タイトルの通りです。PandasのDataframeをpyarrowでParquetに変換して、そのままGCSにアップロードしています。
スクリプト
こんな形で実行可能です。ファイルを経由しないでBufferから、そのままアップロードしています。
import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import numpy as np import datetime from google.cloud import storage as gcs # ダミーデータでDataframe作成 row_num = 100000 string_values = ['Python', 'Ruby', 'Java', 'JavaScript', 'PHP','Golang'] df = pd.DataFrame({ 'datetime': np.datetime64(datetime.datetime.now()) + np.random.choice(np.arange(0, 1000000000),row_num), 'string':np.random.choice(string_values,row_num), 'int': np.random.randint(0,100,row_num), 'double': np.random.rand(row_num), }) # pyarrowのTableに変換 table = pa.Table.from_pandas(df) # Bufferにテーブルを書き込み buf = pa.BufferOutputStream() pq.write_table(table, buf,compression=None) # GCSにアップロード PROJECT_NAME = 'YOUR_PROJECT_NAME' BUCKET_NAME = 'YOUR_BUCKET_NAME' KEY_NAME = 'mydata.snappy.parquet' client = gcs.Client(PROJECT_NAME) bucket = client.get_bucket(BUCKET_NAME) blob = gcs.Blob(KEY_NAME,bucket) blob.upload_from_string(data=buf.getvalue().to_pybytes())
ファイルのダウンロード
ダウンロードして内容確認してみます。
$ gsutil cp gs://YOUR_BUCKET_NAME/path/mydata.snappy.parquet .
parquet-toolsで中身を確認
ちゃんとParquet形式のデータに変換されています。
$ docker run --rm -v $(pwd):/parquet-mr/parquet-tools nathanhowell/parquet-tools head /parquet-mr/parquet-tools/mydata.snappy.parquet datetime = 1575976364091078 string = Ruby int = 19 double = 0.7995554002298092 datetime = 1575975726216655 string = Java int = 67 double = 0.29779047397368064 datetime = 1575976181882283 string = PHP int = 37 double = 0.4448431019047402 datetime = 1575975838161617 string = Java int = 53 double = 0.16337570068831164 datetime = 1575975730325159 string = Golang int = 81 double = 0.014772492141116422
メタデータから件数やスキーマも合っていそうです。
$ docker run --rm -v $(pwd):/parquet-mr/parquet-tools nathanhowell/parquet-tools meta /parquet-mr/parquet-tools/mydata.snappy.parquet file: file:/parquet-mr/parquet-tools/mydata.snappy.parquet creator: parquet-cpp version 1.5.1-SNAPSHOT extra: pandas = {"index_columns": [{"kind": "range", "name": null, "start": 0, "stop": 100000, "step": 1}], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode", "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "datetime", "field_name": "datetime", "pandas_type": "datetime", "numpy_type": "datetime64[ns]", "metadata": null}, {"name": "string", "field_name": "string", "pandas_type": "unicode", "numpy_type": "object", "metadata": null}, {"name": "int", "field_name": "int", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}, {"name": "double", "field_name": "double", "pandas_type": "float64", "numpy_type": "float64", "metadata": null}], "creator": {"library": "pyarrow", "version": "0.14.1"}, "pandas_version": "0.25.3"} file schema: schema -------------------------------------------------------------------------------- datetime: OPTIONAL INT64 O:TIMESTAMP_MICROS R:0 D:1 string: OPTIONAL BINARY O:UTF8 R:0 D:1 int: OPTIONAL INT64 R:0 D:1 double: OPTIONAL DOUBLE R:0 D:1 row group 1: RC:100000 TS:2151954 OFFSET:4 -------------------------------------------------------------------------------- datetime: INT64 UNCOMPRESSED DO:4 FPO:799976 SZ:1012747/1012747/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE string: BINARY UNCOMPRESSED DO:1012840 FPO:1012911 SZ:37820/37820/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE int: INT64 UNCOMPRESSED DO:1050724 FPO:1051541 SZ:88592/88592/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE double: DOUBLE UNCOMPRESSED DO:1139405 FPO:1939425 SZ:1012795/1012795/1.00 VC:100000 ENC:PLAIN,PLAIN_DICTIONARY,RLE