GCP Cloud ComposerでBigQueryのテーブルを操作するワークフローを作る手順

GCP Cloud Composerの動きを一通り確認するために以下のAirflow向けのを参考にワークフロー作成しました。内容をシンプルにするためいくらか修正しています。

cloud.google.com

やること

BigQueryでGH Archiveに入っているデータを集計します。参考にしている記事の内容よりシンプルにしたため、3本のSQLで以下を実施します。

f:id:yomon8:20191020191108p:plain

最終的には以下のように、リポジトリ毎の1週間分のStarとForkが確認できます。

f:id:yomon8:20191020190020p:plain

Cloud Composer作成

まずはCloud Composerの環境を作成します。以下の変数はテスト用の環境なので最小限の構成としています。

最低限 PROJECT NETWORK SUBNETWORK を変更して実行します。構築には10分以上かかるので待ちます。

PROJECT=YOUR_PROJECT_NAME
ENVIRONMENT_NAME=yo-composer-test
NODE_COUNT=3
LOCATION=asia-northeast1
ZONE=asia-northeast1-a
DISK_SIZE=50GB
MACHINE_TYPE=n1-standard-1
IMAGE_VERSION=composer-latest-airflow-1.10.2
PYTHON_VERSION=3
NETWORK=YOUR_NETWORK
SUBNETWORK=YOUR_SUBNET

gcloud composer environments create ${ENVIRONMENT_NAME} \
  --node-count=${NODE_COUNT} \
  --location=${LOCATION} \
  --zone=${ZONE} \
  --disk-size=${DISK_SIZE} \
  --machine-type=${MACHINE_TYPE} \
  --image-version=${IMAGE_VERSION} \
  --python-version=${PYTHON_VERSION} \
  --network=${NETWORK} \
  --subnetwork=${SUBNETWORK} \
  --project=${PROJECT} \
  --async

BigQueryデータセット・テーブル作成

待っている間にBigQueryのデータセットとテーブルを作成しておきます。

PROJECT=YOUR_PROJECT_NAME
DATASET=github_trends
bq --project_id ${PROJECT} mk ${DATASET}
bq --project_id ${PROJECT} mk --time_partitioning_type=DAY ${DATASET}.github_daily_metrics
bq --project_id ${PROJECT} mk --time_partitioning_type=DAY ${DATASET}.github_agg

当然、この時点では何もデータ入っていません。

$ bq --project ${PROJECT} show github_trends.github_daily_metrics
Table YOUR_PROJECT_NAME:github_trends.github_daily_metrics
   Last modified    Schema   Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Labels
 ----------------- -------- ------------ ------------- ------------ ------------------- ------------------ --------
  20 Oct 16:51:49            0            0                          DAY
$ bq --project ${PROJECT} show github_trends.github_agg
Table YOUR_PROJECT_NAME:github_trends.github_agg
   Last modified    Schema   Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Labels
 ----------------- -------- ------------ ------------- ------------ ------------------- ------------------ -------- 
  20 Oct 17:36:01            0            0                          DAY

AirflowのDAG定義を作成

Airflowの定義を作成します。以下を github_trends.py という名前で保存します。

以下の部分だけは自身のプロジェクトIDに変更してください。 PROJECT_ID = "YOUR_PROJECT_NAME"

ちなみに {{ yesterday_ds_nodash }} みたいなのは、Airflowのマクロになります。

airflow.macros — Airflow Documentation

from datetime import timedelta, datetime

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator

PROJECT_ID = "YOUR_PROJECT_NAME"

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime.today() - timedelta(days=8),
    'retries': 5,
    'retry_delay': timedelta(minutes=5),
}

schedule_interval = "00 21 * * *"

dag = DAG('github_trends', default_args=default_args,
          schedule_interval=schedule_interval)

# Task 1
t1 = BigQueryCheckOperator(
    task_id='t1_bq_check_githubarchive_day',
    use_legacy_sql=False,
    sql='''
    SELECT table_id 
    FROM `githubarchive.day.__TABLES__`
    WHERE table_id = "{{ yesterday_ds_nodash }}"
    ''',
    dag=dag)

# Task 2
t2 = BigQueryOperator(
    task_id='t2_bq_write_to_github_daily_metrics',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    SELECT
      date,
      repo,
      SUM(IF(type='WatchEvent', 1, NULL)) AS stars,
      SUM(IF(type='ForkEvent',  1, NULL)) AS forks
    FROM (
      SELECT
        FORMAT_TIMESTAMP("%Y%m%d", created_at) AS date,
        actor.id as actor_id,
        repo.name as repo,
        type
      FROM
        `githubarchive.day.{{ yesterday_ds_nodash }}`
      WHERE type IN ('WatchEvent','ForkEvent')
    )
    GROUP BY
      date,
      repo
    ''',
    destination_dataset_table='_PROJECT_ID_.github_trends.github_daily_metrics${{ yesterday_ds_nodash }}'.replace(
        '_PROJECT_ID_', PROJECT_ID),
    dag=dag)


# Task 3
t3 = BigQueryOperator(
    task_id='t3_bq_write_to_github_agg',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    #standardSQL
    SELECT
      "{{ yesterday_ds_nodash }}" as date,
      repo,
      SUM(stars) as stars_last_7_days,
      SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}") 
        AND TIMESTAMP("{{ yesterday_ds }}") , 
        stars, null)) as stars_last_1_day,
      SUM(forks) as forks_last_7_days,
      SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}") 
        AND TIMESTAMP("{{ yesterday_ds }}") , 
        forks, null)) as forks_last_1_day
    FROM
      `_PROJECT_ID_.github_trends.github_daily_metrics`
    WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -6) }}") 
    AND TIMESTAMP("{{ yesterday_ds }}") 
    GROUP BY
      date,
      repo
    '''.replace('_PROJECT_ID_', PROJECT_ID),
    destination_dataset_table='_PROJECT_ID_.github_trends.github_agg${{ yesterday_ds_nodash }}'.replace(
        '_PROJECT_ID_', PROJECT_ID),
    dag=dag)

# タスクの依存関係を設定
t2.set_upstream(t1)
t3.set_upstream(t2)

Airflowの定義をCloud Composerにアップロード

上記で作成したPythonファイルをアップロードします

gcloud composer environments storage dags import \
  --environment=${ENVIRONMENT_NAME} \
  --location=${LOCATION} --source=./github_trends.py

アップロードできていることを確認します

gcloud composer environments storage dags list \
    --environment=${ENVIRONMENT_NAME} --location=${LOCATION} 
NAME
dags/
dags/airflow_monitoring.py
dags/github_trends.py

実際にはGCSにdagsフォルダができていて、そこにアップロードされています。

f:id:yomon8:20191020192510p:plain

Cloud ComposerのDAGの実行状況確認

アップロード後しばらくすると、Cloud Composerの画面から飛べる、Airflowの画面にアップロードしたDAGが表示されます。

f:id:yomon8:20191020194114p:plain

f:id:yomon8:20191020193101p:plain

クリックすると、徐々にタスクが実行されていく様子がわかります。

f:id:yomon8:20191020193123p:plain

全部のタスクが完了しました。 f:id:yomon8:20191020193135p:plain

色々な情報を見ることができます。

f:id:yomon8:20191020193218p:plain

最後にクエリ打ってみて問題無さそうです。

ちなみに 'start_date': datetime.today() - timedelta(days=8), としてあるので、8日前のデータからしか集計していないので、直近の日付以外は stars_last_7_daysforks_last_7_days はデータが足りておらず正しい数字ではありません。

f:id:yomon8:20191020193201p:plain