4 분 소요


시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현 (현재 글)
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

문제: 이중 쓰기(Dual Write) 문제

마이크로서비스에서 자주 발생하는 문제를 살펴보겠습니다:

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. 데이터베이스에 저장
        val order = orderRepository.save(Order.create(command))

        // 2. Kafka에 이벤트 발행
        kafkaTemplate.send("order-events", OrderCreatedEvent(order))

        return order
    }
}

이 코드의 문제점:

  • DB 저장 성공 후 Kafka 전송 실패 → 데이터 불일치
  • Kafka 전송 성공 후 트랜잭션 롤백 → 이벤트는 발행되었으나 데이터 없음
  • 두 시스템에 대한 원자적 쓰기가 불가능

해결책: Transactional Outbox Pattern

Outbox Pattern은 이벤트를 같은 트랜잭션 내에서 데이터베이스의 Outbox 테이블에 저장하고, 별도의 프로세스가 이를 메시지 브로커로 전달합니다.

아키텍처

┌─────────────────────────────────────────────────────┐
│                   Application                        │
│  ┌─────────────┐    ┌─────────────────────────────┐ │
│  │   Order     │    │        Outbox Table         │ │
│  │   Table     │    │  (Same Transaction)         │ │
│  └─────────────┘    └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  Outbox Relay   │
                    │  (Polling or    │
                    │   CDC)          │
                    └─────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │     Kafka       │
                    └─────────────────┘

구현 방법 1: Polling Publisher

Outbox 테이블 설계

@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(nullable = false)
    val aggregateType: String,

    @Column(nullable = false)
    val aggregateId: String,

    @Column(nullable = false)
    val eventType: String,

    @Column(columnDefinition = "TEXT", nullable = false)
    val payload: String,

    @Column(nullable = false)
    val createdAt: Instant = Instant.now(),

    @Enumerated(EnumType.STRING)
    var status: OutboxStatus = OutboxStatus.PENDING,

    var processedAt: Instant? = null
)

enum class OutboxStatus {
    PENDING, PROCESSED, FAILED
}

Service Layer 수정

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        // 1. Order 저장
        val order = orderRepository.save(Order.create(command))

        // 2. 같은 트랜잭션 내에서 Outbox에 이벤트 저장
        val event = OrderCreatedEvent(
            orderId = order.id,
            customerId = order.customerId,
            items = order.items,
            totalAmount = order.totalAmount
        )

        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(event)
            )
        )

        return order
    }
}

Polling Publisher

@Component
class OutboxPollingPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Scheduled(fixedDelay = 1000) // 1초마다 실행
    @Transactional
    fun publishPendingEvents() {
        val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
            OutboxStatus.PENDING,
            PageRequest.of(0, 100)
        )

        pendingEvents.forEach { event ->
            try {
                val topic = "${event.aggregateType.lowercase()}-events"

                kafkaTemplate.send(topic, event.aggregateId, event.payload)
                    .get() // 동기 전송으로 확실히 전송

                event.status = OutboxStatus.PROCESSED
                event.processedAt = Instant.now()
                outboxRepository.save(event)

                logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
            } catch (e: Exception) {
                logger.error("Failed to publish event: ${event.id}", e)
                event.status = OutboxStatus.FAILED
                outboxRepository.save(event)
            }
        }
    }
}

Polling 방식의 한계

  • 지연 시간: 폴링 간격만큼의 지연 발생
  • 부하: 지속적인 DB 폴링으로 인한 부하
  • 중복: 장애 상황에서 중복 전송 가능

구현 방법 2: Debezium CDC (Change Data Capture)

Debezium은 데이터베이스의 변경 사항을 실시간으로 캡처하여 Kafka로 스트리밍합니다.

Docker Compose 설정

version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: orderdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # CDC를 위해 필수
    ports:
      - "5432:5432"

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Debezium Connector 설정

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orderdb",
    "database.server.name": "orderdb",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}-events",
    "transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
  }
}

Connector 등록

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @outbox-connector.json

Debezium Outbox Event Router

Debezium의 Outbox Event Router는 outbox 테이블의 레코드를 자동으로 적절한 토픽으로 라우팅합니다.

// Outbox 테이블 구조 (Debezium용)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
    @Id
    val id: UUID = UUID.randomUUID(),

    @Column(name = "aggregate_type", nullable = false)
    val aggregateType: String,

    @Column(name = "aggregate_id", nullable = false)
    val aggregateId: String,

    @Column(name = "event_type", nullable = false)
    val eventType: String,

    @Column(columnDefinition = "JSONB", nullable = false)
    val payload: String
)

Service Layer (CDC 방식)

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val outboxRepository: OutboxRepository,
    private val objectMapper: ObjectMapper
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = orderRepository.save(Order.create(command))

        // Outbox에 저장하면 Debezium이 자동으로 Kafka로 전송
        outboxRepository.save(
            OutboxEvent(
                aggregateType = "Order",
                aggregateId = order.id,
                eventType = "OrderCreated",
                payload = objectMapper.writeValueAsString(
                    OrderCreatedEvent(order)
                )
            )
        )

        return order
    }
}

Exactly-Once Semantics 보장

Consumer 측 Idempotency

@Component
class OrderEventConsumer(
    private val processedEventRepository: ProcessedEventRepository
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun handleOrderEvent(
        @Payload payload: String,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String,
        @Header("id") eventId: String
    ) {
        // 이미 처리된 이벤트인지 확인
        if (processedEventRepository.existsById(eventId)) {
            logger.info("Event already processed: $eventId")
            return
        }

        // 이벤트 처리
        processEvent(payload)

        // 처리 완료 기록
        processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
    }
}

@Entity
@Table(name = "processed_events")
class ProcessedEvent(
    @Id
    val eventId: String,
    val processedAt: Instant
)

Outbox 테이블 정리

Outbox 테이블이 무한히 커지는 것을 방지하기 위한 정리 작업:

@Component
class OutboxCleaner(
    private val outboxRepository: OutboxRepository
) {
    @Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
    @Transactional
    fun cleanOldEvents() {
        val cutoffTime = Instant.now().minus(Duration.ofDays(7))
        val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
            OutboxStatus.PROCESSED,
            cutoffTime
        )
        logger.info("Deleted $deletedCount old outbox events")
    }
}

성능 최적화

Batch Processing

@Component
class BatchOutboxPublisher(
    private val outboxRepository: OutboxRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>
) {
    @Scheduled(fixedDelay = 500)
    @Transactional
    fun publishBatch() {
        val events = outboxRepository.findPendingEvents(limit = 1000)

        if (events.isEmpty()) return

        val futures = events.map { event ->
            val topic = "${event.aggregateType.lowercase()}-events"
            kafkaTemplate.send(topic, event.aggregateId, event.payload)
        }

        // 모든 전송 완료 대기
        futures.forEach { it.get() }

        // Batch update
        outboxRepository.markAsProcessed(events.map { it.id!! })
    }
}

정리

Outbox Pattern은 다음과 같은 이점을 제공합니다:

특성 Polling CDC (Debezium)
지연 시간 폴링 간격 거의 실시간
구현 복잡도 낮음 중간
인프라 요구사항 낮음 Debezium 필요
DB 부하 중간 낮음
확장성 제한적 높음

다음 글에서는 CQRS 패턴을 통해 읽기와 쓰기 모델을 분리하는 방법을 다루겠습니다.

댓글남기기