小さなファイルのETLにGlueを使うのがもったいなかったので、Pandasやpyarrowで実装しました。
Lambda Layerにpandasとpyarrowを追加
Layerに登録するパッケージを作成
今回利用するのはpandasとpyarrow、s3fsなのですが少し工夫が必要でした。
3つを全てを一つのZIPに纏めるとLambda Layerの50MBの制限にかかってしまいます。
3つにZIPを分割するとLambdaにレイヤー追加する時の制限にかかってしまいます。
Layers consume more than the available size of 262144000 bytes
大きなnumpyなどを共有しているpandasとpyarrowを一つのZIPに纏めてアップロードします。
パッケージの準備は環境構築面倒なのでDockerで。
バージョンはLambdaで見たら sys.version_info(major=3, minor=6, micro=8, releaselevel='final', serial=0)
となっていたので、今回はPython3.6.8で準備します。
適当なディレクトリで以下の作業を行います。
$ mkdir python $ docker run -it --rm -v /path/to/your/workdir/python:/python python:3.6.8 pip install -t /python pandas pyarrow $ zip -r pandas_pyarrow.zip python $ rm -fR python/* $ docker run -it --rm -v /path/to/your/workdir/python:/python python:3.6.8 pip install -t /python s3fs $ zip -r s3fs.zip python
※WSLなので /path/to/your/workdir
にしていますが、ちゃんとしたLinux環境なら $(pwd)
で大丈夫です。
パッケージをアップロード
Lambda Layerにアップロードします。
参照されるレイヤーに2つを設定します。
とりあえずインポートできること確認できればOKです。
Lambdaのコード
import pandas as pd import pyarrow as pa import pyarrow.parquet as pq from s3fs import S3FileSystem # 出力先の情報 OUT_BUCKET_NAME = 'your-output-bucket-name' OUT_KEY_PREFIX = 'path/to/db' OUT_DB_NAME = 'db_name' OUT_TABLE_NAME = 'tab_name' def etl_s3_object(bucket,key): s3object = 's3a://{0}/{1}'.format(bucket, key) # Gzip圧縮のJSONLを読み込み df = pd.read_json( s3object, compression='gzip', lines=True, convert_dates = ['timestamp'], date_unit='ns' ) print('DataFrameShape ->',df.shape) print('Sapmles ->',df.sample(5)) # パーティション出力ように日付情報を抽出 df['year'] = df['timestamp'].map(lambda x: x.year) df['month'] = df['timestamp'].map(lambda x: x.month) df['day'] = df['timestamp'].map(lambda x: x.day) print('Sapmles transformed ->',df.sample(5)) # pyarrowのTableに変換 table = pa.Table.from_pandas(df) # 出力先のS3にアップロード output_file = f"s3://{OUT_BUCKET_NAME}/{OUT_KEY_PREFIX}/{OUT_DB_NAME}/{OUT_TABLE_NAME}" pq.write_to_dataset( table=table, root_path=output_file, filesystem=S3FileSystem(), # S3をファイルシステムとして利用 partition_cols=['year','month','day'], # Hive形式でパーティショニングして出力 use_deprecated_int96_timestamps=True # Nano秒→Micro秒への情報欠落を許容 ) # Hive形式のパーティションにある=とかを解決 def decode_s3_key(key): import urllib.parse return urllib.parse.unquote_to_bytes(key).decode() def lambda_handler(event, context): print('start lambda') for e in event['Records']: bucket = e['s3']['bucket']['name'] key = decode_s3_key(e['s3']['object']['key']) print('Start for',bucket,'and',key) etl_s3_object(bucket,key) print('Completed for',bucket,'and',key)
Lambda初期の128MBは流石に小さいので、ある程度のメモリを割り当てて上げないと大きめのファイルは動かないです。
S3のPUTイベントでトリガーするように設定すれば、S3へのPUTでParquetへの変換が動き出しましす。
このような感じでパーティショニングされてS3にParquetが出力できます。
エラー対応
s3fsのS3FileSystemにプロファイル指定のSessionを渡そうとすると以下のようなエラーが出る場合があります。
AttributeError: 'Session' object has no attribute 'create_client'
'S3' object has no attribute '__aenter__'
Sessionとして渡すのは boto3.session.Session
でも botocore.session.Session
でもなく、 session:aiobotocore AioSession
なので気をつけてください。
参考
arrow/parquet.py at master · apache/arrow · GitHub
https://dev.classmethod.jp/cloud/aws/lambda-layer-first-action/