Python+Cloud DataflowのPubSubストリーミングをGoogle Colaboratory使って試す

やりたいことのGIF動画です。こちら実際にBeamのストリーミングで、PubSubと同期とっているところです。

f:id:yomon8:20200111212356g:plain

ざっくり構成図はこんな感じです。

f:id:yomon8:20200111214100p:plain

ここから簡単に手順を書いてみます。

Google ColabでGCP使うには

GCPの認証

以下のコマンドを実行するとURLが発行されるので、それでブラウザから認証するだけです。本当に使いやすいです。

from google.colab import auth
auth.authenticate_user()

PyPiモジュールのインストール

Google Colaboratoryの良いところとして、Pythonモジュールは当然のこと、apt使ってソフトウェアインストールしたり、OS側の操作もできることです。

以下のように ! マークつけてコマンド実行します。

!pip install --quiet google-cloud-pubsub

しかも、その環境構築の手順もノートとして残るので、後で忘れた頃にとても重宝します。

PubSub側準備

以下のTopicとSubscription作ります。これを後で使います。

$ gcloud pubsub topics create my-df-topic
$ gcloud pubsub subscriptions create --topic my-df-topic my-df-sub

左のブラウザ(Publish側)

PubSubにメッセージPublishするためのコード書きます。PubSubのクライアントモジュールだけインストールしておきます。

!pip install --quiet google-cloud-pubsub

0.5秒毎にPubSubのTopicにPublishするだけのコードです。 project 変数だけ自身のプロジェクト名に置き換えます。

import time
from datetime import datetime
from google.cloud import pubsub_v1

project = 'YOUR_PROJECT'
topic = 'my-df-topic'

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic)

for i in range(100): 
  data = str({
    'num': i,
    'now': datetime.now()
  }) 
  future = publisher.publish(topic_path, data=data.encode("utf-8")) 
  print('Pub ->{}'.format(data))
  time.sleep(0.5)

右のブラウザ(BeamでSubscribe側)

Google ColabにPubSubに加えて、BeamとBeamのGCPプラグインをインストールします。

!pip install --quiet google-cloud-pubsub
!pip install --quiet apache-beam 
!pip install --quiet apache-beam[gcp]

こちらもコードはとてもシンプルです。Subscribeしたデータを出力するだけのコードです。

import os

import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub

project = 'YOUR_PROJECT_NAME'
os.environ['GOOGLE_CLOUD_PROJECT'] = project
subscription = 'my-df-sub'  # 先程作成したSubscription

options = StandardOptions()
options.streaming = True  # BeamのStreamingを有効化

subscription = 'projects/{PROJECT}/subscriptions/{SUBSCRIPTION}'.format(
    PROJECT=project, SUBSCRIPTION=subscription)

pubsub_reader = ReadFromPubSub(subscription=subscription)

with beam.Pipeline(options=options) as p:
  (
      p 
      | 'subscribe' >> pubsub_reader
      | 'print' >> beam.Map(lambda x: print('Sub -> {}'.format(x)))
   )

後は最初のGIFのように実行すれば動くはずです。

Cloud Dataflowで実行

Google Colabで作成したコードの、BeamのオプションにGCP実行用のパラメータを以下のように加えて実行するとDataflowでジョブを実行できます。

options = StandardOptions()
options.streaming = True     # BeamのStreamingを有効化

#↓追加分のoptions

options.runner = 'DataflowRunner'
gcp_options = options.view_as(GoogleCloudOptions)
gcp_options.project = project
gcp_options.region = 'asia-northeast1'
gcp_options.staging_location = 'gs://YOUR_BUCKET/staging'
gcp_options.temp_location = 'gs://YOUR_BUCKET/temp'

beam側もwith句で囲ってしまうと、終了を待ってしまうので、非同期で実行命令だけ出す形に書き換えます。

p = beam.Pipeline(options=options)
(
  p 
 | 'subscribe' >> pubsub_reader 
 | 'print' >> beam.Map(logging.info)
)
p.run()

Cloud Dataflowの画面にて以下のようにジョブが実行されるのを確認できるはずです。

f:id:yomon8:20200111220025p:plain

画面下の方にあるStackdriverのリンク飛ぶと、先程と同様にPublishしたデータをSubscribe -> Printしていることが確認できます。

f:id:yomon8:20200111215958p:plain

後片付け

後片付けします。特にDataflowのStreamingは動かしっぱなしだと気づかずにお金かかってしまうので気をつけてください。

PubSub削除

$ gcloud pubsub subscriptions delete my-df-sub
$ gcloud pubsub topics delete my-df-topic

Cloud Dataflowのジョブ停止

以下のボタンでジョブを停止します。

f:id:yomon8:20200111215847p:plain

最後に

Cloud DataflowはApache Beam使いますが、PythonかJavaで開発できます。

Javaの方が機能が多く、Pythonの場合は自分で実装しないといけない部分も多くあります。ただ良い部分もあって開発そのものはとてもやりやすいです。

その一つはGoogle Colaboratoryの存在です。ローカルのJupyterも使っているのですが、Google Colaboratoryで開発しておくと探しやすく、自分の環境も整理しないで良いところが素晴らしいです。