본문 바로가기

AI/Toy Project

[NLP] Attention을 이용하여 한-영 번역기 만들기

Github 링크

 

GitHub - RealMyeong/Going_Deeper_NLP

Contribute to RealMyeong/Going_Deeper_NLP development by creating an account on GitHub.

github.com

사용 데이터 링크

 

GitHub - jungyeul/korean-parallel-corpora: Korean Parallel Corpus

Korean Parallel Corpus. Contribute to jungyeul/korean-parallel-corpora development by creating an account on GitHub.

github.com


1. 데이터 확인 및 전처리

import tensorflow as tf
import numpy as np

from sklearn.model_selection import train_test_split

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.font_manager as fm
# matplotlib에 한글 지원이 기본적으로 안되기때문에 설정을 따로 해줍니다.
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic') 
mpl.font_manager.findfont(font)

import time
import re
import io

en_file_path = '.../korean-english-park.train.en'
ko_file_path = '.../korean-english-park.train.ko'

# 파일 읽어들이고 10개만 한 번 확인 해줍니다.
with open(en_file_path, 'r') as f:
  en_raw = f.read().splitlines()
 
print('Data Size: ', len(en_raw))
print('Examples: ')

for sen in en_raw[:10]:
  print('>>', sen)

with open(ko_file_path, 'r') as f:
  ko_raw = f.read().splitlines()

print('Data Size: ', len(ko_raw))
print('Examples: ')

for sen in ko_raw[0:10]:
  print('>>', sen)

데이터의 중복이 있을 수 있기 때문에 중복 치를 제거해줍니다. 

이때 중복을 허락하지 않는 특징이 있는 set(.)을 사용해줍니다. 

하지만! 번역에는 source와 target의 쌍이 흐트러지면 안 되기 때문에 순서를 보존하지 않는 set(.) 사용할 때 조심해야 합니다. 

zip(.)을 이용해 두 개를 묶어서 동시에 진행해주겠습니다.

 

그리고 데이터를 살펴보면 한 문장 내에서도 같은 내용이 반복되는 경우가 있습니다. 

이를 drop_duplicate_in_sen() 함수를 정의하여 처리해주겠습니다.

sen_zip = zip(ko_raw, en_raw)
sen_zip = set(sen_zip)
cleaned_ko, cleaned_en = zip(*sen_zip)

def drop_duplicates_in_sen(sen):
  sen = sen.split()
  drop_dup_sen = []
  dup_sen = []
  for i in sen:
    if i not in drop_dup_sen:
      drop_dup_sen.append(i)
  sen = ' '.join(drop_dup_sen)

  return sen

non_dup_ko = [drop_duplicates_in_sen(x) for x in cleaned_ko]
non_dup_en = [drop_duplicates_in_sen(x) for x in cleaned_en]


print(len(non_dup_ko), len(non_dup_en))

# output
78968 78968

 

중복은 지워줬으니 이제 전처리를 하겠습니다. 

전처리는 아래 기능을 포함하는 전처리 함수를 정의하여 진행해줍니다. 기능은 아래와 같습니다.

  1. 문장 부호('?', '!', '.', ',') 앞 뒤로 공백 하나씩 추가
  2. 큰 따옴표(" ") 삭제
  3. 숫자, 한글, 알파벳 제외하고 공백으로 바꿔줌
def preprocess_sentence(sentence, s_token=False, e_token=False, ko=False):
    # 한국어 전처리 할 때는 ko=True로 입력해줍니다.
    if ko:
      sentence = sentence.strip()
      sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
      sentence = re.sub(r'[" "]+', " ", sentence)
      sentence = re.sub(r"[^0-9a-zA-Z가-힣?.!,]+", " ", sentence)
      sentence = sentence.strip()

    else:
      sentence = sentence.lower().strip()
      sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
      sentence = re.sub(r'[" "]+', " ", sentence)
      sentence = re.sub(r"[^0-9a-zA-Z?.!,]+", " ", sentence)

      sentence = sentence.strip()
    
    # 디코더 문장의 앞 뒤에는 특수한 토큰이 필요하기 때문에 파라미터로 받아서 설정해줍니다.
    if s_token:
        sentence = '<start> ' + sentence

    if e_token:
        sentence += ' <end>'
    
    return sentence
# 인코더의 입력이 될 한글 문장
cleaned_ko = [preprocess_sentence(x, ko=True) for x in cleaned_ko] 

# target 문장
cleaned_en = [preprocess_sentence(x, s_token=True, e_token=True) for x in cleaned_en]

 

이번에는 한국어 문장, 영어 문장의 길이 분포를 확인해보겠습니다. 

너무 길거나 너무 짧은 문장은 오히려 학습에 방해가 되기 때문에 삭제해주겠습니다.

%matplotlib inline

min_len = 999
max_len = 0
sum_len = 0

for sen in non_dup_ko:
  length = len(sen)
  if min_len > length:
    min_len = length
  if max_len < length:
    max_len = length
  sum_len += length

print('문장의 최단 길이 : ', min_len)
print('문장의 최장 길이 : ', max_len)
print('문장의 평균 길이 : ', sum_len //len(non_dup_ko))

sentence_length = np.zeros((max_len), dtype=np.int)

# 길이가 같은 문장의 개수
for sen in non_dup_ko:
  sentence_length[len(sen)-1] += 1

plt.bar(range(max_len), sentence_length, width=1.0)
plt.title('Ko Sentence Length Distribution')
plt.show()

%matplotlib inline

min_len = 999
max_len = 0
sum_len = 0

for sen in non_dup_en:
  length = len(sen)
  if min_len > length:
    min_len = length
  if max_len < length:
    max_len = length
  sum_len += length

print('문장의 최단 길이 : ', min_len)
print('문장의 최장 길이 : ', max_len)
print('문장의 평균 길이 : ', sum_len //len(non_dup_en))

sentence_length = np.zeros((max_len), dtype=np.int)

# 길이가 같은 문장의 개수
for sen in non_dup_en:
  sentence_length[len(sen)-1] += 1

plt.bar(range(max_len), sentence_length, width=1.0)
plt.title('Eng Sentence Length Distribution')
plt.show()

문장의 길이를 확인해봤는데 한국어 문장에 비해 영어 문장의 길이가 평균적으로 더 긴 것을 볼 수 있습니다.

따라서 문장의 길이를 기준으로 정리하기보다는 토큰화를 진행한 후 토큰의 개수를 기준으로 데이터를 나눠주겠습니다.

한국어 토크 나이저로는 Konlpy의 Mecab을 사용해줬습니다.

from konlpy.tag import Mecab

mecab = Mecab()
ko_token = [[len(mecab.morphs(sen)),idx] for idx, sen in enumerate(kor_corpus)]
# 영어는 단순 공백을 기준으로 토큰화 진행
en_token = [[len(sen.split()),idx] for idx, sen in enumerate(eng_corpus)]

print('한국어 문장 최대 토큰 개수 : ',max(ko_token))
print('영어 문장 최대 토큰 개수 : ', max(en_token))
print('한국어 문장 최소 토큰 개수 : ', min(ko_token))
print('영어 문장 최소 토큰 개수 : ', min(en_token))


# output
한국어 문장 최대 토큰 개수 :  [109, 67704]
영어 문장 최대 토큰 개수 :  [82, 47490]
한국어 문장 최소 토큰 개수 :  [1, 6]
영어 문장 최소 토큰 개수 :  [2, 1331]

 

source와 target문장을 선택할 때 두 문장의 토큰 개수의 차이가 5개 이하인 애들만 사용해주겠습니다.

또한 target문장에는 시작 토큰과 마무리 토큰 2개가 추가되었기 때문에 토큰의 개수가 3개 이상인 애들을 사용해줍니다.

filtered_kor_corpus = []
filtered_eng_corpus = []

max_token_ko = 0
max_token_en = 0

for ko, en in zip(ko_token, en_token):
  # 토큰의 개수 차이 5개 이하이면서 영어는 최소 3개 이상
  if -5 <= (ko[0] - en[0]) <= 5 and en[0] > 2:
    filtered_kor_corpus.append(kor_corpus[ko[1]])
    filtered_eng_corpus.append(eng_corpus[en[1]])
    
    if max_token_ko < ko[0]:
      max_token_ko = ko[0]
    if max_token_en < en[0]:
      max_token_en = en[0]


# 각 언어별 최대 토큰의 개수와
# 남아있는 데이터의 수 출력
print(max_token_ko)
print(max_token_en)
print(len(filtered_kor_corpus))
print(len(filtered_eng_corpus))

# output
61
63
38382
38382

2. 데이터 토큰화 및 패딩 진행

앞에서 사용할 데이터를 뽑았으니 모델에 사용하기 위해 토크나이저를 만들어주고, 토큰화 진행 후 패딩까지 진행해주겠습니다.

 

# 패딩 최대 길이
max_len = 63

# 토큰화 함수 정의
# 파라미터 설정을 통해 언어에 따라 다르게 진행
def tokenize(corpus, ko=False):
    if ko:
      mecab = Mecab()
      morph = [(mecab.morphs(sen)) for sen in corpus]
      tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
      tokenizer.fit_on_texts(morph)
      tensor = tokenizer.texts_to_sequences(morph)
      # 인코더의 입력으로 들어가는 시퀀스는 역순으로 뒤집어서 넣어줌
      tensor = [list(reversed(sen)) for sen in tensor]
      tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='pre', maxlen=max_len)

    else:
      tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
      tokenizer.fit_on_texts(corpus)

      tensor = tokenizer.texts_to_sequences(corpus)

      tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post', maxlen=max_len)

    return tensor, tokenizer
# 토큰화하기
enc_tensor, enc_tokenizer = tokenize(filtered_kor_corpus, ko=True)
dec_tensor, dec_tokenizer = tokenize(filtered_eng_corpus)

enc_train, enc_test, dec_train, dec_test = train_test_split(enc_tensor, dec_tensor, test_size=0.1)

print("Korean Vocab Size:", len(enc_tokenizer.index_word))  
print("English Vocab Size:", len(dec_tokenizer.index_word))

# output
Korean Vocab Size: 31275
English Vocab Size: 32442

# 토큰화와 패딩이 잘 진행됐는지 한 번 확인해줌

3. 모델 설계

어텐션 메커니즘과 GRU를 사용하여 한-영 번역기를 설계해줍니다.

# 어텐션 클래스 정의 
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.w_dec = tf.keras.layers.Dense(units)
        self.w_enc = tf.keras.layers.Dense(units)
        self.w_com = tf.keras.layers.Dense(1)
    
    def call(self, h_enc, h_dec):
        # h_enc shape: [batch x length x units]
        # h_dec shape: [batch x units]

        h_enc = self.w_enc(h_enc)
        h_dec = tf.expand_dims(h_dec, 1)
        h_dec = self.w_dec(h_dec)

        score = self.w_com(tf.nn.tanh(h_dec + h_enc))
        
        attn = tf.nn.softmax(score, axis=1)

        context_vec = attn * h_enc
        context_vec = tf.reduce_sum(context_vec, axis=1)

        return context_vec, attn

# 인코더 클래스
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(enc_units, return_sequences=True)
		
    def call(self, x):
        x = self.embedding(x)

        output = self.gru(x)

        return output

# 디코더 클래스
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(dec_units, return_sequences=True, return_state=True)
        self.fc = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.dec_units)
        
    def call(self, x, h_dec, enc_out):
        context_vec, attn = self.attention(enc_out, h_dec)

        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vec, 1), x], axis=-1)

        x, h_dec = self.gru(x)
        x = tf.reshape(x, (-1, x.shape[2]))
        x = self.fc(x)

        return x, h_dec, attn

# optimizer, loss 정의
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    
    return tf.reduce_mean(loss)

# train step 정의
@tf.function
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    bsz = src.shape[0]
    loss = 0

    with tf.GradientTape() as tape:
        enc_out = encoder(src)
        h_dec = enc_out[:, -1]
        
        dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)

        for t in range(1, tgt.shape[1]):
            pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

            loss += loss_function(tgt[:, t], pred)
            dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))

    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return batch_loss

4. 훈련 후 결과 시각화

  • Loss 그래프

  • 실험 결과표

 

결과를 보면 초반에는 loss가 고만고만하다가 확 내려가는 부분이 있는데 바로 패딩의 위치를 인코더와 디코더에 다르게 적용했을 때입니다.

순환 신경망의 특성상 앞에서부터 순차적으로 정보를 저장하다 보니 예측을 해야 하는 단어와 정보를 가져와야 할 단어의 거리가 가까우면 좋겠다는 생각에 인코더에서는 패딩을 앞쪽으로, 디코더에서는 패딩을 뒤쪽으로 줬더니 성능이 올라갔습니다. 

 

다음 실험(8th)에서는 저번에 seq2seq 논문에서 봤던 source문장의 순서를 뒤집어서 입력을 해줬더니 성능이 더 좋았다고 한 게 생각이 나서 뒤집어서 넣어봤습니다. 그랬더니 loss가 눈에 띄게 줄어든 것을 볼 수 있습니다. 

 

근데 패딩 부분에 마스킹도 해주고, attention메커니즘을 사용했는데도 이렇게 입력과 예측 단어 사이의 거리가 가까워지니 성능이 올라가는 게 조금 이해가 안 되네요.... ㅋㅋㅋ 아무리 어텐션 메커니즘을 사용해도 recurrent기반의 한계는 있나 봅니다. 이렇게 여러 가지 실험을 해보면서 loss를 줄인다고 줄였는데도 번역은 잘 안되긴 하더군요.... 다음에는 Transformer를 이용해서 한 번 번역기를 만들어보겠습니다.

'AI > Toy Project' 카테고리의 다른 글

뉴스 요약봇 만들기  (1) 2022.09.22
seq2seq 모델을 이용한 번역기 만들기  (1) 2022.09.22
네이버 영화리뷰 감성분석하기  (1) 2022.09.21