やりたいことのGIF動画です。こちら実際にBeamのストリーミングで、PubSubと同期とっているところです。
ざっくり構成図はこんな感じです。
ここから簡単に手順を書いてみます。
Google ColabでGCP使うには
GCPの認証
以下のコマンドを実行するとURLが発行されるので、それでブラウザから認証するだけです。本当に使いやすいです。
from google.colab import auth auth.authenticate_user()
PyPiモジュールのインストール
Google Colaboratoryの良いところとして、Pythonモジュールは当然のこと、apt使ってソフトウェアインストールしたり、OS側の操作もできることです。
以下のように !
マークつけてコマンド実行します。
!pip install --quiet google-cloud-pubsub
しかも、その環境構築の手順もノートとして残るので、後で忘れた頃にとても重宝します。
PubSub側準備
以下のTopicとSubscription作ります。これを後で使います。
$ gcloud pubsub topics create my-df-topic
$ gcloud pubsub subscriptions create --topic my-df-topic my-df-sub
左のブラウザ(Publish側)
PubSubにメッセージPublishするためのコード書きます。PubSubのクライアントモジュールだけインストールしておきます。
!pip install --quiet google-cloud-pubsub
0.5秒毎にPubSubのTopicにPublishするだけのコードです。 project
変数だけ自身のプロジェクト名に置き換えます。
import time from datetime import datetime from google.cloud import pubsub_v1 project = 'YOUR_PROJECT' topic = 'my-df-topic' publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project, topic) for i in range(100): data = str({ 'num': i, 'now': datetime.now() }) future = publisher.publish(topic_path, data=data.encode("utf-8")) print('Pub ->{}'.format(data)) time.sleep(0.5)
右のブラウザ(BeamでSubscribe側)
Google ColabにPubSubに加えて、BeamとBeamのGCPプラグインをインストールします。
!pip install --quiet google-cloud-pubsub !pip install --quiet apache-beam !pip install --quiet apache-beam[gcp]
こちらもコードはとてもシンプルです。Subscribeしたデータを出力するだけのコードです。
import os import apache_beam as beam from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.pubsub import ReadFromPubSub project = 'YOUR_PROJECT_NAME' os.environ['GOOGLE_CLOUD_PROJECT'] = project subscription = 'my-df-sub' # 先程作成したSubscription options = StandardOptions() options.streaming = True # BeamのStreamingを有効化 subscription = 'projects/{PROJECT}/subscriptions/{SUBSCRIPTION}'.format( PROJECT=project, SUBSCRIPTION=subscription) pubsub_reader = ReadFromPubSub(subscription=subscription) with beam.Pipeline(options=options) as p: ( p | 'subscribe' >> pubsub_reader | 'print' >> beam.Map(lambda x: print('Sub -> {}'.format(x))) )
後は最初のGIFのように実行すれば動くはずです。
Cloud Dataflowで実行
Google Colabで作成したコードの、BeamのオプションにGCP実行用のパラメータを以下のように加えて実行するとDataflowでジョブを実行できます。
options = StandardOptions() options.streaming = True # BeamのStreamingを有効化 #↓追加分のoptions options.runner = 'DataflowRunner' gcp_options = options.view_as(GoogleCloudOptions) gcp_options.project = project gcp_options.region = 'asia-northeast1' gcp_options.staging_location = 'gs://YOUR_BUCKET/staging' gcp_options.temp_location = 'gs://YOUR_BUCKET/temp'
beam側もwith句で囲ってしまうと、終了を待ってしまうので、非同期で実行命令だけ出す形に書き換えます。
p = beam.Pipeline(options=options) ( p | 'subscribe' >> pubsub_reader | 'print' >> beam.Map(logging.info) ) p.run()
Cloud Dataflowの画面にて以下のようにジョブが実行されるのを確認できるはずです。
画面下の方にあるStackdriverのリンク飛ぶと、先程と同様にPublishしたデータをSubscribe -> Printしていることが確認できます。
後片付け
後片付けします。特にDataflowのStreamingは動かしっぱなしだと気づかずにお金かかってしまうので気をつけてください。
PubSub削除
$ gcloud pubsub subscriptions delete my-df-sub $ gcloud pubsub topics delete my-df-topic
Cloud Dataflowのジョブ停止
以下のボタンでジョブを停止します。
最後に
Cloud DataflowはApache Beam使いますが、PythonかJavaで開発できます。
Javaの方が機能が多く、Pythonの場合は自分で実装しないといけない部分も多くあります。ただ良い部分もあって開発そのものはとてもやりやすいです。
その一つはGoogle Colaboratoryの存在です。ローカルのJupyterも使っているのですが、Google Colaboratoryで開発しておくと探しやすく、自分の環境も整理しないで良いところが素晴らしいです。