simplestarの技術ブログ

目的を書いて、思想と試行、結果と考察、そして具体的な手段を記録します。

Distributed TensorFlow with Cloud ML Engine (Implementation Details)

■前置き
今現在、世界中の天才と呼ばれる人類層が、必死になって神秘のヴェールをはがしている領域で
ごく最近使われるようになってきた単語を使って説明するので、かなり知的負荷が高いこと書きます。
筆者もいっぱいいっぱいで、わからなくても気にせず、読み流してください。

■本記事で伝えたいこと
Deep Learning という言葉が流行り始めたのが 2012 年の ImageNet(別名Large Scale Visual Recognition Challenge)にてトロント大学チームがこれまでの画像認識率に大差をつけて勝利したころから。
世界中で人工知能 (AI) ブームが巻き起こり、Deep Learning ライブラリの群雄割拠時代が始まり、ようやく研究者の間で一つのライブラリに収束してきたこの頃です。
GoogleDeep Learning 用のライブラリ Tensor Flow と、膨大な計算量を簡易にスケールアウトできる Cloud ML サービスを提供しています。
Cloud ML は複数の計算マシンを活用して、本来自宅PCだと数週間かかるようなAIの計算を、その数百分の1の数時間で完了させる道具と認識できます。
世界で誰も見たことが無いような予測結果を描く Deep Learning を中心としたAI技術に携わる方は、Google の Cloud ML を最大限に活用しなければならないと言っても過言ではありません。(むしろここがスタートラインです)

Cloud ML と Tensor Flow の使い方ドキュメント
Introduction  |  TensorFlow
Distributed TensorFlow  |  TensorFlow
を順番に読んでいけば、Tensor Flow の Core API を利用した場合は、 あなたの実装コードに cluster (複数マシンの役割設定)定義を書く必要があることに気づくでしょう。
その具体的な記述方法と、なぜこんな書き方になるのか理解しなければならない概念イメージの中で特に集中して見えていなければならない点を強調します。
先ほどのリンク先を読み進めて、疑問点が晴れるのが一番良いので、深く理解できた方はこの先を読む必要はありません。
ここから先は、実装を試しながら深く読み進めた人間が、結局、少数の何を覚えておけばよいかという点を示していきます。

■具体的な記述方法

前提知識、技術水準はかなり高いかもしれません。
以下の項目ができるようになってから続きを読んでください。(ちょっと偉そうですね、すみません)

  1. Tensor Flow の Core API を使ってデータフローグラフのメンタルモデルを構築できる(要はコード見ただけで計算内容がイメージでき、図で他人に説明できる、別に他人は理解できなくて良い)
  2. 登場する Python 書式で不明点がない (または、不明書式が出てきてもすぐに調べて解決できるくらいのコンピュータ知識とプログラミング経験)

前提を揃えたら、Tensor Flow の Cloud ML 実行のチュートリアルが以下のリンクに示されているので、なぞります。
Using Distributed TensorFlow with Cloud ML Engine and Cloud Datalab  |  Cloud ML Engine for TensorFlow  |  Google Cloud
2018年7月に私が試した限りでは、問題なくすべてのチュートリアルが完了することを確認できました。

ここまでやった人なら気づきますが、コードレベルの詳細説明は載っていないので、このサンプルをベースに活用するには、利用した Python コードを読んで理解する必要があります。
コードをローカルに引きます。まずは次の git clone コマンドを成功させてください。

git clone https://github.com/GoogleCloudPlatform/cloudml-dist-mnist-example

PythonIDE として今おすすめなのは VSCode でしょうか、プラグインを入れて Python 実装を見やすくしておいてください。
キーワードとして cluster を検索します。
一つも引っかかりません。

ということで、これ Custom Estimator という Core API 使わない Tensor Flow のライトユーザー向けサンプルですね。

Tensor Flow 利用者は、新しく理論を作り出すような玄人向けの Core API を活用する人と、出来合いの Estimator をいじるライトユーザーの二つのユーザー層に分かれています。
今回のサンプルはライトユーザー層向けのものでした。

cluster の書き方は、結局
こちらDistributed TensorFlow  |  TensorFlow
を参照して、頑張って API ドキュメントの知識を繋ぎながら確信あるサンプルコードを自力で作る必要があります。

ということで、私が自力で作ってみるので、少々お待ちください。

追記:
とても参考になる実装を見つけました。
2017年10月と、半年以上古いですが Google の中の人のサンプルです。
github.com

こちらがグラフ作成時に気を付けることです。
あらかじめ cluster を作っておいたら、その情報を使って現在のマシンに関する文字列を作り、device_fn を作ります。
それを tf.device に渡して、その後のスコープでグラフを作成します。
これによって、グラフは指定されたマシンでのみ実行されることになります。

    if self.cluster:
      logging.info('Starting %s/%d', self.task.type, self.task.index)
      server = start_server(self.cluster, self.task)
      target = server.target
      device_fn = tf.train.replica_device_setter(
          ps_device='/job:ps',
          worker_device='/job:%s/task:%d' % (self.task.type, self.task.index),
          cluster=self.cluster)
      # We use a device_filter to limit the communication between this job
      # and the parameter servers, i.e., there is no need to directly
      # communicate with the other workers; attempting to do so can result
      # in reliability problems.
      device_filters = [
          '/job:ps',
          '/job:%s/task:%d' % (self.task.type, self.task.index)
      ]
      config = tf.ConfigProto(device_filters=device_filters)
    else:
      target = ''
      device_fn = ''
      config = None

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):
        # Build the training graph.
        self.tensors = self.model.build_train_graph(self.args.data_dir, self.args.batch_size)

気になる cluster の作成方法ですが、以下のように環境変数 TF_CONFIG から情報を取得して、マシン情報を取得し、処理を分岐します。

  env = json.loads(os.environ.get('TF_CONFIG', '{}'))

  # Print the job data as provided by the service.
  logging.info('Original job data: %s', env.get('job', {}))

  # First find out if there's a task value on the environment variable.
  # If there is none or it is empty define a default one.
  task_data = env.get('task', None) or {'type': 'master', 'index': 0}
  task = type('TaskSpec', (object,), task_data)

  cluster_data = env.get('cluster', None)
  cluster = tf.train.ClusterSpec(cluster_data) if cluster_data else None

クラウドで準備したマシンの環境変数 TF_CONFIG から、このように情報を取得しなさいというドキュメントは、こちらに書かれています。
Using TF_CONFIG for Distributed Training Details  |  Cloud ML Engine for TensorFlow  |  Google Cloud

最後に、ps ことパラメータサーバーだった場合は、グラフ構築を行う前に server.join を行って、他の master や worker からのリクエストを待つために join します。

def dispatch(args, model, cluster, task):
  if not cluster or not task or task.type == 'master':
    # Run locally.
    Trainer(args, model, cluster, task).run_training()
  elif task.type == 'ps':
    run_parameter_server(cluster, task)
  elif task.type == 'worker':
    Trainer(args, model, cluster, task).run_training()
  else:
    raise ValueError('invalid task_type %s' % (task.type,))


def run_parameter_server(cluster, task):
  logging.info('Starting parameter server %d', task.index)
  server = start_server(cluster, task)
  server.join()

def start_server(cluster, task):
  if not task.type:
    raise ValueError('--task_type must be specified.')
  if task.index is None:
    raise ValueError('--task_index must be specified.')

  # Create and start a server.
  return tf.train.Server(
      tf.train.ClusterSpec(cluster),
      protocol='grpc',
      job_name=task.type,
      task_index=task.index)

パラメーターサーバーは join するとして、他の master と worker は?
と思いますが、以下のように session を作る際に target として server を指定します。

server = start_server(self.cluster, self.task)
target = server.target
with self.sv.managed_session(target, config=config) as session:

このセッションを run すると、それぞれのマシンで計算が行われた時に、join しているパラメータサーバーが必要な変数情報を渡してくれるので、worker は単に計算を頑張り、結果をパラメータサーバーに返します。
以下の説明によれば、バッチ処理など、入力情報が異なるものについて計算を行い、変数の変更内容を互いにマージするように取り入れることによって、並列化による計算速度の向上をする仕組みとなっています。
Using Distributed TensorFlow with Cloud ML Engine and Cloud Datalab  |  Cloud ML Engine for TensorFlow  |  Google Cloud

Tensor Flow API についての知識をあらかたそろえてから Cloud ML のドキュメントを読み進めると、こうした Tensor Flow の記述についてわかってくるということでした。
Cloud ML は日本語訳が充実しているので、英語が苦手でも素早く読めます。

Concepts  |  Cloud ML Engine for TensorFlow  |  Google Cloud