shuheishuhei

dbt incremental model の挙動について調べてみました

2024-10-02

こんにちは。株式会社 Belong の DataPlatform チームに所属する Shuhei です。 Belong に入社して 3 ヶ月が経ちました。毎日が風のように走っていきます。日々学びを噛み締めています。

Belong の DataPlatform チームでは、dbt-core を用いて、BigQuery 上にあるデータ分析基盤を構築しています。現在、私は絶賛 dbt お勉強中です 💪

先日、dbt の incremental model にふれる機会があり、とてもおもしろい機能だなと思う反面、難しさも感じました。今回の記事は、自分の学びを深めるためにも、そんな incremental model について記していきます。

incremental model とは

incremental model とは、dbt の materializaitons のうちの 1 つです。incremental model の特徴は、既存のデータを再計算するのではなく、新しいデータや変更されたデータのみを追加・更新する点にあります。

Materialization説明
tableデータベースにデータが保存されず、その都度クエリを実行します。
viewデータを物理的にテーブルとして保存します。
ephemeral中間テーブルなどの用途として一時的に使用されるテーブルで、テーブルとして保存されません。
materialized viewSQL の実行結果を保存しておき、結果に差分があるときには差分のみを読み込む
incremental新しいデータや変更されたデータのみを追加・更新します。

incremental model には、次のメリットがあります。

  • パフォーマンスの向上
    • 新しいデータや変更されたデータのみを処理するため、クエリの実行時間が短縮されます。
  • コスト削減
    • 処理対象を最小限にすることで、データウェアハウスの使用コストを抑えることができます。

一方で、公式ドキュメントからは、view, table, incremental model の順に複雑性が増していくような印象を受けました。view, table, incremental model のそれぞれの特徴を踏まえたうえで、materialization を選択したいものです。

We’ll explore this in-depth throughout, but the basic guideline is start as simple as possible. We’ll follow a tiered approached, only moving up a tier when it’s necessary.

🔍 Start with a view. When the view gets too long to query for end users,

⚒️ Make it a table. When the table gets too long to build in your dbt Jobs,

📚 Build it incrementally. That is, layer the data on in chunks as it comes in. ※ 引用:https://docs.getdbt.com/best-practices/materializations/1-guide-overview#guiding-principle

このように、incremental model はいい感じの挙動をしてくれそう!と思う一方で、用法の難易度は高そうだなと思いました。

incremental model を configurations から理解してみる

どのようなところに難しさがあるのか、実際に体感してみるために incremental model を触ってみました。公式ドキュメントを読みながら、いくつかの configurations を通して、少しずつ理解してみます。

incremental model の挙動の特徴は、初回実行では create table、 2 回目以降の実行では merge を実行する点にもあると思います。そのそれぞれにも注目していきながら、挙動を確認していきます。

is_incremental() macro

まず、 is_incremental() を使用することで、どのレコードを増分対象にするかフィルタする ことができます。 is_incremental()True, False を返す macro です。基本的には incremental model の初回実行では False となり、それ以降の実行時は True となります。

次に is_incremental() を使用する場合と、使用しない場合の例を見ていきましょう。

今回は、下記のような簡単な incremental model を作成してみることを考えてみます。ソースとなるテーブルとして、id, name, address, created_at, updated_atのカラムをもつテーブルを作成し、50 件ほどのテストデータを用意してみました。

is_incremental() を使用しないとき

is_incremental() を使用しない例として、下記のようなクエリを用意しました。

{{
  config(
      materialized = 'incremental'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from [ source table name ]

この状態で dbt run すると 、下記のようなクエリにコンパイルされ実行されます。

初回実行時

select 文の結果を保持するテーブルを作成する create table 文が生成され、実行されます。

CREATE OR REPLACE TABLE
  [ incremental model name ] OPTIONS() AS (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name] );

2 回目以降の実行時

MERGE 文が生成され、実行されます。少し中身を見てみましょう。

DBT_INTERNAL_SOURCE のデータを DBT_INTERNAL_DEST に merge し、マッチしないレコードを insert していることがわかります。 一方で on 句には FALSE という条件が指定されているため、実際にはマッチング条件が存在せず、すべての行が新規行として扱われることを意味します。

MERGE INTO
  [ incremental model name ] AS DBT_INTERNAL_DEST
USING
  (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name ] ) AS DBT_INTERNAL_SOURCE
ON
  (FALSE)
  WHEN NOT MATCHED
  THEN
INSERT
  (`id`,
    `name`,
    `address`,
    `created_at`,
    `updated_at`)
VALUES
  (`id`, `name`, `address`, `created_at`, `updated_at`)

このクエリを実行したとき、作成した incremental model においては重複が発生してしまいました。

select
  name,
  count(*) as cnt
from [ incremental model name ]
group by id, name, address, created_at, updated_at
having cnt >= 2
order by name

これではやや使いづらいと感じてしまうかもしれません。なにか増分対象を制御する方法があればよいのですが...

is_incremental() を使用するとき

それでは、is_incremental() macro を使用してみましょう!

is_incremental() を使用する例として、下記のような SQL を用意しました。

{{
  config(
      materialized = 'incremental'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from [ source table name ]
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} )
{% endif %}

where 句の部分で is_incremental() を用いています。True となるときには、this (現在の incremental model) の updated_at よりも新しい updated_at のみを select するように、レコードを絞り込んでいます。

この状態で dbt run すると 、下記のようなクエリにコンパイルされ実行されます。

初回実行時

select 文の結果を保持するテーブルを作成する create table 文が生成され、実行されます。これは is_incremental() を使用しないときと同様ですね。

CREATE OR REPLACE TABLE
  [ incremental model name ] OPTIONS() AS (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name] );

2 回目の実行時

is_incremental() を使用しないときと比較すると、where 句の部分で差異があります。is_incremental()True となり、where 句が効いていることがわかりました。

MERGE INTO
  [ incremental model name ] AS DBT_INTERNAL_DEST
USING
  (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name ]
  WHERE
    updated_at > (
    SELECT
      MAX(updated_at)
    FROM
      [ incremental model name ] ) ) AS DBT_INTERNAL_SOURCE
ON
  (FALSE)
  WHEN NOT MATCHED
  THEN
INSERT
  (`id`,
    `name`,
    `address`,
    `created_at`,
    `updated_at`)
VALUES
  (`id`, `name`, `address`, `created_at`, `updated_at`)

この where 句のおかげで、現在の updated_at よりも新しい updated_at を持つレコードのみが insert されます。 したがって、is_incremental() を使用しないときに見られた、レコードの重複は見られません。

select
  name,
  count(*) as cnt
from [ incremental model name ]
group by id, name, address, created_at, updated_at
having cnt >= 2
order by name

つまり、is_incremental() を使用することで、ソーステーブルのどのレコードを増分とするかをフィルタすることができ、適切な増分更新を実現することができそうです!

unique key

unique key を指定することで、増分のデータを insert するか、update するかを制御することができます。

incremental model のデフォルトの設定では、増分のデータは insert されます。しかし、新規のレコードは insert を、更新されたレコードは update をしたいケースもきっとあることでしょう。

今回は下記のシナリオを考えてみます。

シナリオ: ユーザー名 "User1" を "UserX" に変更

1. 初期状態:

  • レコード: User1, 過去時刻の updated_at

2. 更新後:

  • レコード: UserX, 現在時刻の updated_at

では、このようにユーザー名が変更になったケースを想定して、nameUser1 のレコードについて、nameUserX に、updated_at を現在時刻のタイムスタンプに更新してみます。下記の SQL を実行して準備を整えます。

update [ source table name ] set name = 'UserX', updated_at = current_timestamp where name = 'User1'

unique key を指定しないとき

それではまず、unique key を指定しないケースを考えてみます。

前述の例で用いた SQL をベースに考えてみましょう。

{{
  config(
      materialized = 'incremental'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from [ source table name ]
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} )
{% endif %}

この状態で dbt run してみます。 SQL は前述の例と同じなので、実行されるクエリの詳細は割愛し、結果にのみ注目してみます。

この結果の図から、同一 id で重複が発生していることがわかります。name, updated_at を更新したレコードが増分データとして insert されるといった挙動ですね。

unique key を指定しないとき

それでは、unique key を指定して、どのように挙動がかわるか見ていきましょう。

このような SQL としてみました。unique key として id を指定しています。

{{
  config(
      materialized = 'incremental'
      unique_key = 'id'
    )
}}

select
    id,
    name,
    address,
    created_at,
    updated_at
from {{ source('ext', 'src_user_address') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} )
{% endif %}

この状態で dbt run すると、下記のようなクエリにコンパイルされ実行されます。

MERGE INTO
  [ incremental model name ] AS DBT_INTERNAL_DEST
USING
  (
  SELECT
    id,
    name,
    address,
    created_at,
    updated_at
  FROM
    [ source table name ]
  WHERE
    updated_at > (
    SELECT
      MAX(updated_at)
    FROM
      [ incremental model name ] ) ) AS DBT_INTERNAL_SOURCE
ON
  ( DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id )
  WHEN MATCHED THEN UPDATE SET `id` = DBT_INTERNAL_SOURCE.`id`, `name` = DBT_INTERNAL_SOURCE.`name`, `address` = DBT_INTERNAL_SOURCE.`address`, `created_at` = DBT_INTERNAL_SOURCE.`created_at`, `updated_at` = DBT_INTERNAL_SOURCE.`updated_at`
  WHEN NOT MATCHED
  THEN
INSERT
  (`id`,
    `name`,
    `address`,
    `created_at`,
    `updated_at`)
VALUES
  (`id`, `name`, `address`, `created_at`, `updated_at`)

unique_key を指定しないときのクエリでは、 on 句に FALSE という条件が指定されているため、実際にはマッチング条件が存在せず、すべての行が新規行として扱われていました。

その一方で、unique_key = id を指定すると、on 句に DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id という条件が指定されました。ソースとなるデータとターゲットとなるデータの id が一致するとき、update されるようになっています!

実際のデータも見てみましょう。

この結果の図から、name が User1 から userX に更新されており、同一 id で重複する行は存在していないことがわかりました。unique_key を指定したことにより、増分データが insert ではなく update されました!

まとめ

以上、is_incremental()unique_key の configurations を通して、incremental model について知ることができました。とりあえず materializations を incremental model にすれば増分データの更新が気軽にできる、というよりは configuration を適切に指定しないと全く使えないデータにもなりかねない難しさが印象的でした。

特に、unique_key の指定にあたっては、ソースデータ側にも unique であることを保証するための dbt test を追加するなどの工夫が必要そうだなと思いました。

今回紹介した configurations の他に、例えば strategy という重要な configrations もありますが、これはまた今度触れてみます!

Belong ではこのように dbt を用いた DWH の構築を積極的に実施しています!dbt がお好きな方、少しでも気になるよというははぜひ Belong Entrance Book for Engineer をご覧ください!

No table of contents available for this content