BigQueryのクエリ実行に関するメモ

BigQueryのクエリ実行に関する公式ドキュメントを読んでいて、気になった箇所を試してみたメモです。

バッチクエリの実行

SQLクエリをfire-and-forgetで実行できる機能。

BigQuery はユーザーに代わって各バッチクエリをキューに格納し、アイドル状態のリソースが BigQuery 共有リソースプールで使用可能になり次第、クエリを開始します。このことは、通常、数分以内に発生します。24 時間以内にクエリが開始されなかった場合、BigQuery はジョブの優先度をインタラクティブに変更します。

バッチクエリの実行

main.py

"""
Python 3.8.10
google-cloud-bigquery 2.30.1
"""
from google.cloud import bigquery

# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google-cloud-bigquery-client-client
client = bigquery.Client()

# バッチクエリ用の定義
# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig
job_config = bigquery.QueryJobConfig(
    priority=bigquery.job.QueryPriority.BATCH
)

# 実行クエリを定義
sql = """
    SELECT upper("big") as a;
"""

# バッチクエリの実行
# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client.query
query_job = client.query(query=sql, job_config=job_config)

# 実行完了するとTrueを返す
# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJob.html#google.cloud.bigquery.job.QueryJob.done
while not query_job.done():
    print("実行中")

# クエリ結果のイテレーターで返してくれる
# https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJob.html#google.cloud.bigquery.job.QueryJob.result
rows = query_job.result()
for row in rows:
    print(row.a)
$ python main.py 
BIG

一時テーブルの利用

BigQueryでも一時テーブルを利用でき、他のデータベース同様に、そのセッションの間だけ有効なテーブルを作成できます。

一時テーブル

実行例。一時テーブル作成時に、データセットの指定は不要です。一時テーブルであることを明示的に示す場合、_SESSION修飾子を利用します。なくてもアクセスできます。

CREATE TEMP TABLE temp(
    id int,
    name string
);

INSERT INTO _SESSION.temp (id, name) VALUES (1, "さくらみこ");

SELECT * FROM _SESSION.temp;

CREATE TABLE ステートメント

セッションが閉じられた後も、最長24時間、一時テーブルは別の名前を割り当てられて保存されているそうです。下記、WEBコンソール上の履歴画面より、作成した一時テーブルの情報を確認できます。

f:id:goodbyegangster:20211117015844p:plain

検索対象テーブルをワイルドカードで指定

FROM句で指定するテーブルに、ワイルドカードを利用することができます。

ワイルドカード テーブルを使用した複数テーブルに対するクエリ

テーブルの準備。

CREATE TABLE sample.a_100 AS SELECT "100" AS a;
CREATE TABLE sample.a_150 AS SELECT "150" AS a;
CREATE TABLE sample.a_200 AS SELECT "200" AS a;
CREATE TABLE sample.a_201 AS SELECT "201" AS a;
CREATE TABLE sample.a_202 AS SELECT "202" AS a;

ワイルドカードを利用したいテーブル名は、バッククォートで囲む必要があります。

ワイルドカードを含むテーブル名をバッククォートで囲む

SELECT a FROM `sample.a_1*` ORDER BY a;

結果。

[
  {
    "a": "100"
  },
  {
    "a": "150"
  }
]

_TABLE_SUFFIXを利用して、対象のテーブルをさらに絞り込めます。

選択したテーブルの _TABLE_SUFFIX によるフィルタリング

-- ワイルドカードで a_2?? まで限定したので、_TABLE_SUFFIXで 01 と指定してあげる
SELECT a FROM `sample.a_2*` WHERE _TABLE_SUFFIX = "01";

結果。

[
  {
    "a": "201"
  }
]

現時点での主な制限事項。

  • ビューや外部テーブルでは利用できない
  • リザルトキャッシュは利用できない
  • DML言語のオペレーションでは利用できない
  • customer-managed encryption keys(CMEK)が有効なテーブルでは利用できない

制限事項

Merge文を利用してUPSERTな処理

Merge文を利用することで、UPSERT的な処理を実行できるらしいです。

MERGE ステートメント

テーブルを2つ用意します。

$ bq query 'SELECT * FROM sample.source ORDER BY id;'
Waiting on bqjob_r78907ca1d56fd389_0000017d0a7abee8_1 ... (0s) Current status: DONE
+----+------------+
| id |    name    |
+----+------------+
|  1 | さくらみこ |
|  2 | 兎田ぺこら |
+----+------------+
$ bq query 'SELECT * FROM sample.target ORDER BY id;'
Waiting on bqjob_r9472a1a909162f7_0000017d0a7b5e32_1 ... (0s) Current status: DONE
+----+--------------+
| id |     name     |
+----+--------------+
|  2 | 白上フブキ   |
|  3 | 星街すいせい |
+----+--------------+

マージします。

MERGE sample.target t
USING sample.source s
ON t.id = s.id
WHEN MATCHED THEN
    UPDATE SET name = s.name
WHEN NOT MATCHED THEN
    INSERT (id, name) VALUES (id, name);

確認。

$ bq query 'SELECT * FROM sample.target ORDER BY id;'
Waiting on bqjob_r53e7395dcfacd50d_0000017d0a8040bc_1 ... (0s) Current status: DONE
+----+--------------+
| id |     name     |
+----+--------------+
|  1 | さくらみこ   |
|  2 | 兎田ぺこら   |
|  3 | 星街すいせい |
+----+--------------+

updateやinsertのほかdeleteも使えたり、when句ではbooleanを返す検索条件式も併用できたりと、応用範囲が大きいです。

クエリ結果のデバック

いわゆるASSERT文を実行できます。

デバッグ ステートメント

評価されるクエリは、Booleanを返す必要があります。False、またはNULLを返した時に、エラーが発生します。

ASSERT (
    (SELECT MAX(n) FROM UNNEST([1, 2, 3, 4, 5, 6]) AS n ) > 10
) AS "Maximum value must bigger than 10.";

Maximum value must bigger than 10.

ASSERT
    EXISTS(
        SELECT n FROM UNNEST(GENERATE_ARRAY(1, 10, 2)) AS n
         WHERE n = 4
    )
AS "Column n must contain the value 4";

Column n must contain the value 4

ASSERT (
    SELECT NULLIF(0, 10) IS NULL
);

Assertion failed

クエリ結果を Cloud Storage へエクスポート

クエリ結果を Cloud Storage へエクスポートできます。

EXPORT DATA ステートメント

EXPORT DATA OPTIONS(
    compression="GZIP",
    field_delimiter=",",
    format="CSV",
    header=true,
    overwrite=false,
    uri="gs://pekomikodaisensou/test/pakomiko_*.gz") AS
SELECT GENERATE_UUID() as id, num FROM UNNEST(GENERATE_ARRAY(1, 10, 2)) AS num;

uriのオプションで指定するCloud Storageのパスは、単一のワイルドカード URIで指定する必要があります。

エクスポートされるデータが最大値の 1 GB を超えそうな場合は、単一のワイルドカード URI を使用します。データは、指定したパターンに基づいて複数のファイルに分割されます。エクスポートされたファイルのサイズは一定ではありません。

テーブルデータのエクスポート

クエリ結果のエクスポート処理は、bqコマンドや各種言語のクライアント・ライブラリでも実施できます。それらを使う場合、1GB以下のデータエクスポートであれば、Cloud Storageのパスをワイルドカードを用いず明示的に指定できるようです。SQL単体で実行の場合、1GB以下のデータエクスポートであっても、必ず複数ファイルが作成されています。