10 분 소요


시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초 (현재 글)
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

Event Sourcing이란?

Event Sourcing은 애플리케이션의 상태를 일련의 이벤트로 저장하는 패턴입니다. 현재 상태를 직접 저장하는 대신, 상태 변경을 일으킨 모든 이벤트를 순서대로 저장합니다.

왜 Event Sourcing인가?

전통적인 CRUD 방식의 문제점:

  • 현재 상태만 알 수 있고, 어떻게 그 상태에 도달했는지 알 수 없음
  • 감사(Audit) 로그를 별도로 관리해야 함
  • 비즈니스 인사이트를 얻기 어려움

Event Sourcing의 장점:

  • 완전한 감사 추적(Audit Trail)
  • 시간 여행 디버깅 가능
  • 이벤트 재생을 통한 상태 복구
  • 비즈니스 이벤트 기반의 분석 가능

핵심 개념

1. Event Store

이벤트를 저장하는 저장소입니다. 이벤트는 불변(immutable)이며, append-only로 저장됩니다.

@Entity
@Table(name = "event_store")
class StoredEvent(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(nullable = false)
    val aggregateId: String,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(nullable = false)
    val eventType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val version: Long,

    @Column(nullable = false)
    val occurredAt: Instant = Instant.now()
)

2. Aggregate Root

도메인 모델의 진입점으로, 이벤트를 발생시키고 적용하는 역할을 합니다.

abstract class AggregateRoot {

    @Transient
    private val pendingEvents = mutableListOf<DomainEvent>()

    var version: Long = 0
        protected set

    protected fun applyChange(event: DomainEvent) {
        applyEvent(event)
        pendingEvents.add(event)
    }

    protected abstract fun applyEvent(event: DomainEvent)

    fun getPendingEvents(): List<DomainEvent> = pendingEvents.toList()

    fun clearPendingEvents() {
        pendingEvents.clear()
    }

    fun loadFromHistory(events: List<DomainEvent>) {
        events.forEach { event ->
            applyEvent(event)
            version++
        }
    }
}

3. Domain Event

비즈니스에서 발생한 사실을 나타내는 불변 객체입니다.

interface DomainEvent {
    val aggregateId: String
    val occurredAt: Instant
}

data class OrderCreated(
    override val aggregateId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val totalAmount: BigDecimal,
    override val occurredAt: Instant = Instant.now()
) : DomainEvent

data class OrderShipped(
    override val aggregateId: String,
    val shippingAddress: String,
    val trackingNumber: String,
    override val occurredAt: Instant = Instant.now()
) : DomainEvent

실제 구현 예제: 주문 시스템

Order Aggregate

class Order private constructor() : AggregateRoot() {

    lateinit var id: String
        private set
    lateinit var customerId: String
        private set
    var status: OrderStatus = OrderStatus.CREATED
        private set
    var items: List<OrderItem> = emptyList()
        private set
    var totalAmount: BigDecimal = BigDecimal.ZERO
        private set

    companion object {
        fun create(
            orderId: String,
            customerId: String,
            items: List<OrderItem>
        ): Order {
            val order = Order()
            val totalAmount = items.sumOf { it.price * it.quantity.toBigDecimal() }

            order.applyChange(
                OrderCreated(
                    aggregateId = orderId,
                    customerId = customerId,
                    items = items,
                    totalAmount = totalAmount
                )
            )
            return order
        }

        fun fromHistory(events: List<DomainEvent>): Order {
            val order = Order()
            order.loadFromHistory(events)
            return order
        }
    }

    fun ship(shippingAddress: String, trackingNumber: String) {
        require(status == OrderStatus.CREATED) {
            "Order must be in CREATED status to ship"
        }
        applyChange(
            OrderShipped(
                aggregateId = id,
                shippingAddress = shippingAddress,
                trackingNumber = trackingNumber
            )
        )
    }

    override fun applyEvent(event: DomainEvent) {
        when (event) {
            is OrderCreated -> {
                id = event.aggregateId
                customerId = event.customerId
                items = event.items
                totalAmount = event.totalAmount
                status = OrderStatus.CREATED
            }
            is OrderShipped -> {
                status = OrderStatus.SHIPPED
            }
        }
    }
}

enum class OrderStatus {
    CREATED, SHIPPED, DELIVERED, CANCELLED
}

Event Store Repository

interface EventStoreRepository : JpaRepository<StoredEvent, Long> {

    fun findByAggregateIdOrderByVersionAsc(aggregateId: String): List<StoredEvent>

    @Query("SELECT MAX(e.version) FROM StoredEvent e WHERE e.aggregateId = :aggregateId")
    fun findLatestVersion(aggregateId: String): Long?
}

@Service
class EventStore(
    private val repository: EventStoreRepository,
    private val objectMapper: ObjectMapper
) {

    fun saveEvents(
        aggregateId: String,
        aggregateType: String,
        events: List<DomainEvent>,
        expectedVersion: Long
    ) {
        val currentVersion = repository.findLatestVersion(aggregateId) ?: 0

        if (currentVersion != expectedVersion) {
            throw OptimisticLockingException(
                "Expected version $expectedVersion but found $currentVersion"
            )
        }

        events.forEachIndexed { index, event ->
            val storedEvent = StoredEvent(
                aggregateId = aggregateId,
                aggregateType = aggregateType,
                eventType = event::class.simpleName!!,
                payload = objectMapper.writeValueAsString(event),
                version = expectedVersion + index + 1
            )
            repository.save(storedEvent)
        }
    }

    fun getEvents(aggregateId: String): List<DomainEvent> {
        return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
            .map { deserializeEvent(it) }
    }

    private fun deserializeEvent(storedEvent: StoredEvent): DomainEvent {
        val eventClass = when (storedEvent.eventType) {
            "OrderCreated" -> OrderCreated::class.java
            "OrderShipped" -> OrderShipped::class.java
            else -> throw IllegalArgumentException("Unknown event type: ${storedEvent.eventType}")
        }
        return objectMapper.readValue(storedEvent.payload, eventClass)
    }
}

Order Repository (Event Sourced)

@Repository
class OrderRepository(private val eventStore: EventStore) {

    fun save(order: Order) {
        val events = order.getPendingEvents()
        if (events.isEmpty()) return

        eventStore.saveEvents(
            aggregateId = order.id,
            aggregateType = "Order",
            events = events,
            expectedVersion = order.version
        )
        order.clearPendingEvents()
    }

    fun findById(orderId: String): Order? {
        val events = eventStore.getEvents(orderId)
        if (events.isEmpty()) return null

        return Order.fromHistory(events)
    }
}

Snapshotting

이벤트가 많아지면 Aggregate를 복원하는 데 시간이 오래 걸립니다. Snapshotting은 특정 시점의 상태를 저장하여 복원 시간을 단축합니다.

@Entity
@Table(name = "snapshots")
class Snapshot(
    @Id
    val aggregateId: String,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val version: Long,

    @Column(nullable = false)
    val createdAt: Instant = Instant.now()
)

@Service
class SnapshotStore(
    private val snapshotRepository: SnapshotRepository,
    private val objectMapper: ObjectMapper
) {

    private val snapshotFrequency = 10 // 10개의 이벤트마다 스냅샷 생성

    fun shouldCreateSnapshot(version: Long): Boolean {
        return version % snapshotFrequency == 0L
    }

    fun saveSnapshot(aggregateId: String, aggregate: AggregateRoot) {
        val snapshot = Snapshot(
            aggregateId = aggregateId,
            aggregateType = aggregate::class.simpleName!!,
            payload = objectMapper.writeValueAsString(aggregate),
            version = aggregate.version
        )
        snapshotRepository.save(snapshot)
    }

    fun getLatestSnapshot(aggregateId: String): Pair<Order, Long>? {
        return snapshotRepository.findById(aggregateId)
            .map {
                val order = objectMapper.readValue(it.payload, Order::class.java)
                order to it.version
            }
            .orElse(null)
    }
}

Projection

이벤트를 기반으로 읽기 최적화된 뷰를 만드는 것을 Projection이라고 합니다.

@Entity
@Table(name = "order_summary")
class OrderSummary(
    @Id
    val orderId: String,
    val customerId: String,
    val status: String,
    val totalAmount: BigDecimal,
    val itemCount: Int,
    val createdAt: Instant,
    var lastModifiedAt: Instant
)

@Component
class OrderProjection(
    private val orderSummaryRepository: OrderSummaryRepository
) {

    @EventHandler
    fun on(event: OrderCreated) {
        val summary = OrderSummary(
            orderId = event.aggregateId,
            customerId = event.customerId,
            status = "CREATED",
            totalAmount = event.totalAmount,
            itemCount = event.items.size,
            createdAt = event.occurredAt,
            lastModifiedAt = event.occurredAt
        )
        orderSummaryRepository.save(summary)
    }

    @EventHandler
    fun on(event: OrderShipped) {
        orderSummaryRepository.findById(event.aggregateId)
            .ifPresent { summary ->
                summary.status = "SHIPPED"
                summary.lastModifiedAt = event.occurredAt
                orderSummaryRepository.save(summary)
            }
    }
}

정리

Event Sourcing은 다음과 같은 상황에서 특히 유용합니다:

  • 감사 추적이 중요한 금융, 의료 시스템
  • 이벤트 기반 분석이 필요한 경우
  • 복잡한 비즈니스 로직을 가진 도메인
  • 시간에 따른 상태 변화 추적이 필요한 경우

다음 글에서는 Event Sourcing과 함께 자주 사용되는 Outbox Pattern을 다루겠습니다.

Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals (Current)
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS with Separate Read/Write Models
  4. Part 4: Saga Pattern for Distributed Transactions
  5. Part 5: Event Schema Evolution and Versioning

What is Event Sourcing?

Event Sourcing is a pattern that stores the state of an application as a sequence of events. Instead of storing the current state directly, it stores all events that caused state changes in order.

Why Event Sourcing?

Problems with traditional CRUD approach:

  • Can only see current state, not how it got there
  • Audit logs must be managed separately
  • Difficult to gain business insights

Benefits of Event Sourcing:

  • Complete audit trail
  • Time-travel debugging possible
  • State recovery through event replay
  • Business event-based analytics possible

Core Concepts

1. Event Store

A repository that stores events. Events are immutable and stored in an append-only manner.

@Entity
@Table(name = "event_store")
class StoredEvent(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(nullable = false)
    val aggregateId: String,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(nullable = false)
    val eventType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val version: Long,

    @Column(nullable = false)
    val occurredAt: Instant = Instant.now()
)

2. Aggregate Root

The entry point of the domain model, responsible for generating and applying events.

abstract class AggregateRoot {

    @Transient
    private val pendingEvents = mutableListOf<DomainEvent>()

    var version: Long = 0
        protected set

    protected fun applyChange(event: DomainEvent) {
        applyEvent(event)
        pendingEvents.add(event)
    }

    protected abstract fun applyEvent(event: DomainEvent)

    fun getPendingEvents(): List<DomainEvent> = pendingEvents.toList()

    fun clearPendingEvents() {
        pendingEvents.clear()
    }

    fun loadFromHistory(events: List<DomainEvent>) {
        events.forEach { event ->
            applyEvent(event)
            version++
        }
    }
}

3. Domain Event

An immutable object representing a fact that occurred in the business.

interface DomainEvent {
    val aggregateId: String
    val occurredAt: Instant
}

data class OrderCreated(
    override val aggregateId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val totalAmount: BigDecimal,
    override val occurredAt: Instant = Instant.now()
) : DomainEvent

data class OrderShipped(
    override val aggregateId: String,
    val shippingAddress: String,
    val trackingNumber: String,
    override val occurredAt: Instant = Instant.now()
) : DomainEvent

Practical Implementation Example: Order System

Order Aggregate

class Order private constructor() : AggregateRoot() {

    lateinit var id: String
        private set
    lateinit var customerId: String
        private set
    var status: OrderStatus = OrderStatus.CREATED
        private set
    var items: List<OrderItem> = emptyList()
        private set
    var totalAmount: BigDecimal = BigDecimal.ZERO
        private set

    companion object {
        fun create(
            orderId: String,
            customerId: String,
            items: List<OrderItem>
        ): Order {
            val order = Order()
            val totalAmount = items.sumOf { it.price * it.quantity.toBigDecimal() }

            order.applyChange(
                OrderCreated(
                    aggregateId = orderId,
                    customerId = customerId,
                    items = items,
                    totalAmount = totalAmount
                )
            )
            return order
        }

        fun fromHistory(events: List<DomainEvent>): Order {
            val order = Order()
            order.loadFromHistory(events)
            return order
        }
    }

    fun ship(shippingAddress: String, trackingNumber: String) {
        require(status == OrderStatus.CREATED) {
            "Order must be in CREATED status to ship"
        }
        applyChange(
            OrderShipped(
                aggregateId = id,
                shippingAddress = shippingAddress,
                trackingNumber = trackingNumber
            )
        )
    }

    override fun applyEvent(event: DomainEvent) {
        when (event) {
            is OrderCreated -> {
                id = event.aggregateId
                customerId = event.customerId
                items = event.items
                totalAmount = event.totalAmount
                status = OrderStatus.CREATED
            }
            is OrderShipped -> {
                status = OrderStatus.SHIPPED
            }
        }
    }
}

enum class OrderStatus {
    CREATED, SHIPPED, DELIVERED, CANCELLED
}

Event Store Repository

interface EventStoreRepository : JpaRepository<StoredEvent, Long> {

    fun findByAggregateIdOrderByVersionAsc(aggregateId: String): List<StoredEvent>

    @Query("SELECT MAX(e.version) FROM StoredEvent e WHERE e.aggregateId = :aggregateId")
    fun findLatestVersion(aggregateId: String): Long?
}

@Service
class EventStore(
    private val repository: EventStoreRepository,
    private val objectMapper: ObjectMapper
) {

    fun saveEvents(
        aggregateId: String,
        aggregateType: String,
        events: List<DomainEvent>,
        expectedVersion: Long
    ) {
        val currentVersion = repository.findLatestVersion(aggregateId) ?: 0

        if (currentVersion != expectedVersion) {
            throw OptimisticLockingException(
                "Expected version $expectedVersion but found $currentVersion"
            )
        }

        events.forEachIndexed { index, event ->
            val storedEvent = StoredEvent(
                aggregateId = aggregateId,
                aggregateType = aggregateType,
                eventType = event::class.simpleName!!,
                payload = objectMapper.writeValueAsString(event),
                version = expectedVersion + index + 1
            )
            repository.save(storedEvent)
        }
    }

    fun getEvents(aggregateId: String): List<DomainEvent> {
        return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
            .map { deserializeEvent(it) }
    }

    private fun deserializeEvent(storedEvent: StoredEvent): DomainEvent {
        val eventClass = when (storedEvent.eventType) {
            "OrderCreated" -> OrderCreated::class.java
            "OrderShipped" -> OrderShipped::class.java
            else -> throw IllegalArgumentException("Unknown event type: ${storedEvent.eventType}")
        }
        return objectMapper.readValue(storedEvent.payload, eventClass)
    }
}

Order Repository (Event Sourced)

@Repository
class OrderRepository(private val eventStore: EventStore) {

    fun save(order: Order) {
        val events = order.getPendingEvents()
        if (events.isEmpty()) return

        eventStore.saveEvents(
            aggregateId = order.id,
            aggregateType = "Order",
            events = events,
            expectedVersion = order.version
        )
        order.clearPendingEvents()
    }

    fun findById(orderId: String): Order? {
        val events = eventStore.getEvents(orderId)
        if (events.isEmpty()) return null

        return Order.fromHistory(events)
    }
}

Snapshotting

As events accumulate, restoring an Aggregate can take a long time. Snapshotting saves the state at specific points to reduce restoration time.

@Entity
@Table(name = "snapshots")
class Snapshot(
    @Id
    val aggregateId: String,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val version: Long,

    @Column(nullable = false)
    val createdAt: Instant = Instant.now()
)

@Service
class SnapshotStore(
    private val snapshotRepository: SnapshotRepository,
    private val objectMapper: ObjectMapper
) {

    private val snapshotFrequency = 10 // Create snapshot every 10 events

    fun shouldCreateSnapshot(version: Long): Boolean {
        return version % snapshotFrequency == 0L
    }

    fun saveSnapshot(aggregateId: String, aggregate: AggregateRoot) {
        val snapshot = Snapshot(
            aggregateId = aggregateId,
            aggregateType = aggregate::class.simpleName!!,
            payload = objectMapper.writeValueAsString(aggregate),
            version = aggregate.version
        )
        snapshotRepository.save(snapshot)
    }

    fun getLatestSnapshot(aggregateId: String): Pair<Order, Long>? {
        return snapshotRepository.findById(aggregateId)
            .map {
                val order = objectMapper.readValue(it.payload, Order::class.java)
                order to it.version
            }
            .orElse(null)
    }
}

Projection

Creating read-optimized views based on events is called Projection.

@Entity
@Table(name = "order_summary")
class OrderSummary(
    @Id
    val orderId: String,
    val customerId: String,
    val status: String,
    val totalAmount: BigDecimal,
    val itemCount: Int,
    val createdAt: Instant,
    var lastModifiedAt: Instant
)

@Component
class OrderProjection(
    private val orderSummaryRepository: OrderSummaryRepository
) {

    @EventHandler
    fun on(event: OrderCreated) {
        val summary = OrderSummary(
            orderId = event.aggregateId,
            customerId = event.customerId,
            status = "CREATED",
            totalAmount = event.totalAmount,
            itemCount = event.items.size,
            createdAt = event.occurredAt,
            lastModifiedAt = event.occurredAt
        )
        orderSummaryRepository.save(summary)
    }

    @EventHandler
    fun on(event: OrderShipped) {
        orderSummaryRepository.findById(event.aggregateId)
            .ifPresent { summary ->
                summary.status = "SHIPPED"
                summary.lastModifiedAt = event.occurredAt
                orderSummaryRepository.save(summary)
            }
    }
}

Summary

Event Sourcing is particularly useful in the following situations:

  • Financial and healthcare systems where audit trails are important
  • When event-based analytics are needed
  • Domains with complex business logic
  • When tracking state changes over time is required

In the next post, we’ll cover the Outbox Pattern, which is often used together with Event Sourcing.

댓글남기기