DataflowでRuntimeValueProviderで渡した値を使って、BigQueryのパーティション分割テーブルを作成しようとしたら、WriteToBigQueryでハマったので書いておきます。
課題
日付のパーティション分割テーブル自体をWriteToBigQueryで作成する場合、以下のように table="mytable$20180201"
と指定すれば良いだけなのです。RuntimeValueProviderから 20180201
の部分だけを変数で渡そうとしたら少しハマりました。
beam.io.WriteToBigQuery( project=get_gcp_project(), dataset=get_dataset_name(), table="mytable$20180201", schema=get_schema(), create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_TRUNCATE, additional_bq_parameters={"timePartitioning": {"type": "DAY"}}, )
以下のようなオプション作れば --date_key 20180201
と渡すことでRuntimeValueProviderに値を渡せるようになります。
class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser) parser.add_value_provider_argument( "--date_key", type=str, help="DateKey", )
対策
参考URLに張ったコード内に書いてありましたが、lambdaで渡せばいけました。その際に引数を一つ準備してあげる必要があります。
この引数にはelementが入ってきますが、今回はRuntimeValueProviderを使うので引数で受けて捨てています。
my_optoins = options.view_as(MyOptions) beam.io.WriteToBigQuery( project=get_gcp_project(), dataset=get_dataset_name(), table=lambda e: f"{get_dataset_name()}.{get_target_table_name()}${my_optoins.date_key.get()}", schema=get_schema(), create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_TRUNCATE, additional_bq_parameters={"timePartitioning": {"type": "DAY"}}, )
element内の項目使ってパーティション分割する場合は以下のようにします。
my_optoins = options.view_as(MyOptions) beam.io.WriteToBigQuery( project=get_gcp_project(), dataset=get_dataset_name(), table=lambda e: f"{get_dataset_name()}.{get_target_table_name()}${e['my_date']}", schema=get_schema(), create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_TRUNCATE, additional_bq_parameters={"timePartitioning": {"type": "DAY"}}, )
参考URL
https://github.com/apache/beam/blob/v2.25.0/sdks/python/apache_beam/io/gcp/bigquery.py#L1498-L1508