最初はシンプルな差分同期方法の実装を書こうと思ったのですが、どうしても固有要件が入ってしまうので同期処理のヒント実装を書くことにしました。
手元で試せるように、Beam以外の部分はダミー関数としています。データソース取ってくるところと、データ書き込むところは別途実装が必要です。並列処理によるパフォーマンス等も別途考慮が必要です。(特に最後とかデータストアによってはシーケンシャルの方が良いかもしれません)
やりたいこと
シンプルな主キーを持ったテーブルの同期をしたいです。
Source:存在
andTarget:存在しない
-> 挿入(Insert)Source:存在
andTarget:存在
-> 更新(Update)Source:存在しない
andTarget:存在
-> 削除(Delete)
これ全部やったら全件更新と同じなので、実際にはロジックを埋め込んだり調整していくことなります。
やり方
Step By Stepで書きます。
モジュールのインポート
ここで書く内容はbeamだけで試すこと可能です。
import apache_beam as beam
データ準備
source_table = [ r for r in zip( range(1,11), # id1 range(11,21), # id2 [chr(i + 97) for i in range(1,11)], # char ) ] target_table = [ r for r in zip( range(5,16), # id1 range(15,26), # id2 [chr(i + 69) for i in range(1,11)], # char ) ] print('==== Source') print(source_table) print('') print('==== Target') print(target_table)
==== Source [ (1, 11, 'b'), (2, 12, 'c'), (3, 13, 'd'), (4, 14, 'e'), (5, 15, 'f'), (6, 16, 'g'), (7, 17, 'h'), (8, 18, 'i'), (9, 19, 'j'), (10, 20, 'k')] ==== Target [ (5, 15, 'F'), (6, 16, 'G'), (7, 17, 'H'), (8, 18, 'I'), (9, 19, 'J'), (10, 20, 'K'), (11, 21, 'L'), (12, 22, 'M'), (13, 23, 'N'), (14, 24, 'O')]
主キーの設定
主キーで判断するため、主キー付きのデータに変換する関数を作ります。((複合主キー),(行データ))
の形式に変換します。
def determin_primary_key(row): # id1とid2で複合主キーとして設定 return ((row[0],row[1]),row)
この関数をデータに適用してみます。一旦わかりやすいようにこの時点でprintしてみます。
with beam.Pipeline() as p: soruce = ( p | 'create source data' >> beam.Create(source_table) | 'detemin pkey for soruce' >> beam.Map(determin_primary_key) ) print("=== src table") soruce | beam.Map(print) with beam.Pipeline() as p: target = ( p | 'create target data' >> beam.Create(target_table) | 'detemin pkey for target' >> beam.Map(determin_primary_key) ) print("=== tgt table") target | beam.Map(print)
以下のように出力されます。
=== src table ((1, 11), (1, 11, 'b')) ((2, 12), (2, 12, 'c')) ((3, 13), (3, 13, 'd')) ((4, 14), (4, 14, 'e')) ((5, 15), (5, 15, 'f')) ((6, 16), (6, 16, 'g')) ((7, 17), (7, 17, 'h')) ((8, 18), (8, 18, 'i')) ((9, 19), (9, 19, 'j')) ((10, 20), (10, 20, 'k')) === tgt table ((5, 15), (5, 15, 'F')) ((6, 16), (6, 16, 'G')) ((7, 17), (7, 17, 'H')) ((8, 18), (8, 18, 'I')) ((9, 19), (9, 19, 'J')) ((10, 20), (10, 20, 'K')) ((11, 21), (11, 21, 'L')) ((12, 22), (12, 22, 'M')) ((13, 23), (13, 23, 'N')) ((14, 24), (14, 24, 'O'))
主キーでJOIN
beam.CoGroupByKey()
を使うと上記で設定した主キーでJoinできます。
with beam.Pipeline() as p: source = ( p | 'create soruce data' >> beam.Create(source_table) | 'detemin pkey for soruce' >> beam.Map(determin_primary_key) ) target = ( p | 'create target data' >> beam.Create(target_table) | 'detemin pkey for target' >> beam.Map(determin_primary_key) ) joined = ( ({'source': source,'target':target}) | 'group_by' >> beam.CoGroupByKey() # 主キーでJOIN ) joined | beam.Map(print)
((複合主キー),{'source': [(Source行データ)],'target': [(Target行データ)],)
の形式で出力されます。
この時点の出力から以下のように区分できます。
# targetに挿入(Insert)するデータ ((1, 11), {'source': [(1, 11, 'b')], 'target': []}) ((2, 12), {'source': [(2, 12, 'c')], 'target': []}) ((3, 13), {'source': [(3, 13, 'd')], 'target': []}) ((4, 14), {'source': [(4, 14, 'e')], 'target': []}) # targetを更新(Update)するデータ ((5, 15), {'source': [(5, 15, 'f')], 'target': [(5, 15, 'F')]}) ((6, 16), {'source': [(6, 16, 'g')], 'target': [(6, 16, 'G')]}) ((7, 17), {'source': [(7, 17, 'h')], 'target': [(7, 17, 'H')]}) ((8, 18), {'source': [(8, 18, 'i')], 'target': [(8, 18, 'I')]}) ((9, 19), {'source': [(9, 19, 'j')], 'target': [(9, 19, 'J')]}) ((10, 20), {'source': [(10, 20, 'k')], 'target': [(10, 20, 'K')]}) # targetから削除(Delete)するデータ ((11, 21), {'source': [], 'target': [(11, 21, 'L')]}) ((12, 22), {'source': [], 'target': [(12, 22, 'M')]}) ((13, 23), {'source': [], 'target': [(13, 23, 'N')]}) ((14, 24), {'source': [], 'target': [(14, 24, 'O')]})
分類して処理実行(ダミー)
ParDoに渡すDoFnにて以下のように書くと、出力を分岐させることができます。あとはそれぞれの処理を行うように実装します。
class EntryClassifyDoFn(beam.DoFn): # 処理内容を分類 def process(self,element): data = element[1] if data['source'] and not data['target']: yield beam.pvalue.TaggedOutput('tobe_inserted',data) elif data['source'] and data['target']: yield beam.pvalue.TaggedOutput('tobe_updated',data) elif not data['source'] and data['target']: yield beam.pvalue.TaggedOutput('tobe_deleted',data) with beam.Pipeline() as p: source = ( p | 'create soruce data' >> beam.Create(source_table) | 'detemin pkey for soruce' >> beam.Map(determin_primary_key) ) target = ( p | 'create target data' >> beam.Create(target_table) | 'detemin pkey for target' >> beam.Map(determin_primary_key) ) joined = ( ({'source': source,'target':target}) | 'group_by' >> beam.CoGroupByKey() # 主キーでJOIN | beam.ParDo(EntryClassifyDoFn()).with_outputs('tobe_inserted','tobe_deleted','tobe_updated') ) joined.tobe_inserted | 'entries to be inserted' >> beam.Map(lambda r: print('Insert:',r['source'])) joined.tobe_updated | 'entries to be updated' >> beam.Map(lambda r: print('Update:',r['target'])) joined.tobe_deleted | 'entries to be deleted' >> beam.Map(lambda r: print('Delete:',r['target']))
出力だけだとこんな感じです。
Update: [(5, 15, 'F')] Update: [(6, 16, 'G')] Update: [(7, 17, 'H')] Update: [(8, 18, 'I')] Update: [(9, 19, 'J')] Update: [(10, 20, 'K')] Delete: [(11, 21, 'L')] Delete: [(12, 22, 'M')] Delete: [(13, 23, 'N')] Delete: [(14, 24, 'O')] Insert: [(1, 11, 'b')] Insert: [(2, 12, 'c')] Insert: [(3, 13, 'd')] Insert: [(4, 14, 'e')]
ちなみにDataflowで実際に実行すると以下のようなフロー図になります。
さいごに
全件削除して全件挿入すれば同じことできますが、発展可能な実装として書きました。 もう少し詰めたかったのですが、それでも自分だったら欲しかった情報なので公開しておきます。