この記事の続きです。
以下にもある通り、今書いている時点ではApache BeamのPython SDKはビルトインでJDBC対応していません。
PythonでJDBCドライバ使いたかったのはDataflowのPython SDK使ってもJDBC接続使いたかったからです。
上記の記事でJDBCをPythonから使えるところは確認できているので、今度はDataflowにテンプレート登録してみます。
Pythonコード準備
requirements.txt
を準備します。
※ 記事書いている時点のJayDeBeApiのPyPi上のバージョンだとJPype1==0.7では動かないが修正される模様
JPype1==0.6.3 JayDeBeApi
最初の参照記事に沿って、 mssql-jdbc-7.4.1.jre8.jar
も準備しておきます。
最初に参照した記事と同じくSQL Serverに接続するスクリプトです。
ポイントは以下のオプションでローカルPC上のJARを指定すると、Dataflowのテンプレートに取り込んでくれることです。
worker_options.dataflow_worker_jar = JDBC_JAR_FILE
from __future__ import absolute_import import argparse from contextlib import contextmanager from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions, StandardOptions, WorkerOptions import jaydebeapi import logging ########################### # DB接続関連 ########################### CLASS_NAME = 'com.microsoft.sqlserver.jdbc.SQLServerDriver' USER_NAME = 'DBユーザ名' PASSWORD = 'DBユーザパスワード' DATABASE = 'データベース名' HOST = 'DBホスト名' # JDBCのSQL Server用の書式です JDBC_URL = 'jdbc:sqlserver://{}:1433;database={};encrypt=true;'.format( HOST, DATABASE) SQL_QUERY = 'SELECT TOP(100) * FROM dbo.TableA' # ローカルPCにあるJDBCドライバのJARファイルを指定 JDBC_JAR_FILE = './mssql-jdbc-7.4.1.jre8.jar' ########################### # Dataflow関連 ########################### JOB_TEMPLATE_NAME = 'azuredatabasejdbc' REGION = 'asia-northeast1' VPC_NAME = 'GCPのVPC名' SUBNET = 'GCPのサブネット名' # ローカルPCに準備しておいたrequirements.txtを指定 REQUIREMENTS_FILE = './requirements.txt' def logging_row(row): logging.info(row) def parse_jdbc_entry(table_data): for r in table_data: yield [c.value if hasattr(c, 'value') else c for c in r] @contextmanager def open_db_connection(class_name, jdbc_url, user, password): conn = jaydebeapi.connect(class_name, jdbc_url, driver_args=[user, password]) try: yield conn finally: conn.close() if __name__ == '__main__': import sys parser = argparse.ArgumentParser() parser.add_argument('--project') parser.add_argument('--region') parser.add_argument('--base-bucket') known_args, _ = parser.parse_known_args(sys.argv) options = PipelineOptions() options.view_as(StandardOptions).runner = 'DataflowRunner' worker_options = options.view_as(WorkerOptions) worker_options.dataflow_worker_jar = JDBC_JAR_FILE worker_options.network = VPC_NAME # サブネットはregions/REGION/subnetworks/SUBNETWORK の形式で指定 worker_options.subnetwork = 'regions/{}/subnetworks/{}'.format( REGION, SUBNET) setup_option = options.view_as(SetupOptions) setup_option.requirements_file = REQUIREMENTS_FILE gcp_options = options.view_as(GoogleCloudOptions) gcp_options.project = known_args.project gcp_options.region = known_args.region gcp_options.template_location = 'gs://{}/dataflow/templates/{}'.format( known_args.base_bucket, JOB_TEMPLATE_NAME) gcp_options.temp_location = 'gs://{}/temp/{}/'.format( known_args.base_bucket, JOB_TEMPLATE_NAME) gcp_options.staging_location = 'gs://{}/dataflow/staging/{}/'.format( known_args.base_bucket, JOB_TEMPLATE_NAME) with open_db_connection(CLASS_NAME, JDBC_URL, USER_NAME, PASSWORD) as conn: cur = conn.cursor() cur.execute(SQL_QUERY) p = beam.Pipeline(options=options) (p | 'Select Table' >> beam.Create(parse_jdbc_entry(cur.fetchall())) | 'Logging Rows' >> beam.Map(logging_row)) result = p.run() result.wait_until_finish()
作業用GCSバケット作成
作業用のGCSバケットを作成します。
$ gsutil mb -l asia-northeast1 gs://my-df-bucket-1220
Dataflowテンプレート登録
Pythonスクリプトに引数を渡していくのですが、JayDeBeApiが内部でJDBCのJarを探せるようにCLASSPATHを設定してから実行します。
$ export CLASSPATH=./mssql-jdbc-7.4.1.jre8.jar $ python -m azure_database --project your-project --region asia-northeast1 --base-bucket my-df-bucket-1220
テンプレートが登録されると、指定したJDBCのJARファイルは、 staging_location
として設定したGCSの場所に dataflow-worker.jar
として纏められて配置されます。
Dataflowテンプレートの実行
テンプレートからジョブを実行で、先ほど登録したテンプレートを実行してみます。
以下の定数で設定してあったSELECT文の実行結果が表示されました。
SQL_QUERY = 'SELECT TOP(100) * FROM dbo.TableA'
ParDoで使う
ParDoで使って、SQLをパラメータ化したい場合などは以下のように書くことも可能です。
from apache_beam.options.value_provider import check_accessible class SqlDatabaseOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument( '--sql_query', type=str, help='SELECT statement for Azure SQL Database', ) class ReadJdbcSourceDoFn(beam.DoFn): def __init__(self, url, user, password, sql): self._url = url self._user = user self._password = password self._sql = sql @contextmanager def _open_sqlserver_connection(self, url, user, password): import os import jaydebeapi # Dataflow V2の場合はCLASSPATHが '/var/opt/google/staged/dataflow-worker.jar' となります os.environ['CLASSPATH'] = '/var/opt/google/dataflow/dataflow-worker.jar' class_name = 'com.microsoft.sqlserver.jdbc.SQLServerDriver' conn = jaydebeapi.connect( class_name, url, driver_args=[user, password], ) try: yield conn finally: if conn != None: conn.close() @check_accessible(['_sql']) def process(self, element): with self._open_sqlserver_connection(self._url, self._user, self._password) as conn: cur = conn.cursor() cur.execute(self._sql.get()) while True: row = cur.fetchone() if row == None: break yield [col.value if hasattr(col, 'value') else col for col in row]
ただ、この場合、worker側にJavaをインストールしないといけないので、--setup_file
オプションに以下を参考に setup.pyを渡してあげる必要があります。
beam/setup.py at v2.16.0 · apache/beam · GitHub
例えば、CUSTOM_COMMANDSを以下のようにすると、workerにJavaをインストールすることができます。
CUSTOM_COMMANDS = [ ['sudo', 'apt-get', 'update'], ['sudo', 'apt-get', 'install', '--assume-yes', 'openjdk-8-jre'], ]
2020/10/02追記
最新のBeam 2.24.0
を使う場合はOSイメージが異なるので、Java 11を利用します。
CUSTOM_COMMANDS = [ ["sudo", "mkdir", "-p", "/usr/share/man/man1"], ["sudo", "apt", "update"], ["sudo", "apt", "install", "-y", "openjdk-11-jre"], ]