DataflowのWriteToBigQueryでRuntimeValueProviderで渡した日付のパーティション分割テーブルを作る

DataflowでRuntimeValueProviderで渡した値を使って、BigQueryのパーティション分割テーブルを作成しようとしたら、WriteToBigQueryでハマったので書いておきます。

課題

日付のパーティション分割テーブル自体をWriteToBigQueryで作成する場合、以下のように table="mytable$20180201" と指定すれば良いだけなのです。RuntimeValueProviderから 20180201 の部分だけを変数で渡そうとしたら少しハマりました。

beam.io.WriteToBigQuery(
     project=get_gcp_project(),
     dataset=get_dataset_name(),
     table="mytable$20180201",
     schema=get_schema(),
     create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
     write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
     additional_bq_parameters={"timePartitioning": {"type": "DAY"}},
 )

以下のようなオプション作れば --date_key 20180201 と渡すことでRuntimeValueProviderに値を渡せるようになります。

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser)
        parser.add_value_provider_argument(
            "--date_key",
            type=str,
            help="DateKey",
        )

対策

参考URLに張ったコード内に書いてありましたが、lambdaで渡せばいけました。その際に引数を一つ準備してあげる必要があります。

この引数にはelementが入ってきますが、今回はRuntimeValueProviderを使うので引数で受けて捨てています。

my_optoins = options.view_as(MyOptions)
beam.io.WriteToBigQuery(
     project=get_gcp_project(),
     dataset=get_dataset_name(),
     table=lambda e: f"{get_dataset_name()}.{get_target_table_name()}${my_optoins.date_key.get()}",
     schema=get_schema(),
     create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
     write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
     additional_bq_parameters={"timePartitioning": {"type": "DAY"}},
 )

element内の項目使ってパーティション分割する場合は以下のようにします。

my_optoins = options.view_as(MyOptions)
beam.io.WriteToBigQuery(
     project=get_gcp_project(),
     dataset=get_dataset_name(),
     table=lambda e: f"{get_dataset_name()}.{get_target_table_name()}${e['my_date']}",
     schema=get_schema(),
     create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
     write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
     additional_bq_parameters={"timePartitioning": {"type": "DAY"}},
 )

参考URL

https://github.com/apache/beam/blob/v2.25.0/sdks/python/apache_beam/io/gcp/bigquery.py#L1498-L1508