Azure Table StorageのデータをCloud Dataflowを使ってBigQueryに挿入する

Azure Table StorageのデータをCloud Dataflow (Apache Beam)から扱ってみたのでメモ。

対象のAzure Table Storage

対象としたTable Storageの中身です。mytable という名前にしました。このデータをDataflowを使ってBigQueryに挿入してみます。

f:id:yomon8:20191205221950p:plain

準備

データセットだけ作っておきます。

$ bq mk my_dataset

Azure Table Storageをクエリするのに、 azure-cosmosdb-tableという3rd Partyモジュールが必要なのでsetup.pyを以下のように作成しておきます。

cat <<EOF > setup.py
import setuptools

setuptools.setup(
    name='azstoragetable2bigquery',
    version='0.0.1',
    install_requires=['azure-cosmosdb-table'],
    packages=setuptools.find_packages(),
 )
EOF

コード

Dataflowのコードを書きます。

cat <<EOF > azure2bq.py
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

from azure.cosmosdb.table.tableservice import TableService

#-----------------------------------
# 変数
#-----------------------------------
PROJECT_NAME = "プロジェクト名"
JOB_NAME = "azure-test"
REGION = 'asia-northeast1'
BQ_DATASET = 'my_dataset'
BQ_TABLE = 'my_table'

AZ_SOURCE_TABLE = "mytable"
AZ_TABLE_QUERY =  "PartitionKey eq 'pkey1'" #クエリは適当に
AZ_CONNECTION_STRING = "DefaultEndpointsProtocol=省略"




def read_azure_storage_my_table(filter_query):
  table_service = TableService(connection_string=AZ_CONNECTION_STRING)
  return (r for r in table_service.query_entities(AZ_SOURCE_TABLE,filter=filter_query))

class MyTableEntity(beam.DoFn):
    schema = 'Timestamp:TIMESTAMP, PartitionKey:STRING, RowKey:STRING, Id:STRING, Value:STRING'
    def process(self, element):
        return [{
            'Timestamp': str(element.Timestamp),
            'PartitionKey': str(element.PartitionKey),
            'RowKey': str(element.RowKey),
            'Id': str(element.Id),
            'Value': str(element.Value),
        }]

def run(argv=None, save_main_session=True):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  pipeline_options.view_as(SetupOptions).setup_file = './setup.py'
  pipeline_options.view_as(GoogleCloudOptions).project = PROJECT_NAME
  pipeline_options.view_as(GoogleCloudOptions).job_name = JOB_NAME
  pipeline_options.view_as(GoogleCloudOptions).region = REGION

  bq_table_spec = '{}:{}.{}'.format(PROJECT_NAME,BQ_DATASET,BQ_TABLE)

  data = read_azure_storage_my_table(AZ_TABLE_QUERY)

  p = beam.Pipeline(options=pipeline_options)
  (
    p
     | 'Read MyTable' >> beam.Create(data)
     | 'Parse MyTableEntity' >> beam.ParDo(MyTableEntity()) 
     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(bq_table_spec,schema=MyTableEntity.schema, 
                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  )

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  run()
EOF

ローカル実行

まずはローカルPCで実行してみます。

$ python azure2bq.py

しっかりBigQuery側にも入っています。

$ bq query "select * from my_dataset.my_table"
Waiting on bqjob_r15525cdf33edf5b9_0000016edb850da9_1 ... (0s) Current status: DONE   
+---------------------+--------------+--------+----+-------+
|      Timestamp      | PartitionKey | RowKey | Id | Value |
+---------------------+--------------+--------+----+-------+
| 2019-12-05 13:16:26 | pkey1        | rkey1  | a  | 10    |
| 2019-12-05 13:18:04 | pkey1        | rkey2  | b  | 20    |
| 2019-12-05 13:18:33 | pkey1        | rkey3  | c  | 30    |
+---------------------+--------------+--------+----+-------+

次はDataflowから実行してみたいので、一旦テーブルを削除します。

$ bq rm my_dataset.my_table

Dataflowで実行

$ python azure2bq.py --runner DataflowRunner \
  --temp_location gs://your-bucket \
  --staging_location gs://your-bucket 

setup.pyに記載しておいた外部モジュールがインストールされてからジョブが実行されます。

Dataflow上でもジョブの状況が確認できます。

f:id:yomon8:20191206231248p:plain

ちゃんとデータ入っているようです。

$ bq query "select * from my_dataset.my_table"
Waiting on bqjob_r6b0966095b54cb7_0000016edb8e9ccb_1 ... (0s) Current status: DONE   
+---------------------+--------------+--------+----+-------+
|      Timestamp      | PartitionKey | RowKey | Id | Value |
+---------------------+--------------+--------+----+-------+
| 2019-12-05 13:16:26 | pkey1        | rkey1  | a  | 10    |
| 2019-12-05 13:18:33 | pkey1        | rkey3  | c  | 30    |
| 2019-12-05 13:18:04 | pkey1        | rkey2  | b  | 20    |
+---------------------+--------------+--------+----+-------+