10 분 소요


시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현 (현재 글)
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

문제: 이중 쓰기(Dual Write) 문제

마이크로서비스에서 자주 발생하는 문제를 살펴보겠습니다:

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. 데이터베이스에 저장
        val order = orderRepository.save(Order.create(command))

        // 2. Kafka에 이벤트 발행
        kafkaTemplate.send("order-events", OrderCreatedEvent(order))

        return order
    }
}

이 코드의 문제점:

  • DB 저장 성공 후 Kafka 전송 실패 → 데이터 불일치
  • Kafka 전송 성공 후 트랜잭션 롤백 → 이벤트는 발행되었으나 데이터 없음
  • 두 시스템에 대한 원자적 쓰기가 불가능

해결책: Transactional Outbox Pattern

Outbox Pattern은 이벤트를 같은 트랜잭션 내에서 데이터베이스의 Outbox 테이블에 저장하고, 별도의 프로세스가 이를 메시지 브로커로 전달합니다.

아키텍처

┌─────────────────────────────────────────────────────┐
│                   Application                        │
│  ┌─────────────┐    ┌─────────────────────────────┐ │
│  │   Order     │    │        Outbox Table         │ │
│  │   Table     │    │  (Same Transaction)         │ │
│  └─────────────┘    └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  Outbox Relay   │
                    │  (Polling or    │
                    │   CDC)          │
                    └─────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │     Kafka       │
                    └─────────────────┘

구현 방법 1: Polling Publisher

Outbox 테이블 설계

@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(nullable = false)
    val aggregateId: String,

    @Column(nullable = false)
    val eventType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val createdAt: Instant = Instant.now(),

    @Enumerated(EnumType.STRING)
    var status: OutboxStatus = OutboxStatus.PENDING,

    var processedAt: Instant? = null
)

enum class OutboxStatus {
    PENDING, PROCESSED, FAILED
}

Service Layer 수정

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. Order 저장
        val order = orderRepository.save(Order.create(command))

        // 2. 같은 트랜잭션 내에서 Outbox에 이벤트 저장
        val event = OrderCreatedEvent(
            orderId = order.id,
            customerId = order.customerId,
            items = order.items,
            totalAmount = order.totalAmount
        )

        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(event)
            )
        )

        return order
    }
}

Polling Publisher

@Component
class OutboxPollingPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Scheduled(fixedDelay = 1000) // 1초마다 실행
    @Transactional
    fun publishPendingEvents() {
        val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
            OutboxStatus.PENDING,
            PageRequest.of(0, 100)
        )

        pendingEvents.forEach { event ->
            try {
                val topic = "${event.aggregateType.lowercase()}-events"

                kafkaTemplate.send(topic, event.aggregateId, event.payload)
                    .get() // 동기 전송으로 확실히 전송

                event.status = OutboxStatus.PROCESSED
                event.processedAt = Instant.now()
                outboxRepository.save(event)

                logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
            } catch (e: Exception) {
                logger.error("Failed to publish event: ${event.id}", e)
                event.status = OutboxStatus.FAILED
                outboxRepository.save(event)
            }
        }
    }
}

Polling 방식의 한계

  • 지연 시간: 폴링 간격만큼의 지연 발생
  • 부하: 지속적인 DB 폴링으로 인한 부하
  • 중복: 장애 상황에서 중복 전송 가능

구현 방법 2: Debezium CDC (Change Data Capture)

Debezium은 데이터베이스의 변경 사항을 실시간으로 캡처하여 Kafka로 스트리밍합니다.

Docker Compose 설정

version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: orderdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # CDC를 위해 필수
    ports:
      - "5432:5432"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Debezium Connector 설정

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orderdb",
    "database.server.name": "orderdb",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}-events",
    "transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
  }
}

Connector 등록

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @outbox-connector.json

Debezium Outbox Event Router

Debezium의 Outbox Event Router는 outbox 테이블의 레코드를 자동으로 적절한 토픽으로 라우팅합니다.

// Outbox 테이블 구조 (Debezium용)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    val id: UUID = UUID.randomUUID(),

    @Column(name = "aggregate_type", nullable = false)
    val aggregateType: String,

    @Column(name = "aggregate_id", nullable = false)
    val aggregateId: String,

    @Column(name = "event_type", nullable = false)
    val eventType: String,

    @Column(columnDefinition = "JSONB", nullable = false)
    val payload: String
)

Service Layer (CDC 방식)

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = orderRepository.save(Order.create(command))

        // Outbox에 저장하면 Debezium이 자동으로 Kafka로 전송
        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(
                    OrderCreatedEvent(order)
                )
            )
        )

        return order
    }
}

Exactly-Once Semantics 보장

Consumer 측 Idempotency

@Component
class OrderEventConsumer(
    private val processedEventRepository: ProcessedEventRepository
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun handleOrderEvent(
        @Payload payload: String,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String,
        @Header("id") eventId: String
    ) {
        // 이미 처리된 이벤트인지 확인
        if (processedEventRepository.existsById(eventId)) {
            logger.info("Event already processed: $eventId")
            return
        }

        // 이벤트 처리
        processEvent(payload)

        // 처리 완료 기록
        processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
    }
}

@Entity
@Table(name = "processed_events")
class ProcessedEvent(
    @Id
    val eventId: String,
    val processedAt: Instant
)

Outbox 테이블 정리

Outbox 테이블이 무한히 커지는 것을 방지하기 위한 정리 작업:

@Component
class OutboxCleaner(
    private val outboxRepository: OutboxRepository
) {
    @Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
    @Transactional
    fun cleanOldEvents() {
        val cutoffTime = Instant.now().minus(Duration.ofDays(7))
        val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
            OutboxStatus.PROCESSED,
            cutoffTime
        )
        logger.info("Deleted $deletedCount old outbox events")
    }
}

성능 최적화

Batch Processing

@Component
class BatchOutboxPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Scheduled(fixedDelay = 500)
    @Transactional
    fun publishBatch() {
        val events = outboxRepository.findPendingEvents(limit = 1000)

        if (events.isEmpty()) return

        val futures = events.map { event ->
            val topic = "${event.aggregateType.lowercase()}-events"
            kafkaTemplate.send(topic, event.aggregateId, event.payload)
        }

        // 모든 전송 완료 대기
        futures.forEach { it.get() }

        // Batch update
        outboxRepository.markAsProcessed(events.map { it.id!! })
    }
}

정리

Outbox Pattern은 다음과 같은 이점을 제공합니다:

특성 Polling CDC (Debezium)
지연 시간 폴링 간격 거의 실시간
구현 복잡도 낮음 중간
인프라 요구사항 낮음 Debezium 필요
DB 부하 중간 낮음
확장성 제한적 높음

다음 글에서는 CQRS 패턴을 통해 읽기와 쓰기 모델을 분리하는 방법을 다루겠습니다.

Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern (Current)
  3. Part 3: CQRS with Separate Read/Write Models
  4. Part 4: Saga Pattern for Distributed Transactions
  5. Part 5: Event Schema Evolution and Versioning

Problem: Dual Write Issue

Let’s look at a common problem in microservices:

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. Save to database
        val order = orderRepository.save(Order.create(command))

        // 2. Publish event to Kafka
        kafkaTemplate.send("order-events", OrderCreatedEvent(order))

        return order
    }
}

Problems with this code:

  • DB save succeeds but Kafka send fails → Data inconsistency
  • Kafka send succeeds but transaction rolls back → Event published but no data
  • Atomic writes to both systems are impossible

Solution: Transactional Outbox Pattern

The Outbox Pattern stores events in an Outbox table within the same transaction, and a separate process delivers them to the message broker.

Architecture

┌─────────────────────────────────────────────────────┐
│                   Application                        │
│  ┌─────────────┐    ┌─────────────────────────────┐ │
│  │   Order     │    │        Outbox Table         │ │
│  │   Table     │    │  (Same Transaction)         │ │
│  └─────────────┘    └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  Outbox Relay   │
                    │  (Polling or    │
                    │   CDC)          │
                    └─────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │     Kafka       │
                    └─────────────────┘

Implementation Method 1: Polling Publisher

Outbox Table Design

@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(nullable = false)
    val aggregateId: String,

    @Column(nullable = false)
    val eventType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val createdAt: Instant = Instant.now(),

    @Enumerated(EnumType.STRING)
    var status: OutboxStatus = OutboxStatus.PENDING,

    var processedAt: Instant? = null
)

enum class OutboxStatus {
    PENDING, PROCESSED, FAILED
}

Service Layer Modification

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. Save Order
        val order = orderRepository.save(Order.create(command))

        // 2. Save event to Outbox within same transaction
        val event = OrderCreatedEvent(
            orderId = order.id,
            customerId = order.customerId,
            items = order.items,
            totalAmount = order.totalAmount
        )

        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(event)
            )
        )

        return order
    }
}

Polling Publisher

@Component
class OutboxPollingPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Scheduled(fixedDelay = 1000) // Run every 1 second
    @Transactional
    fun publishPendingEvents() {
        val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
            OutboxStatus.PENDING,
            PageRequest.of(0, 100)
        )

        pendingEvents.forEach { event ->
            try {
                val topic = "${event.aggregateType.lowercase()}-events"

                kafkaTemplate.send(topic, event.aggregateId, event.payload)
                    .get() // Synchronous send for certainty

                event.status = OutboxStatus.PROCESSED
                event.processedAt = Instant.now()
                outboxRepository.save(event)

                logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
            } catch (e: Exception) {
                logger.error("Failed to publish event: ${event.id}", e)
                event.status = OutboxStatus.FAILED
                outboxRepository.save(event)
            }
        }
    }
}

Limitations of Polling Approach

  • Latency: Delay equal to polling interval
  • Load: Continuous DB polling causes load
  • Duplicates: Duplicate sends possible in failure scenarios

Implementation Method 2: Debezium CDC (Change Data Capture)

Debezium captures database changes in real-time and streams them to Kafka.

Docker Compose Setup

version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: orderdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # Required for CDC
    ports:
      - "5432:5432"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Debezium Connector Configuration

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orderdb",
    "database.server.name": "orderdb",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}-events",
    "transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
  }
}

Register Connector

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @outbox-connector.json

Debezium Outbox Event Router

Debezium’s Outbox Event Router automatically routes outbox table records to appropriate topics.

// Outbox table structure (for Debezium)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    val id: UUID = UUID.randomUUID(),

    @Column(name = "aggregate_type", nullable = false)
    val aggregateType: String,

    @Column(name = "aggregate_id", nullable = false)
    val aggregateId: String,

    @Column(name = "event_type", nullable = false)
    val eventType: String,

    @Column(columnDefinition = "JSONB", nullable = false)
    val payload: String
)

Service Layer (CDC Method)

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = orderRepository.save(Order.create(command))

        // Save to Outbox, Debezium automatically sends to Kafka
        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(
                    OrderCreatedEvent(order)
                )
            )
        )

        return order
    }
}

Ensuring Exactly-Once Semantics

Consumer-Side Idempotency

@Component
class OrderEventConsumer(
    private val processedEventRepository: ProcessedEventRepository
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun handleOrderEvent(
        @Payload payload: String,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String,
        @Header("id") eventId: String
    ) {
        // Check if event already processed
        if (processedEventRepository.existsById(eventId)) {
            logger.info("Event already processed: $eventId")
            return
        }

        // Process event
        processEvent(payload)

        // Record completion
        processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
    }
}

@Entity
@Table(name = "processed_events")
class ProcessedEvent(
    @Id
    val eventId: String,
    val processedAt: Instant
)

Outbox Table Cleanup

Cleanup task to prevent the Outbox table from growing indefinitely:

@Component
class OutboxCleaner(
    private val outboxRepository: OutboxRepository
) {
    @Scheduled(cron = "0 0 2 * * *") // Daily at 2 AM
    @Transactional
    fun cleanOldEvents() {
        val cutoffTime = Instant.now().minus(Duration.ofDays(7))
        val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
            OutboxStatus.PROCESSED,
            cutoffTime
        )
        logger.info("Deleted $deletedCount old outbox events")
    }
}

Performance Optimization

Batch Processing

@Component
class BatchOutboxPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Scheduled(fixedDelay = 500)
    @Transactional
    fun publishBatch() {
        val events = outboxRepository.findPendingEvents(limit = 1000)

        if (events.isEmpty()) return

        val futures = events.map { event ->
            val topic = "${event.aggregateType.lowercase()}-events"
            kafkaTemplate.send(topic, event.aggregateId, event.payload)
        }

        // Wait for all sends to complete
        futures.forEach { it.get() }

        // Batch update
        outboxRepository.markAsProcessed(events.map { it.id!! })
    }
}

Summary

The Outbox Pattern provides the following benefits:

Feature Polling CDC (Debezium)
Latency Polling interval Near real-time
Implementation Complexity Low Medium
Infrastructure Requirements Low Requires Debezium
DB Load Medium Low
Scalability Limited High

In the next post, we’ll cover how to separate read and write models using the CQRS pattern.

댓글남기기