프로덕션 레디 이벤트 기반 아키텍처 Part 1 - Event Sourcing 기초 Production-Ready Event-Driven Architecture Part 1 - Event Sourcing Fundamentals
시리즈 소개
이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.
- Part 1: Event Sourcing 기초 (현재 글)
- Part 2: Outbox Pattern 구현
- Part 3: CQRS와 Read/Write 모델 분리
- Part 4: Saga Pattern으로 분산 트랜잭션 처리
- Part 5: Event Schema 진화와 버전 관리
Event Sourcing이란?
Event Sourcing은 애플리케이션의 상태를 일련의 이벤트로 저장하는 패턴입니다. 현재 상태를 직접 저장하는 대신, 상태 변경을 일으킨 모든 이벤트를 순서대로 저장합니다.
왜 Event Sourcing인가?
전통적인 CRUD 방식의 문제점:
- 현재 상태만 알 수 있고, 어떻게 그 상태에 도달했는지 알 수 없음
- 감사(Audit) 로그를 별도로 관리해야 함
- 비즈니스 인사이트를 얻기 어려움
Event Sourcing의 장점:
- 완전한 감사 추적(Audit Trail)
- 시간 여행 디버깅 가능
- 이벤트 재생을 통한 상태 복구
- 비즈니스 이벤트 기반의 분석 가능
핵심 개념
1. Event Store
이벤트를 저장하는 저장소입니다. 이벤트는 불변(immutable)이며, append-only로 저장됩니다.
@Entity
@Table(name = "event_store")
class StoredEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val occurredAt: Instant = Instant.now()
)
2. Aggregate Root
도메인 모델의 진입점으로, 이벤트를 발생시키고 적용하는 역할을 합니다.
abstract class AggregateRoot {
@Transient
private val pendingEvents = mutableListOf<DomainEvent>()
var version: Long = 0
protected set
protected fun applyChange(event: DomainEvent) {
applyEvent(event)
pendingEvents.add(event)
}
protected abstract fun applyEvent(event: DomainEvent)
fun getPendingEvents(): List<DomainEvent> = pendingEvents.toList()
fun clearPendingEvents() {
pendingEvents.clear()
}
fun loadFromHistory(events: List<DomainEvent>) {
events.forEach { event ->
applyEvent(event)
version++
}
}
}
3. Domain Event
비즈니스에서 발생한 사실을 나타내는 불변 객체입니다.
interface DomainEvent {
val aggregateId: String
val occurredAt: Instant
}
data class OrderCreated(
override val aggregateId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
data class OrderShipped(
override val aggregateId: String,
val shippingAddress: String,
val trackingNumber: String,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
실제 구현 예제: 주문 시스템
Order Aggregate
class Order private constructor() : AggregateRoot() {
lateinit var id: String
private set
lateinit var customerId: String
private set
var status: OrderStatus = OrderStatus.CREATED
private set
var items: List<OrderItem> = emptyList()
private set
var totalAmount: BigDecimal = BigDecimal.ZERO
private set
companion object {
fun create(
orderId: String,
customerId: String,
items: List<OrderItem>
): Order {
val order = Order()
val totalAmount = items.sumOf { it.price * it.quantity.toBigDecimal() }
order.applyChange(
OrderCreated(
aggregateId = orderId,
customerId = customerId,
items = items,
totalAmount = totalAmount
)
)
return order
}
fun fromHistory(events: List<DomainEvent>): Order {
val order = Order()
order.loadFromHistory(events)
return order
}
}
fun ship(shippingAddress: String, trackingNumber: String) {
require(status == OrderStatus.CREATED) {
"Order must be in CREATED status to ship"
}
applyChange(
OrderShipped(
aggregateId = id,
shippingAddress = shippingAddress,
trackingNumber = trackingNumber
)
)
}
override fun applyEvent(event: DomainEvent) {
when (event) {
is OrderCreated -> {
id = event.aggregateId
customerId = event.customerId
items = event.items
totalAmount = event.totalAmount
status = OrderStatus.CREATED
}
is OrderShipped -> {
status = OrderStatus.SHIPPED
}
}
}
}
enum class OrderStatus {
CREATED, SHIPPED, DELIVERED, CANCELLED
}
Event Store Repository
interface EventStoreRepository : JpaRepository<StoredEvent, Long> {
fun findByAggregateIdOrderByVersionAsc(aggregateId: String): List<StoredEvent>
@Query("SELECT MAX(e.version) FROM StoredEvent e WHERE e.aggregateId = :aggregateId")
fun findLatestVersion(aggregateId: String): Long?
}
@Service
class EventStore(
private val repository: EventStoreRepository,
private val objectMapper: ObjectMapper
) {
fun saveEvents(
aggregateId: String,
aggregateType: String,
events: List<DomainEvent>,
expectedVersion: Long
) {
val currentVersion = repository.findLatestVersion(aggregateId) ?: 0
if (currentVersion != expectedVersion) {
throw OptimisticLockingException(
"Expected version $expectedVersion but found $currentVersion"
)
}
events.forEachIndexed { index, event ->
val storedEvent = StoredEvent(
aggregateId = aggregateId,
aggregateType = aggregateType,
eventType = event::class.simpleName!!,
payload = objectMapper.writeValueAsString(event),
version = expectedVersion + index + 1
)
repository.save(storedEvent)
}
}
fun getEvents(aggregateId: String): List<DomainEvent> {
return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
.map { deserializeEvent(it) }
}
private fun deserializeEvent(storedEvent: StoredEvent): DomainEvent {
val eventClass = when (storedEvent.eventType) {
"OrderCreated" -> OrderCreated::class.java
"OrderShipped" -> OrderShipped::class.java
else -> throw IllegalArgumentException("Unknown event type: ${storedEvent.eventType}")
}
return objectMapper.readValue(storedEvent.payload, eventClass)
}
}
Order Repository (Event Sourced)
@Repository
class OrderRepository(private val eventStore: EventStore) {
fun save(order: Order) {
val events = order.getPendingEvents()
if (events.isEmpty()) return
eventStore.saveEvents(
aggregateId = order.id,
aggregateType = "Order",
events = events,
expectedVersion = order.version
)
order.clearPendingEvents()
}
fun findById(orderId: String): Order? {
val events = eventStore.getEvents(orderId)
if (events.isEmpty()) return null
return Order.fromHistory(events)
}
}
Snapshotting
이벤트가 많아지면 Aggregate를 복원하는 데 시간이 오래 걸립니다. Snapshotting은 특정 시점의 상태를 저장하여 복원 시간을 단축합니다.
@Entity
@Table(name = "snapshots")
class Snapshot(
@Id
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val createdAt: Instant = Instant.now()
)
@Service
class SnapshotStore(
private val snapshotRepository: SnapshotRepository,
private val objectMapper: ObjectMapper
) {
private val snapshotFrequency = 10 // 10개의 이벤트마다 스냅샷 생성
fun shouldCreateSnapshot(version: Long): Boolean {
return version % snapshotFrequency == 0L
}
fun saveSnapshot(aggregateId: String, aggregate: AggregateRoot) {
val snapshot = Snapshot(
aggregateId = aggregateId,
aggregateType = aggregate::class.simpleName!!,
payload = objectMapper.writeValueAsString(aggregate),
version = aggregate.version
)
snapshotRepository.save(snapshot)
}
fun getLatestSnapshot(aggregateId: String): Pair<Order, Long>? {
return snapshotRepository.findById(aggregateId)
.map {
val order = objectMapper.readValue(it.payload, Order::class.java)
order to it.version
}
.orElse(null)
}
}
Projection
이벤트를 기반으로 읽기 최적화된 뷰를 만드는 것을 Projection이라고 합니다.
@Entity
@Table(name = "order_summary")
class OrderSummary(
@Id
val orderId: String,
val customerId: String,
val status: String,
val totalAmount: BigDecimal,
val itemCount: Int,
val createdAt: Instant,
var lastModifiedAt: Instant
)
@Component
class OrderProjection(
private val orderSummaryRepository: OrderSummaryRepository
) {
@EventHandler
fun on(event: OrderCreated) {
val summary = OrderSummary(
orderId = event.aggregateId,
customerId = event.customerId,
status = "CREATED",
totalAmount = event.totalAmount,
itemCount = event.items.size,
createdAt = event.occurredAt,
lastModifiedAt = event.occurredAt
)
orderSummaryRepository.save(summary)
}
@EventHandler
fun on(event: OrderShipped) {
orderSummaryRepository.findById(event.aggregateId)
.ifPresent { summary ->
summary.status = "SHIPPED"
summary.lastModifiedAt = event.occurredAt
orderSummaryRepository.save(summary)
}
}
}
정리
Event Sourcing은 다음과 같은 상황에서 특히 유용합니다:
- 감사 추적이 중요한 금융, 의료 시스템
- 이벤트 기반 분석이 필요한 경우
- 복잡한 비즈니스 로직을 가진 도메인
- 시간에 따른 상태 변화 추적이 필요한 경우
다음 글에서는 Event Sourcing과 함께 자주 사용되는 Outbox Pattern을 다루겠습니다.
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals (Current)
- Part 2: Implementing the Outbox Pattern
- Part 3: CQRS with Separate Read/Write Models
- Part 4: Saga Pattern for Distributed Transactions
- Part 5: Event Schema Evolution and Versioning
What is Event Sourcing?
Event Sourcing is a pattern that stores the state of an application as a sequence of events. Instead of storing the current state directly, it stores all events that caused state changes in order.
Why Event Sourcing?
Problems with traditional CRUD approach:
- Can only see current state, not how it got there
- Audit logs must be managed separately
- Difficult to gain business insights
Benefits of Event Sourcing:
- Complete audit trail
- Time-travel debugging possible
- State recovery through event replay
- Business event-based analytics possible
Core Concepts
1. Event Store
A repository that stores events. Events are immutable and stored in an append-only manner.
@Entity
@Table(name = "event_store")
class StoredEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val occurredAt: Instant = Instant.now()
)
2. Aggregate Root
The entry point of the domain model, responsible for generating and applying events.
abstract class AggregateRoot {
@Transient
private val pendingEvents = mutableListOf<DomainEvent>()
var version: Long = 0
protected set
protected fun applyChange(event: DomainEvent) {
applyEvent(event)
pendingEvents.add(event)
}
protected abstract fun applyEvent(event: DomainEvent)
fun getPendingEvents(): List<DomainEvent> = pendingEvents.toList()
fun clearPendingEvents() {
pendingEvents.clear()
}
fun loadFromHistory(events: List<DomainEvent>) {
events.forEach { event ->
applyEvent(event)
version++
}
}
}
3. Domain Event
An immutable object representing a fact that occurred in the business.
interface DomainEvent {
val aggregateId: String
val occurredAt: Instant
}
data class OrderCreated(
override val aggregateId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
data class OrderShipped(
override val aggregateId: String,
val shippingAddress: String,
val trackingNumber: String,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
Practical Implementation Example: Order System
Order Aggregate
class Order private constructor() : AggregateRoot() {
lateinit var id: String
private set
lateinit var customerId: String
private set
var status: OrderStatus = OrderStatus.CREATED
private set
var items: List<OrderItem> = emptyList()
private set
var totalAmount: BigDecimal = BigDecimal.ZERO
private set
companion object {
fun create(
orderId: String,
customerId: String,
items: List<OrderItem>
): Order {
val order = Order()
val totalAmount = items.sumOf { it.price * it.quantity.toBigDecimal() }
order.applyChange(
OrderCreated(
aggregateId = orderId,
customerId = customerId,
items = items,
totalAmount = totalAmount
)
)
return order
}
fun fromHistory(events: List<DomainEvent>): Order {
val order = Order()
order.loadFromHistory(events)
return order
}
}
fun ship(shippingAddress: String, trackingNumber: String) {
require(status == OrderStatus.CREATED) {
"Order must be in CREATED status to ship"
}
applyChange(
OrderShipped(
aggregateId = id,
shippingAddress = shippingAddress,
trackingNumber = trackingNumber
)
)
}
override fun applyEvent(event: DomainEvent) {
when (event) {
is OrderCreated -> {
id = event.aggregateId
customerId = event.customerId
items = event.items
totalAmount = event.totalAmount
status = OrderStatus.CREATED
}
is OrderShipped -> {
status = OrderStatus.SHIPPED
}
}
}
}
enum class OrderStatus {
CREATED, SHIPPED, DELIVERED, CANCELLED
}
Event Store Repository
interface EventStoreRepository : JpaRepository<StoredEvent, Long> {
fun findByAggregateIdOrderByVersionAsc(aggregateId: String): List<StoredEvent>
@Query("SELECT MAX(e.version) FROM StoredEvent e WHERE e.aggregateId = :aggregateId")
fun findLatestVersion(aggregateId: String): Long?
}
@Service
class EventStore(
private val repository: EventStoreRepository,
private val objectMapper: ObjectMapper
) {
fun saveEvents(
aggregateId: String,
aggregateType: String,
events: List<DomainEvent>,
expectedVersion: Long
) {
val currentVersion = repository.findLatestVersion(aggregateId) ?: 0
if (currentVersion != expectedVersion) {
throw OptimisticLockingException(
"Expected version $expectedVersion but found $currentVersion"
)
}
events.forEachIndexed { index, event ->
val storedEvent = StoredEvent(
aggregateId = aggregateId,
aggregateType = aggregateType,
eventType = event::class.simpleName!!,
payload = objectMapper.writeValueAsString(event),
version = expectedVersion + index + 1
)
repository.save(storedEvent)
}
}
fun getEvents(aggregateId: String): List<DomainEvent> {
return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
.map { deserializeEvent(it) }
}
private fun deserializeEvent(storedEvent: StoredEvent): DomainEvent {
val eventClass = when (storedEvent.eventType) {
"OrderCreated" -> OrderCreated::class.java
"OrderShipped" -> OrderShipped::class.java
else -> throw IllegalArgumentException("Unknown event type: ${storedEvent.eventType}")
}
return objectMapper.readValue(storedEvent.payload, eventClass)
}
}
Order Repository (Event Sourced)
@Repository
class OrderRepository(private val eventStore: EventStore) {
fun save(order: Order) {
val events = order.getPendingEvents()
if (events.isEmpty()) return
eventStore.saveEvents(
aggregateId = order.id,
aggregateType = "Order",
events = events,
expectedVersion = order.version
)
order.clearPendingEvents()
}
fun findById(orderId: String): Order? {
val events = eventStore.getEvents(orderId)
if (events.isEmpty()) return null
return Order.fromHistory(events)
}
}
Snapshotting
As events accumulate, restoring an Aggregate can take a long time. Snapshotting saves the state at specific points to reduce restoration time.
@Entity
@Table(name = "snapshots")
class Snapshot(
@Id
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val createdAt: Instant = Instant.now()
)
@Service
class SnapshotStore(
private val snapshotRepository: SnapshotRepository,
private val objectMapper: ObjectMapper
) {
private val snapshotFrequency = 10 // Create snapshot every 10 events
fun shouldCreateSnapshot(version: Long): Boolean {
return version % snapshotFrequency == 0L
}
fun saveSnapshot(aggregateId: String, aggregate: AggregateRoot) {
val snapshot = Snapshot(
aggregateId = aggregateId,
aggregateType = aggregate::class.simpleName!!,
payload = objectMapper.writeValueAsString(aggregate),
version = aggregate.version
)
snapshotRepository.save(snapshot)
}
fun getLatestSnapshot(aggregateId: String): Pair<Order, Long>? {
return snapshotRepository.findById(aggregateId)
.map {
val order = objectMapper.readValue(it.payload, Order::class.java)
order to it.version
}
.orElse(null)
}
}
Projection
Creating read-optimized views based on events is called Projection.
@Entity
@Table(name = "order_summary")
class OrderSummary(
@Id
val orderId: String,
val customerId: String,
val status: String,
val totalAmount: BigDecimal,
val itemCount: Int,
val createdAt: Instant,
var lastModifiedAt: Instant
)
@Component
class OrderProjection(
private val orderSummaryRepository: OrderSummaryRepository
) {
@EventHandler
fun on(event: OrderCreated) {
val summary = OrderSummary(
orderId = event.aggregateId,
customerId = event.customerId,
status = "CREATED",
totalAmount = event.totalAmount,
itemCount = event.items.size,
createdAt = event.occurredAt,
lastModifiedAt = event.occurredAt
)
orderSummaryRepository.save(summary)
}
@EventHandler
fun on(event: OrderShipped) {
orderSummaryRepository.findById(event.aggregateId)
.ifPresent { summary ->
summary.status = "SHIPPED"
summary.lastModifiedAt = event.occurredAt
orderSummaryRepository.save(summary)
}
}
}
Summary
Event Sourcing is particularly useful in the following situations:
- Financial and healthcare systems where audit trails are important
- When event-based analytics are needed
- Domains with complex business logic
- When tracking state changes over time is required
In the next post, we’ll cover the Outbox Pattern, which is often used together with Event Sourcing.
댓글남기기