こんにちは。
データインテグレーショングループの長谷川です。
今回はTensorFlowでオートエンコーダを作成し、SageMakerのトレーニングジョブを使って分散学習を行いたいと思います。
- SageMakerとは
- オートエンコーダ
- TensorFlowを使ったオートエンコーダの実装
- tf.estimator
- トレーニング用データ関数
- 推論
- SageMakerの分散処理を実装する
- トレーニングを実行する
- 推論エンドポイントをデプロイ
- 推論
- 推論エンドポイントの停止
- トレーニング済みモデルのアタッチ
- まとめ
SageMakerとは
SageMakerはAWSが提供するフルマネージドな機械学習サービスであり、SageMakerを使うとクリック1つでJupyterノートブックを起動でき、モデルのトレーニングから推論まですべてをAWS上のコンテナで完結できます。
さらにSageMaker上で行うモデルのトレーニングは分散学習に対応しており、SageMaker組み込みモデルの他、TensorFlowやKerasで記述したモデルでもSageMaker上で分散学習を行うことが可能です。
オートエンコーダ
オートエンコーダは自己符号化器とも呼ばれ、ニューラルネットワークによって次元削減(エンコード)した特徴量を復元(デコード)させるモデルを指します。
オートエンコーダは入力と同じ出力を正解として用いる教師あり学習で、オートエンコードモデルを通してデコードされた画像と入力画像との誤差が小さければ小さいほど良く学習されたモデルと言えます。
TensorFlowを使ったオートエンコーダの実装
SageMakerで実行する前に、まずは単一のTensorFlowを使ったソースコードでmnistのオートエンコーダを実装してみたいと思います。
mnistの入力は高さ28、幅28、チャンネル数1のグレースケールのため、入力データ1つ1つのサイズは28 * 28 * 1となり、出力結果も同じサイズとなります。
mnistのデータ取得はTensorFlowのmnistチュートリアルのinput_dataを使用します。
今回はシンプルなオートエンコーダなので、エンコーダ、デコーダ共に1層のニューラルネットワークで重みは128として実装します。
import numpy as np import tensorflow as tf h_size = 128 imH = 28 imW = 28 imC = 1 flatten_size = imH * imW * imC
エンコーダ、デコーダの実装は以下となります。
エンコーダ
def encoder(x, flatten_size): w_enc = tf.Variable(tf.random_normal([flatten_size, h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_enc") b_enc = tf.Variable(tf.random_normal([h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_enc") enc = tf.nn.softsign(tf.matmul(x,w_enc) + b_enc, name="enc") return enc
デコーダ
def decoder(enc, flatten_size): w_dec = tf.Variable(tf.random_normal([h_size, flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_dec") b_dec = tf.Variable(tf.random_normal([flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_dec") dec = tf.nn.relu(tf.matmul(enc, w_dec) + b_dec, name="dec") return dec
訓練にはAdam最適化を使用し、入力画像とデコード結果の二乗誤差をバックプロパゲーションすることでオートエンコーダのトレーニングを行います。
トレーニングモデル
def training(loss): optimizer = tf.train.AdamOptimizer( learning_rate = learning_rate, beta1 = 0.9, beta2 = 0.999, epsilon = 1e-08, use_locking=False, name="Adam" ) train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step()) return train_op
誤差関数
def l2loss(decoded, x, flatten_size): loss = tf.nn.l2_loss(decoded - x) return loss
エポック数とバッチサイズを適宜設定してトレーニングするスクリプトを記述します(ここではエポック数を1000、バッチサイズを100としています)。
autoencoder.py
mnist = input_data.read_data_sets("data/", one_hot=True) learning_rate = 0.01 training_epochs = 1000 batch_size = 100 display_step = 100 x = tf.placeholder("float", [None, 784]) encoded = encoder(x, flatten_size) decoded = decoder(encoded, flatten_size) loss = l2loss(decoded, x, flatten_size) train_op = training(loss) sess = tf.Session() init_op = tf.global_variables_initializer() sess.run(init_op) for epoch in range(training_epochs): avg_loss = 0. total_batch = int( mnist.train.num_examples / batch_size ) for i in range(total_batch): mbatch_x, mbatch_y = mnist.train.next_batch( batch_size ) _, new_loss = sess.run([train_op,loss], feed_dict={x: mbatch_x} ) avg_loss += new_loss/total_batch if epoch % display_step == 0: print("Epoch:","%04d" % (epoch + 1),"loss = ","{:.9f}".format(avg_loss)) print("Last Loss:",avg_loss)
これでオートエンコーダの実装が一通りできました。
こちらをSageMakerのトレーニングジョブで分散学習できるように書き換えていきたいと思います。
tf.estimator
TensorFlowにはtf.estimator
という学習と推論を簡単に扱うための高次のAPIが用意されています。
たとえば深層学習の実装はtf.estimator
に組み込まれたDNNClassifier
を使うことで以下のように記述できます。
# DNNClassifierを使い、重みが300と100の二つの隠れ層と分類数が10の出力層を作成するサンプル feature_cols= tf.contrib.learn.infer_real_valued_columns_from_input(train_data) dnn_clf = tf.contrib.learn.DNNClassifier(hidden_units=[300,100], n_classes=10, feature_columns=feature_cols)
tf.estimator
は組み込みクラスのほかに独自のモデルも構築することができ、SageMakerのトレーニングジョブでTensorFlowを実行するにはtf.estimator
のAPIが必要となるため、先ほどのソースをtf.estimator
に合わせて修正していきます。
tf.estimator.Estimator
tf.estimator
を使った独自モデル構築を作成する場合、tf.estimator.Estimator
の引数 model_fn
に独自のモデルを渡すことで、tf.estimator
のAPIを使用したトレーニングと推論が可能となります。
今回の場合はオートエンコーダのモデルをmodel_fn
に渡す形となり、またmodel_fn
に渡すモデルは戻り値はtf.estimator.EstimatorSpec()
となります。
my_model = tf.estimator.Estimator(model_fn = XXXXX)
# XXXXは独自モデル。独自モデルはtf.estimator.EstimatorSpec()を返す。
tf.estimator.EstimatorSpec
tf.estimator.EstimatorSpec
は tf.estimator
がモデルを構築するのに必要な情報をパラメタとして持ちます。
具体的にはトレーニングするためのオペレーションや、バックプロパゲーションのための誤差、どのように推論するか、また呼び出し時のモード(トレーニングか推論か)などのパラメタを指定します。
- mode ・・・ 実行するジョブがトレーニングか推論か
- loss ・・・ モデルの誤差関数
- train_op ・・・ モデルのトレーニング関数
- predictions ・・・ モデルの推論
今回のオートエンコーダの場合は以下のように指定します。
# featuresのキーを'inputs'としてるのはSageMakerの推論時に'inputs'としないと値が渡らないケースがあったため。 INPUT_TENSOR_NAME = 'inputs' def model_fn(features, labels, mode, params): x = features[INPUT_TENSOR_NAME] encoded = encoder(x,flatten_size) # エンコーダ decoded = decoder(encoded,flatten_size) # デコーダ loss = l2loss(decoded,x,flatten_size) # train_op = training(loss) predictions = decoded return tf.estimator.EstimatorSpec( mode = mode, # estimatorの状態(トレーニングか推論か) loss = loss, # バックプロパゲーションのための誤差関数 train_op = train_op, # モデルを訓練するための最適化関数 predictions = predictions # モデルが推論を行うための関数 )
tf.estimator.Estimator
のmodel_fn
には、先ほど定義したmodel_fn
を指定します。
autoencoder= tf.estimator.Estimator(model_fn = model_fn)
トレーニング用データ関数
tf.estimator
モデルができたら、モデルが学習するのに必要なトレーニングデータを渡すための関数をtrain_input_fn
という名前で作成します。
関数名は何でもいいのですが、SageMakerでトレーニングを行う際の入力データ関数をtrain_input_fn
という名称で作る必要があるため、組み込みしやすいように事前にこの名前で関数を作成します。
今回はnumpy arrayを使用するため、tf.estimator.inputs.numpy_input_fn
を使用します。
tf.estimator.inputs.numpy_input_fn
に対象となるデータセットとエポック数、バッチサイズ、シャッフルするかどうかを引数として指定するとTensorFlowはその引数に合わせて適切なトレーニングデータを返却するので、バッチサイズを独自で分割したりシャッフルする必要はありません。
def train_input_fn(): training_set = input_data.read_data_sets("data/", one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:training_set.train.images}, num_epochs=None, batch_size=100, shuffle=True )()
あとはtf.estimator.Estimator
で作成したモデルに対し、train()
の引数input_fn
にtrain_input_fn
を渡せば訓練が開始されます。
autoencoder.train(input_fn=train_input_fn, steps=10000)
指定したステップ回数に到達するとトレーニング完了となります(この場合は10000)。
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpX1FRQG INFO:tensorflow:Using config: {'_save_checkpoints_secs': 10, '_session_config': None, '_keep_checkpoint_max': 5, '_task_type': 'worker', '_global_id_in_cluster': 0, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f27187db790>, '_evaluation_master': '', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 0, '_tf_random_seed': None, '_master': '', '_device_fn': None, '_num_worker_replicas': 1, '_task_id': 0, '_log_step_count_steps': 100, '_model_dir': '/tmp/tmpX1FRQG', '_train_distribute': None, '_save_summary_steps': 1200} WARNING:tensorflow:Estimator's model_fn (<function model_fn at 0x7f275f17de60>) includes params argument, but params are not passed to Estimator. Extracting data/train-images-idx3-ubyte.gz Extracting data/train-labels-idx1-ubyte.gz Extracting data/t10k-images-idx3-ubyte.gz Extracting data/t10k-labels-idx1-ubyte.gz INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpX1FRQG/model.ckpt. INFO:tensorflow:loss = 5657.984, step = 1 INFO:tensorflow:global_step/sec: 183.015 INFO:tensorflow:loss = 440.07687, step = 101 (0.548 sec) INFO:tensorflow:global_step/sec: 231.816 INFO:tensorflow:loss = 368.45178, step = 201 (0.431 sec) INFO:tensorflow:global_step/sec: 231.714 INFO:tensorflow:loss = 298.5456, step = 301 (0.432 sec) INFO:tensorflow:global_step/sec: 229.925 INFO:tensorflow:loss = 292.90118, step = 401 (0.435 sec) ・・・・・・・・・・・・・・・・・・・・ 中略 ・・・・・・・・・・・・・・・・・・・・・・・・・ INFO:tensorflow:Saving checkpoints for 10000 into /tmp/tmpX1FRQG/model.ckpt. INFO:tensorflow:Loss for final step: 190.22992.
推論
推論には入力データのうち1件を返却する推論用の関数eval_input_fn
を使います。
def eval_input_fn(): eval_set = input_data.read_data_sets("data/", one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:eval_set.validation.images}, num_epochs=1, shuffle=False )()
トレーニング済みautoencoder
のpredict()
を呼び出すと推論することができます。
prediction = list(autoencoder.predict(input_fn=eval_input_fn)
試しにminstデータセットの251番目(i=250)を抜き出し、オートエンコーダの前後の画像を比較してみます。
import matplotlib.pyplot as plt eval_set = input_data.read_data_sets("data/", one_hot=True) eval_images = eval_set.validation.images i = 250 # オリジナル画像 plt.imshow(eval_images[i].reshape(28,28)) plt.gray() plt.show() # オートエンコーダの出力結果 plt.imshow(prediction[i].reshape(28,28)) plt.gray() plt.show()
結果は以下のようになりました。
オリジナル画像
オートエンコーダの出力結果
ここで一度、これまでのソースコードをまとめます。
autoencoder_localy.py
import numpy as np import tensorflow as tf import input_data h_size = 128 INPUT_TENSOR_NAME = 'inputs' learning_rate = 0.01 imH = 28 imW = 28 imC = 1 flatten_size = imH * imW * imC def encoder(x, flatten_size): w_enc = tf.Variable(tf.random_normal([flatten_size, h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_enc") b_enc = tf.Variable(tf.random_normal([h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_enc") enc = tf.nn.softsign(tf.matmul(x,w_enc) + b_enc, name="enc") return enc def decoder(enc, flatten_size): w_dec = tf.Variable(tf.random_normal([h_size, flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_dec") b_dec = tf.Variable(tf.random_normal([flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_dec") dec = tf.nn.relu(tf.matmul(enc, w_dec) + b_dec, name="dec") return dec def l2loss(decoded, x, flatten_size): loss = tf.nn.l2_loss(decoded - x) return loss def training(loss): optimizer = tf.train.AdamOptimizer( learning_rate = learning_rate, beta1 = 0.9, beta2 = 0.999, epsilon = 1e-08, use_locking=False, name="Adam" ) train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step()) return train_op def model_fn(features, labels, mode, params): x = features[INPUT_TENSOR_NAME] encoded = encoder(x,flatten_size) decoded = decoder(encoded,flatten_size) loss = l2loss(decoded,x,flatten_size) train_op = training(loss) predictions = decoded return tf.estimator.EstimatorSpec( mode = mode, loss = loss, train_op = train_op, predictions = predictions ) def train_input_fn(): training_set = input_data.read_data_sets("data/", one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:training_set.train.images}, num_epochs=None, batch_size=100, shuffle=True )() def eval_input_fn(): training_set = input_data.read_data_sets("data/", one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:training_set.validation.images}, num_epochs=1, shuffle=False )()
SageMakerの分散処理を実装する
上記で実装した独自モデルをSageMakerのトレーニングジョブとし、SageMaker上で分散学習を行いたいと思います。
そのためにはsagemaker.tensorflow
クラスを使用する必要があります。
sagemaker.tensorflow
クラスを使ったトレーニングは以下のように行います。
# モデルを作成 estimator = TensorFlow( entry_point="xxxxxxx.py", role=role, train_instance_type='ml.p2.xlarge' framework_version='1.xx', train_instance_count=1, training_steps=10000 )
- entory_point ・・・ モデルを記述したスクリプトファイル
- role ・・・ SageMakerを実行可能なAWSロール
- train_instance_type ・・・ トレーニングジョブに使用するインスタンスのタイプ
- framework_version ・・・ モデルが使用するTensorFlowのバージョン
- train_instance_count ・・・ トレーニングジョブが使用するインスタンス数、2以上で分散学習が可能となる
- training_steps ・・・ トレーニングジョブの最大ステップ数
今回作成したオートエンコーダをSageMakerのトレーニングジョブでトレーニングさせるには、entry_point
に処理を記述したスクリプトを指定する必要があります。
またその際、entory_pointのスクリプトには以下の関数を実装している必要があります。
- model_fn
- train_input_fn
- eval_input_fn
- serving_input_fn
model_fn
SageMakerで行う機械学習のモデルを記述します。今回の場合はオートエンコーダ部分がこれに当たります。
model_fn
は戻り値としてtf.estimator.EstimatorSpec
を返す必要がありますが、ここはtf.estimator
のmodel_fn
がそのまま流用できます。
def model_fn(features, labels, mode, params): x = features[INPUT_TENSOR_NAME] encoded = encoder(x,flatten_size) decoded = decoder(encoded,flatten_size) loss = l2loss(decoded,x,flatten_size) train_op = training(loss) predictions = decoded return tf.estimator.EstimatorSpec( mode = mode, # estimatorの状態(トレーニングか推論か) loss = loss, # バックプロパゲーションのための誤差関数 train_op = train_op, # モデルを訓練するための最適化関数 predictions = predictions # モデルが推論を行うための関数 )
train_input_fn
トレーニング用の入力データをジョブに渡す関数です。事前にS3にトレーニング用データをUPしておき、学習時にパスを指定するとSageMakerがトレーニングジョブ起動時にジョブコンテナの/opt/ml/input/
配下にデータを配備します。
先ほどのtrain_input_fn
はローカルのダウンロード済みトレーニングデータを読み込む仕様だったので、これをジョブコンテナのパスが渡るように修正します。
INPUT_TENSOR_NAME = 'inputs' def train_input_fn(training_dir): # training_dirにはS3からコピーされたトレーニング用データのパスが入る training_set = input_data.read_data_sets(training_dir,one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:training_set.train.images}, num_epochs=None, shuffle=False )()
eval_input_fn
SageMakerのトレーニングジョブはチェックポイントごとにeval_input_fn
で指定したデータセットを使用して検証を行います。
こちらには検証用のデータを返す関数を指定します。
def eval_input_fn(training_dir,params): training_set = input_data.read_data_sets(training_dir,one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:training_set.validation.images}, num_epochs=1, shuffle=False )()
serving_input_fn
serving_input_fn
は推論エンドポイントを使用する場合に推論を行うために使用します。
model_fn
, train_input_fn
, eval_input_fn
と異なり、serving_input_fn
は推論エンドポイントを作成しない場合必須ではありません。
今回は推論まで含めてSageMakerで構築したいのでserving_input_fn
も作成します。
def serving_input_fn(params): inputs = {INPUT_TENSOR_NAME: tf.placeholder(tf.float32, [None, flatten_size])} return tf.estimator.export.ServingInputReceiver(inputs, inputs)
これまでの処理をautoencoder.py
としてまとめます。
autoencoder.py
import tensorflow as tf import numpy as np import os import input_data from tensorflow.python.ops import control_flow_ops h_size = 128 INPUT_TENSOR_NAME = 'inputs' learning_rate = 0.01 imH = 28 imW = 28 imC = 1 flatten_size = imH * imW * imC def encoder(x, flatten_size): w_enc = tf.Variable(tf.random_normal([flatten_size, h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_enc") b_enc = tf.Variable(tf.random_normal([h_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_enc") enc = tf.nn.softsign(tf.matmul(x,w_enc) + b_enc, name="enc") return enc def decoder(enc, flatten_size): w_dec = tf.Variable(tf.random_normal([h_size, flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="w_dec") b_dec = tf.Variable(tf.random_normal([flatten_size], mean=0.0, stddev=0.05), dtype=tf.float32, name="b_dec") dec = tf.nn.relu(tf.matmul(enc, w_dec) + b_dec, name="dec") return dec def l2loss(decoded, x, flatten_size): loss = tf.nn.l2_loss(decoded - x) return loss def training(loss): optimizer = tf.train.AdamOptimizer( learning_rate = learning_rate, beta1 = 0.9, beta2 = 0.999, epsilon = 1e-08, use_locking=False, name="Adam" ) train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step()) return train_op def model_fn(features, labels, mode, params): x = features[INPUT_TENSOR_NAME] encoded = encoder(x,flatten_size) decoded = decoder(encoded,flatten_size) loss = l2loss(decoded,x,flatten_size) train_op = training(loss) predictions = decoded return tf.estimator.EstimatorSpec( mode = mode, loss = loss, train_op = train_op, predictions = predictions ) def train_input_fn(training_dir): training_set = input_data.read_data_sets(training_dir,one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:training_set.train.images}, num_epochs=None, shuffle=False )() def eval_input_fn(training_dir,params): training_set = input_data.read_data_sets(training_dir,one_hot=True) return tf.estimator.inputs.numpy_input_fn( x={INPUT_TENSOR_NAME:training_set.validation.images}, num_epochs=1, shuffle=False )() def serving_input_fn(params): inputs = {INPUT_TENSOR_NAME: tf.placeholder(tf.float32, [None, flatten_size])} return tf.estimator.export.ServingInputReceiver(inputs, inputs)
mnistのデータを取得するのにTensorFlowのmnistチュートリアルのinput_dataを使用するため、これらのファイルと合わせて、autoencoder_mnist
というディレクトリに保存します。
sh-4.2$ ls autoencoder_mnist/
autoencoder.py input_data.py
トレーニングを実行する
ファイルの準備ができたらsagemaker.tensorflow
のfit()
を呼び出して訓練を開始します。
分散学習を行うので、train_instance_count
は2以上を指定します。
import boto3 from sagemaker.tensorflow import TensorFlow from sagemaker import get_execution_role # SageMakerを実行するためのrole取得 role = get_execution_role() # AWSのリージョン情報を取得 region = boto3.Session().region_name # SageMakerのモデルを作成 estimator = TensorFlow( entry_point="autoencoder.py", source_dir="autoencoder_mnist", role=role, train_instance_type='ml.m4.2xlarge', framework_version='1.11', train_instance_count=2, training_steps=10000 ) # S3にトレーニング用データをアップロードし、そのパスを指定 data_location = "s3://path/to/mnist/data/".format(region) # トレーニングジョブを実行 # fitの引数にS3のパスを渡すと、トレーニングジョブ起動時にジョブコンテナの`/opt/ml/input/`配下にデータが配備され # そのパスがtrain_input_fnに渡る。 estimator.fit(data_location)
正常に実行されるとJupyter上に実行ログが表示されます。
INFO:sagemaker:Creating training-job with name: sagemaker-tensorflow-2018-xx-xx-xx-xx-xx-xxx Creating tmphdf0psmt_algo-1-VUTMQ_1_3d7a8bccf3f5 ... Attaching to tmphdf0psmt_algo-1-VUTMQ_1_87005b07ff9a2mdone algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:30,937 INFO - root - running container entrypoint algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:30,937 INFO - root - starting train task algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:30,952 INFO - container_support.training - Training starting algo-1-VUTMQ_1_87005b07ff9a | Downloading s3://sagemaker-ap-northeast-1-225919769484/sagemaker-tensorflow-2018-xx-xx-xx-xx-xx-xxx/source/sourcedir.tar.gz to /tmp/script.tar.gz algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:31,948 INFO - tf_container - ----------------------TF_CONFIG-------------------------- algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:31,948 INFO - tf_container - {"environment": "cloud", "cluster": {"master": ["algo-1-VUTMQ:2222"]}, "task": {"index": 0, "type": "master"}} algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:31,949 INFO - tf_container - --------------------------------------------------------- ・・・・・・・・・・・・・・・・・・・・ 中略 ・・・・・・・・・・・・・・・・・・・・・・・・・ algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:35,647 INFO - tensorflow - loss = 5630.208, step = 1 algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,142 INFO - tensorflow - global_step/sec: 201.714 algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,143 INFO - tensorflow - loss = 470.30643, step = 101 (0.496 sec) algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,565 INFO - tensorflow - global_step/sec: 236.69 algo-1-VUTMQ_1_87005b07ff9a | 2018-12-07 02:59:36,565 INFO - tensorflow - loss = 359.07727, step = 201 (0.422 sec)
train_instance_count
を2以上に指定するとコンテナごとに色付けされたログが出力されます。
ログの最後にCompleted - Training job completed
が表示されたらトレーニング完了となります。
2018-12-10 09:38:09 Uploading - Uploading generated training model 2018-12-10 09:38:09 Completed - Training job completed
デバッグ
train_instance_type
にlocal
(GPUインスタンスの場合はlocal_gpu
)を指定すると、jupyterインスタンス内でトレーニングジョブコンテナが起動するので、これを利用してデバッグすることが出来ます。正常に動かなかった場合はこちらを指定してデバッグしてください。
estimator = TensorFlow( entry_point="autoencoder.py", source_dir="autoencoder_mnist", role=role, train_instance_type='local', # gpuインスタンスの場合は local_gpu framework_version='1.11', train_instance_count=1, training_steps=10000 )
推論エンドポイントをデプロイ
トレーニングが完了したら、トレーニング済みのカスタムモデルを推論エンドポイントへデプロイします。
predictor = estimator.deploy(initial_instance_count=1,instance_type='ml.t2.medium')
ログにTF Serving model successfully loaded
が表示されたら推論エンドポイントの準備完了です。
Attaching to tmpfsuu73pc_algo-1-WY5X9_1_4ae08c6e94ca algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:35,850 INFO - root - running container entrypoint algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:35,851 INFO - root - starting serve task algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:35,851 INFO - container_support.serving - reading config ・・・・・・・・・・・・・・・・・・・・ 中略 ・・・・・・・・・・・・・・・・・・・・・・・・・ algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:38,375 INFO - tf_container - TF Serving model successfully loaded algo-1-WY5X9_1_4ae08c6e94ca | 2018-12-05 00:45:38,380 INFO - container_support.serving - returning initialized server
推論
推論エンドポイントを使い、オートエンコーダを試してみましょう。
import numpy as np import tensorflow as tf import matplotlib.pyplot as plt from tensorflow.examples.tutorials.mnist import input_data mnist = input_data.read_data_sets("/tmp/data/", one_hot=True) im_idx = 100 data = mnist.test.images[im_idx].tolist() plt.imshow(np.array(data).reshape(28,28)) plt.gray() plt.show()
オートエンコード前のim_idx
に対応する画像は以下です。
この画像をインプットとし、推論エンドポイントからのオートエンコード結果を取得します。
tensor_proto = tf.make_tensor_proto(values=np.asarray(data), shape=[1, len(data)], dtype=tf.float32) predict_response = predictor.predict(tensor_proto) plt.imshow(np.array(predict_response['outputs']['output']['float_val']).reshape(28,28)) plt.gray() plt.show()
他にも数件ランダムにピックアップして、エンコード前後を比較しました。左がオリジナル、右がオートエンコーダ適用後の結果となります。
推論エンドポイントの停止
確認が終わったらAWSの管理画面から推論エンドポイントを削除します。
トレーニング済みモデルのアタッチ
次回以降アクセスする場合はattach
を使用するとトレーニング済みのカスタムモデルに接続できます。
estimator = estimator.attach("トレーニング済みカスタムモデルのID")
まとめ
tf.estimator.Estimator
はモデルをシンプルに記述することができ、ほぼそのままSageMakerのトレーニングジョブとして利用できます。
SageMakerのトレーニングジョブはこちらが意識することなく分散学習が可能なので、GPUでトレーニング時間のかかるタスクにおいて非常に有用なのではないかと思います。