BigQuery で使える dbt incremental strategy 完全ガイド
Data Platform チームの @kobori です。 先日 Data Contract の記事を出しましたが、現在も社内のプロダクトのデータと DWH の連携を進めております。 プロダクトの DB 上のデータに対して定めたインターフェースを介して DWH へデータを蓄積し DWH 上でデータの加工をした上で mart を構築していきます。
弊社ではデータ基盤の構築に dbt-core を採用していますが、dbt でプロダクトデータを DWH に連携する際は incremental model や snapshot が用いられるかと思います。 fact (transaction データのようなもの) の蓄積には incremental model を採用し、dimension (master データのようなもの) の蓄積に snapshot を採用する方がよいというのが現時点の私の考えです。
incremental model と snapshot の使い分けも議論の余地を残すところだとは思いつつ、本記事では incremental model の strategy に焦点を当て、そのユースケースについて考察します。 incremental model については @shuhei の記事 でも書かれていますのでご参照ください。
incremental strategy
dbt では、データの蓄積の方法として、複数の方法が用意されています。 これを incremental strategy と呼んでいるのですが、下記の 5 種類が用意されています 。
append
merge
delete+insert
insert_overwrite
microbatch
5 つの strategy が用意されているものの、すべての strategy が利用できるわけではありません。利用している DWH 製品によって使用できる strategy は異なっています。
例えば、弊社では BigQuery を用いているのですが、dbt-bigquery では merge
insert_overwrite
microbatch
の 3 種類しか用意されておりません。
また、delete+insert
と insert_overwrite
は相互排他的に定義されており、各 DWH 製品において、いずれか一方しか選択できません。
これら 5 つの strategy について、その実装を眺めつつ、ユースケースを考えていきたいと思います。
🔸 append
最もわかりやすく単純な strategy かと思います。
ターゲットのテーブルに対して追記をしていく strategy です。
get_incremental_append_sql
という macro で定義されています。
(macro については dbt の document を参照してください。)
dbt では {adaptor}_{macro_name}
という形式で、各 adaptor の macro 名が定義されているのですが、今回は default の default__get_incremental_append_sql
を追っていきます。
すると、get_insert_into_sql
という macro に行き着くのですが、実装を読むとおおよそ下記のような SQL が実行されそうであることがわかります。
insert into TARGET_TABLE (col1, col2, ...)
(
select col1, col2, ...
from SOURCE_TABLE
)
名前の通り単純に追記をしていくようです。 単純ではありますが、残念ながら dbt-bigquery では用いることはできません。 また、ターゲットに unique 制約が定義されている場合は注意が必要そうです。
🔸 merge
ターゲットに対して、更新または追記をしていく strategy になります。
get_incremental_merge_sql
に定義されています。
実装を覗くと、 default__get_merge_sql
にたどりつきます。
おおよそ下記のようなクエリが実行されそうです。
unique_key
の指定がある場合
merge into TARGET_TABLE as DBT_INTERNAL_DEST
using SOURCE_TABLE as DBT_INTERNAL_SOURCE
on
DBT_INTERNAL_DEST.unique_key1 = DBT_INTERNAL_SOURCE.unique_key1
and DBT_INTERNAL_DEST.unique_key2 = DBT_INTERNAL_SOURCE.unique_key2
...
{ and some conditions if exists }
when matched then update set
col1 = DBT_INTERNAL_SOURCE.col1,
col2 = DBT_INTERNAL_SOURCE.col2,
...
when not matched then insert
(col1, col2, ...)
values
(col1, col2, ...)
unique_key
の指定がない場合
merge into TARGET_TABLE as DBT_INTERNAL_DEST
using SOURCE_TABLE as DBT_INTERNAL_SOURCE
on FALSE { and some conditions if exists }
when not matched then insert
(col1, col2, ...)
values
(col1, col2, ...)
merge
文を使ったデータの追加・更新が行われます。
unique_key
で指定されているレコードがターゲットに既に存在する場合はデータが更新され、存在しない場合はレコードの追加が行われます。
unique_key
は複数指定可能のようです。変数名が単数形なので少し紛らわしいですね。
また、ドキュメントに記載のある通り、update 対象のカラムも merge_update_columns
や merge_exclude_columns
を使うことで update 対象の column の選択ができます。
BigQuery や PostgreSQL の merge
文ではレコードの削除も行うことができますが、 dbt の merge
strategy ではレコードの削除は行いません。
unique_key
の指定がない場合は on
句に FALSE
が指定されており、単純な insert が行われます。
dbt-bigquery では append
の定義がありませんが、merge
を使うことで append
を実現できそうです。
🔸 delete+insert
unique_key
を使用し、レコードの delete and insert を行うことでデータの更新・追加を行う方法です。
get_incremental_delete_insert_sql
に定義されており、default__get_delete_insert_merge_sql
に default の実装を見ることができます。
unique_key
の指定がある場合
delete from TARGET_TABLE
where (unique_key1, unique_key2, ...) in (
select distinct unique_key1, unique_key2, ...
from SOURCE_TABLE
) { and some conditions if exists }
;
insert into TARGET_TABLE (col1, col2, ...)
(
select col1, col2, ...
from SOURCE_TABLE
)
unique_key
の指定がない場合
insert into TARGET_TABLE (col1, col2, ...)
(
select col1, col2, ...
from SOURCE_TABLE
)
delete
文と insert
文により更新が定義されています。
レコードの delete
が行われるものの、削除の対象はあくまで SOURCE_TABLE
に unique key が存在しているものに限られるためレコードが減少することはなさそうです。
こちらも dbt-bigquery では実装されていませんが、 merge
により同等の処理が可能そうです。
🔸 insert_overwrite
partition ごとの入れ替えや table の replace が想定された strategy です。
get_incremental_insert_overwrite_sql
に定義されています。
default__get_insert_overwrite_merge_sql
から下記のような SQL が作成されそうです。
merge into TARGET_TABLE as DBT_INTERNAL_DEST
using SOURCE_TABLE as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source [ and partition_column in ( {target partitions} ) ]
then delete
when not matched then insert
(col1, col2, ...)
values
(col1, col2, ...)
condition
の部分は
partition_column in (select DISTINCT partition_column from SOURCE_TABLE)
のような形で対象の partition が指定されます。
on
句に FALSE
が指定されており、 when
句にも not matched
が指定されているため、条件の指定がない場合は TARGET_TABLE
自体の replace が行われるように見えます。
Spark adaptor の document を見ると、partition_by
の指定がない場合はテーブル全体を置き換える旨の記載があります。
BigQuery の場合、 partition_by
の指定がない場合は insert_overwrite
strategy を使用することはできないようです。つまり insert_overwrite
strategy の使用は partitioned table に限られることになりそうです。
また、delete
の際に条件を指定できそうに見えますが、BigQuery に関しては SOURCE_TABLE
が insert される partition を指定する条件が内部で生成されています。
BigQuery では partition ごとの洗い替えを行うような実装となっており、incremental_predicates
を用いて指定のレコードを削除することはできないようです。
BigQuery における insert_overwrite
strategy では、更に下記の 3 種類の方法から partition の指定方法を選択することができます。
- dynamic partition
- static partition
- copy partition
dynamic partition は default の方法です。
クエリを実行して、partition_by
に指定されている column から動的に更新対象の partition を取得します。
static partition では partitions
で partition のリストを直接指定します。
partition を静的に指定することで、実行するクエリ数を減らし、実行コストを抑えることができるようです。
copy partition は、BigQuery の copy table API を使い partition ごと洗い替えを行う方法です。 debug の可視性が低くなってしまうようですが、より高速に処理を行うことができるようで、データが大規模な場合は有効な方法となりそうです。
🔸 microbatch
dbt 1.9 から新たに導入された方法です。
get_incremental_microbatch_sql
に定義されています。
default__get_incremental_microbatch_sql
は実装がなく、adaptor ごとに実装方法が異なります。
大規模な時系列データを取扱やすいよう設計された strategy のようで、大規模なデータを一度に処理せず、より小さいバッチに区切って処理をすることで、パフォーマンスに関する課題を解決する手法のようです。
BigQuery でも実装はされているのですが、自分が見る限りでは単に insert_overwrite
が呼び出されているだけのように見えます。
insert_overwrite
は、前述の通り partition ごとに replace する手法です。
確かに BigQuery の場合、partition ごとの replace が最も高効率であり、また並列処理も行われているため insert_overwrite
が最も高速な方法のように思えます。
逆に partition をより小さい単位で分割してしまうと、パフォーマンスが低下してしまいそうです。
もし認識が間違っておりましたらご指摘いただけますと幸いです。
各 strategy の使いどころ
ここからは dbt-bigquery に焦点を絞り、各 strategy の使いどころを考えたいと思います。
先に述べた通り、 microbatch
は insert_overwrite
と同一の処理を行うと考えると、merge
と insert_overwrite
の使い分け、というところに話は収束しそうです。
それぞれの特徴は下記の通りです。
merge
unique_key
の一致を条件に insert or update を行うunique_key
の指定がない場合は insert のみを行う
insert_overwrite
- partition ごとに delete and insert で洗い替えを行う
- partition table である必要がある
最も着目すべきは、 insert_overwrite
では partition ごとに洗い替えを行う点かと思います。
この挙動により insert_overwrite
で merge
のように unique 性を保つためには、 partition カラムの値が変わらない、という条件が必要になります。
ソースデータが immutable に設計されている場合でしか有効に使えないように思います。
むしろソースデータが immutable であれば、backfill や過去データの修正など、merge
に比べて高効率に行えるでしょう。
一方で unique 性を保ったままレコードの更新を追跡する場合は merge
を使用する必要があります。
insert_overwrite
では、 partition カラムの値が変化した場合にデータが重複する可能性があるためです。
partition table に対して merge
を用いる場合は incremental_predicates
で partition の範囲を制限しておけば pruning を効かせ、コストを抑えることができます。
mutable なソーステーブルを snapshot 的に増分更新していく場合はどうでしょうか?
unique_key
を指定せずに merge
を用いる方が効率はよさそうに思えます。
insert_overwrite
では partition カラムの比較が走るのに対して、merge
では単純な insert が行われるだけだからです。
ただし実際に検証をしたわけではないため、間違っていたら申し訳ありません。
使いどころのまとめ
ユースケース | 推奨 strategy | 特徴 |
---|---|---|
immutable なテーブルの蓄積 | insert_overwrite | - パーティション単位での効率的な処理 - backfill が容易 - パーティションテーブルのみ対応 |
mutable なテーブルの変更の追跡 | merge | - unique key による更新管理 - 履歴の追跡が可能 - incremental_predicates でコスト最適化可能 |
mutable なテーブルの増分蓄積 | merge | - 単純な追記処理 - unique key なしで高効率 - BigQuery では append の代替として利用 |
BigQuery だけ strategy の実装が少ないような気がしますが、既に十分な機能を備えているということなのかもしれません。 シャーディングテーブルのとしての蓄積も実装されるとより便利になりそうですね。 改めて BigQuery の機能性に感動し、来月の Google Cloud Next 2025 がより楽しみになりました。
さいごに
dbt の incremental strategy についてまとめてみました。 少しでもご参考になれば幸いです。
弊社ではエンジニアの採用を行なっております。 データプラットフォームチームに興味を持っていただけた方は是非 Entrancebook をご覧ください。