Dataflow
DataflowでNFS上のテキストファイルを読み込む方法です。 はじめに Filestoreの準備 Filestoreインスタンスの作成 GCEからマウント Dataflow実装 Pipeline(Python) Setup.py ジョブ起動スクリプト例 実行してみる はじめに 最初はOSレベルでマウントできれば…
Cloud DataflowのFlexTemplateをインターネット接続無しの閉域ネットワークにて使おうとしたら、少しハマったので書いておきます。 事象 ジョブの定義 発生するエラー 解決策 配布用アーカイブの作成 Dockerfile修正 Beam Pipelineのオプション修正 参考URL …
DataflowでRuntimeValueProviderで渡した値を使って、BigQueryのパーティション分割テーブルを作成しようとしたら、WriteToBigQueryでハマったので書いておきます。 課題 対策 参考URL 課題 日付のパーティション分割テーブル自体をWriteToBigQueryで作成す…
GCPのサポート使ったので、そのメモです。 短期的にはDataflowのRunnerのVersion 2のバグ(というか実装漏れ?)だったので同じ問題で悩んでいる方もググって辿り着けるように残しておきます。 使っているのは$100の開発環境のサポートプランなので、プラン…
GCPのCloud DataflowでのOS系のエラーが出た場合の調査方法の例です。 事象 調査 エラー内容 Dockerイメージを特定する Dockerイメージを起動してみる 修正 まとめ 事象 yomon.hatenablog.com こちらの記事で書いたようにJDBCを使ったジョブの、Apache Beam…
タイトルの通り、*.soなどの形式で提供されるNative Libraryを必要とするjarファイルをDataflowで使おうとして、ハマったので記録しておきます。 問題となっていたエラー 調査 サンプルコード 問題となっていたエラー まず出たエラーはこちら。このエラーの…
小さいハマりなのですが、具体的にはGCP BigQueryのPythonクライアントライブラリである、google-cloud-bigqueryのlist_jobsが手元とDataflowのサーバ側で挙動が違う状態に遭遇しました。 デバッグログを見ながらエラーのポイントは判明しました。list_jobs…
Google ColabでGCP使うには GCPの認証 PyPiモジュールのインストール PubSub側準備 左のブラウザ(Publish側) 右のブラウザ(BeamでSubscribe側) Cloud Dataflowで実行 後片付け PubSub削除 Cloud Dataflowのジョブ停止 最後に やりたいことのGIF動画です。…
最初はシンプルな差分同期方法の実装を書こうと思ったのですが、どうしても固有要件が入ってしまうので同期処理のヒント実装を書くことにしました。 手元で試せるように、Beam以外の部分はダミー関数としています。データソース取ってくるところと、データ書…
年始早々、Dataflowのエラーでかなりハマりました。 エラー自体は些細なことだったのですが、全然原因わからずに1日中悩んでしましました。 備忘も兼ねて、対応メモ書いておきます。 エラーの事象と対応 事象 開発中のジョブで、データ量的に数分で終わる、…
この記事の続きです。 yomon.hatenablog.com 以下にもある通り、今書いている時点ではApache BeamのPython SDKはビルトインでJDBC対応していません。 beam.apache.org PythonでJDBCドライバ使いたかったのはDataflowのPython SDK使ってもJDBC接続使いたかっ…
Cloud Dataflow + Python で作るテンプレートを登録する際に、pipでインストール可能なPyPiなどの外部パッケージをどうやって組み込むか調べました。 requirements.txtでpypi等の外部パッケージ使う方法 設定例 テンプレート作成 setup.pyでローカルパッケー…
Cloud Composer(Airflow)からDataflowTemplateOperatorの使い方がわからなかったので調べました。 Dataflowテンプレート登録 コード作成 コンパイル+アップロード Cloud ComposerのDAG作成 DAG定義スクリプト作成 AirflowのVariables設定 DAGファイルのイン…
Azure Table StorageのデータをCloud Dataflow (Apache Beam)から扱ってみたのでメモ。 対象のAzure Table Storage 対象としたTable Storageの中身です。mytable という名前にしました。このデータをDataflowを使ってBigQueryに挿入してみます。 準備 デー…