본문 바로가기

카테고리 없음

char RNN: 셰익스피어 흉내 내는 문장 생성하기 (훈련 모드, 생성 모드 분기)

char RNN은 낱자로 문장을 생성하는 알고리즘을 말한다. 대표적으로 구글에서 제공하는 예제 코드로서 세익스피어 문장을 흉내내는 코드이다. 

https://www.tensorflow.org/tutorials/text/text_generation?hl=ko 

 

'밑바닥부터 시작하는 딥러닝2'에 나온 PTB 데이터셋을 이용한 문장생성을 응용해, 영화 줄거리 생성을 시도하다가,

데이터 전처리가 너무 까다로워 일단 접어둔 상태다. 그 책에 나온 코드는 PTB라는, 상당히 공을 들여 전처리를 한 데이터를 이용해 단어 기반 문장 생성을 하는 것이라, 다른 종류의 텍스트 데이터를 적용하기에는 상당히 까다로운 코드다.

 

그래서 전처리를 거의 하지 않아도 되는 char RNN의 대표 주자 셰익스피어 문장 생성 코드를 사용해 이것저것 시도해보고 있다. 셰익스피어 char RNN 코드는 별다른 수정없이  다른 텍스트 데이터, 가령 노래 가사, 시, 소설 등에 적용할 수 있다. 이런저런 테스트를 해보니 char RNN의 한계가 보인다. char RNN에 덧붙여 word RNN(단어 기반 문장 생성), sentence RNN(문장 기반 문장들 생성)을 활용하고 감성 분석 및 적용 알고리즘, 전체 맥락 적용 알고리즘(가령 줄거리 적용할 경우), 표절 평가 알고리즘, 등을 혼용해야 제대로 된 수준 높은 문장이 나올 것으로 사료된다.

 

일단은 그 시작점이 char RNN인 것은 다행스런 일이다. char RNN은 전처리를 거의 필요로 하지 않고, 텐서 플로 등을 사용할 경우, 코드도 100~200줄 정도로 간단하게 구현할 수 있기 때문이다. 구글에서 이미 그 코드를 제공하고 있기도 하고.

 

특히 한글 지원도 유니코드 지원 차원에서 파이썬에서 아주 잘 지원하고 있다는 점은 정말 다행스런 일이 아닐 수 없다. 아스키 코드, EUC_KR로 한글을 처리하던 기억을 떠올린다면 정말 좋은 세상이 된 것이다. (하지만 영어의 간결성, 엄격한 문법에 반해 한국어 문어의 복잡성과, 문법의 비정규성 때문에 인공지능에서 아주 다루기 어려운 게 한국어라고 생각한다.)

 

처음에 텍스트 자료를 불러올 때(코드의 첫 부분), utf-8로 디코딩을 해주는 게 중요하다. 리눅스 파이썬에서는 .decode... 코드를 빼도 잘 작동했는데, 윈도우에서는 decode 부분을 뺄 경우, 한글을 바이너리로 인식해 정수로 취급한다. decode 처리를 할 경우, 양쪽 OS 모두, 한글 유니코드 문자열로 처리하게 된다.

 

filename = "korean_poems.txt"

text = open(filename, 'rb').read().decode(encoding='utf-8')

 

또 하나의 문제(제일 중요한 문제)는 훈련 코드와 문장 생성 코드를 분리하는 이슈이다.

 

두어달 전에 이 게시판에 올린 '네이버 영화 평점' 코드에서는 훈련코드와 평가 코드를 분리할 수 있었는데, char RNN 코드에서는 그게 쉽지 않다. 

 

그 이유는 코드에 주석으로 설명해 놓았다. 

 

하나의 코드 파일 안에 분기하여 기능을 구현하면, 훈련 계속과 문장 생성에 필요한 변수값들을 그대로 사용할 수 있기 때문에 아주 편리하고 효율적이다. 

 

1. 훈련을 지속하려면...

훈련할 때마다 처음부터 다시 할 수는 없다. 이전에 학습한 weights값을 이어받아 훈련하고 그 결과를 다시 저장하면, 시간 날 때마다 틈틈이 훈련을 시킬 수 있다.

훈련이 끝날 때마다 모델을 파일로 저장해둔다. 훈련을 재개할 때는 그 파일을 불러와 모델을 생성한다. 이제 이전에 훈련을 마칠 때 상황으로 돌아온 것이다.

 

모델을 불러 올 때, 커스터마이징한 loss 함수를 지정해줘야 하는데, 

 

model = tf.keras.models.load_model('saved_model/my_model', custom_objects={'loss':loss})

 

이 코드를 명기해준다. 커스텀 객체를 지정해주는 것인데 컴파일에 필요한 loss 함수를 지정해줘야 하는데, 정해주지 않으면 loss 함수를 알지 못해서 컴파일을 할 수가 없다. 따라서,

 

코드 2/3 지점 즈음에

def loss(labels, logits): 

로 정의해준 게 있는데, 이 함수명을 전달해 줘야 한다.

 

custom_objects={'loss':loss}

 

컴파일을 하지 않겠다면  compile=False 로 명기하면 loss 함수를 지정해주지 않아도 넘어갈 수 있지만, 그럴 경우는 아마 드물 것이다.

 

훈련을 마치면 이 모델을 다시 저장하면 된다.

 

2. 문장 생성을 할 때

문장 생성을 할 때는 훈련할 때 마련된 변수값들과, 기 훈련된 weights 값들이 필요하다. 저장된 모델을 문장생성에 재활용할 수 없는 까닭은, 문장 생성에 사용되는 모델의 인풋 배치 사이즈는 1인 반면, 훈련 때 LSTM 인풋 데이터로 들어간 배치 사이즈는 64나 128, 256 정도일 것이기 때문이다. 인풋 shape 이 다르므로 기 모델을 활용할 수 없기 때문에, 모델을 새로 생성한다.

 

따라서 훈련 시에 weights 값도 저장을 해둬야 한다. 모델도 저장을 해둬야 하고... 저장된 모델은 훈련 재개에 사용되고, 저장된 가중치값들은 문장 생성에 사용되는 것이다. 훈련 시에 만들어진 온갖 변수값들은 문장 생성용 모델에  꼭 필요하기 때문에, 훈련코드와 문장 생성 코드의 파일을 분리하기가 어려운 것이다!

 

그래서 훈련 재개는,

keep_training = 1 # 혹은 0

 

문장 생성 모드는,

do_generate = 0  # 혹은 1

 

로 활성화, 비활성화 하는 것이다. True, False를 사용해도 마찬가지인데, 0, 1을 사용하는 게 오타 가능성을 줄이고, 타이핑 시간을 절약할 수 있다는...

 

모델과 가중치 저장에 사용되는 폴더는 미리 만들어둬야 한다.

 

기존 코드 마지막 즈음에,

 

keep_training = 0 # 혹은 1

do_generate = 0 # 혹은 1

 

if do_generate == False:

  저장된 모델 불러와서 훈련 제개

if do_generate: 

  저장된 가중치 불러오고 모델 새로 생성해서 문장 생성.

 

이 정도만 달라진 것이라, 이해하기에 어렵지는 않을 것이다.

 

다음은 각기 에폭을 10, 40, 200을 주었을 때 생성된 문장들이다.

 

사용된 코드는 마지막에 붙여 놓았다.

 

--- 10 에폭시

The Man: 
FRIAR LAURENCE:
Is slain such feart
Behold thee suned; but if they have villain--
My fatiers dangural ceose cannot heard
The splitled like a drown a play-facord you,
To have him staingain--mayst thou hast submers.
What this? have I than keep cer me afternied;
The ill-sworn a sightons hear'd to sign to-night. He would have tow'd,
Shall do you to me this faith in time
O: how we'll father.

Third Citizen:
Or do not so?'
But one that cannot God less her breath,
I'll gentle mosts to wome faith in ever,
Exce again, no told of time to slept,
And forward and rath under him: which,
Like a shame in revenge and thee at the law, town without lose.

 

철자가 옳은 단어는 많이 보이는데 문장 수준에서는 엉망이다.

 

----- 40 에폭시

 The Man: 
Provost:
And so on Wedds best, who hast thou mean'st not at my common
creature, and my children with a sight in Mantua's wr:
She are the traitor's forenal innocence, nor he took and all this land
Wherein thou wilt, with traitorder, horns!

VALERIA:
Yield so; let the struck my master.
Post thou to church. You, transmething liege.

DUCHESS OF YORK:
White-say not, I can scarch too?

--- 120 에폭 시

 The Man: 
Pray now, I pray you.

ANGELO:
The sunnet that beseems thee fork.

TRANIO:
Signior Holts, I hope to thy love to hold on sin:
She will not come again unto my good worms,
Comes it and fly. Thou art a traitor to
Go to; let me formake them still in a winter's light
And oppose some point of mine own Richmond in none.

GLOUCESTER:
Fly to my good lord, by your opinion like deform
England kiss a thousand times and many,
With all the effection of my grief to this enemy,
Where on a sister of his merit. Has

First Citizen:
Well, say to his design of the greatest prodigal
Is true and measure of their master; here, I say!
The church disposition tears direct with age!

Second Officer:
He is our subject, Mowbray; for me, so your souls that would take
out of accident?

SEBASTIAN:
Ay, or truly, my mother, wither'd Lord Aumerle,

 

Lord no:
And though thy hand I hope,
I'll know the infant-leaf it profane.

DUKE VINCENTIO:
It cannot but be! is she be found,
And not offending her own limittedied his son,
Who

 

말이 되는 문장들이 제법 등장했다.

"The Man: \n" 으로 운을 띄웠을 때 만들어낸 문장들인데,

글자 하나씩 받아서 예측한 것치곤 상당히 잘했다고 본다.

 

 

import tensorflow as tf
import numpy as np
import os, sys
import time

filename = "shakespeare.txt"
#filename = "all_lyrics.txt"

# 문서 파일을 읽는다.
text = open(filename, 'rb').read().decode(encoding='utf-8')
#text  = text[:100000]

# 텍스트의 길이는 그 안에 있는 문자의 수다.
print("텍스트의 길이: {}자".format(len(text)))

# 처음 250자를 살펴본다.
print(text[:250])

# 파일의 고유 문자수를 출력.
vocab = sorted(set(text))
print(vocab)
print(len(vocab))

# 텍스트를 벡터화. char2idx, idx2char
char2idx = {u:i for i, u in enumerate(vocab)} # 0, 'A' -> {'\n': 0, ' ': 1, '!': 2, '$': 3, '&':  4....}
print(char2idx)

idx2char = np.array(vocab)
print(idx2char)
print(idx2char[:5])
print(text[:10])
tmp = [char2idx[c] for c in text]
print(tmp[:10])
print(char2idx['F'], char2idx['i'],char2idx['r'], char2idx['s'], char2idx['t'])
text_as_int = np.array(tmp)
print(text_as_int[:100])

print ('{} ---- 문자들이 다음의 정수로 매핑되었습니다 ----> {}'.format(repr(text[:13]), text_as_int[:13]))

# 훈련 샘플과 타깃 만들기
# 단일 입력에 대해 원하는 문장의 최대 길이
seq_length = 100
examples_per_epoch = len(text) # seq_length
print(examples_per_epoch)

# 훈련 샘플/타깃 만들기
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
for num in char_dataset.take(5):
  print(num)
  print(idx2char[num])

# batch 메서드를 사용하면, 개별문자들을 원하는 크기의 시퀀스로 쉽게 변환할 수 있다.
sequences = char_dataset.batch(seq_length + 1, drop_remainder=True)
for item in sequences.take(5):
  print(item)
  print(repr(''.join(idx2char[item])))
  
def split_input_target(chunk):
  input_text = chunk[:-1]
  target_text = chunk[1:]

  return input_text, target_text

dataset = sequences.map(split_input_target)

# 첫번째 샘플의 타깃 값을 출력한다.
for input_example, target_example in dataset.take(1):
  print('입력 데이터: ', repr(''.join(idx2char[input_example])))
  print('타깃 데이터: ', repr(''.join(idx2char[target_example])))
  print(input_example[:5])
  print(target_example[:5])

for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
  print("{:4d}단계".format(i))
  print(" 입력: {} {:s}".format(input_idx, repr(idx2char[input_idx])))
  print(" 예상 출력: {} {:s}".format(target_idx, repr(idx2char[target_idx])))

# 훈련 배치 생성
# 배치 크기
BATCH_SIZE = 64
# 데이터셋을 섞을 버퍼 크기
buffer_size = 10000

dataset = dataset.shuffle(buffer_size).batch(BATCH_SIZE, drop_remainder=True)
print(dataset)

'''모델 설계
모델 정의: tf.keras.Sequential
- tf.keras.layers.Embedding: 입력층. embedding_dim 차원 벡터에 각 문자의 정수 코드를 매핑하는 훈련 가능한 검색 테이블
- tf.keras.layers.GRU: 크기가 units =rnn_units인 RNN타입. 여기서 LSTM을 사용할 수도 있음.
- tf.keras.layers.Dense: 크기가 vocab_size인 출력을 생성하는 출력층.'''

# 문자로 된 어휘 사전의 크기
vocab_size = len(vocab)
print(vocab_size)
embedding_dim = 256
rnn_units = 1024

def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
    tf.keras.layers.LSTM(rnn_units, return_sequences=True,
                         stateful=True, kernel_initializer='glorot_uniform'),  # Xavier 정규분포 초기값 설정기
    tf.keras.layers.Dense(vocab_size)
  ])
  return model

model = build_model(
  vocab_size = len(vocab),
  embedding_dim=embedding_dim,
  rnn_units=rnn_units,
  batch_size=BATCH_SIZE)  

  # 모델 사용
for input_example_batch, target_example_batch in dataset.take(1):
  example_batch_pred = model(input_example_batch)
  print(example_batch_pred.shape)  
  print(example_batch_pred[0])

  print(input_example_batch)
  print(target_example_batch)
  example_batch_pred = model(input_example_batch)

# 출력은 (batch_size, sequence length, vocab_size)
# 위 예제에서는 시퀀스 길이를 100으로 설정했으면 임의의 길이를 입력해서 모델을 실행할 수 있다.
model.summary()

# 배치의 첫번째 샘플링 시도
sampled_indices = tf.random.categorical(example_batch_pred[0], num_samples=1)
#print(sampled_indices)
sampled_indices = tf.squeeze(sampled_indices, axis=-1)
#print(sampled_indices)

#print(repr(''.join(idx2char[sampled_indices])))
#for pred in example_batch_pred:
#print(np.argmax(pred))
#print(repr(idx2char[np.argmax(pred, axis=-1)]))
#sampled_indices = tf.random.categorical()

# 아직은 훈련되지 않은 모델에 의해 예측된 데이터이다. 이를 알기 쉽게 복호화한다.
print(input_example_batch[0])
print("-----------------------")
print("입력: \n", repr("".join(idx2char[input_example_batch[0]])))
print()
print("예측된 다음 문자: \n", repr("".join(idx2char[sampled_indices])))

# 모델 훈련: 표준 분류 문제로 다룰 수 있다. 이전 RNN 상태와 이번 타임 스텝의 입력으로 다음 문자의 클래스를 예측합니다.
# 옵티마이저와 손실함수 넣기
def loss(labels, logits):
  return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
example_batch_loss = loss(target_example_batch, example_batch_pred)
print("예측 배열 크기(shape): ", example_batch_pred.shape, " # (배치 크기, 시퀀스 길이, 어휘사전 크기)")
print("스칼라 손실:     ", example_batch_loss.numpy().mean())

# 모델 컴파일
model.compile(optimizer='adam', loss=loss) 

# 체크포인트를 사용하여 훈련 중 체크포인트가 저장되도록 합니다.
# 체크포인트가 저장될 폴더
checkpoint_dir = './training_checkpoints'
# 체크 포인트 파일이름
checkpoint_filename = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath = checkpoint_filename, save_weights_only=True)

# 훈련실행
epochs = 10
keep_training = 0 # 1: 모델을 불러와서 훈련을 계속할 때 0: 훈련을 처음할 때

# 문장 생성 모드
do_generate = 0  # 1: 문장 생성을 할 때, 0: 문장 생성 안 할 때

# 문장 생성을 할 때는 모델을 불러오지 않는다. 훈련을 하지도, 따라서 훈련 결과를 저장하지도 않는다.
# 지금까지 실행된 코드 결과(만들어진 변수값)를 기반으로, 
# 모델을 만들고(배치사이즈를 반드시 1로 해야 함, 안 그러면 에러 남), 체크포인트를 불러와서 훈련 weights를 적재한다.
# 모델을 그냥 불러와서 문장 생성을 하지 못하는 이유는, 훈련 때의 LSTM 인풋 데이터 shape과 문장 생성 시의 인풋 데이터
# shape이 다르기 때문이다. 문장생성 때는 배치를 하지 않고 하나씩 문자를 넣어줘서 예측을 하기 때문이다.

if do_generate == 0:
  if keep_training:
    # If you load model only for prediction (without training), you need to set compile flag to False:
    # model = load_model('saved_model/my_model', compile=False)
    model = tf.keras.models.load_model('saved_model/my_model', custom_objects={'loss':loss})

  history = model.fit(dataset, epochs=epochs, callbacks=[checkpoint_callback])    
  model.save('saved_model/my_model')

# 텍스트 생성
# 최근 체크포인트 복원

# 문장 생성을 한다면, 모델을 별도로 만들어야 한다. (배치사이즈를 1로 해야 하기 때문에)
# 이때는 위의 온갓 char2idx, idx2char 등 온갖 변수값들이 필요하기 때문에, 문장생성 코드를 별도 파일로 작성하기가 난감하다.
# 그래서 조건문으로 처리.
if do_generate:
  print("latest_checkpoint: ", tf.train.latest_checkpoint(checkpoint_dir))

  model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
  model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
  #model.load_weights("./training_checkpoints\ckpt_100")

  model.build(tf.TensorShape([1, None]))
  #model.summary()

def generate_text(model, start_string):
  # 평가 단계 (학습된 모델을 사용하여 텍스트 생성)
  # 생성할 문자의 수
  num_generate = 1000

  # 시작 문자열을 숫자로 변환(벡터화)
  input_eval =  [char2idx[s] for s in start_string]        # ex. ROMEO
  input_eval = tf.expand_dims(input_eval, 0)

  # 결과를 저장할 빈 문자열
  text_generated = []

  # 온도가 낮으면 더 예측 가능한 텍스트가 됩니다.
  # 온도가 높으면 더 의외의 텍스트가 됩니다.
  # 최적의 세팅을 찾기 위한 실험
  temperature = 1.0

  # 여기에서 배치 크기 == 1
  model.reset_states()

  for i in range(num_generate): # 0~ 1000
    predictions = model(input_eval)
    # 배치 차원 제거
    predictions = tf.squeeze(predictions, 0)

    # 범주형 분포를 사용하여 모델에서 리턴한 단어 예측
    predictions = predictions / temperature  # 
    predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy() # should check

    # 예측된 단어를 다음 입력으로 모델에 전달
    # 이전 은닉 상태와 함께
    input_eval = tf.expand_dims([predicted_id], 0)

    text_generated.append(idx2char[predicted_id])
  
  return (start_string + ''.join(text_generated))

# mytext = "ROMEO"
# myinput = [char2idx[s] for s in mytext] 
# print(myinput)
# # 차원을 하나 줄임: 2차원 배열 -> 1차원 배열
# myinput = tf.expand_dims(myinput, 0)
# print(myinput)
# print(myinput[0])
# print(tf.squeeze(myinput))

if do_generate:
  article = generate_text(model, start_string=u"The Man: \n")
  print("\n", article)