BigQueryの課金情報が含まれるクエリ履歴をテーブルにロードしてSQLで分析できるようにする

クエリの履歴情報等を取得したい時はBigQueryのジョブ履歴から情報拾えます。課金されるバイト数などもあり、後からレポートしたいデータも入っています。

f:id:yomon8:20200131102918p:plain

bqコマンドなら bq ls -j -a で一覧して bq --format=prettyjson show -j <jobid> すればJSON形式で取得できますが、どうもこちらの情報は、システムテーブルなどで持っているわけでは無く、APIから取得が必要そうです。

なので、BigQueryに取り込む方法を作ってみました。

Load処理

from datetime import datetime
from collections import namedtuple

import pyarrow as pa
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from google.cloud import bigquery

# 変数
project = 'your-gcp-project-id'

# 2020/01/29のジョブ履歴を取得する設定
min_creation_time = datetime(2020, 1, 29) 
max_creation_time = datetime(2020, 1, 30)

# Parquet出力先のGCSのパス
output_file_prefix = 'gs://otomo-bq-history/data/{}/bq_history_'.format(min_creation_time.strftime('%Y%m%d'))  # -> gs://otomo-bq-history/data/20200129/bq-history

# Parquet出力時のスキーマ定義
Field = namedtuple('Field', 'name type')
schema = [
    Field('project', pa.string()),
    Field('location', pa.string()),
    Field('job_id', pa.string()),
    Field('job_type', pa.string()),
    Field('state', pa.string()),
    Field('user_email', pa.string()),
    Field('created', pa.date64()),
    Field('started', pa.date64()),
    Field('ended', pa.date64()),
    Field('total_bytes_billed', pa.int64()),
    Field('total_bytes_processed', pa.int64()),
    Field('query', pa.string()),
    Field('use_query_cache', pa.bool_()),
]
columns = [f.name for f in schema]
pa_schema = [(f.name, f.type) for f in schema]


# BigQueryの履歴情報を取得
class BqQueryJobLoadDoFn(beam.DoFn):
    def __init__(self, project, min_creation_time, max_creation_time, columns, all_users=True, max_results=100):
        self.project = project
        self.min_creation_time = min_creation_time
        self.max_creation_time = max_creation_time
        self.columns = columns
        self.all_users = all_users
        self.max_results = max_results

    def process(self, element):
        bqclient = bigquery.Client(project=self.project)
        jobs = bqclient.list_jobs(
            min_creation_time=self.min_creation_time,
            max_creation_time=self.max_creation_time,
            all_users=self.all_users,
            max_results=self.max_results,
        )
        for j in jobs:
            if not isinstance(j, bigquery.job.QueryJob):
                continue
            entries = {}
            for c in self.columns:
                if hasattr(j, c):
                    entries.update({c: getattr(j, c)})
                else:
                    entries.update({c: None})
            yield entries

# Apache Beam処理の定義
p = beam.Pipeline()
loader = BqQueryJobLoadDoFn(
    project=project,
    columns=columns,
    min_creation_time=min_creation_time,
    max_creation_time=max_creation_time,
)

(p
 | 'Start' >> beam.Create([None])
 | 'Get BigQuery Job History' >> beam.ParDo(loader)
 | 'Write to Parquet' >> beam.io.WriteToParquet(
     file_path_prefix=output_file_prefix,
     schema=pa.schema(pa_schema),
     file_name_suffix='.parquet',
 ))

result = p.run()
result.wait_until_finish()

実行すると、Parquetファイルが指定したディレクトリに出力されました。

f:id:yomon8:20200131102243p:plain

BigQueryでデータセットを作成

$ bq mk --location asia-northeast1 bq_history
Dataset 'your-project:bq_history' successfully created.

Parquetファイルのロード

日付毎の分割テーブルとして取り込みます。

bq load --source_format=PARQUET \
  --time_partitioning_type=DAY \
  'bq_history.query_history$20200129' \
  "gs://otomo-bq-history/data/20200129/bq-history*.parquet"

BigQueryでクエリ

BigQueryでSELECTできるようになりました。 f:id:yomon8:20200130185025p:plain

Dataflowで動かす場合には権限に注意してください

yomon.hatenablog.com

参考URL

https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list

https://cloud.google.com/bigquery/docs/reference/rest/v2/Job