BigQuery で使える dbt incremental strategy 完全ガイド

2025-03-13

Data Platform チームの @kobori です。 先日 Data Contract の記事を出しましたが、現在も社内のプロダクトのデータと DWH の連携を進めております。 プロダクトの DB 上のデータに対して定めたインターフェースを介して DWH へデータを蓄積し DWH 上でデータの加工をした上で mart を構築していきます。

弊社ではデータ基盤の構築に dbt-core を採用していますが、dbt でプロダクトデータを DWH に連携する際は incremental model や snapshot が用いられるかと思います。 fact (transaction データのようなもの) の蓄積には incremental model を採用し、dimension (master データのようなもの) の蓄積に snapshot を採用する方がよいというのが現時点の私の考えです。

incremental model と snapshot の使い分けも議論の余地を残すところだとは思いつつ、本記事では incremental model の strategy に焦点を当て、そのユースケースについて考察します。 incremental model については @shuhei の記事 でも書かれていますのでご参照ください。

incremental strategy

dbt では、データの蓄積の方法として、複数の方法が用意されています。 これを incremental strategy と呼んでいるのですが、下記の 5 種類が用意されています 。

  1. append
  2. merge
  3. delete+insert
  4. insert_overwrite
  5. microbatch

5 つの strategy が用意されているものの、すべての strategy が利用できるわけではありません。利用している DWH 製品によって使用できる strategy は異なっています。 例えば、弊社では BigQuery を用いているのですが、dbt-bigquery では merge insert_overwrite microbatch の 3 種類しか用意されておりません。 また、delete+insertinsert_overwrite は相互排他的に定義されており、各 DWH 製品において、いずれか一方しか選択できません。

これら 5 つの strategy について、その実装を眺めつつ、ユースケースを考えていきたいと思います。

🔸 append

最もわかりやすく単純な strategy かと思います。 ターゲットのテーブルに対して追記をしていく strategy です。 get_incremental_append_sql という macro で定義されています。 (macro については dbt の document を参照してください。)

dbt では {adaptor}_{macro_name} という形式で、各 adaptor の macro 名が定義されているのですが、今回は default の default__get_incremental_append_sql を追っていきます。 すると、get_insert_into_sql という macro に行き着くのですが、実装を読むとおおよそ下記のような SQL が実行されそうであることがわかります。

insert into TARGET_TABLE (col1, col2, ...)
(
    select col1, col2, ...
    from SOURCE_TABLE
)

名前の通り単純に追記をしていくようです。 単純ではありますが、残念ながら dbt-bigquery では用いることはできません。 また、ターゲットに unique 制約が定義されている場合は注意が必要そうです。

🔸 merge

ターゲットに対して、更新または追記をしていく strategy になります。

get_incremental_merge_sql に定義されています。

実装を覗くと、 default__get_merge_sql にたどりつきます。 おおよそ下記のようなクエリが実行されそうです。

unique_key の指定がある場合

merge into TARGET_TABLE as DBT_INTERNAL_DEST
    using SOURCE_TABLE as DBT_INTERNAL_SOURCE
    on
      DBT_INTERNAL_DEST.unique_key1 = DBT_INTERNAL_SOURCE.unique_key1
      and DBT_INTERNAL_DEST.unique_key2 = DBT_INTERNAL_SOURCE.unique_key2
      ...
      { and some conditions if exists }
when matched then update set
    col1 = DBT_INTERNAL_SOURCE.col1,
    col2 = DBT_INTERNAL_SOURCE.col2,
    ...
when not matched then insert
    (col1, col2, ...)
values
    (col1, col2, ...)

unique_key の指定がない場合

merge into TARGET_TABLE as DBT_INTERNAL_DEST
    using SOURCE_TABLE as DBT_INTERNAL_SOURCE
    on FALSE { and some conditions if exists }
when not matched then insert
    (col1, col2, ...)
values
    (col1, col2, ...)

merge 文を使ったデータの追加・更新が行われます。 unique_key で指定されているレコードがターゲットに既に存在する場合はデータが更新され、存在しない場合はレコードの追加が行われます。 unique_key は複数指定可能のようです。変数名が単数形なので少し紛らわしいですね。 また、ドキュメントに記載のある通り、update 対象のカラムも merge_update_columnsmerge_exclude_columns を使うことで update 対象の column の選択ができます。

BigQuery や PostgreSQL の merge 文ではレコードの削除も行うことができますが、 dbt の merge strategy ではレコードの削除は行いません。

unique_key の指定がない場合は on 句に FALSE が指定されており、単純な insert が行われます。 dbt-bigquery では append の定義がありませんが、merge を使うことで append を実現できそうです。

🔸 delete+insert

unique_key を使用し、レコードの delete and insert を行うことでデータの更新・追加を行う方法です。 get_incremental_delete_insert_sql に定義されており、default__get_delete_insert_merge_sql に default の実装を見ることができます。

unique_key の指定がある場合

delete from TARGET_TABLE
where (unique_key1, unique_key2, ...) in (
    select distinct unique_key1, unique_key2, ...
    from SOURCE_TABLE
) { and some conditions if exists }
;

insert into TARGET_TABLE (col1, col2, ...)
(
    select col1, col2, ...
    from SOURCE_TABLE
)

unique_key の指定がない場合

insert into TARGET_TABLE (col1, col2, ...)
(
    select col1, col2, ...
    from SOURCE_TABLE
)

delete 文と insert 文により更新が定義されています。 レコードの delete が行われるものの、削除の対象はあくまで SOURCE_TABLE に unique key が存在しているものに限られるためレコードが減少することはなさそうです。

こちらも dbt-bigquery では実装されていませんが、 merge により同等の処理が可能そうです。

🔸 insert_overwrite

partition ごとの入れ替えや table の replace が想定された strategy です。 get_incremental_insert_overwrite_sql に定義されています。

default__get_insert_overwrite_merge_sql から下記のような SQL が作成されそうです。

merge into TARGET_TABLE as DBT_INTERNAL_DEST
    using SOURCE_TABLE as DBT_INTERNAL_SOURCE
    on FALSE

when not matched by source [ and partition_column in ( {target partitions} ) ]
    then delete

when not matched then insert
    (col1, col2, ...)
values
    (col1, col2, ...)

condition の部分は

partition_column in (select DISTINCT partition_column from SOURCE_TABLE)

のような形で対象の partition が指定されます。

on 句に FALSE が指定されており、 when 句にも not matched が指定されているため、条件の指定がない場合は TARGET_TABLE 自体の replace が行われるように見えます。 Spark adaptor の document を見ると、partition_by の指定がない場合はテーブル全体を置き換える旨の記載があります。

BigQuery の場合、 partition_by の指定がない場合は insert_overwrite strategy を使用することはできないようです。つまり insert_overwrite strategy の使用は partitioned table に限られることになりそうです。 また、delete の際に条件を指定できそうに見えますが、BigQuery に関しては SOURCE_TABLE が insert される partition を指定する条件が内部で生成されています。 BigQuery では partition ごとの洗い替えを行うような実装となっており、incremental_predicates を用いて指定のレコードを削除することはできないようです。

BigQuery における insert_overwrite strategy では、更に下記の 3 種類の方法から partition の指定方法を選択することができます。

  1. dynamic partition
  2. static partition
  3. copy partition

dynamic partition は default の方法です。 クエリを実行して、partition_by に指定されている column から動的に更新対象の partition を取得します。

static partition では partitions で partition のリストを直接指定します。 partition を静的に指定することで、実行するクエリ数を減らし、実行コストを抑えることができるようです。

copy partition は、BigQuery の copy table API を使い partition ごと洗い替えを行う方法です。 debug の可視性が低くなってしまうようですが、より高速に処理を行うことができるようで、データが大規模な場合は有効な方法となりそうです。

🔸 microbatch

dbt 1.9 から新たに導入された方法です。 get_incremental_microbatch_sql に定義されています。

default__get_incremental_microbatch_sql は実装がなく、adaptor ごとに実装方法が異なります。 大規模な時系列データを取扱やすいよう設計された strategy のようで、大規模なデータを一度に処理せず、より小さいバッチに区切って処理をすることで、パフォーマンスに関する課題を解決する手法のようです。 BigQuery でも実装はされているのですが、自分が見る限りでは単に insert_overwrite が呼び出されているだけのように見えます。

insert_overwrite は、前述の通り partition ごとに replace する手法です。 確かに BigQuery の場合、partition ごとの replace が最も高効率であり、また並列処理も行われているため insert_overwrite が最も高速な方法のように思えます。 逆に partition をより小さい単位で分割してしまうと、パフォーマンスが低下してしまいそうです。

もし認識が間違っておりましたらご指摘いただけますと幸いです。

各 strategy の使いどころ

ここからは dbt-bigquery に焦点を絞り、各 strategy の使いどころを考えたいと思います。 先に述べた通り、 microbatchinsert_overwrite と同一の処理を行うと考えると、mergeinsert_overwrite の使い分け、というところに話は収束しそうです。

それぞれの特徴は下記の通りです。

  • merge
    • unique_key の一致を条件に insert or update を行う
    • unique_key の指定がない場合は insert のみを行う
  • insert_overwrite
    • partition ごとに delete and insert で洗い替えを行う
    • partition table である必要がある

最も着目すべきは、 insert_overwrite では partition ごとに洗い替えを行う点かと思います。 この挙動により insert_overwritemerge のように unique 性を保つためには、 partition カラムの値が変わらない、という条件が必要になります。 ソースデータが immutable に設計されている場合でしか有効に使えないように思います。 むしろソースデータが immutable であれば、backfill や過去データの修正など、merge に比べて高効率に行えるでしょう。

一方で unique 性を保ったままレコードの更新を追跡する場合は merge を使用する必要があります。 insert_overwrite では、 partition カラムの値が変化した場合にデータが重複する可能性があるためです。 partition table に対して merge を用いる場合は incremental_predicates で partition の範囲を制限しておけば pruning を効かせ、コストを抑えることができます。

mutable なソーステーブルを snapshot 的に増分更新していく場合はどうでしょうか? unique_key を指定せずに merge を用いる方が効率はよさそうに思えます。 insert_overwrite では partition カラムの比較が走るのに対して、merge では単純な insert が行われるだけだからです。 ただし実際に検証をしたわけではないため、間違っていたら申し訳ありません。

使いどころのまとめ

ユースケース推奨 strategy特徴
immutable なテーブルの蓄積insert_overwrite- パーティション単位での効率的な処理
- backfill が容易
- パーティションテーブルのみ対応
mutable なテーブルの変更の追跡merge- unique key による更新管理
- 履歴の追跡が可能
- incremental_predicatesでコスト最適化可能
mutable なテーブルの増分蓄積merge- 単純な追記処理
- unique key なしで高効率
- BigQuery ではappendの代替として利用

BigQuery だけ strategy の実装が少ないような気がしますが、既に十分な機能を備えているということなのかもしれません。 シャーディングテーブルのとしての蓄積も実装されるとより便利になりそうですね。 改めて BigQuery の機能性に感動し、来月の Google Cloud Next 2025 がより楽しみになりました。

さいごに

dbt の incremental strategy についてまとめてみました。 少しでもご参考になれば幸いです。

弊社ではエンジニアの採用を行なっております。 データプラットフォームチームに興味を持っていただけた方は是非 Entrancebook をご覧ください。