Cloud Composerでトラブルシュートしたので少しマニアックですが残しておきます。
事象
Airflowの BigQueryValueCheckOperator
を実行すると以下のエラーとなりました。404なのでBigQueryのジョブが見つからないみたいなエラーなのですが、実際に管理画面から見ると該当のIDのジョブは正常終了しています。
[2021-10-22 13:12:46,591] {taskinstance.py:1147} ERROR - <HttpError 404 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/my-pj/queries/job_xxxxxxxxxxxxxx?alt=json returned "Not found: Job my-pj:job_xxxxxxxxxxxxxx"> Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas result = task_copy.execute(context=context File "/usr/local/lib/airflow/airflow/operators/check_operator.py", line 152, in execut records = self.get_db_hook().get_first(self.sql File "/usr/local/lib/airflow/airflow/hooks/dbapi_hook.py", line 142, in get_firs return cur.fetchone( File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 2039, in fetchon return self.next( File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 2058, in nex pageToken=self.page_token).execute(num_retries=self.num_retries) File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrappe return wrapped(*args, **kwargs File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 898, in execut raise HttpError(resp, content, uri=self.uri googleapiclient.errors.HttpError: <HttpError 404 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/my-pj/queries/job_xxxxxxxxxxxxxx?alt=json returned "Not found: Job my-pj:job_xxxxxxxxxxxxxx"
原因
内部で使われている BigQueryHook
への location
変数の連携が上手くいってないのが原因でした。
get_db_hook
で location
引数を指定してないので **kwargs等を経由しても渡せません。
class BigQueryValueCheckOperator(SQLValueCheckOperator): # 省略 @apply_defaults def __init__(self, sql, pass_value, tolerance=None, bigquery_conn_id='bigquery_default', use_legacy_sql=True, *args, **kwargs): # 省略 def get_db_hook(self): return BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, use_legacy_sql=self.use_legacy_sql)
解決策
V2に上げるのが一つの方法です。そちらではコード読む限り location
引数が引き渡せる仕様になってそうです。(すみません試せてないです)
V1でなら以下のように BigQueryValueCheckOperator
を継承して、get_db_hook
をオーバーライドしてlocation
を引き渡せばとりあえず動かすこと可能です。
class MyBigQueryValueCheckOperator(BigQueryValueCheckOperator): def __init__( self, sql, pass_value, location, tolerance=None, bigquery_conn_id="bigquery_default", use_legacy_sql=True, *args, **kwargs ): super().__init__( sql=sql, pass_value=pass_value, tolerance=tolerance, bigquery_conn_id=bigquery_conn_id, use_legacy_sql=use_legacy_sql, *args, **kwargs ) self.location = location def get_db_hook(self): return BigQueryHook( bigquery_conn_id=self.bigquery_conn_id, use_legacy_sql=self.use_legacy_sql, location=self.location, )
参考URL
[jira] [Updated] (AIRFLOW-5249) BigQueryCheckOperator fails for datasets outside of 'US' region