Event Sourcing & CQRS trong Laravel: Giới Thiệu Thực Hành

· 10 min read

CRUD truyền thống ghi đè dữ liệu. Khi bạn update email user, email cũ biến mất mãi mãi. Event Sourcing đảo ngược: thay vì lưu trạng thái hiện tại, bạn lưu mọi thay đổi đã xảy ra. Trạng thái hiện tại được tính bằng cách replay tất cả events.

CRUD vs Event Sourcing

CRUD

UPDATE users SET email = 'new@example.com' WHERE id = 1;
-- Email cũ? Mất vĩnh viễn. Tại sao thay đổi? Không biết.

Event Sourcing

Event 1: UserRegistered { email: 'old@example.com', name: 'John' }
Event 2: EmailChanged   { old: 'old@example.com', new: 'new@example.com', reason: 'typo' }
Event 3: NameChanged     { old: 'John', new: 'John Doe' }

Trạng thái hiện tại = replay tất cả → { email: 'new@example.com', name: 'John Doe' }

Khi Nào Nên Dùng

Phù hợp Không phù hợp
Giao dịch tài chính CRUD đơn giản (blog posts)
Hệ thống đặt hàng/fulfillment User preferences
Domain cần audit (y tế, pháp lý) Content management
Workflow business phức tạp Prototype/MVP

Thiết Lập với Spatie Event Sourcing

composer require spatie/laravel-event-sourcing
php artisan vendor:publish --provider="Spatie\EventSourcing\EventSourcingServiceProvider" --tag="event-sourcing-migrations"
php artisan migrate

Core Concepts

1. Events — Điều Gì Đã Xảy Ra

namespace App\Domain\Account\Events;

use Spatie\EventSourcing\StoredEvents\ShouldBeStored;

class AccountCreated extends ShouldBeStored
{
    public function __construct(
        public readonly string $name,
        public readonly string $userId,
        public readonly string $currency = 'USD',
    ) {}
}

class MoneyDeposited extends ShouldBeStored
{
    public function __construct(
        public readonly int $amountInCents,
        public readonly string $description,
    ) {}
}

class MoneyWithdrawn extends ShouldBeStored
{
    public function __construct(
        public readonly int $amountInCents,
        public readonly string $description,
    ) {}
}

2. Aggregates — Business Rules

Aggregates enforce business rules trước khi ghi events:

namespace App\Domain\Account;

use Spatie\EventSourcing\AggregateRoots\AggregateRoot;

class AccountAggregate extends AggregateRoot
{
    private int $balanceInCents = 0;

    public function applyMoneyDeposited(MoneyDeposited $event): void
    {
        $this->balanceInCents += $event->amountInCents;
    }

    public function applyMoneyWithdrawn(MoneyWithdrawn $event): void
    {
        $this->balanceInCents -= $event->amountInCents;
    }

    public function deposit(int $amountInCents, string $description): self
    {
        if ($amountInCents <= 0) {
            throw new \InvalidArgumentException('Số tiền nạp phải dương');
        }

        $this->recordThat(new MoneyDeposited(
            amountInCents: $amountInCents,
            description: $description,
        ));

        return $this;
    }

    public function withdraw(int $amountInCents, string $description): self
    {
        if ($this->balanceInCents - $amountInCents < 0) {
            throw new \DomainException("Số dư không đủ. Hiện tại: {$this->balanceInCents}");
        }

        $this->recordThat(new MoneyWithdrawn(
            amountInCents: $amountInCents,
            description: $description,
        ));

        return $this;
    }
}

3. Projectors — Read Models

Projectors lắng nghe events và xây dựng read models:

namespace App\Domain\Account\Projectors;

use Spatie\EventSourcing\EventHandlers\Projectors\Projector;

class AccountProjector extends Projector
{
    public function onAccountCreated(AccountCreated $event): void
    {
        Account::create([
            'uuid' => $event->aggregateRootUuid(),
            'name' => $event->name,
            'balance_in_cents' => 0,
        ]);
    }

    public function onMoneyDeposited(MoneyDeposited $event): void
    {
        Account::where('uuid', $event->aggregateRootUuid())
            ->increment('balance_in_cents', $event->amountInCents);
    }
}

4. Reactors — Side Effects

Reactors xử lý side effects (emails, notifications) KHÔNG được replay:

class SendLowBalanceAlert extends Reactor
{
    public function onMoneyWithdrawn(MoneyWithdrawn $event): void
    {
        $account = Account::where('uuid', $event->aggregateRootUuid())->first();

        if ($account->balance_in_cents < 1000) {
            $account->user->notify(new LowBalanceNotification($account));
        }
    }
}

CQRS: Tách Read và Write

CQRS (Command Query Responsibility Segregation) tách biệt hoàn toàn write model (Aggregates + Events) và read model (Projections). Đây là partner tự nhiên của Event Sourcing.

Write Path:                          Read Path:
Controller → Aggregate → Events  →  Projector → Read Model (DB table)
                                                      ↓
                                              Controller → API Response

Tại sao tách? Write model tối ưu cho business rules (invariants, validation). Read model tối ưu cho queries (denormalized, fast). Bạn có thể có nhiều read models khác nhau cho cùng events.

Ví dụ: Nhiều Projections Từ Cùng Events

// Projection 1: Account balance (cho trang account detail)
class AccountProjector extends Projector
{
    public function onMoneyDeposited(MoneyDeposited $event): void
    {
        Account::where('uuid', $event->aggregateRootUuid())
            ->increment('balance_in_cents', $event->amountInCents);
    }
}

// Projection 2: Transaction history (cho trang lịch sử giao dịch)
class TransactionHistoryProjector extends Projector
{
    public function onMoneyDeposited(MoneyDeposited $event): void
    {
        TransactionLog::create([
            'account_uuid' => $event->aggregateRootUuid(),
            'type' => 'deposit',
            'amount_in_cents' => $event->amountInCents,
            'description' => $event->description,
            'created_at' => $event->createdAt(),
        ]);
    }

    public function onMoneyWithdrawn(MoneyWithdrawn $event): void
    {
        TransactionLog::create([
            'account_uuid' => $event->aggregateRootUuid(),
            'type' => 'withdrawal',
            'amount_in_cents' => $event->amountInCents,
            'description' => $event->description,
            'created_at' => $event->createdAt(),
        ]);
    }
}

// Projection 3: Daily summary (cho report dashboard)
class DailySummaryProjector extends Projector
{
    public function onMoneyDeposited(MoneyDeposited $event): void
    {
        DailySummary::updateOrCreate(
            ['date' => $event->createdAt()->toDateString(), 'type' => 'deposit'],
            ['total_amount' => DB::raw("total_amount + {$event->amountInCents}"),
             'count' => DB::raw('count + 1')],
        );
    }
}

Ba projections khác nhau, tối ưu cho ba use cases khác nhau, tất cả từ cùng events. Đây là sức mạnh thực sự của CQRS — bạn không cần compromise giữa write performance và read performance.

Sử Dụng Trong Controller

class AccountController extends Controller
{
    public function deposit(Request $request, string $uuid)
    {
        $request->validate([
            'amount' => 'required|numeric|min:0.01',
            'description' => 'required|string|max:255',
        ]);

        AccountAggregate::retrieve($uuid)
            ->deposit(
                amountInCents: (int) ($request->amount * 100),
                description: $request->description,
            )
            ->persist();

        return back()->with('success', 'Nạp tiền thành công');
    }
}

Giải thích flow:

  1. AccountAggregate::retrieve($uuid) — load tất cả events cho account này, replay để rebuild state hiện tại
  2. ->deposit(...) — validate business rules (số tiền > 0) rồi record event MoneyDeposited
  3. ->persist() — lưu events vào stored_events table. Projectors tự động chạy sau khi persist.

Spatie Event Sourcing lưu events trong bảng stored_events (JSON-serialized). Đây là source of truth — bảng accounts chỉ là read model.

Snapshots: Tối Ưu Performance

Khi account có hàng nghìn events, replay tất cả rất chậm. Snapshots lưu state tại một điểm, chỉ replay events sau snapshot:

// config/event-sourcing.php
'snapshot_interval' => 100, // Tạo snapshot mỗi 100 events

// Hoặc trong Aggregate
class AccountAggregate extends AggregateRoot
{
    // Snapshot được tạo tự động mỗi khi aggregate có 100+ events chưa snapshot
}
# Tạo snapshot thủ công
php artisan event-sourcing:create-snapshot App\\Domain\\Account\\AccountAggregate --uuid=account-uuid

Khi nào cần snapshots: Khi aggregate có nhiều events (>100). Cho account ít giao dịch, không cần — overhead không đáng.

Event Versioning

Events là immutable — một khi lưu, không thay đổi. Nhưng business cần thay đổi schema. Giải pháp: versioning.

// Version 1: Ban đầu
class MoneyDeposited extends ShouldBeStored
{
    public function __construct(
        public readonly int $amountInCents,
        public readonly string $description,
    ) {}
}

// Version 2: Thêm field currency (v2 events sẽ có currency, v1 thì không)
class MoneyDeposited extends ShouldBeStored
{
    public function __construct(
        public readonly int $amountInCents,
        public readonly string $description,
        public readonly string $currency = 'USD', // Default cho events cũ
    ) {}
}

Tip: Luôn dùng default values cho fields mới. Events cũ trong database không có field mới → PHP dùng default khi deserialize.

Cho thay đổi lớn hơn (đổi tên field, xóa field), dùng event upcasters:

// config/event-sourcing.php
'event_class_map' => [
    'money_deposited_v1' => MoneyDeposited::class,
],

Replay Events

Khi bạn thay đổi projector logic hoặc thêm projector mới, replay events để rebuild read models:

# Replay tất cả events qua một projector cụ thể
php artisan event-sourcing:replay App\\Domain\\Account\\Projectors\\AccountProjector

# Replay tất cả projectors
php artisan event-sourcing:replay

Đây là siêu năng lực của Event Sourcing: Bạn thêm projector mới cho dashboard analytics. Replay tất cả events từ đầu → dashboard có dữ liệu lịch sử đầy đủ mà không cần migration hay backfill phức tạp.

Lưu ý quan trọng: Projectors replay được, Reactors thì KHÔNG. Vì Reactors gửi emails, notifications — bạn không muốn gửi lại hàng nghìn email khi replay.

Cấu Trúc Thư Mục Khuyến Nghị

app/Domain/
├── Account/
│   ├── Events/
│   │   ├── AccountCreated.php
│   │   ├── MoneyDeposited.php
│   │   └── MoneyWithdrawn.php
│   ├── Projectors/
│   │   ├── AccountProjector.php
│   │   └── TransactionHistoryProjector.php
│   ├── Reactors/
│   │   └── SendLowBalanceAlert.php
│   ├── AccountAggregate.php
│   └── Exceptions/
│       └── InsufficientFundsException.php
├── Order/
│   ├── Events/
│   ├── Projectors/
│   └── OrderAggregate.php

Tổ chức theo domain (Account, Order), không theo kiểu (Events, Projectors). Mỗi domain chứa đủ components của nó.

Testing

Spatie cung cấp API testing rất elegant cho aggregates:

// Test business rules
public function test_cannot_withdraw_more_than_balance(): void
{
    AccountAggregate::fake()
        ->given([
            new AccountCreated(name: 'Test', userId: '1'),
            new MoneyDeposited(amountInCents: 5000, description: 'Initial'),
        ])
        ->when(function (AccountAggregate $aggregate) {
            $aggregate->withdraw(10000, 'Quá nhiều');
        })
        ->assertNotRecorded(MoneyWithdrawn::class);
}

// Test event đúng được ghi
public function test_deposit_records_event(): void
{
    AccountAggregate::fake()
        ->given([
            new AccountCreated(name: 'Test', userId: '1'),
        ])
        ->when(function (AccountAggregate $aggregate) {
            $aggregate->deposit(5000, 'Lương tháng 4');
        })
        ->assertRecorded([
            new MoneyDeposited(amountInCents: 5000, description: 'Lương tháng 4'),
        ]);
}

// Test projector
public function test_projector_creates_account(): void
{
    event(new AccountCreated(
        name: 'Test Account',
        userId: '1',
        aggregateRootUuid: $uuid = Str::uuid()->toString(),
    ));

    $this->assertDatabaseHas('accounts', [
        'uuid' => $uuid,
        'name' => 'Test Account',
        'balance_in_cents' => 0,
    ]);
}

Giải thích pattern given/when/then:

  • given() — setup state bằng cách apply events. Không chạy projectors.
  • when() — thực hiện action cần test
  • assertRecorded() — kiểm tra event nào được ghi
  • assertNotRecorded() — kiểm tra event KHÔNG được ghi

Gotchas & Bài Học

1. Đừng Query Trong Aggregates

// ❌ Sai — aggregate không nên query database
public function deposit(int $amount): self
{
    $user = User::find($this->userId); // KHÔNG!
    // ...
}

// ✅ Đúng — truyền dữ liệu cần thiết qua parameters
public function deposit(int $amount, string $description): self
{
    // Chỉ dùng internal state
}

Aggregates là pure — chỉ dùng state từ events, không query bên ngoài.

2. Events Nên Nhỏ và Cụ Thể

// ❌ Quá chung chung
class AccountUpdated extends ShouldBeStored {
    public function __construct(public array $changes) {}
}

// ✅ Cụ thể — mỗi event một ý nghĩa
class MoneyDeposited extends ShouldBeStored { ... }
class MoneyWithdrawn extends ShouldBeStored { ... }
class AccountNameChanged extends ShouldBeStored { ... }

3. Eventual Consistency

Projectors chạy async (nếu cấu hình queue). Read model có thể trễ vài milliseconds. UI cần handle điều này — hiển thị optimistic update hoặc polling.

Kết Luận

Event Sourcing mạnh mẽ nhưng không miễn phí. Nó thêm complexity đáng kể — event versioning, eventual consistency, replay management. Chỉ dùng khi bạn thực sự cần:

  1. Audit trail đầy đủ — mọi thay đổi được ghi lại vĩnh viễn
  2. Replay/rebuild state — thêm projections mới, debug bằng cách replay
  3. Business rules phức tạp — aggregates enforce invariants rõ ràng
  4. Temporal queries — "state tại thời điểm X là gì?"

Cho hầu hết CRUD apps, Eloquent truyền thống là đủ. Event Sourcing phù hợp nhất cho domain tài chính, e-commerce, và hệ thống cần compliance.

Bắt đầu nhỏ: Event source một aggregate (Order, Account), giữ phần còn lại CRUD. Không cần all-or-nothing.

Bình luận