GCP Cloud Composerの動きを一通り確認するために以下のAirflow向けのを参考にワークフロー作成しました。内容をシンプルにするためいくらか修正しています。
- やること
- Cloud Composer作成
- BigQueryデータセット・テーブル作成
- AirflowのDAG定義を作成
- Airflowの定義をCloud Composerにアップロード
- Cloud ComposerのDAGの実行状況確認
やること
BigQueryでGH Archiveに入っているデータを集計します。参考にしている記事の内容よりシンプルにしたため、3本のSQLで以下を実施します。
最終的には以下のように、リポジトリ毎の1週間分のStarとForkが確認できます。
Cloud Composer作成
まずはCloud Composerの環境を作成します。以下の変数はテスト用の環境なので最小限の構成としています。
最低限 PROJECT
NETWORK
SUBNETWORK
を変更して実行します。構築には10分以上かかるので待ちます。
PROJECT=YOUR_PROJECT_NAME ENVIRONMENT_NAME=yo-composer-test NODE_COUNT=3 LOCATION=asia-northeast1 ZONE=asia-northeast1-a DISK_SIZE=50GB MACHINE_TYPE=n1-standard-1 IMAGE_VERSION=composer-latest-airflow-1.10.2 PYTHON_VERSION=3 NETWORK=YOUR_NETWORK SUBNETWORK=YOUR_SUBNET gcloud composer environments create ${ENVIRONMENT_NAME} \ --node-count=${NODE_COUNT} \ --location=${LOCATION} \ --zone=${ZONE} \ --disk-size=${DISK_SIZE} \ --machine-type=${MACHINE_TYPE} \ --image-version=${IMAGE_VERSION} \ --python-version=${PYTHON_VERSION} \ --network=${NETWORK} \ --subnetwork=${SUBNETWORK} \ --project=${PROJECT} \ --async
BigQueryデータセット・テーブル作成
待っている間にBigQueryのデータセットとテーブルを作成しておきます。
PROJECT=YOUR_PROJECT_NAME DATASET=github_trends bq --project_id ${PROJECT} mk ${DATASET} bq --project_id ${PROJECT} mk --time_partitioning_type=DAY ${DATASET}.github_daily_metrics bq --project_id ${PROJECT} mk --time_partitioning_type=DAY ${DATASET}.github_agg
当然、この時点では何もデータ入っていません。
$ bq --project ${PROJECT} show github_trends.github_daily_metrics Table YOUR_PROJECT_NAME:github_trends.github_daily_metrics Last modified Schema Total Rows Total Bytes Expiration Time Partitioning Clustered Fields Labels ----------------- -------- ------------ ------------- ------------ ------------------- ------------------ -------- 20 Oct 16:51:49 0 0 DAY
$ bq --project ${PROJECT} show github_trends.github_agg Table YOUR_PROJECT_NAME:github_trends.github_agg Last modified Schema Total Rows Total Bytes Expiration Time Partitioning Clustered Fields Labels ----------------- -------- ------------ ------------- ------------ ------------------- ------------------ -------- 20 Oct 17:36:01 0 0 DAY
AirflowのDAG定義を作成
Airflowの定義を作成します。以下を github_trends.py
という名前で保存します。
以下の部分だけは自身のプロジェクトIDに変更してください。
PROJECT_ID = "YOUR_PROJECT_NAME"
ちなみに {{ yesterday_ds_nodash }}
みたいなのは、Airflowのマクロになります。
airflow.macros — Airflow Documentation
from datetime import timedelta, datetime from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator PROJECT_ID = "YOUR_PROJECT_NAME" default_args = { 'owner': 'airflow', 'depends_on_past': True, 'start_date': datetime.today() - timedelta(days=8), 'retries': 5, 'retry_delay': timedelta(minutes=5), } schedule_interval = "00 21 * * *" dag = DAG('github_trends', default_args=default_args, schedule_interval=schedule_interval) # Task 1 t1 = BigQueryCheckOperator( task_id='t1_bq_check_githubarchive_day', use_legacy_sql=False, sql=''' SELECT table_id FROM `githubarchive.day.__TABLES__` WHERE table_id = "{{ yesterday_ds_nodash }}" ''', dag=dag) # Task 2 t2 = BigQueryOperator( task_id='t2_bq_write_to_github_daily_metrics', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', allow_large_results=True, bql=''' SELECT date, repo, SUM(IF(type='WatchEvent', 1, NULL)) AS stars, SUM(IF(type='ForkEvent', 1, NULL)) AS forks FROM ( SELECT FORMAT_TIMESTAMP("%Y%m%d", created_at) AS date, actor.id as actor_id, repo.name as repo, type FROM `githubarchive.day.{{ yesterday_ds_nodash }}` WHERE type IN ('WatchEvent','ForkEvent') ) GROUP BY date, repo ''', destination_dataset_table='_PROJECT_ID_.github_trends.github_daily_metrics${{ yesterday_ds_nodash }}'.replace( '_PROJECT_ID_', PROJECT_ID), dag=dag) # Task 3 t3 = BigQueryOperator( task_id='t3_bq_write_to_github_agg', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', allow_large_results=True, bql=''' #standardSQL SELECT "{{ yesterday_ds_nodash }}" as date, repo, SUM(stars) as stars_last_7_days, SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}") AND TIMESTAMP("{{ yesterday_ds }}") , stars, null)) as stars_last_1_day, SUM(forks) as forks_last_7_days, SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}") AND TIMESTAMP("{{ yesterday_ds }}") , forks, null)) as forks_last_1_day FROM `_PROJECT_ID_.github_trends.github_daily_metrics` WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -6) }}") AND TIMESTAMP("{{ yesterday_ds }}") GROUP BY date, repo '''.replace('_PROJECT_ID_', PROJECT_ID), destination_dataset_table='_PROJECT_ID_.github_trends.github_agg${{ yesterday_ds_nodash }}'.replace( '_PROJECT_ID_', PROJECT_ID), dag=dag) # タスクの依存関係を設定 t2.set_upstream(t1) t3.set_upstream(t2)
Airflowの定義をCloud Composerにアップロード
上記で作成したPythonファイルをアップロードします
gcloud composer environments storage dags import \ --environment=${ENVIRONMENT_NAME} \ --location=${LOCATION} --source=./github_trends.py
アップロードできていることを確認します
gcloud composer environments storage dags list \ --environment=${ENVIRONMENT_NAME} --location=${LOCATION} NAME dags/ dags/airflow_monitoring.py dags/github_trends.py
実際にはGCSにdagsフォルダができていて、そこにアップロードされています。
Cloud ComposerのDAGの実行状況確認
アップロード後しばらくすると、Cloud Composerの画面から飛べる、Airflowの画面にアップロードしたDAGが表示されます。
クリックすると、徐々にタスクが実行されていく様子がわかります。
全部のタスクが完了しました。
色々な情報を見ることができます。
最後にクエリ打ってみて問題無さそうです。
ちなみに 'start_date': datetime.today() - timedelta(days=8),
としてあるので、8日前のデータからしか集計していないので、直近の日付以外は stars_last_7_days
と forks_last_7_days
はデータが足りておらず正しい数字ではありません。