Glueで変更のあったパーティション配下だけS3を部分更新したい

やりたいこと

以下のようにパーティションを切っているS3のバケットから、S3のバケットにGlueのETLジョブにて変換処理をかけています。

f:id:yomon8:20190405234752p:plain

一部のパーティションのデータが間違っていた場合に、パーティションごとデータを部分更新したい場合がありました。

f:id:yomon8:20190405234719p:plain

準備

テストデータ生成

英語の小文字、大文字とASCIIコードをデータとして持つJSONを生成してみます。

import json
from collections import namedtuple

def output_letters(name,letters):
    output_file = './data_{}.json'.format(name)
    with open(output_file,'w') as f:
        for l in letters:
            json.dump(l._asdict(), f)
            f.write('\n')

Letter = namedtuple('letter',('letter','code'))

capitals= [Letter(chr(s),s) for s in range(ord('A'),ord('A')+26)]
output_letters("capital",capitals)

smalls = [Letter(chr(s),s) for s in range(ord('a'),ord('a')+26)]
output_letters("small",smalls)

symbols = [Letter(chr(s),s) for s in range(91,96)]
output_letters("symbol",symbols)

このような3つのJSONL形式のデータができます。

  • data_capital.json
{"letter": "A", "code": 65}
{"letter": "B", "code": 66}
---省略---
{"letter": "Y", "code": 89}
{"letter": "Z", "code": 90}
  • data_small.json
{"letter": "a", "code": 97}
{"letter": "b", "code": 98}
---省略---
{"letter": "y", "code": 121}
{"letter": "z", "code": 122}
  • data_symbol.json
{"letter": "[", "code": 91}
{"letter": "\\", "code": 92}
{"letter": "]", "code": 93}
{"letter": "^", "code": 94}
{"letter": "_", "code": 95}

S3にアップロード

以下のようにアップロードします。

part=capital/
- data_capital.json
- data_symbol.json

part=small/
- data_small.json

Glueでクローリングすると以下のようなスキーマになります。

f:id:yomon8:20190407222511p:plain

ETLジョブ

Glueのウィザードを流せば以下のようなParquetに変換するETLジョブが簡単に生成できます。

f:id:yomon8:20190409094125p:plain

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "letter", table_name = "src", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("letter", "string", "letter", "string"), ("code", "int", "code", "int"), ("part", "string", "part", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://mybucket/path/to/target","partitionKeys":["part"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

部分更新のケースを試してみる

例えば、特殊文字が入っている data_special.jsonpart=capital から抜くことにしました。つまりは以下の図のように part=capital パーティションだけ部分更新したいです。

f:id:yomon8:20190409104711p:plain

まずソース側のS3から記号の入った data_symbol.json を削除します。

$ aws s3 rm s3://mybucket/path/src/part=capital/data_symbol.json
delete: s3://mybucket/path/src/part=capital/data_symbol.json
part=capital/
- data_capital.json // 大文字
☓- data_symbol.json // 削除

そこでもう一度ETLジョブを実行すれば part=capitalパーティションが更新できるかというと、そう上手くはいきません。

課題がいくつかあります。

課題① ジョブブックマーク有効化時の再実行

Glueのジョブブックマークの機能は便利なのですが、挙動を知らないとうまく動かせません。

今回の場合、ジョブブックマーク有効な場合、ただ data_symbol.json 削除後にETLジョブを実行してもデータは何も更新されません。

これは残っているデータ data_capital.json が既にETLジョブ実行済みとしてジョブブックマークにマーキング されているからです。

ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue

課題① の対策

S3のオブジェクトのタイムスタンプを更新すると再度ジョブが実行されます。

Linuxのtouchのようなことをするのですが、S3の場合は同じファイルを同じ場所にコピーすることで似たようなことが可能です。

以下の同一ファイルにコピーするとタイムスタンプが更新されます。

$ aws s3 cp s3://mybucket/path/src/part=capital/data_capital.json s3://mybucket/path/src/part=capital/data_capital.json
copy: s3://mybucket/path/src/part=capital/data_capital.json to s3://mybucket/path/src/part=capital/data_capital.json

タイムスタンプが更新された上でジョブを実行すると、再度処理対象として処理されます。が、ここで課題②が出てきます。

課題 ② DynamicFrameはAppendしかできない

ジョブブックマークの問題を超えて、ETLジョブを実行してみたところ、data_capital.json がTarget側で2重に取り込まれてしまっています。

そして、記号もTarget側では削除されていません。

部分更新ではなく、単純に追記されたことになります。

SELECT letter,
        count(*) AS cnt
FROM "target"
WHERE part = 'capital'
GROUP BY  letter
ORDER BY  cnt;

上記SQLの結果です。

f:id:yomon8:20190409105559p:plain

サポートにも問合せたのですが、GlueのDynamic Frameの書き出し部分、上記のスクリプトで言えば、以下の1行は2019/04/09時点ではAppendのみでOverwriteモードはサポートしていません。

課題②への対策

課題②への対策1 直接パーティションだけを操作する

上に書いたETLスクリプトを以下のように書き換えます。

#-- 省略
# 特定パーティションを設定無しで直接パーティションのロケーションに書き込み
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "letter", 
    table_name = "src", 
    push_down_predicate = "part='capital'", 
    transformation_ctx = "datasource0")
#-- 省略
# パーティショニング設定無しで直接パーティションのロケーションに書き込み
datasink4 =dropnullfields3.toDF().write.mode("overwrite").format("parquet").save("s3://mybucket/path/src/target/part=capital")
job.commit()

ポイントは2つです。 ① push_down_predicate を指定して部分更新したいパーティションのみSparkに読み込む ② DataFrameに変換してOverwriteモードで書き込む。この際にパスとしてPartition(例: part=capital )を直接指定する。

課題②への対策2 Spark SQLでゴリゴリ書く

書き込みのところはSpark SQLでやってもできます。

temp_view = "tempview"
dropnullfields3.toDF().createOrReplaceTempView(temp_view)
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
query = """
INSERT OVERWRITE TABLE {tgt_table} PARTITION ({partition_key})
                   SELECT *
                   FROM {src_view}
                   WHERE {partition_key} = '{partition_val}'
""".format(
    src_view=temp_view,
    tgt_table="letter.target",
    partition_key="part",
    partition_val="capital")
datasink4 = spark.sql(query)

他にも色々方法あって、対象のデータ種類が少ないなら書けば何とでもできそうです。

ただ、3桁、4桁以上のデータ種別を扱う場合は、個別対応厳しいので、汎用化させるとなると少しめんどくさいです。

単純に別のディレクトリに書き込んで、後でコピーするという力技だけど一番良いのではと思う方法書かれています。

stackoverflow.com

課題②への対策3 対応を待つ

開発しているという話あったりするし、Dynamic Frameもいつか対応してくれるかもしれません。

amazon web services - Overwrite parquet files from dynamic frame in AWS Glue - Stack Overflow

または、GlueのSparkバージョンが2.3.0になれば(現状は2.2.1)、この方法も使えるようになるので、少しシンプルに書けるようになります。

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

[SPARK-20236] Overwrite a partitioned data source table should only overwrite related partitions - ASF JIRA

参考

amazon web services - Overwrite parquet files from dynamic frame in AWS Glue - Stack Overflow

metadata - Is there a way to touch() file in Amazon S3? - Stack Overflow