BigQuery で使える dbt incremental strategy 完全ガイド
Data Platform チームの @kobori です。 先日 Data Contract の記事を出しましたが、現在も社内のプロダクトのデータと DWH の連携を進めております。 プロダクトの DB 上のデータに対して定めたインターフェースを介して DWH へデータを蓄積し DWH 上でデータの加工をした上で mart を構築していきます。
弊社ではデータ基盤の構築に dbt-core を採用していますが、dbt でプロダクトデータを DWH に連携する際は incremental model や snapshot が用いられるかと思います。 fact (transaction データのようなもの) の蓄積には incremental model を採用し、dimension (master データのようなもの) の蓄積に snapshot を採用する方がよいというのが現時点の私の考えです。
incremental model と snapshot の使い分けも議論の余地を残すところだとは思いつつ、本記事では incremental model の strategy に焦点を当て、そのユースケースについて考察します。 incremental model については @shuhei の記事 でも書かれていますのでご参照ください。
incremental strategy
dbt では、データの蓄積の方法として、複数の方法が用意されています。 これを incremental strategy と呼んでいるのですが、下記の 5 種類が用意されています 。
appendmergedelete+insertinsert_overwritemicrobatch
5 つの strategy が用意されているものの、すべての strategy が利用できるわけではありません。利用している DWH 製品によって使用できる strategy は異なっています。
例えば、弊社では BigQuery を用いているのですが、dbt-bigquery では merge insert_overwrite microbatch の 3 種類しか用意されておりません。
また、delete+insert と insert_overwrite は相互排他的に定義されており、各 DWH 製品において、いずれか一方しか選択できません。
これら 5 つの strategy について、その実装を眺めつつ、ユースケースを考えていきたいと思います。
🔸 append
最もわかりやすく単純な strategy かと思います。
ターゲットのテーブルに対して追記をしていく strategy です。
get_incremental_append_sql という macro で定義されています。
(macro については dbt の document を参照してください。)
dbt では {adaptor}_{macro_name} という形式で、各 adaptor の macro 名が定義されているのですが、今回は default の default__get_incremental_append_sql を追っていきます。
すると、get_insert_into_sql という macro に行き着くのですが、実装を読むとおおよそ下記のような SQL が実行されそうであることがわかります。
insert into TARGET_TABLE (col1, col2, ...)
(
select col1, col2, ...
from SOURCE_TABLE
)
名前の通り単純に追記をしていくようです。 単純ではありますが、残念ながら dbt-bigquery では用いることはできません。 また、ターゲットに unique 制約が定義されている場合は注意が必要そうです。
🔸 merge
ターゲットに対して、更新または追記をしていく strategy になります。
get_incremental_merge_sql に定義されています。
実装を覗くと、 default__get_merge_sql にたどりつきます。
おおよそ下記のようなクエリが実行されそうです。
unique_key の指定がある場合
merge into TARGET_TABLE as DBT_INTERNAL_DEST
using SOURCE_TABLE as DBT_INTERNAL_SOURCE
on
DBT_INTERNAL_DEST.unique_key1 = DBT_INTERNAL_SOURCE.unique_key1
and DBT_INTERNAL_DEST.unique_key2 = DBT_INTERNAL_SOURCE.unique_key2
...
{ and some conditions if exists }
when matched then update set
col1 = DBT_INTERNAL_SOURCE.col1,
col2 = DBT_INTERNAL_SOURCE.col2,
...
when not matched then insert
(col1, col2, ...)
values
(col1, col2, ...)
unique_key の指定がない場合
merge into TARGET_TABLE as DBT_INTERNAL_DEST
using SOURCE_TABLE as DBT_INTERNAL_SOURCE
on FALSE { and some conditions if exists }
when not matched then insert
(col1, col2, ...)
values
(col1, col2, ...)
merge 文を使ったデータの追加・更新が行われます。
unique_key で指定されているレコードがターゲットに既に存在する場合はデータが更新され、存在しない場合はレコードの追加が行われます。
unique_key は複数指定可能のようです。変数名が単数形なので少し紛らわしいですね。
また、ドキュメントに記載のある通り、update 対象のカラムも merge_update_columns や merge_exclude_columns を使うことで update 対象の column の選択ができます。
BigQuery や PostgreSQL の merge 文ではレコードの削除も行うことができますが、 dbt の merge strategy ではレコードの削除は行いません。
unique_key の指定がない場合は on 句に FALSE が指定されており、単純な insert が行われます。
dbt-bigquery では append の定義がありませんが、merge を使うことで append を実現できそうです。
🔸 delete+insert
unique_key を使用し、レコードの delete and insert を行うことでデータの更新・追加を行う方法です。
get_incremental_delete_insert_sql に定義されており、default__get_delete_insert_merge_sql に default の実装を見ることができます。
unique_key の指定がある場合
delete from TARGET_TABLE
where (unique_key1, unique_key2, ...) in (
select distinct unique_key1, unique_key2, ...
from SOURCE_TABLE
) { and some conditions if exists }
;
insert into TARGET_TABLE (col1, col2, ...)
(
select col1, col2, ...
from SOURCE_TABLE
)
unique_key の指定がない場合
insert into TARGET_TABLE (col1, col2, ...)
(
select col1, col2, ...
from SOURCE_TABLE
)
delete 文と insert 文により更新が定義されています。
レコードの delete が行われるものの、削除の対象はあくまで SOURCE_TABLE に unique key が存在しているものに限られるためレコードが減少することはなさそうです。
こちらも dbt-bigquery では実装されていませんが、 merge により同等の処理が可能そうです。
🔸 insert_overwrite
partition ごとの入れ替えや table の replace が想定された strategy です。
get_incremental_insert_overwrite_sql に定義されています。
default__get_insert_overwrite_merge_sql から下記のような SQL が作成されそうです。
merge into TARGET_TABLE as DBT_INTERNAL_DEST
using SOURCE_TABLE as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source [ and partition_column in ( {target partitions} ) ]
then delete
when not matched then insert
(col1, col2, ...)
values
(col1, col2, ...)
condition の部分は
partition_column in (select DISTINCT partition_column from SOURCE_TABLE)
のような形で対象の partition が指定されます。
on 句に FALSE が指定されており、 when 句にも not matched が指定されているため、条件の指定がない場合は TARGET_TABLE 自体の replace が行われるように見えます。
Spark adaptor の document を見ると、partition_by の指定がない場合はテーブル全体を置き換える旨の記載があります。
BigQuery の場合、 partition_by の指定がない場合は insert_overwrite strategy を使用することはできないようです。つまり insert_overwrite strategy の使用は partitioned table に限られることになりそうです。
また、delete の際に条件を指定できそうに見えますが、BigQuery に関しては SOURCE_TABLE が insert される partition を指定する条件が内部で生成されています。
BigQuery では partition ごとの洗い替えを行うような実装となっており、incremental_predicates を用いて指定のレコードを削除することはできないようです。
BigQuery における insert_overwrite strategy では、更に下記の 3 種類の方法から partition の指定方法を選択することができます。
- dynamic partition
- static partition
- copy partition
dynamic partition は default の方法です。
クエリを実行して、partition_by に指定されている column から動的に更新対象の partition を取得します。
static partition では partitions で partition のリストを直接指定します。
partition を静的に指定することで、実行するクエリ数を減らし、実行コストを抑えることができるようです。
copy partition は、BigQuery の copy table API を使い partition ごと洗い替えを行う方法です。 debug の可視性が低くなってしまうようですが、より高速に処理を行うことができるようで、データが大規模な場合は有効な方法となりそうです。
🔸 microbatch
dbt 1.9 から新たに導入された方法です。
get_incremental_microbatch_sql に定義されています。
default__get_incremental_microbatch_sql は実装がなく、adaptor ごとに実装方法が異なります。
大規模な時系列データを取扱やすいよう設計された strategy のようで、大規模なデータを一度に処理せず、より小さいバッチに区切って処理をすることで、パフォーマンスに関する課題を解決する手法のようです。
BigQuery でも実装はされているのですが、自分が見る限りでは単に insert_overwrite が呼び出されているだけのように見えます。
insert_overwrite は、前述の通り partition ごとに replace する手法です。
確かに BigQuery の場合、partition ごとの replace が最も高効率であり、また並列処理も行われているため insert_overwrite が最も高速な方法のように思えます。
逆に partition をより小さい単位で分割してしまうと、パフォーマンスが低下してしまいそうです。
もし認識が間違っておりましたらご指摘いただけますと幸いです。
各 strategy の使いどころ
ここからは dbt-bigquery に焦点を絞り、各 strategy の使いどころを考えたいと思います。
先に述べた通り、 microbatch は insert_overwrite と同一の処理を行うと考えると、merge と insert_overwrite の使い分け、というところに話は収束しそうです。
それぞれの特徴は下記の通りです。
mergeunique_keyの一致を条件に insert or update を行うunique_keyの指定がない場合は insert のみを行う
insert_overwrite- partition ごとに delete and insert で洗い替えを行う
- partition table である必要がある
最も着目すべきは、 insert_overwrite では partition ごとに洗い替えを行う点かと思います。
この挙動により insert_overwrite で merge のように unique 性を保つためには、 partition カラムの値が変わらない、という条件が必要になります。
ソースデータが immutable に設計されている場合でしか有効に使えないように思います。
むしろソースデータが immutable であれば、backfill や過去データの修正など、merge に比べて高効率に行えるでしょう。
一方で unique 性を保ったままレコードの更新を追跡する場合は merge を使用する必要があります。
insert_overwrite では、 partition カラムの値が変化した場合にデータが重複する可能性があるためです。
partition table に対して merge を用いる場合は incremental_predicates で partition の範囲を制限しておけば pruning を効かせ、コストを抑えることができます。
mutable なソーステーブルを snapshot 的に増分更新していく場合はどうでしょうか?
unique_key を指定せずに merge を用いる方が効率はよさそうに思えます。
insert_overwrite では partition カラムの比較が走るのに対して、merge では単純な insert が行われるだけだからです。
ただし実際に検証をしたわけではないため、間違っていたら申し訳ありません。
使いどころのまとめ
| ユースケース | 推奨 strategy | 特徴 |
|---|---|---|
| immutable なテーブルの蓄積 | insert_overwrite | - パーティション単位での効率的な処理 - backfill が容易 - パーティションテーブルのみ対応 |
| mutable なテーブルの変更の追跡 | merge | - unique key による更新管理 - 履歴の追跡が可能 - incremental_predicatesでコスト最適化可能 |
| mutable なテーブルの増分蓄積 | merge | - 単純な追記処理 - unique key なしで高効率 - BigQuery では appendの代替として利用 |
BigQuery だけ strategy の実装が少ないような気がしますが、既に十分な機能を備えているということなのかもしれません。 シャーディングテーブルのとしての蓄積も実装されるとより便利になりそうですね。 改めて BigQuery の機能性に感動し、来月の Google Cloud Next 2025 がより楽しみになりました。
さいごに
dbt の incremental strategy についてまとめてみました。 少しでもご参考になれば幸いです。
弊社ではエンジニアの採用を行なっております。 データプラットフォームチームに興味を持っていただけた方は是非 Entrancebook をご覧ください。