Azure Table Storageをpandas+pyarrowでParquet変換してBLOB側にアップロード

BeeX Advent Calendar 2019の12/6の記事です。空いてたので滑り込みです。

Azure Table Storageですが、対応しているツールも少なく、なかなか扱いずらいので、Pandasに読み込んで、Parquet形式に変更する方法を調べました。

対象

対象としたTable Storageの中身です。これをParquet形式に変換して、Azure Blob Storageの方に移動します。

f:id:yomon8:20191205221950p:plain

必要なモジュールインストール

今回利用するPythonモジュールです。

pip install azure-cosmosdb-table  # Table Storageの操作のために利用
pip install azure-storage-blob
pip install pytz

ソースコード

以下のように書きました。

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytz
from azure.cosmosdb.table.tableservice import TableService
from azure.storage.blob import BlobServiceClient


# 変数
CONNECTION_STRING = "DefaultEndpointsProtocol=https  ----------対象のストレージアカウントの接続文字列--------------"
SOURCE_TABLE = "mytable"
UPLOAD_TARGET_CONTAINER = "mycontainer"
UPLOAD_TARGET_OBJECT = "mytable.parquet"

# Table Storageへのクエリ
filter_query = "PartitionKey eq 'pkey1'"

# Tableをクエリして、結果をPandas Dataframeに変換
table_service = TableService(connection_string=CONNECTION_STRING)
df = pd.DataFrame((r for r in table_service.query_entities(SOURCE_TABLE,filter=filter_query)))

# UTC変換
df['Timestamp'] = df.Timestamp.dt.tz_convert(pytz.utc)

# テーブルと関係無い情報を削除
df = df.drop(columns = ['etag'],axis=0)

# pyarrowのTableに変換
table = pa.Table.from_pandas(df)

# BufferにParquet形式でテーブルを書き込み
buf = pa.BufferOutputStream()
pq.write_table(table, buf)

# BufferをBLOBにアップロード
blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING)
blob_client = blob_service_client.get_blob_client(container=UPLOAD_TARGET_CONTAINER, blob=UPLOAD_TARGET_OBJECT)
blob_client.upload_blob(data=buf.getvalue().to_pybytes(),overwrite=True)

Azureポータル上からも、アップロードされたことが確認できます。

f:id:yomon8:20191205233802p:plain

ダウンロードしてApache Parquet Viewerで確認したところです。

クエリに設定した filter_query = "PartitionKey eq 'pkey1'" が効いてることがわかります。

f:id:yomon8:20191205233900p:plain

参考URL

How to store pandas dataframe data to azure blobs using python? - Stack Overflow

Python を使用して Azure Table Storage と Azure Cosmos DB Table API を使用する | Microsoft Docs

Azure Table Storage Pandas Dataframe · GitHub