Cloud DataflowのテンプレートにPythonの外部パッケージを利用する

Cloud Dataflow + Python で作るテンプレートを登録する際に、pipでインストール可能なPyPiなどの外部パッケージをどうやって組み込むか調べました。

requirements.txtでpypi等の外部パッケージ使う方法

結局ドキュメントは見つからなかったのですが、ソースコード読んでいたら以下のオプションを見つけました。

class SetupOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    # Options for installing dependencies in the worker.
    parser.add_argument(
        '--requirements_file',
        default=None,
        help=
        ('Path to a requirements file containing package dependencies. '
         'Typically it is produced by a pip freeze command. More details: '
         'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
         'If used, all the packages specified will be downloaded, '
         'cached (use --requirements_cache to change default location), '
         'and then staged so that they can be automatically installed in '
         'workers during startup. The cache is refreshed as needed '
         'avoiding extra downloads for existing packages. Typically the '
         'file is named requirements.txt.'))

引用元:https://github.com/apache/beam/blob/v2.16.0/sdks/python/apache_beam/options/pipeline_options.py#L721-L738

設定例

以下のようにPipeline OptionのSetupOptoinsに requirements_fileとして設定します。

options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DataflowRunner'
setup_option = options.view_as(SetupOptions)
setup_option.requirements_file = "./requirements.txt"

# 省略

p = beam.Pipeline(options=options)

テンプレート作成

テンプレートを作成してみると、staging_locationに指定したバケットに requirements.txt がアップロードされます。

f:id:yomon8:20191216224005p:plain

テンプレート作成が完了してから見てみると、requirements.txtに記載したパッケージがアップロードされています。後はテンプレート実行すれば、パッケージを利用した処理が可能です。

f:id:yomon8:20191216224240p:plain

setup.pyでローカルパッケージも使う方法

上記のrequirements.txtでは外部パッケージは取り込めますが、以下のようにローカルのパッケージを組み込むには setup.py を使います。

.
├── mylibs
│   ├── __init__.py
│   └── func.py
├── job_wordcount.py
from mylibs.func import my_func

こちらのオプションを利用します。

    parser.add_argument(
        '--setup_file',
        default=None,
        help=
        ('Path to a setup Python file containing package dependencies. If '
         'specified, the file\'s containing folder is assumed to have the '
         'structure required for a setuptools setup package. The file must be '
         'named setup.py. More details: '
         'https://pythonhosted.org/an_example_pypi_project/setuptools.html '
         'During job submission a source distribution will be built and the '
         'worker will install the resulting package before running any custom '
         'code.'))

引用元:https://github.com/apache/beam/blob/v2.16.0/sdks/python/apache_beam/options/pipeline_options.py#L745-L756

設定例

設定方法は、requirements.txtの時と同じです。

setup.pyにローカルのパッケージを追加します。

from distutils.core import setup

setup(name='wordcount',
      version='1.0',
      description='wordcount setup',
      author='yusuke.otomo',
      author_email='yomon8@outlook.jp',
      url='https://yomon.hatenablog.com',
      packages=['mylibs'])

その上で SetupOptionsetup_file を設定すればOKです。

options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DataflowRunner'
setup_option = options.view_as(SetupOptions)
setup_option.view_as(SetupOptions).setup_file = './setup.py'

# 省略

p = beam.Pipeline(options=options)

実行すると以下のコードにあるように workflow.tar.gz として、GCS上のstagingの場所にtar ballに纏められて出力されます。

https://github.com/apache/beam/blob/v2.16.0/sdks/python/apache_beam/runners/portability/stager.py#L74

関連

この辺りの処理は、以下のコードを見ると動きわかります。

https://github.com/apache/beam/blob/v2.16.0/sdks/python/apache_beam/runners/portability/stager.py

Cloud ComposerからDataflowTemplateOperatorでwordcountジョブを実行する - YOMON8.NET