프로덕션 레디 이벤트 기반 아키텍처 Part 2 - Outbox Pattern 구현 Production-Ready Event-Driven Architecture Part 2 - Implementing the Outbox Pattern
시리즈 소개
이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.
- Part 1: Event Sourcing 기초
- Part 2: Outbox Pattern 구현 (현재 글)
- Part 3: CQRS와 Read/Write 모델 분리
- Part 4: Saga Pattern으로 분산 트랜잭션 처리
- Part 5: Event Schema 진화와 버전 관리
문제: 이중 쓰기(Dual Write) 문제
마이크로서비스에서 자주 발생하는 문제를 살펴보겠습니다:
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. 데이터베이스에 저장
val order = orderRepository.save(Order.create(command))
// 2. Kafka에 이벤트 발행
kafkaTemplate.send("order-events", OrderCreatedEvent(order))
return order
}
}
이 코드의 문제점:
- DB 저장 성공 후 Kafka 전송 실패 → 데이터 불일치
- Kafka 전송 성공 후 트랜잭션 롤백 → 이벤트는 발행되었으나 데이터 없음
- 두 시스템에 대한 원자적 쓰기가 불가능
해결책: Transactional Outbox Pattern
Outbox Pattern은 이벤트를 같은 트랜잭션 내에서 데이터베이스의 Outbox 테이블에 저장하고, 별도의 프로세스가 이를 메시지 브로커로 전달합니다.
아키텍처
┌─────────────────────────────────────────────────────┐
│ Application │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Order │ │ Outbox Table │ │
│ │ Table │ │ (Same Transaction) │ │
│ └─────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐
│ Outbox Relay │
│ (Polling or │
│ CDC) │
└─────────────────┘
│
▼
┌─────────────────┐
│ Kafka │
└─────────────────┘
구현 방법 1: Polling Publisher
Outbox 테이블 설계
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val createdAt: Instant = Instant.now(),
@Enumerated(EnumType.STRING)
var status: OutboxStatus = OutboxStatus.PENDING,
var processedAt: Instant? = null
)
enum class OutboxStatus {
PENDING, PROCESSED, FAILED
}
Service Layer 수정
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. Order 저장
val order = orderRepository.save(Order.create(command))
// 2. 같은 트랜잭션 내에서 Outbox에 이벤트 저장
val event = OrderCreatedEvent(
orderId = order.id,
customerId = order.customerId,
items = order.items,
totalAmount = order.totalAmount
)
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(event)
)
)
return order
}
}
Polling Publisher
@Component
class OutboxPollingPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
private val logger = LoggerFactory.getLogger(javaClass)
@Scheduled(fixedDelay = 1000) // 1초마다 실행
@Transactional
fun publishPendingEvents() {
val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
OutboxStatus.PENDING,
PageRequest.of(0, 100)
)
pendingEvents.forEach { event ->
try {
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
.get() // 동기 전송으로 확실히 전송
event.status = OutboxStatus.PROCESSED
event.processedAt = Instant.now()
outboxRepository.save(event)
logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
} catch (e: Exception) {
logger.error("Failed to publish event: ${event.id}", e)
event.status = OutboxStatus.FAILED
outboxRepository.save(event)
}
}
}
}
Polling 방식의 한계
- 지연 시간: 폴링 간격만큼의 지연 발생
- 부하: 지속적인 DB 폴링으로 인한 부하
- 중복: 장애 상황에서 중복 전송 가능
구현 방법 2: Debezium CDC (Change Data Capture)
Debezium은 데이터베이스의 변경 사항을 실시간으로 캡처하여 Kafka로 스트리밍합니다.
Docker Compose 설정
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical" # CDC를 위해 필수
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
Debezium Connector 설정
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orderdb",
"database.server.name": "orderdb",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events",
"transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
}
}
Connector 등록
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @outbox-connector.json
Debezium Outbox Event Router
Debezium의 Outbox Event Router는 outbox 테이블의 레코드를 자동으로 적절한 토픽으로 라우팅합니다.
// Outbox 테이블 구조 (Debezium용)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
val id: UUID = UUID.randomUUID(),
@Column(name = "aggregate_type", nullable = false)
val aggregateType: String,
@Column(name = "aggregate_id", nullable = false)
val aggregateId: String,
@Column(name = "event_type", nullable = false)
val eventType: String,
@Column(columnDefinition = "JSONB", nullable = false)
val payload: String
)
Service Layer (CDC 방식)
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
val order = orderRepository.save(Order.create(command))
// Outbox에 저장하면 Debezium이 자동으로 Kafka로 전송
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(
OrderCreatedEvent(order)
)
)
)
return order
}
}
Exactly-Once Semantics 보장
Consumer 측 Idempotency
@Component
class OrderEventConsumer(
private val processedEventRepository: ProcessedEventRepository
) {
@KafkaListener(topics = ["order-events"])
@Transactional
fun handleOrderEvent(
@Payload payload: String,
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
@Header("id") eventId: String
) {
// 이미 처리된 이벤트인지 확인
if (processedEventRepository.existsById(eventId)) {
logger.info("Event already processed: $eventId")
return
}
// 이벤트 처리
processEvent(payload)
// 처리 완료 기록
processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
}
}
@Entity
@Table(name = "processed_events")
class ProcessedEvent(
@Id
val eventId: String,
val processedAt: Instant
)
Outbox 테이블 정리
Outbox 테이블이 무한히 커지는 것을 방지하기 위한 정리 작업:
@Component
class OutboxCleaner(
private val outboxRepository: OutboxRepository
) {
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
@Transactional
fun cleanOldEvents() {
val cutoffTime = Instant.now().minus(Duration.ofDays(7))
val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
OutboxStatus.PROCESSED,
cutoffTime
)
logger.info("Deleted $deletedCount old outbox events")
}
}
성능 최적화
Batch Processing
@Component
class BatchOutboxPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Scheduled(fixedDelay = 500)
@Transactional
fun publishBatch() {
val events = outboxRepository.findPendingEvents(limit = 1000)
if (events.isEmpty()) return
val futures = events.map { event ->
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
}
// 모든 전송 완료 대기
futures.forEach { it.get() }
// Batch update
outboxRepository.markAsProcessed(events.map { it.id!! })
}
}
정리
Outbox Pattern은 다음과 같은 이점을 제공합니다:
| 특성 | Polling | CDC (Debezium) |
|---|---|---|
| 지연 시간 | 폴링 간격 | 거의 실시간 |
| 구현 복잡도 | 낮음 | 중간 |
| 인프라 요구사항 | 낮음 | Debezium 필요 |
| DB 부하 | 중간 | 낮음 |
| 확장성 | 제한적 | 높음 |
다음 글에서는 CQRS 패턴을 통해 읽기와 쓰기 모델을 분리하는 방법을 다루겠습니다.
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals
- Part 2: Implementing the Outbox Pattern (Current)
- Part 3: CQRS with Separate Read/Write Models
- Part 4: Saga Pattern for Distributed Transactions
- Part 5: Event Schema Evolution and Versioning
Problem: Dual Write Issue
Let’s look at a common problem in microservices:
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. Save to database
val order = orderRepository.save(Order.create(command))
// 2. Publish event to Kafka
kafkaTemplate.send("order-events", OrderCreatedEvent(order))
return order
}
}
Problems with this code:
- DB save succeeds but Kafka send fails → Data inconsistency
- Kafka send succeeds but transaction rolls back → Event published but no data
- Atomic writes to both systems are impossible
Solution: Transactional Outbox Pattern
The Outbox Pattern stores events in an Outbox table within the same transaction, and a separate process delivers them to the message broker.
Architecture
┌─────────────────────────────────────────────────────┐
│ Application │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Order │ │ Outbox Table │ │
│ │ Table │ │ (Same Transaction) │ │
│ └─────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐
│ Outbox Relay │
│ (Polling or │
│ CDC) │
└─────────────────┘
│
▼
┌─────────────────┐
│ Kafka │
└─────────────────┘
Implementation Method 1: Polling Publisher
Outbox Table Design
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val createdAt: Instant = Instant.now(),
@Enumerated(EnumType.STRING)
var status: OutboxStatus = OutboxStatus.PENDING,
var processedAt: Instant? = null
)
enum class OutboxStatus {
PENDING, PROCESSED, FAILED
}
Service Layer Modification
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. Save Order
val order = orderRepository.save(Order.create(command))
// 2. Save event to Outbox within same transaction
val event = OrderCreatedEvent(
orderId = order.id,
customerId = order.customerId,
items = order.items,
totalAmount = order.totalAmount
)
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(event)
)
)
return order
}
}
Polling Publisher
@Component
class OutboxPollingPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
private val logger = LoggerFactory.getLogger(javaClass)
@Scheduled(fixedDelay = 1000) // Run every 1 second
@Transactional
fun publishPendingEvents() {
val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
OutboxStatus.PENDING,
PageRequest.of(0, 100)
)
pendingEvents.forEach { event ->
try {
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
.get() // Synchronous send for certainty
event.status = OutboxStatus.PROCESSED
event.processedAt = Instant.now()
outboxRepository.save(event)
logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
} catch (e: Exception) {
logger.error("Failed to publish event: ${event.id}", e)
event.status = OutboxStatus.FAILED
outboxRepository.save(event)
}
}
}
}
Limitations of Polling Approach
- Latency: Delay equal to polling interval
- Load: Continuous DB polling causes load
- Duplicates: Duplicate sends possible in failure scenarios
Implementation Method 2: Debezium CDC (Change Data Capture)
Debezium captures database changes in real-time and streams them to Kafka.
Docker Compose Setup
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical" # Required for CDC
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
Debezium Connector Configuration
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orderdb",
"database.server.name": "orderdb",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events",
"transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
}
}
Register Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @outbox-connector.json
Debezium Outbox Event Router
Debezium’s Outbox Event Router automatically routes outbox table records to appropriate topics.
// Outbox table structure (for Debezium)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
val id: UUID = UUID.randomUUID(),
@Column(name = "aggregate_type", nullable = false)
val aggregateType: String,
@Column(name = "aggregate_id", nullable = false)
val aggregateId: String,
@Column(name = "event_type", nullable = false)
val eventType: String,
@Column(columnDefinition = "JSONB", nullable = false)
val payload: String
)
Service Layer (CDC Method)
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
val order = orderRepository.save(Order.create(command))
// Save to Outbox, Debezium automatically sends to Kafka
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(
OrderCreatedEvent(order)
)
)
)
return order
}
}
Ensuring Exactly-Once Semantics
Consumer-Side Idempotency
@Component
class OrderEventConsumer(
private val processedEventRepository: ProcessedEventRepository
) {
@KafkaListener(topics = ["order-events"])
@Transactional
fun handleOrderEvent(
@Payload payload: String,
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
@Header("id") eventId: String
) {
// Check if event already processed
if (processedEventRepository.existsById(eventId)) {
logger.info("Event already processed: $eventId")
return
}
// Process event
processEvent(payload)
// Record completion
processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
}
}
@Entity
@Table(name = "processed_events")
class ProcessedEvent(
@Id
val eventId: String,
val processedAt: Instant
)
Outbox Table Cleanup
Cleanup task to prevent the Outbox table from growing indefinitely:
@Component
class OutboxCleaner(
private val outboxRepository: OutboxRepository
) {
@Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM
@Transactional
fun cleanOldEvents() {
val cutoffTime = Instant.now().minus(Duration.ofDays(7))
val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
OutboxStatus.PROCESSED,
cutoffTime
)
logger.info("Deleted $deletedCount old outbox events")
}
}
Performance Optimization
Batch Processing
@Component
class BatchOutboxPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Scheduled(fixedDelay = 500)
@Transactional
fun publishBatch() {
val events = outboxRepository.findPendingEvents(limit = 1000)
if (events.isEmpty()) return
val futures = events.map { event ->
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
}
// Wait for all sends to complete
futures.forEach { it.get() }
// Batch update
outboxRepository.markAsProcessed(events.map { it.id!! })
}
}
Summary
The Outbox Pattern provides the following benefits:
| Feature | Polling | CDC (Debezium) |
|---|---|---|
| Latency | Polling interval | Near real-time |
| Implementation Complexity | Low | Medium |
| Infrastructure Requirements | Low | Requires Debezium |
| DB Load | Medium | Low |
| Scalability | Limited | High |
In the next post, we’ll cover how to separate read and write models using the CQRS pattern.
댓글남기기