Production-Ready Event-Driven Architecture Part 2 - Implementing the Outbox Pattern
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals
- Part 2: Implementing the Outbox Pattern (Current)
- Part 3: CQRS with Separate Read/Write Models
- Part 4: Saga Pattern for Distributed Transactions
- Part 5: Event Schema Evolution and Versioning
Problem: Dual Write Issue
Let’s look at a common problem in microservices:
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. Save to database
val order = orderRepository.save(Order.create(command))
// 2. Publish event to Kafka
kafkaTemplate.send("order-events", OrderCreatedEvent(order))
return order
}
}
Problems with this code:
- DB save succeeds but Kafka send fails → Data inconsistency
- Kafka send succeeds but transaction rolls back → Event published but no data
- Atomic writes to both systems are impossible
Solution: Transactional Outbox Pattern
The Outbox Pattern stores events in an Outbox table within the same transaction, and a separate process delivers them to the message broker.
Architecture
┌─────────────────────────────────────────────────────┐
│ Application │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Order │ │ Outbox Table │ │
│ │ Table │ │ (Same Transaction) │ │
│ └─────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐
│ Outbox Relay │
│ (Polling or │
│ CDC) │
└─────────────────┘
│
▼
┌─────────────────┐
│ Kafka │
└─────────────────┘
Implementation Method 1: Polling Publisher
Outbox Table Design
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val createdAt: Instant = Instant.now(),
@Enumerated(EnumType.STRING)
var status: OutboxStatus = OutboxStatus.PENDING,
var processedAt: Instant? = null
)
enum class OutboxStatus {
PENDING, PROCESSED, FAILED
}
Service Layer Modification
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. Save Order
val order = orderRepository.save(Order.create(command))
// 2. Save event to Outbox within same transaction
val event = OrderCreatedEvent(
orderId = order.id,
customerId = order.customerId,
items = order.items,
totalAmount = order.totalAmount
)
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(event)
)
)
return order
}
}
Polling Publisher
@Component
class OutboxPollingPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
private val logger = LoggerFactory.getLogger(javaClass)
@Scheduled(fixedDelay = 1000) // Run every 1 second
@Transactional
fun publishPendingEvents() {
val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
OutboxStatus.PENDING,
PageRequest.of(0, 100)
)
pendingEvents.forEach { event ->
try {
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
.get() // Synchronous send for certainty
event.status = OutboxStatus.PROCESSED
event.processedAt = Instant.now()
outboxRepository.save(event)
logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
} catch (e: Exception) {
logger.error("Failed to publish event: ${event.id}", e)
event.status = OutboxStatus.FAILED
outboxRepository.save(event)
}
}
}
}
Limitations of Polling Approach
- Latency: Delay equal to polling interval
- Load: Continuous DB polling causes load
- Duplicates: Duplicate sends possible in failure scenarios
Implementation Method 2: Debezium CDC (Change Data Capture)
Debezium captures database changes in real-time and streams them to Kafka.
Docker Compose Setup
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical" # Required for CDC
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
Debezium Connector Configuration
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orderdb",
"database.server.name": "orderdb",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events",
"transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
}
}
Register Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @outbox-connector.json
Debezium Outbox Event Router
Debezium’s Outbox Event Router automatically routes outbox table records to appropriate topics.
// Outbox table structure (for Debezium)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
val id: UUID = UUID.randomUUID(),
@Column(name = "aggregate_type", nullable = false)
val aggregateType: String,
@Column(name = "aggregate_id", nullable = false)
val aggregateId: String,
@Column(name = "event_type", nullable = false)
val eventType: String,
@Column(columnDefinition = "JSONB", nullable = false)
val payload: String
)
Service Layer (CDC Method)
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
val order = orderRepository.save(Order.create(command))
// Save to Outbox, Debezium automatically sends to Kafka
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(
OrderCreatedEvent(order)
)
)
)
return order
}
}
Ensuring Exactly-Once Semantics
Consumer-Side Idempotency
@Component
class OrderEventConsumer(
private val processedEventRepository: ProcessedEventRepository
) {
@KafkaListener(topics = ["order-events"])
@Transactional
fun handleOrderEvent(
@Payload payload: String,
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
@Header("id") eventId: String
) {
// Check if event already processed
if (processedEventRepository.existsById(eventId)) {
logger.info("Event already processed: $eventId")
return
}
// Process event
processEvent(payload)
// Record completion
processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
}
}
@Entity
@Table(name = "processed_events")
class ProcessedEvent(
@Id
val eventId: String,
val processedAt: Instant
)
Outbox Table Cleanup
Cleanup task to prevent the Outbox table from growing indefinitely:
@Component
class OutboxCleaner(
private val outboxRepository: OutboxRepository
) {
@Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM
@Transactional
fun cleanOldEvents() {
val cutoffTime = Instant.now().minus(Duration.ofDays(7))
val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
OutboxStatus.PROCESSED,
cutoffTime
)
logger.info("Deleted $deletedCount old outbox events")
}
}
Performance Optimization
Batch Processing
@Component
class BatchOutboxPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Scheduled(fixedDelay = 500)
@Transactional
fun publishBatch() {
val events = outboxRepository.findPendingEvents(limit = 1000)
if (events.isEmpty()) return
val futures = events.map { event ->
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
}
// Wait for all sends to complete
futures.forEach { it.get() }
// Batch update
outboxRepository.markAsProcessed(events.map { it.id!! })
}
}
Summary
The Outbox Pattern provides the following benefits:
| Feature | Polling | CDC (Debezium) |
|---|---|---|
| Latency | Polling interval | Near real-time |
| Implementation Complexity | Low | Medium |
| Infrastructure Requirements | Low | Requires Debezium |
| DB Load | Medium | Low |
| Scalability | Limited | High |
In the next post, we’ll cover how to separate read and write models using the CQRS pattern.
댓글남기기