Pythonで書くDataflowテンプレートでサードパーティ製JDBCドライバを使う

この記事の続きです。

yomon.hatenablog.com

以下にもある通り、今書いている時点ではApache BeamのPython SDKはビルトインでJDBC対応していません。

beam.apache.org

PythonでJDBCドライバ使いたかったのはDataflowのPython SDK使ってもJDBC接続使いたかったからです。

上記の記事でJDBCをPythonから使えるところは確認できているので、今度はDataflowにテンプレート登録してみます。

Pythonコード準備

requirements.txt を準備します。

※ 記事書いている時点のJayDeBeApiのPyPi上のバージョンだとJPype1==0.7では動かないが修正される模様

JPype1==0.6.3
JayDeBeApi

最初の参照記事に沿って、 mssql-jdbc-7.4.1.jre8.jar も準備しておきます。

最初に参照した記事と同じくSQL Serverに接続するスクリプトです。

ポイントは以下のオプションでローカルPC上のJARを指定すると、Dataflowのテンプレートに取り込んでくれることです。

 worker_options.dataflow_worker_jar = JDBC_JAR_FILE 
from __future__ import absolute_import
import argparse
from contextlib import contextmanager

from past.builtins import unicode

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
import jaydebeapi
import logging

###########################
# DB接続関連
###########################
CLASS_NAME = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
USER_NAME = 'DBユーザ名'
PASSWORD = 'DBユーザパスワード'
DATABASE = 'データベース名'
HOST = 'DBホスト名'
# JDBCのSQL Server用の書式です
JDBC_URL = 'jdbc:sqlserver://{}:1433;database={};encrypt=true;'.format(
    HOST, DATABASE)
SQL_QUERY = 'SELECT TOP(100) * FROM dbo.TableA'
# ローカルPCにあるJDBCドライバのJARファイルを指定
JDBC_JAR_FILE = './mssql-jdbc-7.4.1.jre8.jar'

###########################
# Dataflow関連
###########################
JOB_TEMPLATE_NAME = 'azuredatabasejdbc'
REGION = 'asia-northeast1'
VPC_NAME = 'GCPのVPC名'
SUBNET = 'GCPのサブネット名'
# ローカルPCに準備しておいたrequirements.txtを指定
REQUIREMENTS_FILE = './requirements.txt'



def logging_row(row):
    logging.info(row)


def parse_jdbc_entry(table_data):
    for r in table_data:
        yield [c.value if hasattr(c, 'value') else c for c in r]


@contextmanager
def open_db_connection(class_name, jdbc_url, user, password):
    conn = jaydebeapi.connect(class_name,
                              jdbc_url,
                              driver_args=[user, password])
    try:
        yield conn
    finally:
        conn.close()


if __name__ == '__main__':
    import sys
    parser = argparse.ArgumentParser()
    parser.add_argument('--project')
    parser.add_argument('--region')
    parser.add_argument('--base-bucket')
    known_args, _ = parser.parse_known_args(sys.argv)

    options = PipelineOptions()
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    worker_options = options.view_as(WorkerOptions)

    worker_options.dataflow_worker_jar = JDBC_JAR_FILE 
    worker_options.network = VPC_NAME
    # サブネットはregions/REGION/subnetworks/SUBNETWORK の形式で指定
    worker_options.subnetwork = 'regions/{}/subnetworks/{}'.format(
        REGION, SUBNET)

    setup_option = options.view_as(SetupOptions)
    setup_option.requirements_file = REQUIREMENTS_FILE

    gcp_options = options.view_as(GoogleCloudOptions)
    gcp_options.project = known_args.project
    gcp_options.region = known_args.region
    gcp_options.template_location = 'gs://{}/dataflow/templates/{}'.format(
        known_args.base_bucket, JOB_TEMPLATE_NAME)
    gcp_options.temp_location = 'gs://{}/temp/{}/'.format(
        known_args.base_bucket, JOB_TEMPLATE_NAME)
    gcp_options.staging_location = 'gs://{}/dataflow/staging/{}/'.format(
        known_args.base_bucket, JOB_TEMPLATE_NAME)

    with open_db_connection(CLASS_NAME, JDBC_URL, USER_NAME, PASSWORD) as conn:
        cur = conn.cursor()
        cur.execute(SQL_QUERY)
        p = beam.Pipeline(options=options)
        (p
         | 'Select Table' >> beam.Create(parse_jdbc_entry(cur.fetchall()))
         | 'Logging Rows' >> beam.Map(logging_row))
        result = p.run()
        result.wait_until_finish()

作業用GCSバケット作成

作業用のGCSバケットを作成します。

$ gsutil mb -l asia-northeast1 gs://my-df-bucket-1220

Dataflowテンプレート登録

Pythonスクリプトに引数を渡していくのですが、JayDeBeApiが内部でJDBCのJarを探せるようにCLASSPATHを設定してから実行します。

$ export CLASSPATH=./mssql-jdbc-7.4.1.jre8.jar
$ python -m azure_database --project your-project --region asia-northeast1 --base-bucket my-df-bucket-1220

テンプレートが登録されると、指定したJDBCのJARファイルは、 staging_location として設定したGCSの場所に dataflow-worker.jar として纏められて配置されます。

f:id:yomon8:20191220101858p:plain

Dataflowテンプレートの実行

テンプレートからジョブを実行で、先ほど登録したテンプレートを実行してみます。

f:id:yomon8:20191220102222p:plain

以下の定数で設定してあったSELECT文の実行結果が表示されました。

SQL_QUERY = 'SELECT TOP(100) * FROM dbo.TableA'

f:id:yomon8:20191220102825p:plain

ParDoで使う

ParDoで使って、SQLをパラメータ化したい場合などは以下のように書くことも可能です。

from apache_beam.options.value_provider import check_accessible


class SqlDatabaseOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--sql_query',
            type=str,
            help='SELECT statement for Azure SQL Database',
        )


class ReadJdbcSourceDoFn(beam.DoFn):
    def __init__(self, url, user, password, sql):
        self._url = url
        self._user = user
        self._password = password
        self._sql = sql

    @contextmanager
    def _open_sqlserver_connection(self, url, user, password):
        import os
        import jaydebeapi
        # Dataflow V2の場合はCLASSPATHが '/var/opt/google/staged/dataflow-worker.jar' となります
        os.environ['CLASSPATH'] = '/var/opt/google/dataflow/dataflow-worker.jar'
        class_name = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
        conn = jaydebeapi.connect(
            class_name,
            url,
            driver_args=[user, password],
        )
        try:
            yield conn
        finally:
            if conn != None:
                conn.close()

    @check_accessible(['_sql'])
    def process(self, element):
        with self._open_sqlserver_connection(self._url, self._user, self._password) as conn:
            cur = conn.cursor()
            cur.execute(self._sql.get())
            while True:
                row = cur.fetchone()
                if row == None:
                    break
                yield [col.value if hasattr(col, 'value') else col for col in row]

ただ、この場合、worker側にJavaをインストールしないといけないので、--setup_file オプションに以下を参考に setup.pyを渡してあげる必要があります。

beam/setup.py at v2.16.0 · apache/beam · GitHub

例えば、CUSTOM_COMMANDSを以下のようにすると、workerにJavaをインストールすることができます。

CUSTOM_COMMANDS = [
    ['sudo', 'apt-get', 'update'],
    ['sudo', 'apt-get', 'install', '--assume-yes', 'openjdk-8-jre'],
]
2020/10/02追記

最新のBeam 2.24.0 を使う場合はOSイメージが異なるので、Java 11を利用します。

CUSTOM_COMMANDS = [
    ["sudo", "mkdir", "-p", "/usr/share/man/man1"],
    ["sudo", "apt", "update"],
    ["sudo", "apt", "install", "-y", "openjdk-11-jre"],
]

yomon.hatenablog.com