Event Sourcing 実践編

2023-05-26

はじめに

こんにちは。Belong Inc. で Backend Engineer を担当している niwa です。
今年は東京の Summer Sonic に二日間とも行くぞ、という心持ちだったのですが、本記事執筆時点で全てチケットが完売しており深い悲しみに包まれています。

前回の記事では、Event Sourcing の概念について解説しました。 今回は実践編と題して、go のコードを使って書くとどのような感じになるのかを紹介したいと思います。

前提条件

今回の実装にあたっては、proto.actor というライブラリを利用します。1
proto.actor は、Akka や Erlang に代表される Actor Model を実現するためのライブラリです。
本記事では Actor Model についての説明は省略しますが、こちら に詳細な説明がありますので、興味のある方は参照していただけると嬉しいです。

実装

protobuf の定義

早速実装に入っていきましょう!とはいいつつ、まず一番最初に必要になるものはモデルです。
今回はモデルとして、前回の記事で紹介した 財布 (= Wallet ) を採用します。

proto.actor はその名の通り、モデル構造やメッセージングなど、強く protobuf に依存した構造になっています。
そのため、今回の実装でもそれに則り、基本的なデータモデルについては protobuf を使用して定義していきます。

syntax = "proto3";

package wallet;

message Wallet {
  int32 deposit = 1;
}

message ReceiveMoney {
  int32 amount = 1;
}

message PaymentMoney {
  int32  amount = 1;
}

まずは 財布のデータモデルを示す message Wallet を定義しました。
Wallet は deposit というフィールドを持ち、これは財布の中に入っているお金の額を表します。

また、財布に対してお金を受け取る ReceiveMoney と、お金を支払う PaymentMoney という message を定義しました。 これらの 財布に対しての状態変更 を示す message が、今回作成する Event Sourcing の Event に相当します。

この protobuf を元に、 buf などのツールを使用して wallet.pb.go を生成しておきます。

Actor の定義

上記で、簡単ではありますがデータモデルと、モデルに対する操作の定義が終わりました。
続いて、最低限 proto.actor を使用したプログラムが形になるところまで実装します。

ディレクトリ構成

ファイルの構成は、下記のようなイメージになります。

├── go.mod
├── go.sum
├── main.go
├── models
│   ├── proto
│   │   └── wallet.pb.go
│   ├── wallet.go
│   └── wallet_provider.go
└── proto
    └── wallet.proto

models/wallet.go

まずは、Wallet の振る舞いを定義していきます。
proto.actor のしきたりに乗っ取り、Wallet は actor.Actor インターフェースを実装する必要があります。

actor.Actor のインターフェースはシンプルで、実装が必要なメソッドは Receive(actor.Context) のみです。
proto.actor 上にて送信されるメッセージは全て actor.Context を介して受け取ることになります。
actor.ContextMessage の型に応じて処理を分岐させることで、メッセージに応じた振る舞いを実現します。

ここでは、先程 proto で定義した ReceiveMoneyPaymentMoney を受け取った場合に、Wallet がもつ残高を加算・減算するようにします。

また、proto.actor の永続化機構を利用するために、persistence.Mixin を埋め込んでいます。

package models

import (
	"fmt"

	"money-actor/models/proto"

	"github.com/asynkron/protoactor-go/actor"
	"github.com/asynkron/protoactor-go/persistence"
)

type WalletActor struct {
	proto.Wallet
	persistence.Mixin
}

func (w *WalletActor) Receive(c actor.Context) {
	switch msg := c.Message().(type) {
	case *proto.ReceiveMoney:
		{
			w.Deposit += int(msg.Amount)
			if !w.Recovering() {
				w.PersistReceive(msg)
			}
			fmt.Printf("current deposit: %d\n", w.Deposit)
		}
	case *proto.PaymentMoney:
		{
			w.Deposit -= int(msg.Amount)
			if !w.Recovering() {
				w.PersistReceive(msg)
			}
			fmt.Printf("current deposit: %d\n", w.Deposit)
		}
	}
}

var _ actor.Actor = (*WalletActor)(nil)

models/wallet_provider.go

永続化手法の機構を決定するための provider です。
persistence.Provider は、Actor の永続化に関する機構を提供するためのインターフェースです。

ここについては後ほど詳しく触れますが、 まずは動かすことを優先するため、 一旦組み込みの InMemoryProvider を使用します。
InMemoryProvider はその名の通り、メモリ上にデータを保存するため、プログラムが終了するとデータは揮発してしまいます。

package models

import "github.com/asynkron/protoactor-go/persistence"

type WalletProvider struct {
}

func (w WalletProvider) GetState() persistence.ProviderState {
	return persistence.NewInMemoryProvider(5)
}

var _ persistence.Provider = (*WalletProvider)(nil)

main.go

ここまで来れば、あとは main.go を実装するのみです。

  • WalletActor の永続化機構として WalletProviderを指定した上で、Actor を作成
  • 作成した Actor に対して、入金 / 出金のイベントである ReceiveMoney / PaymentMoney を送信

という一連のシナリオを記述したプログラムを実装します。

package main

import (
	"money-actor/models"
	"money-actor/models/proto"

	console "github.com/asynkron/goconsole"
	"github.com/asynkron/protoactor-go/actor"
	"github.com/asynkron/protoactor-go/persistence"
)

func main() {
	sys := actor.NewActorSystem()

	props := actor.PropsFromProducer(func() actor.Actor {
		// props が spawn されたら、新規の models.WalletActor が生成される
		return &models.WalletActor{}
	}, actor.WithReceiverMiddleware(
		// models.WalletProvider を永続化機構として選択する
		persistence.Using(
			&models.WalletProvider{},
		),
	))

	pid := sys.Root.Spawn(props)

	// 12000円の入金
	sys.Root.Send(pid, &proto.ReceiveMoney{Amount: 12000})
	// 4000円の入金
	sys.Root.Send(pid, &proto.ReceiveMoney{Amount: 4000})
	// 3000円の入金
	sys.Root.Send(pid, &proto.ReceiveMoney{Amount: 3000})
	// 8200円の出金
	sys.Root.Send(pid, &proto.PaymentMoney{Amount: 8200})

	_, _ = console.ReadLine()
}

実行

この状態で go run main.go を実行すると、下記のような結果が得られます。

$ go run main.go
current deposit: 12000
current deposit: 16000
current deposit: 19000
current deposit: 10800

Actor モデルに馴染みのない方は戸惑いのあるプログラム構造だったとは思われますが、 実際の動作としては、受け取ったイベントに応じて、deposit の値を更新し、それをコンソールに出力しているだけ、という雰囲気を感じ取ってもらえたと思います。

永続化の実装

ここまでで、実際に Actor Model を利用したシンプルなプログラムを実装しました。
ただ、この状態では永続化層として InMemoryProvider を利用しているため、プログラムを再起動すると、deposit の値が失われてしまいます。
そこで、永続化層として SQLite を利用するように変更していきましょう。

SQLite の準備

下記のテーブル構造をもつ SQLite Database を作成します。

# Wallet に対する Event を格納するテーブル
CREATE TABLE wallet_event (
    user_id TEXT NOT NULL,
    version INT NOT NULL,
    event_name TEXT NOT NULL,
    event_value INT NOT NULL,
    timestamp DATETIME NOT NULL,
    primary key (user_id, version)
);

# Wallet の Snapshot を格納するテーブル
create table wallet_snapshot (
    user_id TEXT PRIMARY KEY NOT NULL,
    saved_version INT NOT NULL,
    deposit INT NOT NULL
);

現在のところ Event として定義している ReceiveMoney / PaymentMoney は、どちらも amount という数値のみを値として持っています。 そのため、event_value を int 型で定義し、event_name field で両者の区分けが付くように定義します。

wallet_snapshot はその名の通り、あるバージョン時点での wallet の状態を保存するためのテーブルです。 (前回記事の Snapshot Approach に相当するものです)
こちらは素直に 特定時点での deposit の値を示す deposit field と、その時点でのバージョンを示す saved_version field を定義します。

永続化層の実装

models/sqlite_provider.go

永続化層として SQLite を利用するために、まずは persistence.Provider を実装する SQLiteProvider を定義します。 実装にあたっては、ORM として sqlboiler を利用します。

package models

import (
	"context"
	"database/sql"
	"time"

	"google.golang.org/protobuf/proto"
	mproto "money-actor/models/proto"
	"money-actor/sqlite3"

	_ "github.com/mattn/go-sqlite3"

	"github.com/asynkron/protoactor-go/persistence"
	"github.com/volatiletech/sqlboiler/v4/boil"
	"github.com/volatiletech/sqlboiler/v4/queries/qm"
)

type SQLiteProvider struct {
	ctx      context.Context
	conn     boil.ContextExecutor
	interval int
	userID   string
}

func NewSQLiteProvider(userID string) *SQLiteProvider {
	db, err := sql.Open("sqlite3", "./db.sqlite3")
	if err != nil {
		panic(err)
	}
	return &SQLiteProvider{
		ctx:      context.Background(),
		interval: 5,
		conn:     db,
		userID:   userID,
	}
}

func (s *SQLiteProvider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool) {
	snap := &mproto.Wallet{
		Deposit: 0,
	}

	wallet, err := sqlite3.WalletSnapshots(sqlite3.WalletSnapshotWhere.UserID.EQ(s.userID)).One(s.ctx, s.conn)
	if err != nil {
		// Snapshot を DB から取得できなかった
		return snap, 1, true
	}

	// DB の Snapshot の値を使用する
	snap.Deposit = int32(wallet.Deposit)

	return snap, int(wallet.SavedVersion + 1), true
}

func (s *SQLiteProvider) PersistSnapshot(actorName string, snapshotIndex int, snapshot proto.Message) {
	wallet := snapshot.(*mproto.Wallet)
	snap := &sqlite3.WalletSnapshot{
		UserID:       s.userID,
		SavedVersion: int64(snapshotIndex),
		Deposit:       int64(wallet.Deposit),
	}
	_ = snap.Upsert(s.ctx, s.conn, true, []string{"user_id"}, boil.Infer(), boil.Infer())
}

func (s *SQLiteProvider) GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{})) {
	conditions := []qm.QueryMod{
		sqlite3.WalletEventWhere.UserID.EQ(s.userID),
	}
	if eventIndexStart != 0 {
		conditions = append(conditions, sqlite3.WalletEventWhere.Version.GTE(int64(eventIndexStart)))
	}
	if eventIndexEnd != 0 {
		conditions = append(conditions, sqlite3.WalletEventWhere.Version.LT(int64(eventIndexEnd)))
	}
	events, err := sqlite3.WalletEvents(conditions...).All(s.ctx, s.conn)
	if err != nil {
		return
	}

	for _, event := range events {
		switch event.EventName {
		case "receive_money":
			callback(&mproto.ReceiveMoney{Amount: int32(event.EventValue)})
		case "payment_money":
			callback(&mproto.PaymentMoney{Amount: int32(event.EventValue)})
		}
	}
}

func (s *SQLiteProvider) PersistEvent(actorName string, eventIndex int, event proto.Message) {
	wallet := &sqlite3.WalletEvent{
		UserID:    s.userID,
		Version:   int64(eventIndex),
		Timestamp: time.Now(),
	}
	switch event.(type) {
	case *mproto.ReceiveMoney:
		rcv := event.(*mproto.ReceiveMoney)
		wallet.EventName = "receive_money"
		wallet.EventValue = int64(rcv.Amount)
	case *mproto.PaymentMoney:
		pay := event.(*mproto.PaymentMoney)
		wallet.EventName = "payment_money"
		wallet.EventValue = int64(pay.Amount)
	}

	_ = wallet.Insert(s.ctx, s.conn, boil.Infer())
}

func (s *SQLiteProvider) GetSnapshotInterval() int {
	return s.interval
}

// 以下は実装しない
func (s *SQLiteProvider) DeleteSnapshots(actorName string, inclusiveToIndex int) {
}

func (s *SQLiteProvider) DeleteEvents(actorName string, inclusiveToIndex int) {
}

func (s *SQLiteProvider) Restart() {
}

var _ persistence.ProviderState = (*SQLiteProvider)(nil)

長くなってしまいましたが、やっていることとしてはシンプルです。実際には以下のような処理を行っています。

  • GetSnapshot ( Snapshot を DB から復元 )
    • Snapshot を DB から取得し、proto.Wallet に変換します。Snapshot が存在しない場合は、初期値の proto.Wallet を返します。
  • PersistSnapshot ( 現在時点の Snapshot を DB に保存 )
    • 受け取った proto.Wallet を元に、Snapshot を DB に保存します。
  • GetEvents ( Event 履歴を DB から復元 )
    • eventIndex を元に、特定範囲のイベントを DB より取得します。 wallet_eventevent_name の値に応じて、作成する proto の型を変更します。
  • PersistEvent ( 発生した Event を DB に保存 )
    • 受け取ったイベントを DB に永続化します。 proto の型に応じて、 wallet_eventevent_name の値を変更します。

また、GetSnapshotInterval は、Snapshot が保存される間隔を返すメソッドです。 今回は 5 件イベントが発行されるごとに Snapshot を保存するようにしています。

models/wallet.go

上記にて、Event / Snapshot と DB との関連付けの実装が完了しました。
続いて、WalletActor に対し、Snapshot からの状態復帰ができるように実装を修正します。

package models

import (
    "fmt"

    "money-actor/models/proto"

    "github.com/asynkron/protoactor-go/actor"
    "github.com/asynkron/protoactor-go/persistence"
)

type WalletActor struct {
	proto.Wallet
	persistence.Mixin
}

func (w *WalletActor) Receive(c actor.Context) {
	switch msg := c.Message().(type) {
	case *proto.ReceiveMoney:
		{
			w.Deposit += msg.Amount
			if w.Recovering() {
				fmt.Printf("replay wallet(receive): %v\n", msg)
			} else {
				w.PersistReceive(msg)
			}
			fmt.Printf("current deposit: %d\n", w.Deposit)
		}
	case *proto.PaymentMoney:
		{
			w.Deposit -= msg.Amount
			if w.Recovering() {
				fmt.Printf("replay wallet(payment): %v\n", msg)
			} else {
				w.PersistReceive(msg)
			}
			fmt.Printf("current deposit: %d\n", w.Deposit)
		}
	case *proto.Wallet:
		{
			fmt.Printf("recovering wallet from snapshot: %v\n", msg)
			w.Deposit = msg.Deposit
		}
	case *persistence.RequestSnapshot:
		{
			fmt.Printf("snapshot requested, current deposit: %d\n", w.Deposit)
			w.PersistSnapshot(&w.Wallet)
		}
	case *persistence.ReplayComplete:
		{
			fmt.Printf("replay complete, current deposit: %d\n", w.Deposit)
		}
	}
}

var _ actor.Actor = (*WalletActor)(nil)

先程の SQLiteProvider で実装した GetSnapshot では、戻り値として *proto.Wallet を設定していました。そのため、Snapshot からの復元を定義するために、*proto.Walletとの型マッチが必要になっています。

*persistence.RequestSnapshot は、Snapshot の作成を要求するメッセージです。このメッセージを受け取ったタイミングで、PersistSnapshot を呼び出すことで Snapshot の保存が可能になります。 *persistence.ReplayComplete は、Snapshot および Event の再適用が完了したことを示すメッセージです。今回は再適用が完了したタイミングで、ログを流すようにしています。

models/wallet_provider.go

上記が完了したら、wallet_provider.goの実装を InMemoryProvider から SQLiteProvider に変更します。

package models

import "github.com/asynkron/protoactor-go/persistence"

type WalletProvider struct {
}

func (w WalletProvider) GetState() persistence.ProviderState {
	// return persistence.NewInMemoryProvider(5)
	return NewSQLiteProvider("user_id")
}

var _ persistence.Provider = (*WalletProvider)(nil)

再実行

上記で SQLite を利用した永続化が実装できました。
早速、実行して動作を確認してみましょう。

1 回目

$ go run main.go
recovering wallet from snapshot:
replay complete, current deposit: 0
-----
current deposit: 12000
current deposit: 16000
current deposit: 19000
current deposit: 10800

ここまでは、InMemoryProvider を使用していた時と同じ結果です。

2 回目

$ go run main.go
recovering wallet from snapshot:
replay wallet(receive): amount:12000
current deposit: 12000
replay wallet(receive): amount:4000
current deposit: 16000
replay wallet(receive): amount:3000
current deposit: 19000
replay wallet(payment): amount:8200
current deposit: 10800
replay complete, current deposit: 10800
-----
snapshot requested, current deposit: 22800
current deposit: 22800
current deposit: 26800
current deposit: 29800
current deposit: 21600

前回はなかった、replay というログが出力されていますね。これは、DB に保存された Event が状態導出のため、再適用されていることを示しています。

2 回目の実行では、

  • Snapshot は存在しない
  • 過去のイベントとして、4 件の入出金がある

状態となっています。 そのため、

  • 4 件のイベントの再適用が行われ、前回の deposit の値である 10800 が導出された
  • その後、再度 main.go に定義している 4 件の入出金が行われた
  • その結果、最新の deposit の値として 21600 が導出された

という流れになります。また、5 件目のイベントが発生したタイミングで、Snapshot も作成されています。

3 回目

$ go run main.go
recovering wallet from snapshot: deposit:22800
replay wallet(receive): amount:4000
current deposit: 26800
replay wallet(receive): amount:3000
current deposit: 29800
replay wallet(payment): amount:8200
current deposit: 21600
replay complete, current deposit: 21600
-----
current deposit: 33600
snapshot requested, current deposit: 37600
current deposit: 37600
current deposit: 40600
current deposit: 32400

最後に、もう一度だけ実行してみましょう。  3 回目の実行時においては、

  • deposit が 22800 の時点での Snapshot が存在する
  • Snapshot を保存したタイミングより後に、3 件の入出金が行われている

状態になっています。

そのため、

  • Snapshot から deposit: 22800 の状態が復元された
  • Snapshot 取得後に発生した 3 件のイベントを再適用することで、前回の deposit である 21600 が導出された
  • その後、再度 main.go に定義している 4 件の入出金が行われた
  • その結果、最新の deposit の値として 32400 が導出された

ことがわかると思います。
Event Sourcing パターンではイベント量が増えるに伴って最新の状態を導出するためのコストは高まっていきますが、Snapshot Approach を取ることによって、最新状態導出のためのコストを抑えることができています。

まとめ

本記事では、Event Sourcing を適用したプログラムについて、golang / proto.actor を利用した実装例を示しました。
実際に Snapshot Approach がどのような振る舞いをするのかというところも合わせて紹介を行いました。
Event Sourcing を用いた実装のイメージとして、参考にしていただければ幸いです。

最後に、Belong Inc. では我々と一緒にサービスの成長にコミットしてくれるメンバーを募集中です!
ぜひ エンジニアリングチーム紹介ページ をご覧いただけたら幸いです。

Footnotes

  1. Actor Model でのプログラミングは Event Sourcing 実現にあたって必須ではないが、ライブラリ的にサポートされているため、シンプルに実装できる