LambdaのLayer機能活用してpandas,pyarrow,s3fs使ってParquet変換する簡易ETL処理を実装する

小さなファイルのETLにGlueを使うのがもったいなかったので、Pandasやpyarrowで実装しました。

Lambda Layerにpandasとpyarrowを追加

Layerに登録するパッケージを作成

今回利用するのはpandasとpyarrow、s3fsなのですが少し工夫が必要でした。

3つを全てを一つのZIPに纏めるとLambda Layerの50MBの制限にかかってしまいます。

3つにZIPを分割するとLambdaにレイヤー追加する時の制限にかかってしまいます。

Layers consume more than the available size of 262144000 bytes

大きなnumpyなどを共有しているpandasとpyarrowを一つのZIPに纏めてアップロードします。

パッケージの準備は環境構築面倒なのでDockerで。

バージョンはLambdaで見たら sys.version_info(major=3, minor=6, micro=8, releaselevel='final', serial=0) となっていたので、今回はPython3.6.8で準備します。

適当なディレクトリで以下の作業を行います。

$ mkdir python
$ docker run -it --rm -v /path/to/your/workdir/python:/python python:3.6.8 pip install -t /python pandas pyarrow
$ zip -r pandas_pyarrow.zip python

$ rm -fR python/*
$ docker run -it --rm -v /path/to/your/workdir/python:/python python:3.6.8 pip install -t /python s3fs
$ zip -r s3fs.zip python

※WSLなので /path/to/your/workdir にしていますが、ちゃんとしたLinux環境なら $(pwd) で大丈夫です。

パッケージをアップロード

Lambda Layerにアップロードします。

f:id:yomon8:20190302170151p:plain

参照されるレイヤーに2つを設定します。 f:id:yomon8:20190302174422p:plain

とりあえずインポートできること確認できればOKです。 f:id:yomon8:20190302170608p:plain

Lambdaのコード

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from s3fs import S3FileSystem

# 出力先の情報
OUT_BUCKET_NAME = 'your-output-bucket-name'
OUT_KEY_PREFIX = 'path/to/db'
OUT_DB_NAME = 'db_name'
OUT_TABLE_NAME = 'tab_name'

def etl_s3_object(bucket,key):
    s3object = 's3a://{0}/{1}'.format(bucket, key)

    # Gzip圧縮のJSONLを読み込み
    df = pd.read_json(
        s3object, compression='gzip', 
        lines=True, 
        convert_dates = ['timestamp'],
        date_unit='ns'
    )
    print('DataFrameShape ->',df.shape)
    print('Sapmles ->',df.sample(5))
    
    
    # パーティション出力ように日付情報を抽出
    df['year'] = df['timestamp'].map(lambda x: x.year)
    df['month'] = df['timestamp'].map(lambda x: x.month)
    df['day'] = df['timestamp'].map(lambda x: x.day)
    print('Sapmles transformed ->',df.sample(5))
    
    # pyarrowのTableに変換
    table = pa.Table.from_pandas(df)
    
    # 出力先のS3にアップロード
    output_file = f"s3://{OUT_BUCKET_NAME}/{OUT_KEY_PREFIX}/{OUT_DB_NAME}/{OUT_TABLE_NAME}"
    pq.write_to_dataset(
        table=table, root_path=output_file,     
        filesystem=S3FileSystem(),              # S3をファイルシステムとして利用
        partition_cols=['year','month','day'],  # Hive形式でパーティショニングして出力
        use_deprecated_int96_timestamps=True    # Nano秒→Micro秒への情報欠落を許容
    )
    
# Hive形式のパーティションにある=とかを解決
def decode_s3_key(key):
    import urllib.parse
    return urllib.parse.unquote_to_bytes(key).decode()


def lambda_handler(event, context):
    print('start lambda')
    for e in event['Records']:
        bucket = e['s3']['bucket']['name']
        key = decode_s3_key(e['s3']['object']['key'])
        print('Start for',bucket,'and',key)
        etl_s3_object(bucket,key)
        print('Completed for',bucket,'and',key)

Lambda初期の128MBは流石に小さいので、ある程度のメモリを割り当てて上げないと大きめのファイルは動かないです。

S3のPUTイベントでトリガーするように設定すれば、S3へのPUTでParquetへの変換が動き出しましす。

f:id:yomon8:20190302204432p:plain

このような感じでパーティショニングされてS3にParquetが出力できます。

f:id:yomon8:20190302222335p:plain

エラー対応

s3fsのS3FileSystemにプロファイル指定のSessionを渡そうとすると以下のようなエラーが出る場合があります。

AttributeError: 'Session' object has no attribute 'create_client'
'S3' object has no attribute '__aenter__'

Sessionとして渡すのは boto3.session.Session でも botocore.session.Session でもなく、 session:aiobotocore AioSession なので気をつけてください。

s3fs.readthedocs.io

参考

pandas.pydata.org

arrow/parquet.py at master · apache/arrow · GitHub

https://dev.classmethod.jp/cloud/aws/lambda-layer-first-action/