AirflowにはAzureのパッケージが存在しています。
pip install 'apache-airflow[azure]'
その辺りを使って、Azure Blob Storageへオブジェクトがアップロードしたことを検知して、ワークフローを流したいと思いました。
モジュールとしては以下のモジュールが該当するSensorモジュールになります。
https://github.com/apache/airflow/blob/1.10.4/airflow/contrib/sensors/wasb_sensor.py
WasbBlobSensor
と WasbPrefixSensor
がそれにあたり、ドキュメント上も以下などに記載があるのですが、結構わかりにくかったのでメモしておきます。
最新番のAzureのSDKでは動かない
いきなりのつっかかりポイントですが、現時点では最新版のAzure SDKでは上記のモジュールは動きません。
エラーとGithubの変更履歴追って、ここまで遡ってやっと動きました。Azure の SDKはかなり進化早い気がするのと、メジャーバージョンも頻繁に上がるイメージがあります。何より情報が少ない。
最新版では動かないことが、それなりに多いイメージがあります。
azure-storage-blob == 1.4.0
引数を確認
WasbBlobSensor
と WasbPrefixSensor
のシグネチャ見てみます。
class WasbBlobSensor(BaseSensorOperator): """ Waits for a blob to arrive on Azure Blob Storage. :param container_name: Name of the container. :type container_name: str :param blob_name: Name of the blob. :type blob_name: str :param wasb_conn_id: Reference to the wasb connection. :type wasb_conn_id: str :param check_options: Optional keyword arguments that `WasbHook.check_for_blob()` takes. :type check_options: dict """
class WasbPrefixSensor(BaseSensorOperator): """ Waits for blobs matching a prefix to arrive on Azure Blob Storage. :param container_name: Name of the container. :type container_name: str :param prefix: Prefix of the blob. :type prefix: str :param wasb_conn_id: Reference to the wasb connection. :type wasb_conn_id: str :param check_options: Optional keyword arguments that `WasbHook.check_for_prefix()` takes. :type check_options: dict """
container_name
や blob_name
はAzureのBlobの設定そのままですし、 prefix
もBlobのPrefixということでわかりやすいのですが、wasb_conn_id
だけ迷うかもしれないので説明しておきます。
ここにはAirflowのコネクションのIDを入れます。
Airflowのコネクションは以下から設定できます。
設定内容はストレージアカウントの接続文字列から取得します。
DefaultEndpointsProtocol=https;AccountName=youraccountname;AccountKey=xxxxxx==;EndpointSuffix=core.windows.net
AccountName
を login
に、AccountKey
を password
に読み替えて以下のように設定します。
ここで設定されたコネクションのID(ここではmy-azure-blob)を wasb_conn_id
に指定します。
実行してみる
from airflow import DAG from airflow.contrib.sensors.wasb_sensor import WasbBlobSensor,WasbPrefixSensor from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time()) default_args = { 'owner': 'yusuke.otomo', 'start_date': yesterday, } dag = DAG('try_azure_blob_sensor', default_args=default_args) t1 = WasbBlobSensor( task_id='sensor_blob', container_name='test-container', blob_name='my_file.csv', wasb_conn_id='my-azure-blob', dag=dag) t2 = WasbPrefixSensor( task_id='sensor_blob_prefix', container_name='test-container', prefix='myprefix_', wasb_conn_id='my-azure-blob', dag=dag) t3 = BashOperator( task_id='check', bash_command='echo "{{ params.message }}"', params={'message': 'Blob Objects, Found!'}, dag=dag) t3.set_upstream([t1,t2])
最初はWasbBlobSensorで定義したSensorも、WasbPrefixSensorで定義したSensorも、実行状態の黄緑色です。内部的にはポーリングしているようです。
AzureのBlobにWasbBlobSensorで指定した名前でファイルをアップロードしてみます。
WasbBlobSensorのタスクが終わりました。
今度はWasbPrefixSensorで定義したPrefixに合う名前のファイルをアップロードします。
両方の前提が終わり、後続タスクが実行されました。