Cloud DataflowのFlexTemplateを閉域ネットワークで使おうとしたら少しハマった

Cloud DataflowのFlexTemplateをインターネット接続無しの閉域ネットワークにて使おうとしたら、少しハマったので書いておきます。

事象

ジョブの定義

以下のようなシンプルなジョブを作成してみます。

import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import (
    PipelineOptions,
    SetupOptions,
    StandardOptions,
)

def main():
    options = PipelineOptions()
    options.view_as(StandardOptions).runner = "DataflowRunner"

    with beam.Pipeline(options=options) as p:
        (
            p
            | "start" >> beam.Create(range(100, 200))
            | "logging" >> beam.Map(lambda x: logging.warn(x))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    main()

DockerfileもFlexTemplateのサンプルから大きく変更はありません。

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY main.py ${WORKDIR}/main.py
COPY requirements.txt ${WORKDIR}/requirements.txt
COPY spec/python_command_spec.json ${WORKDIR}/python_command_spec.json

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV DATAFLOW_PYTHON_COMMAND_SPEC="${WORKDIR}/python_command_spec.json"

RUN pip install --no-cache-dir --upgrade pip setuptools wheel \
    pip install --no-cache-dir -r ${FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE} \
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r ${FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE}
ENV PIP_NO_DEPS=True

発生するエラー

これを閉域ネットワークで動かすようにデプロイすると、以下のようなエラーが発生します。

中身見るとわかるのですが、pipapache-beam をインストールしようとして、ネットワークが接続できないエラーとなっています。

Dockerfileの中で pip を使って apache-beam をインストールしても、この事象は変わりません。

2021-08-26 08:35:19.258 JST INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpdwsjaxq0', 'apache-beam==2.31.0', '--no-deps', '--no-binary', ':all:']
2021-08-26 08:36:19.743 JST WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ConnectTimeoutError(<pip._vendor.urllib3.connection.HTTPSConnection object at 0x7f31becc5750>, 'Connection to pypi.org timed out. (connect timeout=15)')': /simple/apache-beam/
2021-08-26 08:41:47.801 JST subprocess.CalledProcessError: Command '['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpdwsjaxq0', 'apache-beam==2.31.0', '--no-deps', '--no-binary', ':all:']' returned non-zero exit status 1.

解決策

事前に配布用のアーカイブ(tar.gz)を作成して、コンテナイメージに含め SetupOptions.sdk_location というパラメータで指定することで解決可能です。

以下に手順を記載しておきます。

配布用アーカイブの作成

Githubから利用したいバージョンのbeamを取得します。

git clone https://github.com/apache/beam.git -b v2.31.0  --depth 1

PythonのSDKディレクトリに移動します。

cd beam/sdks/python/

ビルド用のパッケージを準備します。

python3 -m venv venv 
source venv/bin/activate
pip install -r build-requirements.txt

配布用アーカイブを作成します。

$ python setup.py sdist
# 省略
copying apache_beam/utils/windowed_value_test.py -> apache-beam-2.31.0/apache_beam/utils
Writing apache-beam-2.31.0/setup.cfg
creating dist
Creating tar archive
removing 'apache-beam-2.31.0' (and everything under it)

dist ディレクトリに配布用アーカイブが作成されます。

$ ls dist/ 
apache-beam-2.31.0.tar.gz

Dockerfile修正

Dockefileを修正して作成した配布用アーカイブをイメージに組み込みます。 /tmp/apache-beam.tar.gz という名前に変更していますが、ここは後でコード内で指定しやすいようにバージョン情報外しましたが任意です。ディレクトリも /tmp が必須というわけではありません。

COPY apache-beam-2.31.0.tar.gz /tmp/apache-beam.tar.gz

また、イメージには上記でtarに纏めたBeamと同じバージョンを明示的にインストールしておいてください。

RUN pip install --no-cache-dir apache-beam[gcp]==2.31.0

これを忘れると最新のSDKのバージョンがリリースされたさいに以下のようなエラーが発生します。

Beam SDK base version 2.31.0 does not match Dataflow Python worker version 2.32.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed.

Beam Pipelineのオプション修正

以下のように SetupOptions.sdk_location を設定します。

def main():
    options = PipelineOptions()
    options.view_as(SetupOptions).sdk_location = "/tmp/apache-beam.tar.gz"
    options.view_as(StandardOptions).runner = "DataflowRunner"

    with beam.Pipeline(options=options) as p:
        (
            p
            | "start" >> beam.Create(range(100, 200))
            | "logging" >> beam.Map(lambda x: logging.warn(x))
        )

これで閉域ネットワークでもDataflow FlexTemplateの実行ができるようになりました。

参考URL

github.com