BigQueryのストリーミング挿入は注意点が多い!
千里の道(データ利活用)も一歩(データのロード)から
皆様、あけましておめでとうございます!
新年早々、タイトルを去年の流行語にしているのこそなぁぜなぁぜという感じですが今年もよろしくお願いします。
今年もやはり目が離せないのは生成AI関連かと思いますが、もはやトレンドはグラウンディング、自社データとの融合、ということを考えるとそのために使えるデータ資産の管理が重要だと思います。
頑強なデータ基盤があってこそのデータ利活用です。
ということで今年もやはりお世話になるのがBigQueryです。
昨年は ML.GENERATE_TEXT
のような推論関数で生成AIも活用できるようにもなりました。今年もどんなアップデートがあるのか目が離せません。
そんなBigQueryによるデータ基盤の構築ですが、どんな発展的なことをするにもやはり最初の一歩は「データのロード」です。
今回はその中でもPython等のBigQueryクライアントでLLMとかに頼りながらコードを書いていると簡単に提案されてしまう insert_rows_json
、ひいてはBigQueryのストリーミング挿入の罠について解説します。
insert_rows_jsonとは?
insert_rows_json
は、DictのListとしてインサート対象のデータが変数に格納されているとき、それをそのまま放り込んでくれるとても便利なメソッドとしてPythonの google-cloud-bigquery
のライブラリに入っています。
サンプルは BigQueryのドキュメントにあるサンプル「ストリーミング挿入」のページで確認出来ます。
テーブルのIDと、インサートしたいデータのリストを用意した上で、
errors = client.insert_rows_json(table_id, rows_to_insert)
のようにして挿入できるということです。これは単純明快ですね。
(返り値がエラー内容のリストになっているのが少しトリッキーな気もしますが)
insert_rows_jsonの罠
前置きが長くなりましたが、ここからが問題です。
よし、じゃあ使ってみよう…ということで、こんなコードを書いたとします。
from google.cloud import bigquery # 挿入したいデータ rows_to_insert = [{"foo": 1, "bar": 2}] # 挿入先のテーブルのスキーマ schema = [ bigquery.SchemaField("foo", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("bar", "INTEGER", mode="REQUIRED") ] # 挿入先のテーブルのID table_id = "(テーブルID)" bigquery_client = bigquery.Client() table = bigquery.Table(table_id, schema=schema) # 一度消してからテーブルを作る bigquery_client.delete_table(table, not_found_ok=True) bigquery_client.create_table(table, exists_ok=True) # レコードを挿入する errors = bigquery_client.insert_rows_json(table_id, rows_to_insert)
よし、これで1レコードのテーブルが出来る!と思いきや…何度か実行していると…
404 POST https://bigquery.googleapis.com/bigquery/v2/projects/[Project ID]/datasets/abcd_crm/tables/VisitHistory__c/insertAll?prettyPrint=false: Table [Project Number]:[Dataset ID].[Table Name] not found.
あれ?上記のエラーが…
テーブルを作ったあとのはずなのに「404」が返ってくるのなぁぜなぁぜ?
となるわけです。BigQueryのプロの皆さんなら即答の問題かと思いますがどうでしょうか。
なぜ404が返ってくるのか
この答えはBigQueryのドキュメントの「ストリーミング挿入に関するトラブルシューティング」のページに載っています。
その中のストリーミング挿入のメタデータエラーの項目の中の「テーブルの作成/削除」のシナリオに
存在しないテーブルへのストリーミングによって、
notFound
レスポンスのバリエーションが返されます。レスポンスで作成されたテーブルは、後続のストリーミング挿入ですぐには認識されない可能性があります。同様に、テーブルを削除または、再作成すると、古いテーブルにストリーミング挿入が実質的に配信される期間が生じることがあります。このストリーミング挿入は新しいテーブルには存在しない可能性があります。
と書いてあります。
はい、これが丸々答えです。
テーブル削除・作成からの即時でのストリーミング挿入の相性が悪いことを覚えておくと良いと思います。
具体的な解決策
作成処理とデータ挿入を即時に行いたい場合の解決策としては、シンプルに load_table_from_file
を使うなどしましょう。
要するに、バッチ読み込みをしましょう。
その際は、データのバッチ読み込みのページが参考になります。
下記のように、 job.result()
を入れることでjobの完了まで待機できることもポイントです。
with open(file_path, "rb") as source_file: job = client.load_table_from_file(source_file, table_id, job_config=job_config) job.result()
まとめ
新年一発目はBigQueryでのデータのロードに関するハマりポイントの記事でした。
実は年末に一回コレにやられたのですが、意外とドンピシャな記事が見つからず…ドキュメントを探ったら「まさに!」な箇所があったのでまとめなおした次第です。
LLM、生成AIといった領域のほうが派手さはありますが、こういった地道な作業が良質なアウトプットを支えるものだと思います。
2024年もガバナンスの効いた頑強なデータ基盤を構築することは忘れずに、その上での活用を目指していきたいですね。