5 분 소요


Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern (Current)
  3. Part 3: CQRS with Separate Read/Write Models
  4. Part 4: Saga Pattern for Distributed Transactions
  5. Part 5: Event Schema Evolution and Versioning

Problem: Dual Write Issue

Let’s look at a common problem in microservices:

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. Save to database
        val order = orderRepository.save(Order.create(command))

        // 2. Publish event to Kafka
        kafkaTemplate.send("order-events", OrderCreatedEvent(order))

        return order
    }
}

Problems with this code:

  • DB save succeeds but Kafka send fails → Data inconsistency
  • Kafka send succeeds but transaction rolls back → Event published but no data
  • Atomic writes to both systems are impossible

Solution: Transactional Outbox Pattern

The Outbox Pattern stores events in an Outbox table within the same transaction, and a separate process delivers them to the message broker.

Architecture

┌─────────────────────────────────────────────────────┐
│                   Application                        │
│  ┌─────────────┐    ┌─────────────────────────────┐ │
│  │   Order     │    │        Outbox Table         │ │
│  │   Table     │    │  (Same Transaction)         │ │
│  └─────────────┘    └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  Outbox Relay   │
                    │  (Polling or    │
                    │   CDC)          │
                    └─────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │     Kafka       │
                    └─────────────────┘

Implementation Method 1: Polling Publisher

Outbox Table Design

@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(nullable = false)
    val aggregateId: String,

    @Column(nullable = false)
    val eventType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val createdAt: Instant = Instant.now(),

    @Enumerated(EnumType.STRING)
    var status: OutboxStatus = OutboxStatus.PENDING,

    var processedAt: Instant? = null
)

enum class OutboxStatus {
    PENDING, PROCESSED, FAILED
}

Service Layer Modification

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. Save Order
        val order = orderRepository.save(Order.create(command))

        // 2. Save event to Outbox within same transaction
        val event = OrderCreatedEvent(
            orderId = order.id,
            customerId = order.customerId,
            items = order.items,
            totalAmount = order.totalAmount
        )

        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(event)
            )
        )

        return order
    }
}

Polling Publisher

@Component
class OutboxPollingPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Scheduled(fixedDelay = 1000) // Run every 1 second
    @Transactional
    fun publishPendingEvents() {
        val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
            OutboxStatus.PENDING,
            PageRequest.of(0, 100)
        )

        pendingEvents.forEach { event ->
            try {
                val topic = "${event.aggregateType.lowercase()}-events"

                kafkaTemplate.send(topic, event.aggregateId, event.payload)
                    .get() // Synchronous send for certainty

                event.status = OutboxStatus.PROCESSED
                event.processedAt = Instant.now()
                outboxRepository.save(event)

                logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
            } catch (e: Exception) {
                logger.error("Failed to publish event: ${event.id}", e)
                event.status = OutboxStatus.FAILED
                outboxRepository.save(event)
            }
        }
    }
}

Limitations of Polling Approach

  • Latency: Delay equal to polling interval
  • Load: Continuous DB polling causes load
  • Duplicates: Duplicate sends possible in failure scenarios

Implementation Method 2: Debezium CDC (Change Data Capture)

Debezium captures database changes in real-time and streams them to Kafka.

Docker Compose Setup

version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: orderdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # Required for CDC
    ports:
      - "5432:5432"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Debezium Connector Configuration

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orderdb",
    "database.server.name": "orderdb",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}-events",
    "transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
  }
}

Register Connector

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @outbox-connector.json

Debezium Outbox Event Router

Debezium’s Outbox Event Router automatically routes outbox table records to appropriate topics.

// Outbox table structure (for Debezium)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    val id: UUID = UUID.randomUUID(),

    @Column(name = "aggregate_type", nullable = false)
    val aggregateType: String,

    @Column(name = "aggregate_id", nullable = false)
    val aggregateId: String,

    @Column(name = "event_type", nullable = false)
    val eventType: String,

    @Column(columnDefinition = "JSONB", nullable = false)
    val payload: String
)

Service Layer (CDC Method)

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = orderRepository.save(Order.create(command))

        // Save to Outbox, Debezium automatically sends to Kafka
        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(
                    OrderCreatedEvent(order)
                )
            )
        )

        return order
    }
}

Ensuring Exactly-Once Semantics

Consumer-Side Idempotency

@Component
class OrderEventConsumer(
    private val processedEventRepository: ProcessedEventRepository
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun handleOrderEvent(
        @Payload payload: String,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String,
        @Header("id") eventId: String
    ) {
        // Check if event already processed
        if (processedEventRepository.existsById(eventId)) {
            logger.info("Event already processed: $eventId")
            return
        }

        // Process event
        processEvent(payload)

        // Record completion
        processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
    }
}

@Entity
@Table(name = "processed_events")
class ProcessedEvent(
    @Id
    val eventId: String,
    val processedAt: Instant
)

Outbox Table Cleanup

Cleanup task to prevent the Outbox table from growing indefinitely:

@Component
class OutboxCleaner(
    private val outboxRepository: OutboxRepository
) {
    @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM
    @Transactional
    fun cleanOldEvents() {
        val cutoffTime = Instant.now().minus(Duration.ofDays(7))
        val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
            OutboxStatus.PROCESSED,
            cutoffTime
        )
        logger.info("Deleted $deletedCount old outbox events")
    }
}

Performance Optimization

Batch Processing

@Component
class BatchOutboxPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Scheduled(fixedDelay = 500)
    @Transactional
    fun publishBatch() {
        val events = outboxRepository.findPendingEvents(limit = 1000)

        if (events.isEmpty()) return

        val futures = events.map { event ->
            val topic = "${event.aggregateType.lowercase()}-events"
            kafkaTemplate.send(topic, event.aggregateId, event.payload)
        }

        // Wait for all sends to complete
        futures.forEach { it.get() }

        // Batch update
        outboxRepository.markAsProcessed(events.map { it.id!! })
    }
}

Summary

The Outbox Pattern provides the following benefits:

Feature Polling CDC (Debezium)
Latency Polling interval Near real-time
Implementation Complexity Low Medium
Infrastructure Requirements Low Requires Debezium
DB Load Medium Low
Scalability Limited High

In the next post, we’ll cover how to separate read and write models using the CQRS pattern.

댓글남기기