dbt incremental model の挙動について調べてみました

2024-10-02

こんにちは。株式会社 Belong の DataPlatform チームに所属する Shuhei です。 Belong に入社して 3 ヶ月が経ちました。毎日が風のように走っていきます。日々学びを噛み締めています。

Belong の DataPlatform チームでは、dbt-core を用いて、BigQuery 上にあるデータ分析基盤を構築しています。現在、私は絶賛 dbt お勉強中です 💪

先日、dbt の incremental model にふれる機会があり、とてもおもしろい機能だなと思う反面、難しさも感じました。今回の記事は、自分の学びを深めるためにも、そんな incremental model について記していきます。

incremental model とは

incremental model とは、dbt の materializaitons のうちの 1 つです。incremental model の特徴は、既存のデータを再計算するのではなく、新しいデータや変更されたデータのみを追加・更新する点にあります。

Materialization説明
tableデータベースにデータが保存されず、その都度クエリを実行します。
viewデータを物理的にテーブルとして保存します。
ephemeral中間テーブルなどの用途として一時的に使用されるテーブルで、テーブルとして保存されません。
materialized viewSQL の実行結果を保存しておき、結果に差分があるときには差分のみを読み込む
incremental新しいデータや変更されたデータのみを追加・更新します。

incremental model には、次のメリットがあります。

  • パフォーマンスの向上
    • 新しいデータや変更されたデータのみを処理するため、クエリの実行時間が短縮されます。
  • コスト削減
    • 処理対象を最小限にすることで、データウェアハウスの使用コストを抑えることができます。

一方で、公式ドキュメントからは、view, table, incremental model の順に複雑性が増していくような印象を受けました。view, table, incremental model のそれぞれの特徴を踏まえたうえで、materialization を選択したいものです。

We’ll explore this in-depth throughout, but the basic guideline is start as simple as possible. We’ll follow a tiered approached, only moving up a tier when it’s necessary.

🔍 Start with a view. When the view gets too long to query for end users,

⚒️ Make it a table. When the table gets too long to build in your dbt Jobs,

📚 Build it incrementally. That is, layer the data on in chunks as it comes in. ※ 引用:https://docs.getdbt.com/best-practices/materializations/1-guide-overview#guiding-principle

このように、incremental model はいい感じの挙動をしてくれそう!と思う一方で、用法の難易度は高そうだなと思いました。

incremental model を configurations から理解してみる

どのようなところに難しさがあるのか、実際に体感してみるために incremental model を触ってみました。公式ドキュメントを読みながら、いくつかの configurations を通して、少しずつ理解してみます。

incremental model の挙動の特徴は、初回実行では create table、 2 回目以降の実行では merge を実行する点にもあると思います。そのそれぞれにも注目していきながら、挙動を確認していきます。

is_incremental() macro

まず、 is_incremental() を使用することで、どのレコードを増分対象にするかフィルタする ことができます。 is_incremental()True, False を返す macro です。基本的には incremental model の初回実行では False となり、それ以降の実行時は True となります。

次に is_incremental() を使用する場合と、使用しない場合の例を見ていきましょう。

今回は、下記のような簡単な incremental model を作成してみることを考えてみます。ソースとなるテーブルとして、id, name, address, created_at, updated_atのカラムをもつテーブルを作成し、50 件ほどのテストデータを用意してみました。

is_incremental() を使用しないとき

is_incremental() を使用しない例として、下記のようなクエリを用意しました。

{{
  config(
      materialized = 'incremental'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from [ source table name ]

この状態で dbt run すると 、下記のようなクエリにコンパイルされ実行されます。

初回実行時

select 文の結果を保持するテーブルを作成する create table 文が生成され、実行されます。

CREATE OR REPLACE TABLE
  [ incremental model name ] OPTIONS() AS (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name] );

2 回目以降の実行時

MERGE 文が生成され、実行されます。少し中身を見てみましょう。

DBT_INTERNAL_SOURCE のデータを DBT_INTERNAL_DEST に merge し、マッチしないレコードを insert していることがわかります。 一方で on 句には FALSE という条件が指定されているため、実際にはマッチング条件が存在せず、すべての行が新規行として扱われることを意味します。

MERGE INTO
  [ incremental model name ] AS DBT_INTERNAL_DEST
USING
  (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name ] ) AS DBT_INTERNAL_SOURCE
ON
  (FALSE)
  WHEN NOT MATCHED
  THEN
INSERT
  (`id`,
    `name`,
    `address`,
    `created_at`,
    `updated_at`)
VALUES
  (`id`, `name`, `address`, `created_at`, `updated_at`)

このクエリを実行したとき、作成した incremental model においては重複が発生してしまいました。

select
  name,
  count(*) as cnt
from [ incremental model name ]
group by id, name, address, created_at, updated_at
having cnt >= 2
order by name

これではやや使いづらいと感じてしまうかもしれません。なにか増分対象を制御する方法があればよいのですが...

is_incremental() を使用するとき

それでは、is_incremental() macro を使用してみましょう!

is_incremental() を使用する例として、下記のような SQL を用意しました。

{{
  config(
      materialized = 'incremental'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from [ source table name ]
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} )
{% endif %}

where 句の部分で is_incremental() を用いています。True となるときには、this (現在の incremental model) の updated_at よりも新しい updated_at のみを select するように、レコードを絞り込んでいます。

この状態で dbt run すると 、下記のようなクエリにコンパイルされ実行されます。

初回実行時

select 文の結果を保持するテーブルを作成する create table 文が生成され、実行されます。これは is_incremental() を使用しないときと同様ですね。

CREATE OR REPLACE TABLE
  [ incremental model name ] OPTIONS() AS (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name] );

2 回目の実行時

is_incremental() を使用しないときと比較すると、where 句の部分で差異があります。is_incremental()True となり、where 句が効いていることがわかりました。

MERGE INTO
  [ incremental model name ] AS DBT_INTERNAL_DEST
USING
  (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name ]
  WHERE
    updated_at > (
    SELECT
      MAX(updated_at)
    FROM
      [ incremental model name ] ) ) AS DBT_INTERNAL_SOURCE
ON
  (FALSE)
  WHEN NOT MATCHED
  THEN
INSERT
  (`id`,
    `name`,
    `address`,
    `created_at`,
    `updated_at`)
VALUES
  (`id`, `name`, `address`, `created_at`, `updated_at`)

この where 句のおかげで、現在の updated_at よりも新しい updated_at を持つレコードのみが insert されます。 したがって、is_incremental() を使用しないときに見られた、レコードの重複は見られません。

select
  name,
  count(*) as cnt
from [ incremental model name ]
group by id, name, address, created_at, updated_at
having cnt >= 2
order by name

つまり、is_incremental() を使用することで、ソーステーブルのどのレコードを増分とするかをフィルタすることができ、適切な増分更新を実現することができそうです!

unique key

unique key を指定することで、増分のデータを insert するか、update するかを制御することができます。

incremental model のデフォルトの設定では、増分のデータは insert されます。しかし、新規のレコードは insert を、更新されたレコードは update をしたいケースもきっとあることでしょう。

今回は下記のシナリオを考えてみます。

シナリオ: ユーザー名 "User1" を "UserX" に変更

1. 初期状態:

  • レコード: User1, 過去時刻の updated_at

2. 更新後:

  • レコード: UserX, 現在時刻の updated_at

では、このようにユーザー名が変更になったケースを想定して、nameUser1 のレコードについて、nameUserX に、updated_at を現在時刻のタイムスタンプに更新してみます。下記の SQL を実行して準備を整えます。

update [ source table name ] set name = 'UserX', updated_at = current_timestamp where name = 'User1'

unique key を指定しないとき

それではまず、unique key を指定しないケースを考えてみます。

前述の例で用いた SQL をベースに考えてみましょう。

{{
  config(
      materialized = 'incremental'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from [ source table name ]
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} )
{% endif %}

この状態で dbt run してみます。 SQL は前述の例と同じなので、実行されるクエリの詳細は割愛し、結果にのみ注目してみます。

この結果の図から、同一 id で重複が発生していることがわかります。name, updated_at を更新したレコードが増分データとして insert されるといった挙動ですね。

unique key を指定しないとき

それでは、unique key を指定して、どのように挙動がかわるか見ていきましょう。

このような SQL としてみました。unique key として id を指定しています。

{{
  config(
      materialized = 'incremental'
      unique_key = 'id'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from {{ source('ext', 'src_user_address') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} )
{% endif %}

この状態で dbt run すると、下記のようなクエリにコンパイルされ実行されます。

MERGE INTO
  [ incremental model name ] AS DBT_INTERNAL_DEST
USING
  (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name ]
  WHERE
    updated_at > (
    SELECT
      MAX(updated_at)
    FROM
      [ incremental model name ] ) ) AS DBT_INTERNAL_SOURCE
ON
  ( DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id )
  WHEN MATCHED THEN UPDATE SET `id` = DBT_INTERNAL_SOURCE.`id`, `name` = DBT_INTERNAL_SOURCE.`name`, `address` = DBT_INTERNAL_SOURCE.`address`, `created_at` = DBT_INTERNAL_SOURCE.`created_at`, `updated_at` = DBT_INTERNAL_SOURCE.`updated_at`
  WHEN NOT MATCHED
  THEN
INSERT
  (`id`,
    `name`,
    `address`,
    `created_at`,
    `updated_at`)
VALUES
  (`id`, `name`, `address`, `created_at`, `updated_at`)

unique_key を指定しないときのクエリでは、 on 句に FALSE という条件が指定されているため、実際にはマッチング条件が存在せず、すべての行が新規行として扱われていました。

その一方で、unique_key = id を指定すると、on 句に DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id という条件が指定されました。ソースとなるデータとターゲットとなるデータの id が一致するとき、update されるようになっています!

実際のデータも見てみましょう。

この結果の図から、name が User1 から userX に更新されており、同一 id で重複する行は存在していないことがわかりました。unique_key を指定したことにより、増分データが insert ではなく update されました!

まとめ

以上、is_incremental()unique_key の configurations を通して、incremental model について知ることができました。とりあえず materializations を incremental model にすれば増分データの更新が気軽にできる、というよりは configuration を適切に指定しないと全く使えないデータにもなりかねない難しさが印象的でした。

特に、unique_key の指定にあたっては、ソースデータ側にも unique であることを保証するための dbt test を追加するなどの工夫が必要そうだなと思いました。

今回紹介した configurations の他に、例えば strategy という重要な configrations もありますが、これはまた今度触れてみます!

Belong ではこのように dbt を用いた DWH の構築を積極的に実施しています!dbt がお好きな方、少しでも気になるよというははぜひ Belong Entrance Book for Engineer をご覧ください!