LSTMによる文書分類#

import pandas as pd
import numpy as np
import torch
import re
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
from collections import Counter
from torchtext.vocab import vocab
from sklearn.metrics import accuracy_score, f1_score
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
[nltk_data] Downloading package punkt to /Users/ryozawau/nltk_data...
[nltk_data]   Package punkt is already up-to-date!

データ準備#

CSVファイルを読み込む#

df= pd.read_csv('./Data/twitter_training.csv',names=['index','brand','sentiment','text'])
df.head()
index brand sentiment text
0 2401 Borderlands Positive im getting on borderlands and i will murder yo...
1 2401 Borderlands Positive I am coming to the borders and I will kill you...
2 2401 Borderlands Positive im getting on borderlands and i will kill you ...
3 2401 Borderlands Positive im coming on borderlands and i will murder you...
4 2401 Borderlands Positive im getting on borderlands 2 and i will murder ...

ラベルデータの処理#

df["label"]=df["sentiment"].replace({"Positive":2,"Negative":0,"Neutral":1,"Irrelevant":np.nan})
df.dropna(inplace=True)
df.head()
index brand sentiment text label
0 2401 Borderlands Positive im getting on borderlands and i will murder yo... 2.0
1 2401 Borderlands Positive I am coming to the borders and I will kill you... 2.0
2 2401 Borderlands Positive im getting on borderlands and i will kill you ... 2.0
3 2401 Borderlands Positive im coming on borderlands and i will murder you... 2.0
4 2401 Borderlands Positive im getting on borderlands 2 and i will murder ... 2.0

テキストデータの前処理#

  • テキストを小文字に変換

  • 句読点を削除

  • トークン化

  • 単語ID化

Tokenization#

def preprocess_text(text):
    text = text.lower()  # Lowercasing
    text = re.sub(r'\W+', ' ', text)  # Remove punctuation
    tokens = word_tokenize(text)  # Tokenization
    return tokens
df["processed_text"]=df["text"].apply(preprocess_text)
df.head()
index brand sentiment text label processed_text
0 2401 Borderlands Positive im getting on borderlands and i will murder yo... 2.0 [im, getting, on, borderlands, and, i, will, m...
1 2401 Borderlands Positive I am coming to the borders and I will kill you... 2.0 [i, am, coming, to, the, borders, and, i, will...
2 2401 Borderlands Positive im getting on borderlands and i will kill you ... 2.0 [im, getting, on, borderlands, and, i, will, k...
3 2401 Borderlands Positive im coming on borderlands and i will murder you... 2.0 [im, coming, on, borderlands, and, i, will, mu...
4 2401 Borderlands Positive im getting on borderlands 2 and i will murder ... 2.0 [im, getting, on, borderlands, 2, and, i, will...

単語辞書#

Vocabは、各単語(トークン)に対して一意のインデックス(またはID)を割り当てます。このマッピングにより、テキストデータを数値データに変換することができます。

counter = Counter()
for line in df["processed_text"]:
    counter.update(line)
Vocab = vocab(counter, min_freq=1)
# 単語からインデックスへのマッピング
word_to_index = Vocab.get_stoi()

# 最初の5つのアイテムを取得して表示
for i, (word, index) in enumerate(word_to_index.items()):
    if i >= 5:  # 最初の5つのアイテムのみ表示
        break
    print(f"'{word}': {index}")
'gfn': 26881
'elim': 26879
'maxbit': 26876
'challen': 26875
'techsall': 26874
df['numericalized_text'] = df["processed_text"].apply(lambda x: [Vocab[token] for token in x])
df.head()
index brand sentiment text label processed_text numericalized_text
0 2401 Borderlands Positive im getting on borderlands and i will murder yo... 2.0 [im, getting, on, borderlands, and, i, will, m... [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1 2401 Borderlands Positive I am coming to the borders and I will kill you... 2.0 [i, am, coming, to, the, borders, and, i, will... [5, 10, 11, 12, 13, 14, 4, 5, 6, 15, 8, 9]
2 2401 Borderlands Positive im getting on borderlands and i will kill you ... 2.0 [im, getting, on, borderlands, and, i, will, k... [0, 1, 2, 3, 4, 5, 6, 15, 8, 9]
3 2401 Borderlands Positive im coming on borderlands and i will murder you... 2.0 [im, coming, on, borderlands, and, i, will, mu... [0, 11, 2, 3, 4, 5, 6, 7, 8, 9]
4 2401 Borderlands Positive im getting on borderlands 2 and i will murder ... 2.0 [im, getting, on, borderlands, 2, and, i, will... [0, 1, 2, 3, 16, 4, 5, 6, 7, 8, 17, 9]
def pad_sequences(seq, max_len):
    padded = np.zeros((max_len,), dtype=np.int64)
    if len(seq) > max_len: padded[:] = seq[:max_len]
    else: padded[:len(seq)] = seq
    return padded

Padding#

ニューラルネットワークは、入力データが固定長であることを前提としていますので、テキストシーケンスを特定の最大長にパディング(埋める)する必要があります。

df["text_length"]=df["numericalized_text"].apply(lambda x: len(x)) 
df["text_length"].describe()
count    61121.000000
mean        19.455212
std         14.430986
min          0.000000
25%          8.000000
50%         16.000000
75%         27.000000
max        198.000000
Name: text_length, dtype: float64
max_len=30
df['padded_text'] = df['numericalized_text'].apply(lambda x: pad_sequences(x, max_len))
df.head()
index brand sentiment text label processed_text numericalized_text text_length padded_text
0 2401 Borderlands Positive im getting on borderlands and i will murder yo... 2.0 [im, getting, on, borderlands, and, i, will, m... [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 10 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, ...
1 2401 Borderlands Positive I am coming to the borders and I will kill you... 2.0 [i, am, coming, to, the, borders, and, i, will... [5, 10, 11, 12, 13, 14, 4, 5, 6, 15, 8, 9] 12 [5, 10, 11, 12, 13, 14, 4, 5, 6, 15, 8, 9, 0, ...
2 2401 Borderlands Positive im getting on borderlands and i will kill you ... 2.0 [im, getting, on, borderlands, and, i, will, k... [0, 1, 2, 3, 4, 5, 6, 15, 8, 9] 10 [0, 1, 2, 3, 4, 5, 6, 15, 8, 9, 0, 0, 0, 0, 0,...
3 2401 Borderlands Positive im coming on borderlands and i will murder you... 2.0 [im, coming, on, borderlands, and, i, will, mu... [0, 11, 2, 3, 4, 5, 6, 7, 8, 9] 10 [0, 11, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0,...
4 2401 Borderlands Positive im getting on borderlands 2 and i will murder ... 2.0 [im, getting, on, borderlands, 2, and, i, will... [0, 1, 2, 3, 16, 4, 5, 6, 7, 8, 17, 9] 12 [0, 1, 2, 3, 16, 4, 5, 6, 7, 8, 17, 9, 0, 0, 0...

学習用データセットの作成(Batch Datasets)#

# Split the original dataset into training plus validation and testing sets
train_val_df, test_df = train_test_split(df, test_size=0.2)

# Split the training plus validation set into separate training and validation sets
train_df, val_df = train_test_split(train_val_df, test_size=0.25)
# Create TensorDatasets
train_data = TensorDataset(torch.LongTensor(train_df['padded_text'].tolist()), torch.LongTensor(train_df['label'].tolist()))
val_data = TensorDataset(torch.LongTensor(val_df['padded_text'].tolist()), torch.LongTensor(val_df['label'].tolist()))
test_data = TensorDataset(torch.LongTensor(test_df['padded_text'].tolist()), torch.LongTensor(test_df['label'].tolist()))

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
val_loader = DataLoader(val_data, batch_size=batch_size)
test_loader = DataLoader(test_data, batch_size=batch_size)
/var/folders/wm/5xxpvjcj15g89khxd5jwn9200000gn/T/ipykernel_18989/2868894412.py:2: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at /Users/runner/work/pytorch/pytorch/pytorch/torch/csrc/utils/tensor_new.cpp:264.)
  train_data = TensorDataset(torch.LongTensor(train_df['padded_text'].tolist()), torch.LongTensor(train_df['label'].tolist()))

モデルの作成#

メソッドの説明#

nn.Embedding#

  • nn.Embeddingは単語の埋め込みを行うために使用されます。単語の埋め込みとは、単語を固定長のベクトルに変換することを指します。このベクトルは、単語の意味的な特性を捉えることができます。nn.Embeddingの主なパラメータは以下の通りです:

    • num_embeddings:埋め込みを行う単語の総数。通常は語彙のサイズに設定します。

    • embedding_dim:各単語の埋め込みベクトルの次元数。

  • nn.Embeddingは、整数のインデックスを入力として受け取り、それに対応する埋め込みベクトルを出力します。

  • 下の例では、inputの各インデックスが対応する埋め込みベクトルに置き換えられ、embeddedはサイズ(batch_size, sequence_length, embedding_dim)のテンソルになります。

nn.Dropout#

  • ドロップアウトは、ニューラルネットワークの訓練中にランダムにノードを「ドロップアウト」(つまり無効化)することで、過学習を防ぐための一般的なテクニックですnn.Dropoutの主なパラメータは以下の通りです:

    • p:ノードをドロップアウトする確率。0.0(ノードをドロップアウトしない)から1.0(全てのノードをドロップアウトする)までの値を取ります。デフォルトは0.5です。

  • nn.Dropoutは、訓練中にのみドロップアウトを適用し、評価(つまりモデルが.eval()モードにあるとき)中にはドロップアウトを適用しません。これは、訓練中にはモデルのロバスト性を向上させるためにランダム性が必要である一方、評価中にはモデルの全ての学習特性を使用して一貫した出力を得る必要があるためです。

embedding = nn.Embedding(num_embeddings=10000, embedding_dim=300)
input = torch.LongTensor([[1, 2, 4, 5], [4, 3, 2, 9]])
embedded = embedding(input)
embedded.shape
torch.Size([2, 4, 300])

モデルの定義#

hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))

ここでは、双方向LSTMの最後の隠れ状態を取り扱っています。

双方向LSTMは、順方向と逆方向の2つのLSTMを使用します。順方向のLSTMはシーケンスを通常の順序で処理し、逆方向のLSTMはシーケンスを逆順で処理します。その結果、各時間ステップで2つの隠れ状態(順方向と逆方向のそれぞれから1つずつ)が得られます。

  • hidden[-2,:,:]hidden[-1,:,:]は、それぞれ最後の時間ステップでの順方向と逆方向の隠れ状態を取得しています。

  • torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)は、これら2つの隠れ状態を結合しています。結合はdim=1(つまり、特徴量の次元)に沿って行われます。

その結果、順方向と逆方向の隠れ状態が1つのベクトルに結合され、そのベクトルは次の全結合層に入力されます。

self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)

self.fcは全結合層で、LSTMからの出力を最終的な出力次元に変換します。この出力は、分類タスクのクラス数に等しいなります。

全結合層の入力次元は、LSTMの隠れ状態の次元数に依存します。

  • LSTMが双方向の場合(bidirectional=True)、順方向と逆方向の隠れ状態が結合されるため、隠れ状態の次元数はhidden_dim * 2になります。

  • LSTMが一方向の場合(bidirectional=False)、隠れ状態の次元数はhidden_dimになります。

したがって、nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)は、LSTMの方向性に応じて全結合層の入力次元を適切に設定します。

出力次元output_dimは、タスクのクラス数または回帰の出力次元に設定します。

batch_first=True

batch_first=Trueを設定すると、

  • 入力テンソルの形状は(batch_size, sequence_length, input_size)と解釈されます。つまり、バッチの次元が最初に来ます。

  • outputテンソルの形状は(batch_size, seq_len, num_directions * hidden_size)になります。

batch_first=Trueを使用する主な理由は、多くの場合、バッチの次元を最初に持ってくると、テンソル操作が直感的になり、コードが読みやすくなります。

class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, 
                            hidden_dim, 
                            num_layers=n_layers, 
                            bidirectional=bidirectional, 
                            dropout=dropout, 
                            batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.bidirectional = bidirectional
    
    def forward(self, text):
        embedded = self.embedding(text)
        output, (hidden, cell) = self.lstm(embedded)
        if self.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
        else:
            hidden = hidden[-1,:,:]
        return self.fc(hidden.squeeze(0))
vocab_size = len(Vocab)
embedding_dim = 100  
hidden_dim = 256     
output_dim = 3 
n_layers = 2        
bidirectional = True 
dropout = 0.2        

model = LSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
def train_model(model, train_loader, val_loader, optimizer, criterion, n_epochs):
    model.train()
    for epoch in range(n_epochs):
        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)
            optimizer.zero_grad()
            predictions = model(texts)
            loss = criterion(predictions, labels)
            loss.backward()
            optimizer.step()

        # Validation
        model.eval()
        with torch.no_grad():
            val_labels = []
            val_preds = []
            for texts, labels in val_loader:
                texts, labels = texts.to(device), labels.to(device)
                predictions = model(texts)
                val_labels.extend(labels.tolist())
                val_preds.extend(torch.argmax(predictions, dim=1).tolist())

            accuracy = accuracy_score(val_labels, val_preds)
            f1 = f1_score(val_labels, val_preds, average='weighted')
            print(f"Epoch {epoch+1}, Loss: {loss.item()}, Accuracy: {accuracy}, F1 Score: {f1}")
        model.train()
from torch.utils.tensorboard import SummaryWriter

def train_model(model, train_loader, val_loader, optimizer, criterion, n_epochs, tensorboard=False, tensorboard_path='./runs'):
    # Initialize TensorBoard writer if tensorboard logging is enabled
    writer = SummaryWriter(tensorboard_path) if tensorboard else None

    model.train()
    for epoch in range(n_epochs):
        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)
            optimizer.zero_grad()
            predictions = model(texts)
            loss = criterion(predictions, labels)
            loss.backward()
            optimizer.step()

        # Validation
        model.eval()
        with torch.no_grad():
            val_labels = []
            val_preds = []
            for texts, labels in val_loader:
                texts, labels = texts.to(device), labels.to(device)
                predictions = model(texts)
                val_labels.extend(labels.tolist())
                val_preds.extend(torch.argmax(predictions, dim=1).tolist())

            accuracy = accuracy_score(val_labels, val_preds)
            f1 = f1_score(val_labels, val_preds, average='weighted')

            # Log metrics to TensorBoard
            if tensorboard:
                writer.add_scalar('Loss/train', loss.item(), epoch)
                writer.add_scalar('Accuracy/val', accuracy, epoch)
                writer.add_scalar('F1-Score/val', f1, epoch)

            print(f"Epoch {epoch+1}, Loss: {loss.item()}, Accuracy: {accuracy}, F1 Score: {f1}")

        model.train()

    # Close the TensorBoard writer
    if tensorboard:
        writer.close()
# Train the model
n_epochs = 30
#train_model(model, train_loader, val_loader, optimizer, criterion, n_epochs, tensorboard=True, tensorboard_path='./runs/lstm')