Cloud Dataflow + Python で作るテンプレートを登録する際に、pipでインストール可能なPyPiなどの外部パッケージをどうやって組み込むか調べました。
requirements.txtでpypi等の外部パッケージ使う方法
結局ドキュメントは見つからなかったのですが、ソースコード読んでいたら以下のオプションを見つけました。
class SetupOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Options for installing dependencies in the worker. parser.add_argument( '--requirements_file', default=None, help= ('Path to a requirements file containing package dependencies. ' 'Typically it is produced by a pip freeze command. More details: ' 'https://pip.pypa.io/en/latest/reference/pip_freeze.html. ' 'If used, all the packages specified will be downloaded, ' 'cached (use --requirements_cache to change default location), ' 'and then staged so that they can be automatically installed in ' 'workers during startup. The cache is refreshed as needed ' 'avoiding extra downloads for existing packages. Typically the ' 'file is named requirements.txt.'))
設定例
以下のようにPipeline OptionのSetupOptoinsに requirements_fileとして設定します。
options = PipelineOptions() options.view_as(StandardOptions).runner = 'DataflowRunner' setup_option = options.view_as(SetupOptions) setup_option.requirements_file = "./requirements.txt" # 省略 p = beam.Pipeline(options=options)
テンプレート作成
テンプレートを作成してみると、staging_locationに指定したバケットに requirements.txt がアップロードされます。
テンプレート作成が完了してから見てみると、requirements.txtに記載したパッケージがアップロードされています。後はテンプレート実行すれば、パッケージを利用した処理が可能です。
setup.pyでローカルパッケージも使う方法
上記のrequirements.txtでは外部パッケージは取り込めますが、以下のようにローカルのパッケージを組み込むには setup.py
を使います。
. ├── mylibs │ ├── __init__.py │ └── func.py ├── job_wordcount.py
from mylibs.func import my_func
こちらのオプションを利用します。
parser.add_argument( '--setup_file', default=None, help= ('Path to a setup Python file containing package dependencies. If ' 'specified, the file\'s containing folder is assumed to have the ' 'structure required for a setuptools setup package. The file must be ' 'named setup.py. More details: ' 'https://pythonhosted.org/an_example_pypi_project/setuptools.html ' 'During job submission a source distribution will be built and the ' 'worker will install the resulting package before running any custom ' 'code.'))
設定例
設定方法は、requirements.txtの時と同じです。
setup.pyにローカルのパッケージを追加します。
from distutils.core import setup setup(name='wordcount', version='1.0', description='wordcount setup', author='yusuke.otomo', author_email='yomon8@outlook.jp', url='https://yomon.hatenablog.com', packages=['mylibs'])
その上で SetupOption
の setup_file
を設定すればOKです。
options = PipelineOptions() options.view_as(StandardOptions).runner = 'DataflowRunner' setup_option = options.view_as(SetupOptions) setup_option.view_as(SetupOptions).setup_file = './setup.py' # 省略 p = beam.Pipeline(options=options)
実行すると以下のコードにあるように workflow.tar.gz
として、GCS上のstagingの場所にtar ballに纏められて出力されます。
関連
この辺りの処理は、以下のコードを見ると動きわかります。
https://github.com/apache/beam/blob/v2.16.0/sdks/python/apache_beam/runners/portability/stager.py
Cloud ComposerからDataflowTemplateOperatorでwordcountジョブを実行する - YOMON8.NET