1. 데이터 확인 및 전처리
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.font_manager as fm
# matplotlib에 한글 지원이 기본적으로 안되기때문에 설정을 따로 해줍니다.
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic')
mpl.font_manager.findfont(font)
import time
import re
import io
en_file_path = '.../korean-english-park.train.en'
ko_file_path = '.../korean-english-park.train.ko'
# 파일 읽어들이고 10개만 한 번 확인 해줍니다.
with open(en_file_path, 'r') as f:
en_raw = f.read().splitlines()
print('Data Size: ', len(en_raw))
print('Examples: ')
for sen in en_raw[:10]:
print('>>', sen)
with open(ko_file_path, 'r') as f:
ko_raw = f.read().splitlines()
print('Data Size: ', len(ko_raw))
print('Examples: ')
for sen in ko_raw[0:10]:
print('>>', sen)
데이터의 중복이 있을 수 있기 때문에 중복 치를 제거해줍니다.
이때 중복을 허락하지 않는 특징이 있는 set(.)을 사용해줍니다.
하지만! 번역에는 source와 target의 쌍이 흐트러지면 안 되기 때문에 순서를 보존하지 않는 set(.) 사용할 때 조심해야 합니다.
zip(.)을 이용해 두 개를 묶어서 동시에 진행해주겠습니다.
그리고 데이터를 살펴보면 한 문장 내에서도 같은 내용이 반복되는 경우가 있습니다.
이를 drop_duplicate_in_sen() 함수를 정의하여 처리해주겠습니다.
sen_zip = zip(ko_raw, en_raw)
sen_zip = set(sen_zip)
cleaned_ko, cleaned_en = zip(*sen_zip)
def drop_duplicates_in_sen(sen):
sen = sen.split()
drop_dup_sen = []
dup_sen = []
for i in sen:
if i not in drop_dup_sen:
drop_dup_sen.append(i)
sen = ' '.join(drop_dup_sen)
return sen
non_dup_ko = [drop_duplicates_in_sen(x) for x in cleaned_ko]
non_dup_en = [drop_duplicates_in_sen(x) for x in cleaned_en]
print(len(non_dup_ko), len(non_dup_en))
# output
78968 78968
중복은 지워줬으니 이제 전처리를 하겠습니다.
전처리는 아래 기능을 포함하는 전처리 함수를 정의하여 진행해줍니다. 기능은 아래와 같습니다.
- 문장 부호('?', '!', '.', ',') 앞 뒤로 공백 하나씩 추가
- 큰 따옴표(" ") 삭제
- 숫자, 한글, 알파벳 제외하고 공백으로 바꿔줌
def preprocess_sentence(sentence, s_token=False, e_token=False, ko=False):
# 한국어 전처리 할 때는 ko=True로 입력해줍니다.
if ko:
sentence = sentence.strip()
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = re.sub(r'[" "]+', " ", sentence)
sentence = re.sub(r"[^0-9a-zA-Z가-힣?.!,]+", " ", sentence)
sentence = sentence.strip()
else:
sentence = sentence.lower().strip()
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = re.sub(r'[" "]+', " ", sentence)
sentence = re.sub(r"[^0-9a-zA-Z?.!,]+", " ", sentence)
sentence = sentence.strip()
# 디코더 문장의 앞 뒤에는 특수한 토큰이 필요하기 때문에 파라미터로 받아서 설정해줍니다.
if s_token:
sentence = '<start> ' + sentence
if e_token:
sentence += ' <end>'
return sentence
# 인코더의 입력이 될 한글 문장
cleaned_ko = [preprocess_sentence(x, ko=True) for x in cleaned_ko]
# target 문장
cleaned_en = [preprocess_sentence(x, s_token=True, e_token=True) for x in cleaned_en]
이번에는 한국어 문장, 영어 문장의 길이 분포를 확인해보겠습니다.
너무 길거나 너무 짧은 문장은 오히려 학습에 방해가 되기 때문에 삭제해주겠습니다.
%matplotlib inline
min_len = 999
max_len = 0
sum_len = 0
for sen in non_dup_ko:
length = len(sen)
if min_len > length:
min_len = length
if max_len < length:
max_len = length
sum_len += length
print('문장의 최단 길이 : ', min_len)
print('문장의 최장 길이 : ', max_len)
print('문장의 평균 길이 : ', sum_len //len(non_dup_ko))
sentence_length = np.zeros((max_len), dtype=np.int)
# 길이가 같은 문장의 개수
for sen in non_dup_ko:
sentence_length[len(sen)-1] += 1
plt.bar(range(max_len), sentence_length, width=1.0)
plt.title('Ko Sentence Length Distribution')
plt.show()
%matplotlib inline
min_len = 999
max_len = 0
sum_len = 0
for sen in non_dup_en:
length = len(sen)
if min_len > length:
min_len = length
if max_len < length:
max_len = length
sum_len += length
print('문장의 최단 길이 : ', min_len)
print('문장의 최장 길이 : ', max_len)
print('문장의 평균 길이 : ', sum_len //len(non_dup_en))
sentence_length = np.zeros((max_len), dtype=np.int)
# 길이가 같은 문장의 개수
for sen in non_dup_en:
sentence_length[len(sen)-1] += 1
plt.bar(range(max_len), sentence_length, width=1.0)
plt.title('Eng Sentence Length Distribution')
plt.show()
문장의 길이를 확인해봤는데 한국어 문장에 비해 영어 문장의 길이가 평균적으로 더 긴 것을 볼 수 있습니다.
따라서 문장의 길이를 기준으로 정리하기보다는 토큰화를 진행한 후 토큰의 개수를 기준으로 데이터를 나눠주겠습니다.
한국어 토크 나이저로는 Konlpy의 Mecab을 사용해줬습니다.
from konlpy.tag import Mecab
mecab = Mecab()
ko_token = [[len(mecab.morphs(sen)),idx] for idx, sen in enumerate(kor_corpus)]
# 영어는 단순 공백을 기준으로 토큰화 진행
en_token = [[len(sen.split()),idx] for idx, sen in enumerate(eng_corpus)]
print('한국어 문장 최대 토큰 개수 : ',max(ko_token))
print('영어 문장 최대 토큰 개수 : ', max(en_token))
print('한국어 문장 최소 토큰 개수 : ', min(ko_token))
print('영어 문장 최소 토큰 개수 : ', min(en_token))
# output
한국어 문장 최대 토큰 개수 : [109, 67704]
영어 문장 최대 토큰 개수 : [82, 47490]
한국어 문장 최소 토큰 개수 : [1, 6]
영어 문장 최소 토큰 개수 : [2, 1331]
source와 target문장을 선택할 때 두 문장의 토큰 개수의 차이가 5개 이하인 애들만 사용해주겠습니다.
또한 target문장에는 시작 토큰과 마무리 토큰 2개가 추가되었기 때문에 토큰의 개수가 3개 이상인 애들을 사용해줍니다.
filtered_kor_corpus = []
filtered_eng_corpus = []
max_token_ko = 0
max_token_en = 0
for ko, en in zip(ko_token, en_token):
# 토큰의 개수 차이 5개 이하이면서 영어는 최소 3개 이상
if -5 <= (ko[0] - en[0]) <= 5 and en[0] > 2:
filtered_kor_corpus.append(kor_corpus[ko[1]])
filtered_eng_corpus.append(eng_corpus[en[1]])
if max_token_ko < ko[0]:
max_token_ko = ko[0]
if max_token_en < en[0]:
max_token_en = en[0]
# 각 언어별 최대 토큰의 개수와
# 남아있는 데이터의 수 출력
print(max_token_ko)
print(max_token_en)
print(len(filtered_kor_corpus))
print(len(filtered_eng_corpus))
# output
61
63
38382
38382
2. 데이터 토큰화 및 패딩 진행
앞에서 사용할 데이터를 뽑았으니 모델에 사용하기 위해 토크나이저를 만들어주고, 토큰화 진행 후 패딩까지 진행해주겠습니다.
# 패딩 최대 길이
max_len = 63
# 토큰화 함수 정의
# 파라미터 설정을 통해 언어에 따라 다르게 진행
def tokenize(corpus, ko=False):
if ko:
mecab = Mecab()
morph = [(mecab.morphs(sen)) for sen in corpus]
tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
tokenizer.fit_on_texts(morph)
tensor = tokenizer.texts_to_sequences(morph)
# 인코더의 입력으로 들어가는 시퀀스는 역순으로 뒤집어서 넣어줌
tensor = [list(reversed(sen)) for sen in tensor]
tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='pre', maxlen=max_len)
else:
tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
tokenizer.fit_on_texts(corpus)
tensor = tokenizer.texts_to_sequences(corpus)
tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post', maxlen=max_len)
return tensor, tokenizer
# 토큰화하기
enc_tensor, enc_tokenizer = tokenize(filtered_kor_corpus, ko=True)
dec_tensor, dec_tokenizer = tokenize(filtered_eng_corpus)
enc_train, enc_test, dec_train, dec_test = train_test_split(enc_tensor, dec_tensor, test_size=0.1)
print("Korean Vocab Size:", len(enc_tokenizer.index_word))
print("English Vocab Size:", len(dec_tokenizer.index_word))
# output
Korean Vocab Size: 31275
English Vocab Size: 32442
# 토큰화와 패딩이 잘 진행됐는지 한 번 확인해줌
3. 모델 설계
어텐션 메커니즘과 GRU를 사용하여 한-영 번역기를 설계해줍니다.
# 어텐션 클래스 정의
class BahdanauAttention(tf.keras.layers.Layer):
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.w_dec = tf.keras.layers.Dense(units)
self.w_enc = tf.keras.layers.Dense(units)
self.w_com = tf.keras.layers.Dense(1)
def call(self, h_enc, h_dec):
# h_enc shape: [batch x length x units]
# h_dec shape: [batch x units]
h_enc = self.w_enc(h_enc)
h_dec = tf.expand_dims(h_dec, 1)
h_dec = self.w_dec(h_dec)
score = self.w_com(tf.nn.tanh(h_dec + h_enc))
attn = tf.nn.softmax(score, axis=1)
context_vec = attn * h_enc
context_vec = tf.reduce_sum(context_vec, axis=1)
return context_vec, attn
# 인코더 클래스
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, enc_units):
super(Encoder, self).__init__()
self.enc_units = enc_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.gru = tf.keras.layers.GRU(enc_units, return_sequences=True)
def call(self, x):
x = self.embedding(x)
output = self.gru(x)
return output
# 디코더 클래스
class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, dec_units):
super(Decoder, self).__init__()
self.dec_units = dec_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.gru = tf.keras.layers.GRU(dec_units, return_sequences=True, return_state=True)
self.fc = tf.keras.layers.Dense(vocab_size)
self.attention = BahdanauAttention(self.dec_units)
def call(self, x, h_dec, enc_out):
context_vec, attn = self.attention(enc_out, h_dec)
x = self.embedding(x)
x = tf.concat([tf.expand_dims(context_vec, 1), x], axis=-1)
x, h_dec = self.gru(x)
x = tf.reshape(x, (-1, x.shape[2]))
x = self.fc(x)
return x, h_dec, attn
# optimizer, loss 정의
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss.dtype)
loss *= mask
return tf.reduce_mean(loss)
# train step 정의
@tf.function
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
bsz = src.shape[0]
loss = 0
with tf.GradientTape() as tape:
enc_out = encoder(src)
h_dec = enc_out[:, -1]
dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)
for t in range(1, tgt.shape[1]):
pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)
loss += loss_function(tgt[:, t], pred)
dec_src = tf.expand_dims(tgt[:, t], 1)
batch_loss = (loss / int(tgt.shape[1]))
variables = encoder.trainable_variables + decoder.trainable_variables
gradients = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
return batch_loss
4. 훈련 후 결과 시각화
- Loss 그래프
- 실험 결과표
결과를 보면 초반에는 loss가 고만고만하다가 확 내려가는 부분이 있는데 바로 패딩의 위치를 인코더와 디코더에 다르게 적용했을 때입니다.
순환 신경망의 특성상 앞에서부터 순차적으로 정보를 저장하다 보니 예측을 해야 하는 단어와 정보를 가져와야 할 단어의 거리가 가까우면 좋겠다는 생각에 인코더에서는 패딩을 앞쪽으로, 디코더에서는 패딩을 뒤쪽으로 줬더니 성능이 올라갔습니다.
다음 실험(8th)에서는 저번에 seq2seq 논문에서 봤던 source문장의 순서를 뒤집어서 입력을 해줬더니 성능이 더 좋았다고 한 게 생각이 나서 뒤집어서 넣어봤습니다. 그랬더니 loss가 눈에 띄게 줄어든 것을 볼 수 있습니다.
근데 패딩 부분에 마스킹도 해주고, attention메커니즘을 사용했는데도 이렇게 입력과 예측 단어 사이의 거리가 가까워지니 성능이 올라가는 게 조금 이해가 안 되네요.... ㅋㅋㅋ 아무리 어텐션 메커니즘을 사용해도 recurrent기반의 한계는 있나 봅니다. 이렇게 여러 가지 실험을 해보면서 loss를 줄인다고 줄였는데도 번역은 잘 안되긴 하더군요.... 다음에는 Transformer를 이용해서 한 번 번역기를 만들어보겠습니다.
'AI > Toy Project' 카테고리의 다른 글
뉴스 요약봇 만들기 (1) | 2022.09.22 |
---|---|
seq2seq 모델을 이용한 번역기 만들기 (1) | 2022.09.22 |
네이버 영화리뷰 감성분석하기 (1) | 2022.09.21 |