Cloud ComposerからDataflowTemplateOperatorでwordcountジョブを実行する

Cloud Composer(Airflow)からDataflowTemplateOperatorの使い方がわからなかったので調べました。

Dataflowテンプレート登録

DataflowTemplateOperatorは名前の如くDataflowのテンプレートという、ジョブの実行情報の定義体みたいなものを指定してジョブを実行します。

以下の公式ドキュメントを参考にDataflowのテンプレートを作成し、GCSにアップします。

https://cloud.google.com/dataflow/docs/guides/templates/creating-templates?hl=ja

コード作成

ここでは例として、以下のwordcountのジョブを参考にしてテンプレート作っています。

beam/wordcount.py at v2.16.0 · apache/beam · GitHub

作成したもはこちらです。今回はDataflowで使うバケットは分かりやすいように、共通で一つのバケット使っています。

from __future__ import absolute_import

import re
import argparse

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions, StandardOptions


class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""
    def __init__(self):
        beam.DoFn.__init__(self)
        self.words_counter = Metrics.counter(self.__class__, 'words')
        self.word_lengths_counter = Metrics.counter(self.__class__,
                                                    'word_lengths')
        self.word_lengths_dist = Metrics.distribution(self.__class__,
                                                      'word_len_dist')
        self.empty_line_counter = Metrics.counter(self.__class__,
                                                  'empty_lines')

    def process(self, element):
        text_line = element.strip()
        if not text_line:
            self.empty_line_counter.inc(1)
        words = re.findall(r'[\w\']+', text_line, re.UNICODE)
        for w in words:
            self.words_counter.inc()
            self.word_lengths_counter.inc(len(w))
            self.word_lengths_dist.update(len(w))
        return words


# PipelineOptionsを継承したCustomOptionsはこの辺りを参考に
# https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py
class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            default='gs://dataflow-samples/shakespeare/kinglear.txt',
            help='Input file to process.')
        parser.add_value_provider_argument(
            '--output', help='Output file to write results to.')


def count_ones(word_ones):
    (word, ones) = word_ones
    return (word, sum(ones))


def format_result(word_count):
    (word, count) = word_count
    return '%s: %d' % (word, count)


if __name__ == '__main__':
    import sys
    parser = argparse.ArgumentParser()
    parser.add_argument('--project')
    parser.add_argument('--region')
    parser.add_argument('--base-bucket')
    known_args, _ = parser.parse_known_args(sys.argv)

    options = CustomOptions()
    options.view_as(StandardOptions).runner = 'DataflowRunner'
    gcp_options = options.view_as(GoogleCloudOptions)
    gcp_options.project = known_args.project
    gcp_options.region = known_args.region
    gcp_options.template_location = "gs://{}/dataflow/templates/wordcount".format(
        known_args.base_bucket)
    gcp_options.temp_location = "gs://{}/temp".format(known_args.base_bucket)
    gcp_options.staging_location = "gs://{}/dataflow/staging".format(
        known_args.base_bucket)

    p = beam.Pipeline(options=options)

    lines = p | 'read' >> ReadFromText(options.input)

    counts = (lines
              | 'split' >>
              (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))

    output = counts | 'format' >> beam.Map(format_result)
    output | 'write' >> WriteToText(options.output)

    result = p.run()
    result.wait_until_finish()

コンパイル+アップロード

上記のコードのこの部分で指定された通り、テンプレートがアップロードされます。

    gcp_options.template_location = "gs://{}/templates/wordcount".format(
        known_args.base_bucket)

今回利用するBucketを作成します。

$ gsutil mb -l asia-northeast1 gs://my-df-bucket
Creating gs://my-df-bucket/...

必要なパッケージをインストールします。

$ pip install apache-beam[gcp]

上記のスクリプトを実行します。

$ python -m wordcount --project your-project-name --region asia-northeast1 --base-bucket my-df-bucket

このようにテンプレートがアップロードされます。

f:id:yomon8:20191214171534p:plain

ここでは載せないですが、中身見ると大きなJSONの定義だということがわかります。

$ gsutil cat gs://my-df-bucket/dataflow/templates/wordcount | jq

Cloud ComposerのDAG作成

DAGから先程作成したテンプレート呼び出します。

DAG定義スクリプト作成

定義はシンプルにしました。projectregion は調整してください。

import airflow
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator

yesterday = airflow.utils.dates.days_ago(1)

default_args = {
    'owner': 'yusuke.otomo',
    'start_date': yesterday,
    'dataflow_default_options': {
        "project": "your-project",
        "region": "asia-northeast1",
        "stagingLocation":
        'gs://{{ var.value.df_base_bucket }}/airflow/staging/',
        "tempLocation": 'gs://{{ var.value.df_base_bucket }}/airflow/temp/',
    }
}

dag = DAG('my-job-wordcount', default_args=default_args)

df_task = DataflowTemplateOperator(
    task_id='my-wordcount',
    template='gs://{{ var.value.df_base_bucket }}/dataflow/templates/wordcount',
    parameters={
        'output':
        "gs://{{ var.value.df_base_bucket }}/airflow/output/my_output.txt"
    },
    gcp_conn_id='google_cloud_default',
    dag=dag)

AirflowのVariables設定

上記で {{ var.value.df_base_bucket }} のように書いてあるところはマクロでAirflow側に定義した変数を呼び出しています。

airflow.apache.org

変数は以下のメニューから定義できます。

DAGファイルのインポート

以下のコマンドで上記のDAGをCloud Composerにインポートします。

$ gcloud composer environments storage dags import \
  --environment=${YOUR_COMPOSER_ENV} \
  --location asia-northeast1 \
  --source=./dags/dag_wordcount.py 

ジョブが開始されます。 f:id:yomon8:20191214175614p:plain

f:id:yomon8:20191214172016p:plain

f:id:yomon8:20191214172040p:plain

ジョブが完了したら、outputに指定したファイルを確認してみます。

$ gsutil cat gs://my-df-bucket/airflow/output/my_output* | head
gap: 1
appetite: 2
change: 5
bending: 2
advised: 1
moulds: 1
sequent: 1
sounded: 2
knight: 1
gentlewoman: 1

参考URL

Cloud Dataflow(Python)をAppEngineから実行する - Qiita

airflow/dataflow_operator.py at 1.10.3 · apache/airflow · GitHub