追記
今は INFORMATION_SCHEMA
を使うことで同じ情報が簡単に取得できます。
SELECT * FROM `region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_TIMELINE_BY_PROJECT
クエリの履歴情報等を取得したい時はBigQueryのジョブ履歴から情報拾えます。課金されるバイト数などもあり、後からレポートしたいデータも入っています。
bqコマンドなら bq ls -j -a
で一覧して bq --format=prettyjson show -j <jobid>
すればJSON形式で取得できますが、どうもこちらの情報は、システムテーブルなどで持っているわけでは無く、APIから取得が必要そうです。
なので、BigQueryに取り込む方法を作ってみました。
Load処理
from datetime import datetime from collections import namedtuple import pyarrow as pa import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from google.cloud import bigquery # 変数 project = 'your-gcp-project-id' # 2020/01/29のジョブ履歴を取得する設定 min_creation_time = datetime(2020, 1, 29) max_creation_time = datetime(2020, 1, 30) # Parquet出力先のGCSのパス output_file_prefix = 'gs://otomo-bq-history/data/{}/bq_history_'.format(min_creation_time.strftime('%Y%m%d')) # -> gs://otomo-bq-history/data/20200129/bq-history # Parquet出力時のスキーマ定義 Field = namedtuple('Field', 'name type') schema = [ Field('project', pa.string()), Field('location', pa.string()), Field('job_id', pa.string()), Field('job_type', pa.string()), Field('state', pa.string()), Field('user_email', pa.string()), Field('created', pa.date64()), Field('started', pa.date64()), Field('ended', pa.date64()), Field('total_bytes_billed', pa.int64()), Field('total_bytes_processed', pa.int64()), Field('query', pa.string()), Field('use_query_cache', pa.bool_()), ] columns = [f.name for f in schema] pa_schema = [(f.name, f.type) for f in schema] # BigQueryの履歴情報を取得 class BqQueryJobLoadDoFn(beam.DoFn): def __init__(self, project, min_creation_time, max_creation_time, columns, all_users=True, max_results=100): self.project = project self.min_creation_time = min_creation_time self.max_creation_time = max_creation_time self.columns = columns self.all_users = all_users self.max_results = max_results def process(self, element): bqclient = bigquery.Client(project=self.project) jobs = bqclient.list_jobs( min_creation_time=self.min_creation_time, max_creation_time=self.max_creation_time, all_users=self.all_users, max_results=self.max_results, ) for j in jobs: if not isinstance(j, bigquery.job.QueryJob): continue entries = {} for c in self.columns: if hasattr(j, c): entries.update({c: getattr(j, c)}) else: entries.update({c: None}) yield entries # Apache Beam処理の定義 p = beam.Pipeline() loader = BqQueryJobLoadDoFn( project=project, columns=columns, min_creation_time=min_creation_time, max_creation_time=max_creation_time, ) (p | 'Start' >> beam.Create([None]) | 'Get BigQuery Job History' >> beam.ParDo(loader) | 'Write to Parquet' >> beam.io.WriteToParquet( file_path_prefix=output_file_prefix, schema=pa.schema(pa_schema), file_name_suffix='.parquet', )) result = p.run() result.wait_until_finish()
実行すると、Parquetファイルが指定したディレクトリに出力されました。
BigQueryでデータセットを作成
$ bq mk --location asia-northeast1 bq_history Dataset 'your-project:bq_history' successfully created.
Parquetファイルのロード
日付毎の分割テーブルとして取り込みます。
bq load --source_format=PARQUET \ --time_partitioning_type=DAY \ 'bq_history.query_history$20200129' \ "gs://otomo-bq-history/data/20200129/bq-history*.parquet"
BigQueryでクエリ
BigQueryでSELECTできるようになりました。
Dataflowで動かす場合には権限に注意してください
参考URL
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job