Production-Ready Event-Driven Architecture Part 1 - Event Sourcing Fundamentals
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals (Current)
- Part 2: Implementing the Outbox Pattern
- Part 3: CQRS with Separate Read/Write Models
- Part 4: Saga Pattern for Distributed Transactions
- Part 5: Event Schema Evolution and Versioning
What is Event Sourcing?
Event Sourcing is a pattern that stores the state of an application as a sequence of events. Instead of storing the current state directly, it stores all events that caused state changes in order.
Why Event Sourcing?
Problems with traditional CRUD approach:
- Can only see current state, not how it got there
- Audit logs must be managed separately
- Difficult to gain business insights
Benefits of Event Sourcing:
- Complete audit trail
- Time-travel debugging possible
- State recovery through event replay
- Business event-based analytics possible
Core Concepts
1. Event Store
A repository that stores events. Events are immutable and stored in an append-only manner.
@Entity
@Table(name = "event_store")
class StoredEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val occurredAt: Instant = Instant.now()
)
2. Aggregate Root
The entry point of the domain model, responsible for generating and applying events.
abstract class AggregateRoot {
@Transient
private val pendingEvents = mutableListOf<DomainEvent>()
var version: Long = 0
protected set
protected fun applyChange(event: DomainEvent) {
applyEvent(event)
pendingEvents.add(event)
}
protected abstract fun applyEvent(event: DomainEvent)
fun getPendingEvents(): List<DomainEvent> = pendingEvents.toList()
fun clearPendingEvents() {
pendingEvents.clear()
}
fun loadFromHistory(events: List<DomainEvent>) {
events.forEach { event ->
applyEvent(event)
version++
}
}
}
3. Domain Event
An immutable object representing a fact that occurred in the business.
interface DomainEvent {
val aggregateId: String
val occurredAt: Instant
}
data class OrderCreated(
override val aggregateId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
data class OrderShipped(
override val aggregateId: String,
val shippingAddress: String,
val trackingNumber: String,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
Practical Implementation Example: Order System
Order Aggregate
class Order private constructor() : AggregateRoot() {
lateinit var id: String
private set
lateinit var customerId: String
private set
var status: OrderStatus = OrderStatus.CREATED
private set
var items: List<OrderItem> = emptyList()
private set
var totalAmount: BigDecimal = BigDecimal.ZERO
private set
companion object {
fun create(
orderId: String,
customerId: String,
items: List<OrderItem>
): Order {
val order = Order()
val totalAmount = items.sumOf { it.price * it.quantity.toBigDecimal() }
order.applyChange(
OrderCreated(
aggregateId = orderId,
customerId = customerId,
items = items,
totalAmount = totalAmount
)
)
return order
}
fun fromHistory(events: List<DomainEvent>): Order {
val order = Order()
order.loadFromHistory(events)
return order
}
}
fun ship(shippingAddress: String, trackingNumber: String) {
require(status == OrderStatus.CREATED) {
"Order must be in CREATED status to ship"
}
applyChange(
OrderShipped(
aggregateId = id,
shippingAddress = shippingAddress,
trackingNumber = trackingNumber
)
)
}
override fun applyEvent(event: DomainEvent) {
when (event) {
is OrderCreated -> {
id = event.aggregateId
customerId = event.customerId
items = event.items
totalAmount = event.totalAmount
status = OrderStatus.CREATED
}
is OrderShipped -> {
status = OrderStatus.SHIPPED
}
}
}
}
enum class OrderStatus {
CREATED, SHIPPED, DELIVERED, CANCELLED
}
Event Store Repository
interface EventStoreRepository : JpaRepository<StoredEvent, Long> {
fun findByAggregateIdOrderByVersionAsc(aggregateId: String): List<StoredEvent>
@Query("SELECT MAX(e.version) FROM StoredEvent e WHERE e.aggregateId = :aggregateId")
fun findLatestVersion(aggregateId: String): Long?
}
@Service
class EventStore(
private val repository: EventStoreRepository,
private val objectMapper: ObjectMapper
) {
fun saveEvents(
aggregateId: String,
aggregateType: String,
events: List<DomainEvent>,
expectedVersion: Long
) {
val currentVersion = repository.findLatestVersion(aggregateId) ?: 0
if (currentVersion != expectedVersion) {
throw OptimisticLockingException(
"Expected version $expectedVersion but found $currentVersion"
)
}
events.forEachIndexed { index, event ->
val storedEvent = StoredEvent(
aggregateId = aggregateId,
aggregateType = aggregateType,
eventType = event::class.simpleName!!,
payload = objectMapper.writeValueAsString(event),
version = expectedVersion + index + 1
)
repository.save(storedEvent)
}
}
fun getEvents(aggregateId: String): List<DomainEvent> {
return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
.map { deserializeEvent(it) }
}
private fun deserializeEvent(storedEvent: StoredEvent): DomainEvent {
val eventClass = when (storedEvent.eventType) {
"OrderCreated" -> OrderCreated::class.java
"OrderShipped" -> OrderShipped::class.java
else -> throw IllegalArgumentException("Unknown event type: ${storedEvent.eventType}")
}
return objectMapper.readValue(storedEvent.payload, eventClass)
}
}
Order Repository (Event Sourced)
@Repository
class OrderRepository(private val eventStore: EventStore) {
fun save(order: Order) {
val events = order.getPendingEvents()
if (events.isEmpty()) return
eventStore.saveEvents(
aggregateId = order.id,
aggregateType = "Order",
events = events,
expectedVersion = order.version
)
order.clearPendingEvents()
}
fun findById(orderId: String): Order? {
val events = eventStore.getEvents(orderId)
if (events.isEmpty()) return null
return Order.fromHistory(events)
}
}
Snapshotting
As events accumulate, restoring an Aggregate can take a long time. Snapshotting saves the state at specific points to reduce restoration time.
@Entity
@Table(name = "snapshots")
class Snapshot(
@Id
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val createdAt: Instant = Instant.now()
)
@Service
class SnapshotStore(
private val snapshotRepository: SnapshotRepository,
private val objectMapper: ObjectMapper
) {
private val snapshotFrequency = 10 // Create snapshot every 10 events
fun shouldCreateSnapshot(version: Long): Boolean {
return version % snapshotFrequency == 0L
}
fun saveSnapshot(aggregateId: String, aggregate: AggregateRoot) {
val snapshot = Snapshot(
aggregateId = aggregateId,
aggregateType = aggregate::class.simpleName!!,
payload = objectMapper.writeValueAsString(aggregate),
version = aggregate.version
)
snapshotRepository.save(snapshot)
}
fun getLatestSnapshot(aggregateId: String): Pair<Order, Long>? {
return snapshotRepository.findById(aggregateId)
.map {
val order = objectMapper.readValue(it.payload, Order::class.java)
order to it.version
}
.orElse(null)
}
}
Projection
Creating read-optimized views based on events is called Projection.
@Entity
@Table(name = "order_summary")
class OrderSummary(
@Id
val orderId: String,
val customerId: String,
val status: String,
val totalAmount: BigDecimal,
val itemCount: Int,
val createdAt: Instant,
var lastModifiedAt: Instant
)
@Component
class OrderProjection(
private val orderSummaryRepository: OrderSummaryRepository
) {
@EventHandler
fun on(event: OrderCreated) {
val summary = OrderSummary(
orderId = event.aggregateId,
customerId = event.customerId,
status = "CREATED",
totalAmount = event.totalAmount,
itemCount = event.items.size,
createdAt = event.occurredAt,
lastModifiedAt = event.occurredAt
)
orderSummaryRepository.save(summary)
}
@EventHandler
fun on(event: OrderShipped) {
orderSummaryRepository.findById(event.aggregateId)
.ifPresent { summary ->
summary.status = "SHIPPED"
summary.lastModifiedAt = event.occurredAt
orderSummaryRepository.save(summary)
}
}
}
Summary
Event Sourcing is particularly useful in the following situations:
- Financial and healthcare systems where audit trails are important
- When event-based analytics are needed
- Domains with complex business logic
- When tracking state changes over time is required
In the next post, we’ll cover the Outbox Pattern, which is often used together with Event Sourcing.
댓글남기기