DataflowでNFS上のテキストファイルを読み込む方法です。
はじめに
最初はOSレベルでマウントできればと思ったのですが上手くいかなかったので、こちらのパッケージを使っています。
Google CloudのマネージドのNFSサーバーであるFilestore上に配置したテキストファイルを読み込んでみます。
Filestoreの準備
Filestoreインスタンスの作成
まずはFilestoreのインスタンスを作成しておきます。ネットワークはDataflow等が動くネットワークからアクセス可能なネットワークを指定します。
https://console.cloud.google.com/filestore/instances
作成後にNFSマウントポイントが確認できるので、メモしておきます。
GCEからマウント
GCEからNFSマウントポイントをマウントしてみます。
$ sudo mkdir -p /mnt/filestore $ sudo mount 10.166.144.2:/share /mnt/filestore
今回読み込みに使うテキストファイルを作成します。
$ sudo mkdir -p /mnt/filestore/data $ cat <<EOF | sudo tee /mnt/filestore/data/sample.txt sample001 sample002 sample003 sample004 sample005 EOF
テキストファイルの中身を確認してみます。こちらをDataflowから読み込みます。
$ cat /mnt/filestore/data/sample.txt sample001 sample002 sample003 sample004 sample005
Dataflow実装
Pipeline(Python)
pipeline.py
というファイル名で以下を作成します。
import logging from contextlib import contextmanager import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions class NFSOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument( "--input_file", type=str, help="Path of the file to read from", ) parser.add_value_provider_argument( "--nfs_share", type=str, help="Path of the file to read from", ) class NfsFileReader(beam.DoFn): def __init__(self, nfs_share, input_file): self._nfs_share = nfs_share self._input_file = input_file @contextmanager def _open_nfs_file(self): import libnfs nfs = libnfs.NFS(self._nfs_share.get()) f = nfs.open(self._input_file.get()) try: yield f finally: if f != None: f.close() def process(self, element): with self._open_nfs_file() as f: for line in f.read().splitlines(): yield line def main(): options = PipelineOptions() with beam.Pipeline(options=options) as p: nfs_options = options.view_as(NFSOptions) logging.getLogger().setLevel(logging.INFO) ( p | "start" >> beam.Create([None]) | "read text file" >> beam.ParDo( NfsFileReader( nfs_share=nfs_options.nfs_share, input_file=nfs_options.input_file, ) ) | "write log" >> beam.Map(lambda r: logging.info(r)) ) if __name__ == "__main__": main()
Setup.py
NFSへの接続に利用するライブラリを setup.py
にてインストールします。
from distutils.command.build import build as _build # type: ignore import subprocess import logging import setuptools class build(_build): sub_commands = _build.sub_commands + [("CustomCommands", None)] CUSTOM_COMMANDS = [ ["sudo", "apt", "update"], ["sudo", "apt", "install", "-y", "gcc", "libnfs-dev"], ] class CustomCommands(setuptools.Command): def initialize_options(self): pass def finalize_options(self): pass def RunCustomCommand(self, command_list): logging.info("Running command: %s" % command_list) p = subprocess.Popen( command_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) stdout_data, _ = p.communicate() logging.info("Command output: %s" % stdout_data) if p.returncode != 0: raise RuntimeError( "Command %s failed: exit code: %s" % (command_list, p.returncode) ) def run(self): for command in CUSTOM_COMMANDS: self.RunCustomCommand(command) REQUIRED_PACKAGES = [ "libnfs==1.0.post4", ] setuptools.setup( name="dataflow-nfs-read-sample", version="0.1", description="read text file from nfs", install_requires=REQUIRED_PACKAGES, packages=setuptools.find_packages(), cmdclass={ "build": build, "CustomCommands": CustomCommands, }, )
OS系のパッケージはこちらの部分でインストール指定しています。
CUSTOM_COMMANDS = [ ["sudo", "apt", "update"], ["sudo", "apt", "install", "-y", "gcc", "libnfs-dev"], ]
Pythonパッケージの libnfs
はこちらで指定してインストールします。
REQUIRED_PACKAGES = [
"libnfs==1.0.post4",
]
ジョブ起動スクリプト例
Dataflowジョブ起動用のスクリプトも用意しました。 Parameters
のセクションを自身の環境に合わせて変更して使います。
#!/bin/bash -eu # -------------------- # Parameters PROJECT=your-project BUCKET=your-bucket NFS_SHARE="10.166.144.2/share" INPUT_FILE="/share/test" SUBNETWORK=your-subnetwork # -------------------- PROJECT=${PROJECT} STAGING_BUCKET=${BUCKET} REGION="asia-northeast1" SETUP_FILE="./setup.py" SUBNETWORK=regions/${REGION}/subnetworks/${SUBNETWORK} STAGING_BUCKET_PATH="gs://${STAGING_BUCKET}" STAGING_LOCATION="$STAGING_BUCKET_PATH/staging" TEMP_LOCATION="$STAGING_BUCKET_PATH/temp" TEMPLATE_LOCATION="${STAGING_BUCKET_PATH}/templates" RUNNER=DataflowRunner python run_pipeline.py \ --project=${PROJECT} \ --runner=${RUNNER} \ --region=${REGION} \ --staging_location=${STAGING_LOCATION} \ --temp_location=${TEMP_LOCATION} \ --template_location=${TEMPLATE_LOCATION} \ --setup_file=${SETUP_FILE} gcloud dataflow jobs run "read-from-nfs-`date +%Y%m%d-%H%M%S`" \ --project=$PROJECT \ --region=$REGION \ --gcs-location=$TEMPLATE_LOCATION \ --staging-location=$STAGING_LOCATION \ --disable-public-ips \ --parameters="nfs_share=nfs://${NFS_SHARE},input_file=${INPUT_FILE}" \ --subnetwork=$SUBNETWORK
実行してみる
作成したジョブ起動用のスクリプトを実行します。
$ ./run.sh # 省略 createTime: '2021-10-13T04:04:28.374985Z' currentStateTime: '1970-01-01T00:00:00Z' id: 2021-10-12_21_04_27-13575594679950095882 location: asia-northeast1 name: read-from-nfs-20211013-130426 projectId: your-project startTime: '2021-10-13T04:04:28.374985Z' type: JOB_TYPE_BATCH
以下のようにジョブが実行されます。
先程書き込んだファイルの内容がログに出力されているのが確認できました。