AirflowのBigQueryValueCheckOperatorがHttpError 404でジョブが見つけられずに止まる

Cloud Composerでトラブルシュートしたので少しマニアックですが残しておきます。

事象

Airflowの BigQueryValueCheckOperator を実行すると以下のエラーとなりました。404なのでBigQueryのジョブが見つからないみたいなエラーなのですが、実際に管理画面から見ると該当のIDのジョブは正常終了しています。

[2021-10-22 13:12:46,591] {taskinstance.py:1147} ERROR - <HttpError 404 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/my-pj/queries/job_xxxxxxxxxxxxxx?alt=json returned "Not found: Job my-pj:job_xxxxxxxxxxxxxx">
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/check_operator.py", line 152, in execut
    records = self.get_db_hook().get_first(self.sql
  File "/usr/local/lib/airflow/airflow/hooks/dbapi_hook.py", line 142, in get_firs
    return cur.fetchone(
  File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 2039, in fetchon
    return self.next(
  File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 2058, in nex
    pageToken=self.page_token).execute(num_retries=self.num_retries)
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrappe
    return wrapped(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 898, in execut
    raise HttpError(resp, content, uri=self.uri
googleapiclient.errors.HttpError: <HttpError 404 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/my-pj/queries/job_xxxxxxxxxxxxxx?alt=json returned "Not found: Job my-pj:job_xxxxxxxxxxxxxx"

原因

内部で使われている BigQueryHook への location 変数の連携が上手くいってないのが原因でした。

get_db_hooklocation 引数を指定してないので **kwargs等を経由しても渡せません。

class BigQueryValueCheckOperator(SQLValueCheckOperator):
# 省略

    @apply_defaults
    def __init__(self, sql,
                 pass_value,
                 tolerance=None,
                 bigquery_conn_id='bigquery_default',
                 use_legacy_sql=True,
                 *args, **kwargs):

# 省略

    def get_db_hook(self):
        return BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                            use_legacy_sql=self.use_legacy_sql)

https://github.com/apache/airflow/blob/1.10.15/airflow/contrib/operators/bigquery_check_operator.py#L82-L111

解決策

V2に上げるのが一つの方法です。そちらではコード読む限り location 引数が引き渡せる仕様になってそうです。(すみません試せてないです)

https://github.com/apache/airflow/blob/2.2.0/airflow/providers/google/cloud/operators/bigquery.py#L197

V1でなら以下のように BigQueryValueCheckOperator を継承して、get_db_hook をオーバーライドしてlocation を引き渡せばとりあえず動かすこと可能です。

class MyBigQueryValueCheckOperator(BigQueryValueCheckOperator):
    def __init__(
        self,
        sql,
        pass_value,
        location,
        tolerance=None,
        bigquery_conn_id="bigquery_default",
        use_legacy_sql=True,
        *args,
        **kwargs
    ):
        super().__init__(
            sql=sql,
            pass_value=pass_value,
            tolerance=tolerance,
            bigquery_conn_id=bigquery_conn_id,
            use_legacy_sql=use_legacy_sql,
            *args,
            **kwargs
        )
        self.location = location

    def get_db_hook(self):
        return BigQueryHook(
            bigquery_conn_id=self.bigquery_conn_id,
            use_legacy_sql=self.use_legacy_sql,
            location=self.location,
        )

参考URL

[jira] [Updated] (AIRFLOW-5249) BigQueryCheckOperator fails for datasets outside of 'US' region