PandasのデータをpyarrowでParquet変換してGCS(Google Cloud Storage)にアップロード

タイトルの通りです。PandasのDataframeをpyarrowでParquetに変換して、そのままGCSにアップロードしています。

スクリプト

こんな形で実行可能です。ファイルを経由しないでBufferから、そのままアップロードしています。

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import datetime
from google.cloud import storage as gcs


# ダミーデータでDataframe作成
row_num = 100000
string_values = ['Python', 'Ruby', 'Java', 'JavaScript', 'PHP','Golang']
df = pd.DataFrame({
    'datetime': np.datetime64(datetime.datetime.now()) + np.random.choice(np.arange(0, 1000000000),row_num),
    'string':np.random.choice(string_values,row_num),
    'int': np.random.randint(0,100,row_num),
    'double': np.random.rand(row_num),
    })

# pyarrowのTableに変換
table = pa.Table.from_pandas(df)

# Bufferにテーブルを書き込み
buf = pa.BufferOutputStream()
pq.write_table(table, buf,compression=None)

# GCSにアップロード
PROJECT_NAME = 'YOUR_PROJECT_NAME'
BUCKET_NAME = 'YOUR_BUCKET_NAME'
KEY_NAME = 'mydata.snappy.parquet'
client = gcs.Client(PROJECT_NAME)
bucket = client.get_bucket(BUCKET_NAME)
blob = gcs.Blob(KEY_NAME,bucket)
blob.upload_from_string(data=buf.getvalue().to_pybytes())

ファイルのダウンロード

ダウンロードして内容確認してみます。

$ gsutil cp gs://YOUR_BUCKET_NAME/path/mydata.snappy.parquet .

parquet-toolsで中身を確認

ちゃんとParquet形式のデータに変換されています。

$ docker run --rm  -v $(pwd):/parquet-mr/parquet-tools nathanhowell/parquet-tools head /parquet-mr/parquet-tools/mydata.snappy.parquet
datetime = 1575976364091078
string = Ruby
int = 19
double = 0.7995554002298092

datetime = 1575975726216655
string = Java
int = 67
double = 0.29779047397368064

datetime = 1575976181882283
string = PHP
int = 37
double = 0.4448431019047402

datetime = 1575975838161617
string = Java
int = 53
double = 0.16337570068831164

datetime = 1575975730325159
string = Golang
int = 81
double = 0.014772492141116422

メタデータから件数やスキーマも合っていそうです。

$ docker run --rm  -v $(pwd):/parquet-mr/parquet-tools nathanhowell/parquet-tools meta /parquet-mr/parquet-tools/mydata.snappy.parquet
file:        file:/parquet-mr/parquet-tools/mydata.snappy.parquet
creator:     parquet-cpp version 1.5.1-SNAPSHOT
extra:       pandas = {"index_columns": [{"kind": "range", "name": null, "start": 0, "stop": 100000, "step": 1}], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode", "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "datetime", "field_name": "datetime", "pandas_type": "datetime", "numpy_type": "datetime64[ns]", "metadata": null}, {"name": "string", "field_name": "string", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}, {"name": "int", "field_name": "int", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}, {"name": "double", "field_name": "double", "pandas_type": "float64", "numpy_type": "float64", "metadata": null}], "creator": {"library": "pyarrow", "version": "0.14.1"}, "pandas_version": "0.25.3"}

file schema: schema
--------------------------------------------------------------------------------
datetime:    OPTIONAL INT64 O:TIMESTAMP_MICROS R:0 D:1
string:      OPTIONAL BINARY O:UTF8 R:0 D:1
int:         OPTIONAL INT64 R:0 D:1
double:      OPTIONAL DOUBLE R:0 D:1

row group 1: RC:100000 TS:2151954 OFFSET:4
--------------------------------------------------------------------------------
datetime:     INT64 UNCOMPRESSED DO:4 FPO:799976 SZ:1012747/1012747/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE
string:       BINARY UNCOMPRESSED DO:1012840 FPO:1012911 SZ:37820/37820/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE
int:          INT64 UNCOMPRESSED DO:1050724 FPO:1051541 SZ:88592/88592/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE
double:       DOUBLE UNCOMPRESSED DO:1139405 FPO:1939425 SZ:1012795/1012795/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE