Azure Table StorageのデータをCloud Dataflow (Apache Beam)から扱ってみたのでメモ。
対象のAzure Table Storage
対象としたTable Storageの中身です。mytable
という名前にしました。このデータをDataflowを使ってBigQueryに挿入してみます。
準備
データセットだけ作っておきます。
$ bq mk my_dataset
Azure Table Storageをクエリするのに、 azure-cosmosdb-table
という3rd Partyモジュールが必要なのでsetup.pyを以下のように作成しておきます。
cat <<EOF > setup.py import setuptools setuptools.setup( name='azstoragetable2bigquery', version='0.0.1', install_requires=['azure-cosmosdb-table'], packages=setuptools.find_packages(), ) EOF
コード
Dataflowのコードを書きます。
cat <<EOF > azure2bq.py import argparse import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from azure.cosmosdb.table.tableservice import TableService #----------------------------------- # 変数 #----------------------------------- PROJECT_NAME = "プロジェクト名" JOB_NAME = "azure-test" REGION = 'asia-northeast1' BQ_DATASET = 'my_dataset' BQ_TABLE = 'my_table' AZ_SOURCE_TABLE = "mytable" AZ_TABLE_QUERY = "PartitionKey eq 'pkey1'" #クエリは適当に AZ_CONNECTION_STRING = "DefaultEndpointsProtocol=省略" def read_azure_storage_my_table(filter_query): table_service = TableService(connection_string=AZ_CONNECTION_STRING) return (r for r in table_service.query_entities(AZ_SOURCE_TABLE,filter=filter_query)) class MyTableEntity(beam.DoFn): schema = 'Timestamp:TIMESTAMP, PartitionKey:STRING, RowKey:STRING, Id:STRING, Value:STRING' def process(self, element): return [{ 'Timestamp': str(element.Timestamp), 'PartitionKey': str(element.PartitionKey), 'RowKey': str(element.RowKey), 'Id': str(element.Id), 'Value': str(element.Value), }] def run(argv=None, save_main_session=True): parser = argparse.ArgumentParser() known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session pipeline_options.view_as(SetupOptions).setup_file = './setup.py' pipeline_options.view_as(GoogleCloudOptions).project = PROJECT_NAME pipeline_options.view_as(GoogleCloudOptions).job_name = JOB_NAME pipeline_options.view_as(GoogleCloudOptions).region = REGION bq_table_spec = '{}:{}.{}'.format(PROJECT_NAME,BQ_DATASET,BQ_TABLE) data = read_azure_storage_my_table(AZ_TABLE_QUERY) p = beam.Pipeline(options=pipeline_options) ( p | 'Read MyTable' >> beam.Create(data) | 'Parse MyTableEntity' >> beam.ParDo(MyTableEntity()) | 'Write to BigQuery' >> beam.io.WriteToBigQuery(bq_table_spec,schema=MyTableEntity.schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) ) result = p.run() result.wait_until_finish() if __name__ == '__main__': run() EOF
ローカル実行
まずはローカルPCで実行してみます。
$ python azure2bq.py
しっかりBigQuery側にも入っています。
$ bq query "select * from my_dataset.my_table" Waiting on bqjob_r15525cdf33edf5b9_0000016edb850da9_1 ... (0s) Current status: DONE +---------------------+--------------+--------+----+-------+ | Timestamp | PartitionKey | RowKey | Id | Value | +---------------------+--------------+--------+----+-------+ | 2019-12-05 13:16:26 | pkey1 | rkey1 | a | 10 | | 2019-12-05 13:18:04 | pkey1 | rkey2 | b | 20 | | 2019-12-05 13:18:33 | pkey1 | rkey3 | c | 30 | +---------------------+--------------+--------+----+-------+
次はDataflowから実行してみたいので、一旦テーブルを削除します。
$ bq rm my_dataset.my_table
Dataflowで実行
$ python azure2bq.py --runner DataflowRunner \ --temp_location gs://your-bucket \ --staging_location gs://your-bucket
setup.pyに記載しておいた外部モジュールがインストールされてからジョブが実行されます。
Dataflow上でもジョブの状況が確認できます。
ちゃんとデータ入っているようです。
$ bq query "select * from my_dataset.my_table" Waiting on bqjob_r6b0966095b54cb7_0000016edb8e9ccb_1 ... (0s) Current status: DONE +---------------------+--------------+--------+----+-------+ | Timestamp | PartitionKey | RowKey | Id | Value | +---------------------+--------------+--------+----+-------+ | 2019-12-05 13:16:26 | pkey1 | rkey1 | a | 10 | | 2019-12-05 13:18:33 | pkey1 | rkey3 | c | 30 | | 2019-12-05 13:18:04 | pkey1 | rkey2 | b | 20 | +---------------------+--------------+--------+----+-------+