Dataflow+PythonでNFS接続でFilestoreのファイルを読み込む

DataflowでNFS上のテキストファイルを読み込む方法です。

f:id:yomon8:20211013130658p:plain

はじめに

最初はOSレベルでマウントできればと思ったのですが上手くいかなかったので、こちらのパッケージを使っています。

pypi.org

Google CloudのマネージドのNFSサーバーであるFilestore上に配置したテキストファイルを読み込んでみます。

Filestoreの準備

Filestoreインスタンスの作成

まずはFilestoreのインスタンスを作成しておきます。ネットワークはDataflow等が動くネットワークからアクセス可能なネットワークを指定します。

https://console.cloud.google.com/filestore/instances

f:id:yomon8:20211013091332p:plain:w500

作成後にNFSマウントポイントが確認できるので、メモしておきます。

f:id:yomon8:20211013091441p:plain:w500

GCEからマウント

GCEからNFSマウントポイントをマウントしてみます。

$ sudo mkdir -p /mnt/filestore
$ sudo mount 10.166.144.2:/share /mnt/filestore 

今回読み込みに使うテキストファイルを作成します。

$ sudo mkdir -p /mnt/filestore/data
$ cat <<EOF | sudo tee /mnt/filestore/data/sample.txt
sample001
sample002
sample003
sample004
sample005
EOF

テキストファイルの中身を確認してみます。こちらをDataflowから読み込みます。

$ cat /mnt/filestore/data/sample.txt
sample001
sample002
sample003
sample004
sample005

Dataflow実装

Pipeline(Python)

pipeline.py というファイル名で以下を作成します。

import logging
from contextlib import contextmanager

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class NFSOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            "--input_file",
            type=str,
            help="Path of the file to read from",
        )
        parser.add_value_provider_argument(
            "--nfs_share",
            type=str,
            help="Path of the file to read from",
        )


class NfsFileReader(beam.DoFn):
    def __init__(self, nfs_share, input_file):
        self._nfs_share = nfs_share
        self._input_file = input_file

    @contextmanager
    def _open_nfs_file(self):
        import libnfs

        nfs = libnfs.NFS(self._nfs_share.get())
        f = nfs.open(self._input_file.get())
        try:
            yield f
        finally:
            if f != None:
                f.close()

    def process(self, element):
        with self._open_nfs_file() as f:
            for line in f.read().splitlines():
                yield line


def main():
    options = PipelineOptions()

    with beam.Pipeline(options=options) as p:

        nfs_options = options.view_as(NFSOptions)
        logging.getLogger().setLevel(logging.INFO)

        (
            p
            | "start" >> beam.Create([None])
            | "read text file"
            >> beam.ParDo(
                NfsFileReader(
                    nfs_share=nfs_options.nfs_share,
                    input_file=nfs_options.input_file,
                )
            )
            | "write log" >> beam.Map(lambda r: logging.info(r))
        )


if __name__ == "__main__":
    main()

Setup.py

NFSへの接続に利用するライブラリを setup.py にてインストールします。

from distutils.command.build import build as _build  # type: ignore
import subprocess
import logging

import setuptools


class build(_build):
    sub_commands = _build.sub_commands + [("CustomCommands", None)]


CUSTOM_COMMANDS = [
    ["sudo", "apt", "update"],
    ["sudo", "apt", "install", "-y", "gcc", "libnfs-dev"],
]


class CustomCommands(setuptools.Command):
    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        logging.info("Running command: %s" % command_list)
        p = subprocess.Popen(
            command_list,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )
        stdout_data, _ = p.communicate()
        logging.info("Command output: %s" % stdout_data)
        if p.returncode != 0:
            raise RuntimeError(
                "Command %s failed: exit code: %s" % (command_list, p.returncode)
            )

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)


REQUIRED_PACKAGES = [
    "libnfs==1.0.post4",
]

setuptools.setup(
    name="dataflow-nfs-read-sample",
    version="0.1",
    description="read text file from nfs",
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        "build": build,
        "CustomCommands": CustomCommands,
    },
)

OS系のパッケージはこちらの部分でインストール指定しています。

CUSTOM_COMMANDS = [
    ["sudo", "apt", "update"],
    ["sudo", "apt", "install", "-y", "gcc", "libnfs-dev"],
]

Pythonパッケージの libnfs はこちらで指定してインストールします。

REQUIRED_PACKAGES = [
    "libnfs==1.0.post4",
]

ジョブ起動スクリプト例

Dataflowジョブ起動用のスクリプトも用意しました。 Parameters のセクションを自身の環境に合わせて変更して使います。

#!/bin/bash -eu
# --------------------
# Parameters
PROJECT=your-project
BUCKET=your-bucket
NFS_SHARE="10.166.144.2/share"
INPUT_FILE="/share/test"
SUBNETWORK=your-subnetwork
# --------------------

PROJECT=${PROJECT}
STAGING_BUCKET=${BUCKET}
REGION="asia-northeast1"
SETUP_FILE="./setup.py"
SUBNETWORK=regions/${REGION}/subnetworks/${SUBNETWORK}
STAGING_BUCKET_PATH="gs://${STAGING_BUCKET}"
STAGING_LOCATION="$STAGING_BUCKET_PATH/staging"
TEMP_LOCATION="$STAGING_BUCKET_PATH/temp"
TEMPLATE_LOCATION="${STAGING_BUCKET_PATH}/templates"
RUNNER=DataflowRunner

python run_pipeline.py \
  --project=${PROJECT} \
  --runner=${RUNNER} \
  --region=${REGION} \
  --staging_location=${STAGING_LOCATION} \
  --temp_location=${TEMP_LOCATION} \
  --template_location=${TEMPLATE_LOCATION} \
  --setup_file=${SETUP_FILE}

 gcloud dataflow jobs run "read-from-nfs-`date +%Y%m%d-%H%M%S`" \
  --project=$PROJECT \
  --region=$REGION \
  --gcs-location=$TEMPLATE_LOCATION \
  --staging-location=$STAGING_LOCATION \
  --disable-public-ips \
  --parameters="nfs_share=nfs://${NFS_SHARE},input_file=${INPUT_FILE}" \
  --subnetwork=$SUBNETWORK

実行してみる

作成したジョブ起動用のスクリプトを実行します。

$ ./run.sh
# 省略
createTime: '2021-10-13T04:04:28.374985Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: 2021-10-12_21_04_27-13575594679950095882
location: asia-northeast1
name: read-from-nfs-20211013-130426
projectId: your-project
startTime: '2021-10-13T04:04:28.374985Z'
type: JOB_TYPE_BATCH

以下のようにジョブが実行されます。

f:id:yomon8:20211013131154p:plain

先程書き込んだファイルの内容がログに出力されているのが確認できました。

f:id:yomon8:20211013131221p:plain