Apache AirflowでAzure BLOB Storageのオブジェクトを監視してワークフローをトリガする

AirflowにはAzureのパッケージが存在しています。

pip install 'apache-airflow[azure]'

その辺りを使って、Azure Blob Storageへオブジェクトがアップロードしたことを検知して、ワークフローを流したいと思いました。

モジュールとしては以下のモジュールが該当するSensorモジュールになります。

https://github.com/apache/airflow/blob/1.10.4/airflow/contrib/sensors/wasb_sensor.py

WasbBlobSensorWasbPrefixSensor がそれにあたり、ドキュメント上も以下などに記載があるのですが、結構わかりにくかったのでメモしておきます。

airflow.apache.org

最新番のAzureのSDKでは動かない

いきなりのつっかかりポイントですが、現時点では最新版のAzure SDKでは上記のモジュールは動きません。

エラーとGithubの変更履歴追って、ここまで遡ってやっと動きました。Azure の SDKはかなり進化早い気がするのと、メジャーバージョンも頻繁に上がるイメージがあります。何より情報が少ない。

最新版では動かないことが、それなりに多いイメージがあります。

azure-storage-blob == 1.4.0

引数を確認

WasbBlobSensorWasbPrefixSensor のシグネチャ見てみます。

class WasbBlobSensor(BaseSensorOperator):
    """
    Waits for a blob to arrive on Azure Blob Storage.
    :param container_name: Name of the container.
    :type container_name: str
    :param blob_name: Name of the blob.
    :type blob_name: str
    :param wasb_conn_id: Reference to the wasb connection.
    :type wasb_conn_id: str
    :param check_options: Optional keyword arguments that
        `WasbHook.check_for_blob()` takes.
    :type check_options: dict
    """
class WasbPrefixSensor(BaseSensorOperator):
    """
    Waits for blobs matching a prefix to arrive on Azure Blob Storage.
    :param container_name: Name of the container.
    :type container_name: str
    :param prefix: Prefix of the blob.
    :type prefix: str
    :param wasb_conn_id: Reference to the wasb connection.
    :type wasb_conn_id: str
    :param check_options: Optional keyword arguments that
        `WasbHook.check_for_prefix()` takes.
    :type check_options: dict
    """

container_nameblob_name はAzureのBlobの設定そのままですし、 prefix もBlobのPrefixということでわかりやすいのですが、wasb_conn_id だけ迷うかもしれないので説明しておきます。

ここにはAirflowのコネクションのIDを入れます。

Airflowのコネクションは以下から設定できます。

f:id:yomon8:20191201234228p:plain

設定内容はストレージアカウントの接続文字列から取得します。

DefaultEndpointsProtocol=https;AccountName=youraccountname;AccountKey=xxxxxx==;EndpointSuffix=core.windows.net

AccountNamelogin に、AccountKeypassword に読み替えて以下のように設定します。

f:id:yomon8:20191201234531p:plain

ここで設定されたコネクションのID(ここではmy-azure-blob)を wasb_conn_id に指定します。 f:id:yomon8:20191201234704p:plain

実行してみる

from airflow import DAG
from airflow.contrib.sensors.wasb_sensor import WasbBlobSensor,WasbPrefixSensor
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

yesterday = datetime.combine(datetime.today() - timedelta(1),
                                  datetime.min.time())

default_args = {
    'owner': 'yusuke.otomo',
    'start_date': yesterday,
}

dag = DAG('try_azure_blob_sensor', default_args=default_args)


t1 = WasbBlobSensor(
    task_id='sensor_blob',
    container_name='test-container',
    blob_name='my_file.csv',
    wasb_conn_id='my-azure-blob',
    dag=dag)

t2 = WasbPrefixSensor(
    task_id='sensor_blob_prefix',
    container_name='test-container',
    prefix='myprefix_',
    wasb_conn_id='my-azure-blob',
    dag=dag)

t3 = BashOperator(
    task_id='check',
    bash_command='echo "{{ params.message }}"',
    params={'message': 'Blob Objects, Found!'},
    dag=dag)

t3.set_upstream([t1,t2])

最初はWasbBlobSensorで定義したSensorも、WasbPrefixSensorで定義したSensorも、実行状態の黄緑色です。内部的にはポーリングしているようです。

f:id:yomon8:20191202000346p:plain

AzureのBlobにWasbBlobSensorで指定した名前でファイルをアップロードしてみます。

f:id:yomon8:20191202000452p:plain

WasbBlobSensorのタスクが終わりました。

f:id:yomon8:20191202000544p:plain

今度はWasbPrefixSensorで定義したPrefixに合う名前のファイルをアップロードします。 f:id:yomon8:20191202000624p:plain

両方の前提が終わり、後続タスクが実行されました。 f:id:yomon8:20191202000657p:plain