SageMakerでTensorflowのオートエンコーダをトレーニングする

こんにちは。
データインテグレーショングループの長谷川です。

今回はTensorFlowでオートエンコーダを作成し、SageMakerのトレーニングジョブを使って分散学習を行いたいと思います。

SageMakerとは

SageMakerはAWSが提供するフルマネージドな機械学習サービスであり、SageMakerを使うとクリック1つでJupyterノートブックを起動でき、モデルのトレーニングから推論まですべてをAWS上のコンテナで完結できます。

さらにSageMaker上で行うモデルのトレーニングは分散学習に対応しており、SageMaker組み込みモデルの他、TensorFlowやKerasで記述したモデルでもSageMaker上で分散学習を行うことが可能です。

オートエンコーダ

オートエンコーダは自己符号化器とも呼ばれ、ニューラルネットワークによって次元削減(エンコード)した特徴量を復元(デコード)させるモデルを指します。

オートエンコーダは入力と同じ出力を正解として用いる教師あり学習で、オートエンコードモデルを通してデコードされた画像と入力画像との誤差が小さければ小さいほど良く学習されたモデルと言えます。

TensorFlowを使ったオートエンコーダの実装

SageMakerで実行する前に、まずは単一のTensorFlowを使ったソースコードでmnistのオートエンコーダを実装してみたいと思います。

mnistの入力は高さ28、幅28、チャンネル数1のグレースケールのため、入力データ1つ1つのサイズは28 * 28 * 1となり、出力結果も同じサイズとなります。

mnistのデータ取得はTensorFlowのmnistチュートリアルのinput_dataを使用します。

今回はシンプルなオートエンコーダなので、エンコーダ、デコーダ共に1層のニューラルネットワークで重みは128として実装します。

import numpy as np
import tensorflow as tf

h_size = 128
imH = 28
imW = 28
imC = 1
flatten_size = imH * imW * imC

エンコーダ、デコーダの実装は以下となります。

エンコーダ

def encoder(x, flatten_size):
    w_enc = tf.Variable(tf.random_normal([flatten_size, h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_enc")
    b_enc = tf.Variable(tf.random_normal([h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_enc")
    enc = tf.nn.softsign(tf.matmul(x,w_enc) + b_enc, name="enc")
    return enc

デコーダ

def decoder(enc, flatten_size):
    w_dec = tf.Variable(tf.random_normal([h_size, flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_dec")
    b_dec = tf.Variable(tf.random_normal([flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_dec")
    dec = tf.nn.relu(tf.matmul(enc, w_dec) + b_dec, name="dec")
    return dec

訓練にはAdam最適化を使用し、入力画像とデコード結果の二乗誤差をバックプロパゲーションすることでオートエンコーダのトレーニングを行います。

トレーニングモデル

def training(loss):
    optimizer = tf.train.AdamOptimizer(
        learning_rate = learning_rate,
        beta1 = 0.9,
        beta2 = 0.999,
        epsilon = 1e-08,
        use_locking=False,
        name="Adam"
    )
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return train_op

誤差関数

def l2loss(decoded, x, flatten_size):
    loss = tf.nn.l2_loss(decoded - x)
    return loss

エポック数とバッチサイズを適宜設定してトレーニングするスクリプトを記述します(ここではエポック数を1000、バッチサイズを100としています)。

autoencoder.py

mnist = input_data.read_data_sets("data/", one_hot=True)

learning_rate = 0.01
training_epochs = 1000
batch_size = 100
display_step = 100

x = tf.placeholder("float", [None, 784])
encoded = encoder(x, flatten_size)
decoded = decoder(encoded, flatten_size)
loss = l2loss(decoded, x, flatten_size)
train_op = training(loss)

sess = tf.Session()
init_op = tf.global_variables_initializer()
sess.run(init_op)

for epoch in range(training_epochs):
    avg_loss = 0.
    total_batch = int(
        mnist.train.num_examples / batch_size
    )
    for i in range(total_batch):
        mbatch_x, mbatch_y = mnist.train.next_batch(
            batch_size
        )
        _, new_loss = sess.run([train_op,loss],
                                              feed_dict={x: mbatch_x}
        )

        avg_loss += new_loss/total_batch

    if epoch % display_step == 0:
        print("Epoch:","%04d" % (epoch + 1),"loss = ","{:.9f}".format(avg_loss))
print("Last Loss:",avg_loss)

これでオートエンコーダの実装が一通りできました。

こちらをSageMakerのトレーニングジョブで分散学習できるように書き換えていきたいと思います。

tf.estimator

TensorFlowにはtf.estimatorという学習と推論を簡単に扱うための高次のAPIが用意されています。

たとえば深層学習の実装はtf.estimator に組み込まれたDNNClassifierを使うことで以下のように記述できます。

# DNNClassifierを使い、重みが300と100の二つの隠れ層と分類数が10の出力層を作成するサンプル
feature_cols= tf.contrib.learn.infer_real_valued_columns_from_input(train_data)
dnn_clf = tf.contrib.learn.DNNClassifier(hidden_units=[300,100], 
                                        n_classes=10,
                                        feature_columns=feature_cols)

tf.estimator は組み込みクラスのほかに独自のモデルも構築することができ、SageMakerのトレーニングジョブでTensorFlowを実行するにはtf.estimator のAPIが必要となるため、先ほどのソースをtf.estimatorに合わせて修正していきます。

tf.estimator.Estimator

tf.estimator を使った独自モデル構築を作成する場合、tf.estimator.Estimator の引数 model_fn に独自のモデルを渡すことで、tf.estimatorのAPIを使用したトレーニングと推論が可能となります。

今回の場合はオートエンコーダのモデルをmodel_fn に渡す形となり、またmodel_fnに渡すモデルは戻り値はtf.estimator.EstimatorSpec() となります。

my_model = tf.estimator.Estimator(model_fn = XXXXX)
# XXXXは独自モデル。独自モデルはtf.estimator.EstimatorSpec()を返す。

tf.estimator.EstimatorSpec

tf.estimator.EstimatorSpectf.estimator がモデルを構築するのに必要な情報をパラメタとして持ちます。

具体的にはトレーニングするためのオペレーションや、バックプロパゲーションのための誤差、どのように推論するか、また呼び出し時のモード(トレーニングか推論か)などのパラメタを指定します。

  • mode ・・・ 実行するジョブがトレーニングか推論か
  • loss ・・・ モデルの誤差関数
  • train_op ・・・ モデルのトレーニング関数
  • predictions ・・・ モデルの推論

今回のオートエンコーダの場合は以下のように指定します。

# featuresのキーを'inputs'としてるのはSageMakerの推論時に'inputs'としないと値が渡らないケースがあったため。
INPUT_TENSOR_NAME = 'inputs'

def model_fn(features, labels, mode, params):
    x = features[INPUT_TENSOR_NAME]
    encoded = encoder(x,flatten_size) # エンコーダ
    decoded = decoder(encoded,flatten_size) # デコーダ
    loss = l2loss(decoded,x,flatten_size) # 
    train_op = training(loss)
    predictions = decoded
    return tf.estimator.EstimatorSpec(
        mode = mode, # estimatorの状態(トレーニングか推論か)
        loss = loss, # バックプロパゲーションのための誤差関数
        train_op = train_op, # モデルを訓練するための最適化関数
        predictions = predictions # モデルが推論を行うための関数
    )

tf.estimator.Estimatormodel_fnには、先ほど定義したmodel_fnを指定します。

autoencoder= tf.estimator.Estimator(model_fn = model_fn) 

トレーニング用データ関数

tf.estimatorモデルができたら、モデルが学習するのに必要なトレーニングデータを渡すための関数をtrain_input_fnという名前で作成します。
関数名は何でもいいのですが、SageMakerでトレーニングを行う際の入力データ関数をtrain_input_fn という名称で作る必要があるため、組み込みしやすいように事前にこの名前で関数を作成します。

今回はnumpy arrayを使用するため、tf.estimator.inputs.numpy_input_fnを使用します。

tf.estimator.inputs.numpy_input_fnに対象となるデータセットとエポック数、バッチサイズ、シャッフルするかどうかを引数として指定するとTensorFlowはその引数に合わせて適切なトレーニングデータを返却するので、バッチサイズを独自で分割したりシャッフルする必要はありません。

def train_input_fn():
    training_set = input_data.read_data_sets("data/", one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:training_set.train.images},
        num_epochs=None,
        batch_size=100,
        shuffle=True
    )()

あとはtf.estimator.Estimator で作成したモデルに対し、train()の引数input_fntrain_input_fn を渡せば訓練が開始されます。

autoencoder.train(input_fn=train_input_fn, steps=10000)

指定したステップ回数に到達するとトレーニング完了となります(この場合は10000)。

WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpX1FRQG
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 10, '_session_config': None, '_keep_checkpoint_max': 5, '_task_type': 'worker', '_global_id_in_cluster': 0, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f27187db790>, '_evaluation_master': '', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 0, '_tf_random_seed': None, '_master': '', '_device_fn': None, '_num_worker_replicas': 1, '_task_id': 0, '_log_step_count_steps': 100, '_model_dir': '/tmp/tmpX1FRQG', '_train_distribute': None, '_save_summary_steps': 1200}
WARNING:tensorflow:Estimator's model_fn (<function model_fn at 0x7f275f17de60>) includes params argument, but params are not passed to Estimator.
Extracting data/train-images-idx3-ubyte.gz
Extracting data/train-labels-idx1-ubyte.gz
Extracting data/t10k-images-idx3-ubyte.gz
Extracting data/t10k-labels-idx1-ubyte.gz
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpX1FRQG/model.ckpt.
INFO:tensorflow:loss = 5657.984, step = 1
INFO:tensorflow:global_step/sec: 183.015
INFO:tensorflow:loss = 440.07687, step = 101 (0.548 sec)
INFO:tensorflow:global_step/sec: 231.816
INFO:tensorflow:loss = 368.45178, step = 201 (0.431 sec)
INFO:tensorflow:global_step/sec: 231.714
INFO:tensorflow:loss = 298.5456, step = 301 (0.432 sec)
INFO:tensorflow:global_step/sec: 229.925
INFO:tensorflow:loss = 292.90118, step = 401 (0.435 sec)
・・・・・・・・・・・・・・・・・・・・ 中略 ・・・・・・・・・・・・・・・・・・・・・・・・・
INFO:tensorflow:Saving checkpoints for 10000 into /tmp/tmpX1FRQG/model.ckpt.
INFO:tensorflow:Loss for final step: 190.22992.

推論

推論には入力データのうち1件を返却する推論用の関数eval_input_fnを使います。

def eval_input_fn():
    eval_set = input_data.read_data_sets("data/", one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:eval_set.validation.images},
        num_epochs=1,
        shuffle=False
    )()

トレーニング済みautoencoderpredict() を呼び出すと推論することができます。

prediction = list(autoencoder.predict(input_fn=eval_input_fn)

試しにminstデータセットの251番目(i=250)を抜き出し、オートエンコーダの前後の画像を比較してみます。

import matplotlib.pyplot as plt
eval_set = input_data.read_data_sets("data/", one_hot=True)
eval_images = eval_set.validation.images 

i = 250

# オリジナル画像
plt.imshow(eval_images[i].reshape(28,28))
plt.gray()
plt.show()

# オートエンコーダの出力結果
plt.imshow(prediction[i].reshape(28,28))
plt.gray()
plt.show()

結果は以下のようになりました。

オリジナル画像

f:id:hasegawa-ma:20190104140432p:plain

オートエンコーダの出力結果

f:id:gnavi_developers:20181221101140p:plain

ここで一度、これまでのソースコードをまとめます。

autoencoder_localy.py

import numpy as np
import tensorflow as tf
import input_data

h_size = 128
INPUT_TENSOR_NAME = 'inputs'
learning_rate = 0.01
imH = 28
imW = 28
imC = 1
flatten_size = imH * imW * imC

def encoder(x, flatten_size):
    w_enc = tf.Variable(tf.random_normal([flatten_size, h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_enc")
    b_enc = tf.Variable(tf.random_normal([h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_enc")
    enc = tf.nn.softsign(tf.matmul(x,w_enc) + b_enc, name="enc")
    return enc

def decoder(enc, flatten_size):
    w_dec = tf.Variable(tf.random_normal([h_size, flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_dec")
    b_dec = tf.Variable(tf.random_normal([flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_dec")
    dec = tf.nn.relu(tf.matmul(enc, w_dec) + b_dec, name="dec")
    return dec

def l2loss(decoded, x, flatten_size):
    loss = tf.nn.l2_loss(decoded - x)
    return loss

def training(loss):
    optimizer = tf.train.AdamOptimizer(
        learning_rate = learning_rate,
        beta1 = 0.9,
        beta2 = 0.999,
        epsilon = 1e-08,
        use_locking=False,
        name="Adam"
    )
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return train_op

def model_fn(features, labels, mode, params):
    x = features[INPUT_TENSOR_NAME]
    encoded = encoder(x,flatten_size)
    decoded = decoder(encoded,flatten_size)
    loss = l2loss(decoded,x,flatten_size)
    train_op = training(loss)
    predictions = decoded
    return tf.estimator.EstimatorSpec(
        mode = mode,
        loss = loss,
        train_op = train_op,
        predictions = predictions
    )

def train_input_fn():
    training_set = input_data.read_data_sets("data/", one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:training_set.train.images},
        num_epochs=None,
        batch_size=100,
        shuffle=True
    )()

def eval_input_fn():
    training_set = input_data.read_data_sets("data/", one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:training_set.validation.images},
        num_epochs=1,
        shuffle=False
    )()

SageMakerの分散処理を実装する

上記で実装した独自モデルをSageMakerのトレーニングジョブとし、SageMaker上で分散学習を行いたいと思います。

そのためにはsagemaker.tensorflow クラスを使用する必要があります。

sagemaker.tensorflowクラスを使ったトレーニングは以下のように行います。

# モデルを作成
estimator = TensorFlow(
    entry_point="xxxxxxx.py",
    role=role,
    train_instance_type='ml.p2.xlarge'
    framework_version='1.xx',
    train_instance_count=1,
    training_steps=10000
)
  • entory_point ・・・ モデルを記述したスクリプトファイル
  • role ・・・ SageMakerを実行可能なAWSロール
  • train_instance_type ・・・ トレーニングジョブに使用するインスタンスのタイプ
  • framework_version ・・・ モデルが使用するTensorFlowのバージョン
  • train_instance_count ・・・ トレーニングジョブが使用するインスタンス数、2以上で分散学習が可能となる
  • training_steps ・・・ トレーニングジョブの最大ステップ数

今回作成したオートエンコーダをSageMakerのトレーニングジョブでトレーニングさせるには、entry_pointに処理を記述したスクリプトを指定する必要があります。

またその際、entory_pointのスクリプトには以下の関数を実装している必要があります。

  • model_fn
  • train_input_fn
  • eval_input_fn
  • serving_input_fn

model_fn

SageMakerで行う機械学習のモデルを記述します。今回の場合はオートエンコーダ部分がこれに当たります。

model_fnは戻り値としてtf.estimator.EstimatorSpecを返す必要がありますが、ここはtf.estimatormodel_fnがそのまま流用できます。

def model_fn(features, labels, mode, params):
    x = features[INPUT_TENSOR_NAME]
    encoded = encoder(x,flatten_size)
    decoded = decoder(encoded,flatten_size)
    loss = l2loss(decoded,x,flatten_size)
    train_op = training(loss)
    predictions = decoded
    return tf.estimator.EstimatorSpec(
        mode = mode, # estimatorの状態(トレーニングか推論か)
        loss = loss, # バックプロパゲーションのための誤差関数
        train_op = train_op, # モデルを訓練するための最適化関数
        predictions = predictions # モデルが推論を行うための関数
    )

train_input_fn

トレーニング用の入力データをジョブに渡す関数です。事前にS3にトレーニング用データをUPしておき、学習時にパスを指定するとSageMakerがトレーニングジョブ起動時にジョブコンテナの/opt/ml/input/配下にデータを配備します。

先ほどのtrain_input_fn はローカルのダウンロード済みトレーニングデータを読み込む仕様だったので、これをジョブコンテナのパスが渡るように修正します。

INPUT_TENSOR_NAME = 'inputs'
def train_input_fn(training_dir): # training_dirにはS3からコピーされたトレーニング用データのパスが入る
    training_set = input_data.read_data_sets(training_dir,one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:training_set.train.images},
        num_epochs=None,
        shuffle=False
    )()

eval_input_fn

SageMakerのトレーニングジョブはチェックポイントごとにeval_input_fnで指定したデータセットを使用して検証を行います。

こちらには検証用のデータを返す関数を指定します。

def eval_input_fn(training_dir,params):
    training_set = input_data.read_data_sets(training_dir,one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:training_set.validation.images},
        num_epochs=1,
        shuffle=False
    )()

serving_input_fn

serving_input_fn は推論エンドポイントを使用する場合に推論を行うために使用します。

model_fn, train_input_fn, eval_input_fnと異なり、serving_input_fnは推論エンドポイントを作成しない場合必須ではありません。

今回は推論まで含めてSageMakerで構築したいのでserving_input_fnも作成します。

def serving_input_fn(params):
    inputs = {INPUT_TENSOR_NAME: tf.placeholder(tf.float32, [None, flatten_size])}
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)

これまでの処理をautoencoder.pyとしてまとめます。

autoencoder.py

import tensorflow as tf
import numpy as np
import os
import input_data
from tensorflow.python.ops import control_flow_ops

h_size = 128
INPUT_TENSOR_NAME = 'inputs'
learning_rate = 0.01
imH = 28
imW = 28
imC = 1
flatten_size = imH * imW * imC

def encoder(x, flatten_size):
    w_enc = tf.Variable(tf.random_normal([flatten_size, h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_enc")
    b_enc = tf.Variable(tf.random_normal([h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_enc")
    enc = tf.nn.softsign(tf.matmul(x,w_enc) + b_enc, name="enc")
    return enc

def decoder(enc, flatten_size):
    w_dec = tf.Variable(tf.random_normal([h_size, flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_dec")
    b_dec = tf.Variable(tf.random_normal([flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_dec")
    dec = tf.nn.relu(tf.matmul(enc, w_dec) + b_dec, name="dec")
    return dec

def l2loss(decoded, x, flatten_size):
    loss = tf.nn.l2_loss(decoded - x)
    return loss

def training(loss):
    optimizer = tf.train.AdamOptimizer(
        learning_rate = learning_rate,
        beta1 = 0.9,
        beta2 = 0.999,
        epsilon = 1e-08,
        use_locking=False,
        name="Adam"
    )
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return train_op

def model_fn(features, labels, mode, params):
    x = features[INPUT_TENSOR_NAME]
    encoded = encoder(x,flatten_size)
    decoded = decoder(encoded,flatten_size)
    loss = l2loss(decoded,x,flatten_size)
    train_op = training(loss)
    predictions = decoded
    return tf.estimator.EstimatorSpec(
        mode = mode,
        loss = loss,
        train_op = train_op,
        predictions = predictions
    )

def train_input_fn(training_dir):
    training_set = input_data.read_data_sets(training_dir,one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:training_set.train.images},
        num_epochs=None,
        shuffle=False
    )()

def eval_input_fn(training_dir,params):
    training_set = input_data.read_data_sets(training_dir,one_hot=True)
    return tf.estimator.inputs.numpy_input_fn(
        x={INPUT_TENSOR_NAME:training_set.validation.images},
        num_epochs=1,
        shuffle=False
    )()

def serving_input_fn(params):
    inputs = {INPUT_TENSOR_NAME: tf.placeholder(tf.float32, [None, flatten_size])}
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)

mnistのデータを取得するのにTensorFlowのmnistチュートリアルのinput_dataを使用するため、これらのファイルと合わせて、autoencoder_mnistというディレクトリに保存します。

sh-4.2$ ls autoencoder_mnist/
autoencoder.py  input_data.py

トレーニングを実行する

ファイルの準備ができたらsagemaker.tensorflowfit()を呼び出して訓練を開始します。

分散学習を行うので、train_instance_countは2以上を指定します。

import boto3
from sagemaker.tensorflow import TensorFlow
from sagemaker import get_execution_role

# SageMakerを実行するためのrole取得
role = get_execution_role()

# AWSのリージョン情報を取得
region = boto3.Session().region_name

# SageMakerのモデルを作成
estimator = TensorFlow(
    entry_point="autoencoder.py",
    source_dir="autoencoder_mnist",
    role=role,
    train_instance_type='ml.m4.2xlarge',
    framework_version='1.11',
    train_instance_count=2,
    training_steps=10000
)

# S3にトレーニング用データをアップロードし、そのパスを指定
data_location = "s3://path/to/mnist/data/".format(region)

# トレーニングジョブを実行
# fitの引数にS3のパスを渡すと、トレーニングジョブ起動時にジョブコンテナの`/opt/ml/input/`配下にデータが配備され
# そのパスがtrain_input_fnに渡る。
estimator.fit(data_location)

正常に実行されるとJupyter上に実行ログが表示されます。

INFO:sagemaker:Creating training-job with name: sagemaker-tensorflow-2018-xx-xx-xx-xx-xx-xxx
Creating tmphdf0psmt_algo-1-VUTMQ_1_3d7a8bccf3f5 ... 
Attaching to tmphdf0psmt_algo-1-VUTMQ_1_87005b07ff9a2mdone
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:30,937 INFO - root - running container entrypoint
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:30,937 INFO - root - starting train task
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:30,952 INFO - container_support.training - Training starting
algo-1-VUTMQ_1_87005b07ff9a | Downloading s3://sagemaker-ap-northeast-1-225919769484/sagemaker-tensorflow-2018-xx-xx-xx-xx-xx-xxx/source/sourcedir.tar.gz to /tmp/script.tar.gz
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:31,948 INFO - tf_container - ----------------------TF_CONFIG--------------------------
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:31,948 INFO - tf_container - {"environment": "cloud", "cluster": {"master": ["algo-1-VUTMQ:2222"]}, "task": {"index": 0, "type": "master"}}
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:31,949 INFO - tf_container - ---------------------------------------------------------
・・・・・・・・・・・・・・・・・・・・ 中略 ・・・・・・・・・・・・・・・・・・・・・・・・・
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:35,647 INFO - tensorflow - loss = 5630.208, step = 1
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,142 INFO - tensorflow - global_step/sec: 201.714
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,143 INFO - tensorflow - loss = 470.30643, step = 101 (0.496 sec)
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,565 INFO - tensorflow - global_step/sec: 236.69
algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,565 INFO - tensorflow - loss = 359.07727, step = 201 (0.422 sec)

train_instance_countを2以上に指定するとコンテナごとに色付けされたログが出力されます。

f:id:gnavi_developers:20181221102321p:plain

ログの最後にCompleted - Training job completed が表示されたらトレーニング完了となります。

2018-12-10 09:38:09 Uploading - Uploading generated training model
2018-12-10 09:38:09 Completed - Training job completed

デバッグ

train_instance_typelocal(GPUインスタンスの場合はlocal_gpu)を指定すると、jupyterインスタンス内でトレーニングジョブコンテナが起動するので、これを利用してデバッグすることが出来ます。正常に動かなかった場合はこちらを指定してデバッグしてください。

estimator = TensorFlow(
    entry_point="autoencoder.py",
    source_dir="autoencoder_mnist",
    role=role,
    train_instance_type='local', # gpuインスタンスの場合は local_gpu
    framework_version='1.11',
    train_instance_count=1,
    training_steps=10000
)

推論エンドポイントをデプロイ

トレーニングが完了したら、トレーニング済みのカスタムモデルを推論エンドポイントへデプロイします。

predictor = estimator.deploy(initial_instance_count=1,instance_type='ml.t2.medium')

ログにTF Serving model successfully loaded が表示されたら推論エンドポイントの準備完了です。

Attaching to tmpfsuu73pc_algo-1-WY5X9_1_4ae08c6e94ca
algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:35,850 INFO - root - running container entrypoint
algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:35,851 INFO - root - starting serve task
algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:35,851 INFO - container_support.serving - reading config
・・・・・・・・・・・・・・・・・・・・ 中略 ・・・・・・・・・・・・・・・・・・・・・・・・・
algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:38,375 INFO - tf_container - TF Serving model successfully loaded
algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:38,380 INFO - container_support.serving - returning initialized server

推論

推論エンドポイントを使い、オートエンコーダを試してみましょう。

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)

im_idx = 100
data = mnist.test.images[im_idx].tolist()
plt.imshow(np.array(data).reshape(28,28))
plt.gray()
plt.show()

オートエンコード前のim_idxに対応する画像は以下です。

f:id:gnavi_developers:20181221102723p:plain

この画像をインプットとし、推論エンドポイントからのオートエンコード結果を取得します。

tensor_proto = tf.make_tensor_proto(values=np.asarray(data), shape=[1, len(data)], dtype=tf.float32)
predict_response = predictor.predict(tensor_proto)
plt.imshow(np.array(predict_response['outputs']['output']['float_val']).reshape(28,28))
plt.gray()
plt.show()

f:id:gnavi_developers:20181221102842p:plain

他にも数件ランダムにピックアップして、エンコード前後を比較しました。左がオリジナル、右がオートエンコーダ適用後の結果となります。

f:id:hasegawa-ma:20190107115438p:plain

推論エンドポイントの停止

確認が終わったらAWSの管理画面から推論エンドポイントを削除します。

f:id:gnavi_developers:20181221102929p:plain

トレーニング済みモデルのアタッチ

次回以降アクセスする場合はattachを使用するとトレーニング済みのカスタムモデルに接続できます。

estimator = estimator.attach("トレーニング済みカスタムモデルのID")

まとめ

tf.estimator.Estimator はモデルをシンプルに記述することができ、ほぼそのままSageMakerのトレーニングジョブとして利用できます。

SageMakerのトレーニングジョブはこちらが意識することなく分散学習が可能なので、GPUでトレーニング時間のかかるタスクにおいて非常に有用なのではないかと思います。


長谷川
ぐるなびビッグデータを使った商品開発を行っています。 2児の父親で、プライベートでは子供が喜ぶような何かを作ったりしてますが、どれも反応がいまいちなのが悲しいです。