Dataflow

Dataflow+PythonでNFS接続でFilestoreのファイルを読み込む

DataflowでNFS上のテキストファイルを読み込む方法です。 はじめに Filestoreの準備 Filestoreインスタンスの作成 GCEからマウント Dataflow実装 Pipeline(Python) Setup.py ジョブ起動スクリプト例 実行してみる はじめに 最初はOSレベルでマウントできれば…

Cloud DataflowのFlexTemplateを閉域ネットワークで使おうとしたら少しハマった

Cloud DataflowのFlexTemplateをインターネット接続無しの閉域ネットワークにて使おうとしたら、少しハマったので書いておきます。 事象 ジョブの定義 発生するエラー 解決策 配布用アーカイブの作成 Dockerfile修正 Beam Pipelineのオプション修正 参考URL …

DataflowのWriteToBigQueryでRuntimeValueProviderで渡した日付のパーティション分割テーブルを作る

DataflowでRuntimeValueProviderで渡した値を使って、BigQueryのパーティション分割テーブルを作成しようとしたら、WriteToBigQueryでハマったので書いておきます。 課題 対策 参考URL 課題 日付のパーティション分割テーブル自体をWriteToBigQueryで作成す…

DataflowのRunnber Version2でRuntimeValueProviderErrorが発生したのでGCPのサポートに問い合わせてみた

GCPのサポート使ったので、そのメモです。 短期的にはDataflowのRunnerのVersion 2のバグ(というか実装漏れ?)だったので同じ問題で悩んでいる方もググって辿り着けるように残しておきます。 使っているのは$100の開発環境のサポートプランなので、プラン…

GCP Dataflowでapt installが上手くいかない場合のトラブルシュート方法

GCPのCloud DataflowでのOS系のエラーが出た場合の調査方法の例です。 事象 調査 エラー内容 Dockerイメージを特定する Dockerイメージを起動してみる 修正 まとめ 事象 yomon.hatenablog.com こちらの記事で書いたようにJDBCを使ったジョブの、Apache Beam…

Google Cloud DataflowでNative Library(*.so)が必要なjarを利用する方法

タイトルの通り、*.soなどの形式で提供されるNative Libraryを必要とするjarファイルをDataflowで使おうとして、ハマったので記録しておきます。 問題となっていたエラー 調査 サンプルコード 問題となっていたエラー まず出たエラーはこちら。このエラーの…

google-cloud-bigqueryのlist_jobsがUnknownJobを返してきてハマってしまった

小さいハマりなのですが、具体的にはGCP BigQueryのPythonクライアントライブラリである、google-cloud-bigqueryのlist_jobsが手元とDataflowのサーバ側で挙動が違う状態に遭遇しました。 デバッグログを見ながらエラーのポイントは判明しました。list_jobs…

Python+Cloud DataflowのPubSubストリーミングをGoogle Colaboratory使って試す

Google ColabでGCP使うには GCPの認証 PyPiモジュールのインストール PubSub側準備 左のブラウザ(Publish側) 右のブラウザ(BeamでSubscribe側) Cloud Dataflowで実行 後片付け PubSub削除 Cloud Dataflowのジョブ停止 最後に やりたいことのGIF動画です。…

Cloud Dataflowでシンプルな同期処理のヒント実装

最初はシンプルな差分同期方法の実装を書こうと思ったのですが、どうしても固有要件が入ってしまうので同期処理のヒント実装を書くことにしました。 手元で試せるように、Beam以外の部分はダミー関数としています。データソース取ってくるところと、データ書…

Cloud Dataflowのトラブルシューティングで調べること備忘録

年始早々、Dataflowのエラーでかなりハマりました。 エラー自体は些細なことだったのですが、全然原因わからずに1日中悩んでしましました。 備忘も兼ねて、対応メモ書いておきます。 エラーの事象と対応 事象 開発中のジョブで、データ量的に数分で終わる、…

Pythonで書くDataflowテンプレートでサードパーティ製JDBCドライバを使う

この記事の続きです。 yomon.hatenablog.com 以下にもある通り、今書いている時点ではApache BeamのPython SDKはビルトインでJDBC対応していません。 beam.apache.org PythonでJDBCドライバ使いたかったのはDataflowのPython SDK使ってもJDBC接続使いたかっ…

Cloud DataflowのテンプレートにPythonの外部パッケージを利用する

Cloud Dataflow + Python で作るテンプレートを登録する際に、pipでインストール可能なPyPiなどの外部パッケージをどうやって組み込むか調べました。 requirements.txtでpypi等の外部パッケージ使う方法 設定例 テンプレート作成 setup.pyでローカルパッケー…

Cloud ComposerからDataflowTemplateOperatorでwordcountジョブを実行する

Cloud Composer(Airflow)からDataflowTemplateOperatorの使い方がわからなかったので調べました。 Dataflowテンプレート登録 コード作成 コンパイル+アップロード Cloud ComposerのDAG作成 DAG定義スクリプト作成 AirflowのVariables設定 DAGファイルのイン…

Azure Table StorageのデータをCloud Dataflowを使ってBigQueryに挿入する

Azure Table StorageのデータをCloud Dataflow (Apache Beam)から扱ってみたのでメモ。 対象のAzure Table Storage 対象としたTable Storageの中身です。mytable という名前にしました。このデータをDataflowを使ってBigQueryに挿入してみます。 準備 デー…