Cloud Composer(Airflow)からDataflowTemplateOperatorの使い方がわからなかったので調べました。
Dataflowテンプレート登録
DataflowTemplateOperatorは名前の如くDataflowのテンプレートという、ジョブの実行情報の定義体みたいなものを指定してジョブを実行します。
以下の公式ドキュメントを参考にDataflowのテンプレートを作成し、GCSにアップします。
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates?hl=ja
コード作成
ここでは例として、以下のwordcountのジョブを参考にしてテンプレート作っています。
beam/wordcount.py at v2.16.0 · apache/beam · GitHub
作成したもはこちらです。今回はDataflowで使うバケットは分かりやすいように、共通で一つのバケット使っています。
from __future__ import absolute_import import re import argparse from past.builtins import unicode import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.metrics import Metrics from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions, StandardOptions class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" def __init__(self): beam.DoFn.__init__(self) self.words_counter = Metrics.counter(self.__class__, 'words') self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths') self.word_lengths_dist = Metrics.distribution(self.__class__, 'word_len_dist') self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines') def process(self, element): text_line = element.strip() if not text_line: self.empty_line_counter.inc(1) words = re.findall(r'[\w\']+', text_line, re.UNICODE) for w in words: self.words_counter.inc() self.word_lengths_counter.inc(len(w)) self.word_lengths_dist.update(len(w)) return words # PipelineOptionsを継承したCustomOptionsはこの辺りを参考に # https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py class CustomOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Input file to process.') parser.add_value_provider_argument( '--output', help='Output file to write results to.') def count_ones(word_ones): (word, ones) = word_ones return (word, sum(ones)) def format_result(word_count): (word, count) = word_count return '%s: %d' % (word, count) if __name__ == '__main__': import sys parser = argparse.ArgumentParser() parser.add_argument('--project') parser.add_argument('--region') parser.add_argument('--base-bucket') known_args, _ = parser.parse_known_args(sys.argv) options = CustomOptions() options.view_as(StandardOptions).runner = 'DataflowRunner' gcp_options = options.view_as(GoogleCloudOptions) gcp_options.project = known_args.project gcp_options.region = known_args.region gcp_options.template_location = "gs://{}/dataflow/templates/wordcount".format( known_args.base_bucket) gcp_options.temp_location = "gs://{}/temp".format(known_args.base_bucket) gcp_options.staging_location = "gs://{}/dataflow/staging".format( known_args.base_bucket) p = beam.Pipeline(options=options) lines = p | 'read' >> ReadFromText(options.input) counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(count_ones)) output = counts | 'format' >> beam.Map(format_result) output | 'write' >> WriteToText(options.output) result = p.run() result.wait_until_finish()
コンパイル+アップロード
上記のコードのこの部分で指定された通り、テンプレートがアップロードされます。
gcp_options.template_location = "gs://{}/templates/wordcount".format(
known_args.base_bucket)
今回利用するBucketを作成します。
$ gsutil mb -l asia-northeast1 gs://my-df-bucket
Creating gs://my-df-bucket/...
必要なパッケージをインストールします。
$ pip install apache-beam[gcp]
上記のスクリプトを実行します。
$ python -m wordcount --project your-project-name --region asia-northeast1 --base-bucket my-df-bucket
このようにテンプレートがアップロードされます。
ここでは載せないですが、中身見ると大きなJSONの定義だということがわかります。
$ gsutil cat gs://my-df-bucket/dataflow/templates/wordcount | jq
Cloud ComposerのDAG作成
DAGから先程作成したテンプレート呼び出します。
DAG定義スクリプト作成
定義はシンプルにしました。project
や region
は調整してください。
import airflow from airflow import DAG from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator yesterday = airflow.utils.dates.days_ago(1) default_args = { 'owner': 'yusuke.otomo', 'start_date': yesterday, 'dataflow_default_options': { "project": "your-project", "region": "asia-northeast1", "stagingLocation": 'gs://{{ var.value.df_base_bucket }}/airflow/staging/', "tempLocation": 'gs://{{ var.value.df_base_bucket }}/airflow/temp/', } } dag = DAG('my-job-wordcount', default_args=default_args) df_task = DataflowTemplateOperator( task_id='my-wordcount', template='gs://{{ var.value.df_base_bucket }}/dataflow/templates/wordcount', parameters={ 'output': "gs://{{ var.value.df_base_bucket }}/airflow/output/my_output.txt" }, gcp_conn_id='google_cloud_default', dag=dag)
AirflowのVariables設定
上記で {{ var.value.df_base_bucket }}
のように書いてあるところはマクロでAirflow側に定義した変数を呼び出しています。
変数は以下のメニューから定義できます。
DAGファイルのインポート
以下のコマンドで上記のDAGをCloud Composerにインポートします。
$ gcloud composer environments storage dags import \ --environment=${YOUR_COMPOSER_ENV} \ --location asia-northeast1 \ --source=./dags/dag_wordcount.py
ジョブが開始されます。
ジョブが完了したら、outputに指定したファイルを確認してみます。
$ gsutil cat gs://my-df-bucket/airflow/output/my_output* | head gap: 1 appetite: 2 change: 5 bending: 2 advised: 1 moulds: 1 sequent: 1 sounded: 2 knight: 1 gentlewoman: 1
参考URL
Cloud Dataflow(Python)をAppEngineから実行する - Qiita
airflow/dataflow_operator.py at 1.10.3 · apache/airflow · GitHub