Event Sourcing 実践編
はじめに
こんにちは。Belong Inc. で Backend Engineer を担当している niwa です。
今年は東京の Summer Sonic に二日間とも行くぞ、という心持ちだったのですが、本記事執筆時点で全てチケットが完売しており深い悲しみに包まれています。
前回の記事では、Event Sourcing の概念について解説しました。 今回は実践編と題して、go のコードを使って書くとどのような感じになるのかを紹介したいと思います。
前提条件
今回の実装にあたっては、proto.actor というライブラリを利用します。1
proto.actor は、Akka や Erlang に代表される Actor Model を実現するためのライブラリです。
本記事では Actor Model についての説明は省略しますが、こちら に詳細な説明がありますので、興味のある方は参照していただけると嬉しいです。
実装
protobuf の定義
早速実装に入っていきましょう!とはいいつつ、まず一番最初に必要になるものはモデルです。
今回はモデルとして、前回の記事で紹介した 財布 (= Wallet
) を採用します。
proto.actor
はその名の通り、モデル構造やメッセージングなど、強く protobuf に依存した構造になっています。
そのため、今回の実装でもそれに則り、基本的なデータモデルについては protobuf を使用して定義していきます。
syntax = "proto3";
package wallet;
message Wallet {
int32 deposit = 1;
}
message ReceiveMoney {
int32 amount = 1;
}
message PaymentMoney {
int32 amount = 1;
}
まずは 財布のデータモデルを示す message Wallet
を定義しました。
Wallet は deposit
というフィールドを持ち、これは財布の中に入っているお金の額を表します。
また、財布に対してお金を受け取る ReceiveMoney
と、お金を支払う PaymentMoney
という message を定義しました。
これらの 財布に対しての状態変更 を示す message が、今回作成する Event Sourcing の Event に相当します。
この protobuf を元に、 buf などのツールを使用して wallet.pb.go
を生成しておきます。
Actor の定義
上記で、簡単ではありますがデータモデルと、モデルに対する操作の定義が終わりました。
続いて、最低限 proto.actor
を使用したプログラムが形になるところまで実装します。
ディレクトリ構成
ファイルの構成は、下記のようなイメージになります。
├── go.mod
├── go.sum
├── main.go
├── models
│ ├── proto
│ │ └── wallet.pb.go
│ ├── wallet.go
│ └── wallet_provider.go
└── proto
└── wallet.proto
models/wallet.go
まずは、Wallet の振る舞いを定義していきます。
proto.actor
のしきたりに乗っ取り、Wallet は actor.Actor
インターフェースを実装する必要があります。
actor.Actor
のインターフェースはシンプルで、実装が必要なメソッドは Receive(actor.Context)
のみです。
proto.actor
上にて送信されるメッセージは全て actor.Context
を介して受け取ることになります。
actor.Context
の Message
の型に応じて処理を分岐させることで、メッセージに応じた振る舞いを実現します。
ここでは、先程 proto で定義した ReceiveMoney
と PaymentMoney
を受け取った場合に、Wallet がもつ残高を加算・減算するようにします。
また、proto.actor の永続化機構を利用するために、persistence.Mixin
を埋め込んでいます。
package models
import (
"fmt"
"money-actor/models/proto"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/persistence"
)
type WalletActor struct {
proto.Wallet
persistence.Mixin
}
func (w *WalletActor) Receive(c actor.Context) {
switch msg := c.Message().(type) {
case *proto.ReceiveMoney:
{
w.Deposit += int(msg.Amount)
if !w.Recovering() {
w.PersistReceive(msg)
}
fmt.Printf("current deposit: %d\n", w.Deposit)
}
case *proto.PaymentMoney:
{
w.Deposit -= int(msg.Amount)
if !w.Recovering() {
w.PersistReceive(msg)
}
fmt.Printf("current deposit: %d\n", w.Deposit)
}
}
}
var _ actor.Actor = (*WalletActor)(nil)
models/wallet_provider.go
永続化手法の機構を決定するための provider です。
persistence.Provider
は、Actor の永続化に関する機構を提供するためのインターフェースです。
ここについては後ほど詳しく触れますが、 まずは動かすことを優先するため、 一旦組み込みの InMemoryProvider
を使用します。
InMemoryProvider
はその名の通り、メモリ上にデータを保存するため、プログラムが終了するとデータは揮発してしまいます。
package models
import "github.com/asynkron/protoactor-go/persistence"
type WalletProvider struct {
}
func (w WalletProvider) GetState() persistence.ProviderState {
return persistence.NewInMemoryProvider(5)
}
var _ persistence.Provider = (*WalletProvider)(nil)
main.go
ここまで来れば、あとは main.go
を実装するのみです。
WalletActor
の永続化機構としてWalletProvider
を指定した上で、Actor を作成- 作成した Actor に対して、入金 / 出金のイベントである
ReceiveMoney
/PaymentMoney
を送信
という一連のシナリオを記述したプログラムを実装します。
package main
import (
"money-actor/models"
"money-actor/models/proto"
console "github.com/asynkron/goconsole"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/persistence"
)
func main() {
sys := actor.NewActorSystem()
props := actor.PropsFromProducer(func() actor.Actor {
// props が spawn されたら、新規の models.WalletActor が生成される
return &models.WalletActor{}
}, actor.WithReceiverMiddleware(
// models.WalletProvider を永続化機構として選択する
persistence.Using(
&models.WalletProvider{},
),
))
pid := sys.Root.Spawn(props)
// 12000円の入金
sys.Root.Send(pid, &proto.ReceiveMoney{Amount: 12000})
// 4000円の入金
sys.Root.Send(pid, &proto.ReceiveMoney{Amount: 4000})
// 3000円の入金
sys.Root.Send(pid, &proto.ReceiveMoney{Amount: 3000})
// 8200円の出金
sys.Root.Send(pid, &proto.PaymentMoney{Amount: 8200})
_, _ = console.ReadLine()
}
実行
この状態で go run main.go
を実行すると、下記のような結果が得られます。
$ go run main.go
current deposit: 12000
current deposit: 16000
current deposit: 19000
current deposit: 10800
Actor モデルに馴染みのない方は戸惑いのあるプログラム構造だったとは思われますが、
実際の動作としては、受け取ったイベントに応じて、deposit
の値を更新し、それをコンソールに出力しているだけ、という雰囲気を感じ取ってもらえたと思います。
永続化の実装
ここまでで、実際に Actor Model を利用したシンプルなプログラムを実装しました。
ただ、この状態では永続化層として InMemoryProvider
を利用しているため、プログラムを再起動すると、deposit
の値が失われてしまいます。
そこで、永続化層として SQLite を利用するように変更していきましょう。
SQLite の準備
下記のテーブル構造をもつ SQLite Database を作成します。
# Wallet に対する Event を格納するテーブル
CREATE TABLE wallet_event (
user_id TEXT NOT NULL,
version INT NOT NULL,
event_name TEXT NOT NULL,
event_value INT NOT NULL,
timestamp DATETIME NOT NULL,
primary key (user_id, version)
);
# Wallet の Snapshot を格納するテーブル
create table wallet_snapshot (
user_id TEXT PRIMARY KEY NOT NULL,
saved_version INT NOT NULL,
deposit INT NOT NULL
);
現在のところ Event として定義している ReceiveMoney
/ PaymentMoney
は、どちらも amount
という数値のみを値として持っています。
そのため、event_value
を int 型で定義し、event_name
field で両者の区分けが付くように定義します。
wallet_snapshot
はその名の通り、あるバージョン時点での wallet の状態を保存するためのテーブルです。 (前回記事の Snapshot Approach
に相当するものです)
こちらは素直に 特定時点での deposit
の値を示す deposit
field と、その時点でのバージョンを示す saved_version
field を定義します。
永続化層の実装
models/sqlite_provider.go
永続化層として SQLite を利用するために、まずは persistence.Provider
を実装する SQLiteProvider
を定義します。
実装にあたっては、ORM として sqlboiler
を利用します。
package models
import (
"context"
"database/sql"
"time"
"google.golang.org/protobuf/proto"
mproto "money-actor/models/proto"
"money-actor/sqlite3"
_ "github.com/mattn/go-sqlite3"
"github.com/asynkron/protoactor-go/persistence"
"github.com/volatiletech/sqlboiler/v4/boil"
"github.com/volatiletech/sqlboiler/v4/queries/qm"
)
type SQLiteProvider struct {
ctx context.Context
conn boil.ContextExecutor
interval int
userID string
}
func NewSQLiteProvider(userID string) *SQLiteProvider {
db, err := sql.Open("sqlite3", "./db.sqlite3")
if err != nil {
panic(err)
}
return &SQLiteProvider{
ctx: context.Background(),
interval: 5,
conn: db,
userID: userID,
}
}
func (s *SQLiteProvider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool) {
snap := &mproto.Wallet{
Deposit: 0,
}
wallet, err := sqlite3.WalletSnapshots(sqlite3.WalletSnapshotWhere.UserID.EQ(s.userID)).One(s.ctx, s.conn)
if err != nil {
// Snapshot を DB から取得できなかった
return snap, 1, true
}
// DB の Snapshot の値を使用する
snap.Deposit = int32(wallet.Deposit)
return snap, int(wallet.SavedVersion + 1), true
}
func (s *SQLiteProvider) PersistSnapshot(actorName string, snapshotIndex int, snapshot proto.Message) {
wallet := snapshot.(*mproto.Wallet)
snap := &sqlite3.WalletSnapshot{
UserID: s.userID,
SavedVersion: int64(snapshotIndex),
Deposit: int64(wallet.Deposit),
}
_ = snap.Upsert(s.ctx, s.conn, true, []string{"user_id"}, boil.Infer(), boil.Infer())
}
func (s *SQLiteProvider) GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{})) {
conditions := []qm.QueryMod{
sqlite3.WalletEventWhere.UserID.EQ(s.userID),
}
if eventIndexStart != 0 {
conditions = append(conditions, sqlite3.WalletEventWhere.Version.GTE(int64(eventIndexStart)))
}
if eventIndexEnd != 0 {
conditions = append(conditions, sqlite3.WalletEventWhere.Version.LT(int64(eventIndexEnd)))
}
events, err := sqlite3.WalletEvents(conditions...).All(s.ctx, s.conn)
if err != nil {
return
}
for _, event := range events {
switch event.EventName {
case "receive_money":
callback(&mproto.ReceiveMoney{Amount: int32(event.EventValue)})
case "payment_money":
callback(&mproto.PaymentMoney{Amount: int32(event.EventValue)})
}
}
}
func (s *SQLiteProvider) PersistEvent(actorName string, eventIndex int, event proto.Message) {
wallet := &sqlite3.WalletEvent{
UserID: s.userID,
Version: int64(eventIndex),
Timestamp: time.Now(),
}
switch event.(type) {
case *mproto.ReceiveMoney:
rcv := event.(*mproto.ReceiveMoney)
wallet.EventName = "receive_money"
wallet.EventValue = int64(rcv.Amount)
case *mproto.PaymentMoney:
pay := event.(*mproto.PaymentMoney)
wallet.EventName = "payment_money"
wallet.EventValue = int64(pay.Amount)
}
_ = wallet.Insert(s.ctx, s.conn, boil.Infer())
}
func (s *SQLiteProvider) GetSnapshotInterval() int {
return s.interval
}
// 以下は実装しない
func (s *SQLiteProvider) DeleteSnapshots(actorName string, inclusiveToIndex int) {
}
func (s *SQLiteProvider) DeleteEvents(actorName string, inclusiveToIndex int) {
}
func (s *SQLiteProvider) Restart() {
}
var _ persistence.ProviderState = (*SQLiteProvider)(nil)
長くなってしまいましたが、やっていることとしてはシンプルです。実際には以下のような処理を行っています。
GetSnapshot
( Snapshot を DB から復元 )- Snapshot を DB から取得し、
proto.Wallet
に変換します。Snapshot が存在しない場合は、初期値のproto.Wallet
を返します。
- Snapshot を DB から取得し、
PersistSnapshot
( 現在時点の Snapshot を DB に保存 )- 受け取った
proto.Wallet
を元に、Snapshot を DB に保存します。
- 受け取った
GetEvents
( Event 履歴を DB から復元 )- eventIndex を元に、特定範囲のイベントを DB より取得します。
wallet_event
のevent_name
の値に応じて、作成する proto の型を変更します。
- eventIndex を元に、特定範囲のイベントを DB より取得します。
PersistEvent
( 発生した Event を DB に保存 )- 受け取ったイベントを DB に永続化します。 proto の型に応じて、
wallet_event
のevent_name
の値を変更します。
- 受け取ったイベントを DB に永続化します。 proto の型に応じて、
また、GetSnapshotInterval
は、Snapshot が保存される間隔を返すメソッドです。
今回は 5 件イベントが発行されるごとに Snapshot を保存するようにしています。
models/wallet.go
上記にて、Event / Snapshot と DB との関連付けの実装が完了しました。
続いて、WalletActor
に対し、Snapshot からの状態復帰ができるように実装を修正します。
package models
import (
"fmt"
"money-actor/models/proto"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/persistence"
)
type WalletActor struct {
proto.Wallet
persistence.Mixin
}
func (w *WalletActor) Receive(c actor.Context) {
switch msg := c.Message().(type) {
case *proto.ReceiveMoney:
{
w.Deposit += msg.Amount
if w.Recovering() {
fmt.Printf("replay wallet(receive): %v\n", msg)
} else {
w.PersistReceive(msg)
}
fmt.Printf("current deposit: %d\n", w.Deposit)
}
case *proto.PaymentMoney:
{
w.Deposit -= msg.Amount
if w.Recovering() {
fmt.Printf("replay wallet(payment): %v\n", msg)
} else {
w.PersistReceive(msg)
}
fmt.Printf("current deposit: %d\n", w.Deposit)
}
case *proto.Wallet:
{
fmt.Printf("recovering wallet from snapshot: %v\n", msg)
w.Deposit = msg.Deposit
}
case *persistence.RequestSnapshot:
{
fmt.Printf("snapshot requested, current deposit: %d\n", w.Deposit)
w.PersistSnapshot(&w.Wallet)
}
case *persistence.ReplayComplete:
{
fmt.Printf("replay complete, current deposit: %d\n", w.Deposit)
}
}
}
var _ actor.Actor = (*WalletActor)(nil)
先程の SQLiteProvider
で実装した GetSnapshot
では、戻り値として *proto.Wallet
を設定していました。そのため、Snapshot からの復元を定義するために、*proto.Wallet
との型マッチが必要になっています。
*persistence.RequestSnapshot
は、Snapshot の作成を要求するメッセージです。このメッセージを受け取ったタイミングで、PersistSnapshot
を呼び出すことで Snapshot の保存が可能になります。
*persistence.ReplayComplete
は、Snapshot および Event の再適用が完了したことを示すメッセージです。今回は再適用が完了したタイミングで、ログを流すようにしています。
models/wallet_provider.go
上記が完了したら、wallet_provider.go
の実装を InMemoryProvider
から SQLiteProvider
に変更します。
package models
import "github.com/asynkron/protoactor-go/persistence"
type WalletProvider struct {
}
func (w WalletProvider) GetState() persistence.ProviderState {
// return persistence.NewInMemoryProvider(5)
return NewSQLiteProvider("user_id")
}
var _ persistence.Provider = (*WalletProvider)(nil)
再実行
上記で SQLite を利用した永続化が実装できました。
早速、実行して動作を確認してみましょう。
1 回目
$ go run main.go
recovering wallet from snapshot:
replay complete, current deposit: 0
-----
current deposit: 12000
current deposit: 16000
current deposit: 19000
current deposit: 10800
ここまでは、InMemoryProvider
を使用していた時と同じ結果です。
2 回目
$ go run main.go
recovering wallet from snapshot:
replay wallet(receive): amount:12000
current deposit: 12000
replay wallet(receive): amount:4000
current deposit: 16000
replay wallet(receive): amount:3000
current deposit: 19000
replay wallet(payment): amount:8200
current deposit: 10800
replay complete, current deposit: 10800
-----
snapshot requested, current deposit: 22800
current deposit: 22800
current deposit: 26800
current deposit: 29800
current deposit: 21600
前回はなかった、replay
というログが出力されていますね。これは、DB に保存された Event が状態導出のため、再適用されていることを示しています。
2 回目の実行では、
- Snapshot は存在しない
- 過去のイベントとして、4 件の入出金がある
状態となっています。 そのため、
- 4 件のイベントの再適用が行われ、前回の deposit の値である
10800
が導出された - その後、再度 main.go に定義している 4 件の入出金が行われた
- その結果、最新の deposit の値として
21600
が導出された
という流れになります。また、5 件目のイベントが発生したタイミングで、Snapshot も作成されています。
3 回目
$ go run main.go
recovering wallet from snapshot: deposit:22800
replay wallet(receive): amount:4000
current deposit: 26800
replay wallet(receive): amount:3000
current deposit: 29800
replay wallet(payment): amount:8200
current deposit: 21600
replay complete, current deposit: 21600
-----
current deposit: 33600
snapshot requested, current deposit: 37600
current deposit: 37600
current deposit: 40600
current deposit: 32400
最後に、もう一度だけ実行してみましょう。 3 回目の実行時においては、
- deposit が
22800
の時点での Snapshot が存在する - Snapshot を保存したタイミングより後に、3 件の入出金が行われている
状態になっています。
そのため、
- Snapshot から deposit:
22800
の状態が復元された - Snapshot 取得後に発生した 3 件のイベントを再適用することで、前回の deposit である
21600
が導出された - その後、再度 main.go に定義している 4 件の入出金が行われた
- その結果、最新の deposit の値として
32400
が導出された
ことがわかると思います。
Event Sourcing パターンではイベント量が増えるに伴って最新の状態を導出するためのコストは高まっていきますが、Snapshot Approach を取ることによって、最新状態導出のためのコストを抑えることができています。
まとめ
本記事では、Event Sourcing を適用したプログラムについて、golang / proto.actor を利用した実装例を示しました。
実際に Snapshot Approach がどのような振る舞いをするのかというところも合わせて紹介を行いました。
Event Sourcing を用いた実装のイメージとして、参考にしていただければ幸いです。
最後に、Belong Inc. では我々と一緒にサービスの成長にコミットしてくれるメンバーを募集中です!
ぜひ エンジニアリングチーム紹介ページ をご覧いただけたら幸いです。
Footnotes
-
Actor Model でのプログラミングは Event Sourcing 実現にあたって必須ではないが、ライブラリ的にサポートされているため、シンプルに実装できる ↩