11 분 소요


시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리 (현재 글)
  5. Part 5: Event Schema 진화와 버전 관리

분산 트랜잭션의 문제

마이크로서비스 환경에서는 하나의 비즈니스 트랜잭션이 여러 서비스에 걸쳐 있습니다.

예: 주문 처리 프로세스

주문 생성 → 재고 확인 → 결제 처리 → 배송 예약

각 단계가 다른 서비스에서 처리되며, 전통적인 ACID 트랜잭션을 사용할 수 없습니다.

Saga Pattern 소개

Saga는 일련의 로컬 트랜잭션으로 구성됩니다. 각 로컬 트랜잭션은 다음 트랜잭션을 트리거하고, 실패 시 이전 트랜잭션을 취소하는 보상 트랜잭션을 실행합니다.

두 가지 구현 방식

  1. Choreography: 각 서비스가 이벤트를 발행하고 구독
  2. Orchestration: 중앙 조정자가 Saga를 관리

Choreography 방식

아키텍처

┌─────────┐     ┌─────────┐     ┌─────────┐     ┌─────────┐
│  Order  │────▶│Inventory│────▶│ Payment │────▶│Shipping │
│ Service │     │ Service │     │ Service │     │ Service │
└────┬────┘     └────┬────┘     └────┬────┘     └────┬────┘
     │               │               │               │
     │  OrderCreated │ StockReserved │PaymentCompleted│
     └───────────────┴───────────────┴───────────────┘
                    Event Bus (Kafka)

Order Service

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val eventPublisher: OrderEventPublisher
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = Order.create(
            customerId = command.customerId,
            items = command.items
        )

        orderRepository.save(order)

        eventPublisher.publish(
            OrderCreated(
                orderId = order.id,
                customerId = order.customerId,
                items = order.items.map { OrderItemDto(it) },
                totalAmount = order.totalAmount
            )
        )

        return order
    }

    @TransactionalEventListener
    fun onPaymentCompleted(event: PaymentCompleted) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)

        order.markAsPaid()
        orderRepository.save(order)
    }

    @TransactionalEventListener
    fun onStockReservationFailed(event: StockReservationFailed) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)

        order.cancel("Stock reservation failed: ${event.reason}")
        orderRepository.save(order)

        eventPublisher.publish(OrderCancelled(order.id, event.reason))
    }
}

Inventory Service

@Service
class InventoryService(
    private val inventoryRepository: InventoryRepository,
    private val eventPublisher: InventoryEventPublisher
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun onOrderCreated(event: OrderCreated) {
        try {
            val reservations = event.items.map { item ->
                val inventory = inventoryRepository.findByProductId(item.productId)
                    ?: throw ProductNotFoundException(item.productId)

                if (inventory.availableQuantity < item.quantity) {
                    throw InsufficientStockException(item.productId)
                }

                inventory.reserve(item.quantity)
                inventoryRepository.save(inventory)

                StockReservation(
                    productId = item.productId,
                    quantity = item.quantity,
                    reservationId = UUID.randomUUID().toString()
                )
            }

            eventPublisher.publish(
                StockReserved(
                    orderId = event.orderId,
                    reservations = reservations
                )
            )
        } catch (e: Exception) {
            eventPublisher.publish(
                StockReservationFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Unknown error"
                )
            )
        }
    }

    @KafkaListener(topics = ["payment-events"])
    @Transactional
    fun onPaymentFailed(event: PaymentFailed) {
        // 보상 트랜잭션: 예약된 재고 해제
        val reservations = stockReservationRepository.findByOrderId(event.orderId)

        reservations.forEach { reservation ->
            val inventory = inventoryRepository.findByProductId(reservation.productId)!!
            inventory.releaseReservation(reservation.quantity)
            inventoryRepository.save(inventory)
        }

        stockReservationRepository.deleteAll(reservations)

        eventPublisher.publish(
            StockReleased(orderId = event.orderId)
        )
    }
}

Payment Service

@Service
class PaymentService(
    private val paymentRepository: PaymentRepository,
    private val paymentGateway: PaymentGateway,
    private val eventPublisher: PaymentEventPublisher
) {
    @KafkaListener(topics = ["inventory-events"])
    @Transactional
    fun onStockReserved(event: StockReserved) {
        try {
            val order = orderClient.getOrder(event.orderId)

            val paymentResult = paymentGateway.charge(
                customerId = order.customerId,
                amount = order.totalAmount
            )

            val payment = Payment(
                orderId = event.orderId,
                amount = order.totalAmount,
                transactionId = paymentResult.transactionId,
                status = PaymentStatus.COMPLETED
            )
            paymentRepository.save(payment)

            eventPublisher.publish(
                PaymentCompleted(
                    orderId = event.orderId,
                    transactionId = paymentResult.transactionId
                )
            )
        } catch (e: PaymentException) {
            eventPublisher.publish(
                PaymentFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Payment failed"
                )
            )
        }
    }
}

Choreography의 장단점

장점 단점
느슨한 결합 전체 흐름 파악 어려움
단순한 구현 순환 의존성 위험
서비스 자율성 디버깅 어려움
확장성 테스트 복잡

Orchestration 방식

아키텍처

                    ┌─────────────────┐
                    │     Saga        │
                    │  Orchestrator   │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
    ┌─────────┐        ┌─────────┐        ┌─────────┐
    │  Order  │        │Inventory│        │ Payment │
    │ Service │        │ Service │        │ Service │
    └─────────┘        └─────────┘        └─────────┘

Saga State Machine

enum class OrderSagaState {
    STARTED,
    STOCK_RESERVING,
    STOCK_RESERVED,
    STOCK_RESERVATION_FAILED,
    PAYMENT_PROCESSING,
    PAYMENT_COMPLETED,
    PAYMENT_FAILED,
    SHIPPING_SCHEDULED,
    COMPLETED,
    COMPENSATING,
    COMPENSATED,
    FAILED
}

enum class OrderSagaEvent {
    START,
    STOCK_RESERVE_SUCCESS,
    STOCK_RESERVE_FAIL,
    PAYMENT_SUCCESS,
    PAYMENT_FAIL,
    SHIPPING_SUCCESS,
    SHIPPING_FAIL,
    COMPENSATE_COMPLETE
}

@Entity
@Table(name = "order_saga")
class OrderSaga(
    @Id
    val sagaId: String = UUID.randomUUID().toString(),

    @Column(nullable = false)
    val orderId: String,

    @Enumerated(EnumType.STRING)
    var state: OrderSagaState = OrderSagaState.STARTED,

    @Column(columnDefinition = "TEXT")
    var context: String = "{}",  // JSON 형태로 컨텍스트 저장

    val createdAt: Instant = Instant.now(),
    var updatedAt: Instant = Instant.now()
)

Saga Orchestrator

@Service
class OrderSagaOrchestrator(
    private val sagaRepository: OrderSagaRepository,
    private val inventoryClient: InventoryClient,
    private val paymentClient: PaymentClient,
    private val shippingClient: ShippingClient,
    private val orderClient: OrderClient,
    private val objectMapper: ObjectMapper
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Transactional
    fun startSaga(orderId: String): OrderSaga {
        val saga = OrderSaga(orderId = orderId)
        sagaRepository.save(saga)

        processNextStep(saga)
        return saga
    }

    @Transactional
    fun handleEvent(sagaId: String, event: OrderSagaEvent, payload: Any? = null) {
        val saga = sagaRepository.findById(sagaId)
            ?: throw SagaNotFoundException(sagaId)

        val newState = transition(saga.state, event)
        saga.state = newState
        saga.updatedAt = Instant.now()

        if (payload != null) {
            val context = objectMapper.readTree(saga.context).deepCopy() as ObjectNode
            context.set<ObjectNode>(event.name, objectMapper.valueToTree(payload))
            saga.context = objectMapper.writeValueAsString(context)
        }

        sagaRepository.save(saga)

        processNextStep(saga)
    }

    private fun transition(currentState: OrderSagaState, event: OrderSagaEvent): OrderSagaState {
        return when (currentState to event) {
            OrderSagaState.STARTED to OrderSagaEvent.START ->
                OrderSagaState.STOCK_RESERVING

            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_SUCCESS ->
                OrderSagaState.STOCK_RESERVED

            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_FAIL ->
                OrderSagaState.STOCK_RESERVATION_FAILED

            OrderSagaState.STOCK_RESERVED to OrderSagaEvent.START ->
                OrderSagaState.PAYMENT_PROCESSING

            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_SUCCESS ->
                OrderSagaState.PAYMENT_COMPLETED

            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_FAIL ->
                OrderSagaState.COMPENSATING

            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_SUCCESS ->
                OrderSagaState.COMPLETED

            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_FAIL ->
                OrderSagaState.COMPENSATING

            OrderSagaState.COMPENSATING to OrderSagaEvent.COMPENSATE_COMPLETE ->
                OrderSagaState.COMPENSATED

            else -> throw InvalidStateTransitionException(currentState, event)
        }
    }

    private fun processNextStep(saga: OrderSaga) {
        when (saga.state) {
            OrderSagaState.STARTED -> {
                handleEvent(saga.sagaId, OrderSagaEvent.START)
            }

            OrderSagaState.STOCK_RESERVING -> {
                reserveStock(saga)
            }

            OrderSagaState.STOCK_RESERVED -> {
                handleEvent(saga.sagaId, OrderSagaEvent.START)
            }

            OrderSagaState.PAYMENT_PROCESSING -> {
                processPayment(saga)
            }

            OrderSagaState.PAYMENT_COMPLETED -> {
                scheduleShipping(saga)
            }

            OrderSagaState.COMPENSATING -> {
                compensate(saga)
            }

            OrderSagaState.COMPLETED -> {
                logger.info("Saga completed successfully: ${saga.sagaId}")
            }

            OrderSagaState.COMPENSATED -> {
                logger.info("Saga compensated: ${saga.sagaId}")
                markOrderAsFailed(saga)
            }

            else -> {
                logger.warn("No action for state: ${saga.state}")
            }
        }
    }

    @Async
    fun reserveStock(saga: OrderSaga) {
        try {
            val order = orderClient.getOrder(saga.orderId)
            val result = inventoryClient.reserveStock(
                ReserveStockRequest(
                    orderId = saga.orderId,
                    items = order.items
                )
            )
            handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Stock reservation failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_FAIL, e.message)
        }
    }

    @Async
    fun processPayment(saga: OrderSaga) {
        try {
            val order = orderClient.getOrder(saga.orderId)
            val result = paymentClient.processPayment(
                ProcessPaymentRequest(
                    orderId = saga.orderId,
                    customerId = order.customerId,
                    amount = order.totalAmount
                )
            )
            handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Payment failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_FAIL, e.message)
        }
    }

    @Async
    fun scheduleShipping(saga: OrderSaga) {
        try {
            val result = shippingClient.scheduleShipping(
                ScheduleShippingRequest(orderId = saga.orderId)
            )
            handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Shipping scheduling failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_FAIL, e.message)
        }
    }

    @Async
    fun compensate(saga: OrderSaga) {
        logger.info("Starting compensation for saga: ${saga.sagaId}")

        val context = objectMapper.readTree(saga.context)

        // 역순으로 보상 트랜잭션 실행
        try {
            // Payment 보상 (있는 경우)
            if (context.has("PAYMENT_SUCCESS")) {
                val paymentInfo = context.get("PAYMENT_SUCCESS")
                paymentClient.refund(
                    RefundRequest(
                        orderId = saga.orderId,
                        transactionId = paymentInfo.get("transactionId").asText()
                    )
                )
            }

            // Stock 보상 (있는 경우)
            if (context.has("STOCK_RESERVE_SUCCESS")) {
                inventoryClient.releaseStock(
                    ReleaseStockRequest(orderId = saga.orderId)
                )
            }

            handleEvent(saga.sagaId, OrderSagaEvent.COMPENSATE_COMPLETE)
        } catch (e: Exception) {
            logger.error("Compensation failed for saga: ${saga.sagaId}", e)
            // 보상 실패는 수동 개입 필요
            markForManualIntervention(saga, e)
        }
    }

    private fun markOrderAsFailed(saga: OrderSaga) {
        orderClient.updateOrderStatus(saga.orderId, "FAILED")
    }

    private fun markForManualIntervention(saga: OrderSaga, error: Exception) {
        saga.state = OrderSagaState.FAILED
        sagaRepository.save(saga)
        // Alert, 알림 등 수동 개입 요청
    }
}

Saga Step Definition (더 우아한 방법)

@Configuration
class OrderSagaDefinition {

    @Bean
    fun orderSaga(): SagaDefinition<OrderSagaData> {
        return SagaDefinition.builder<OrderSagaData>()
            .step("reserve-stock")
                .invokeParticipant { data -> inventoryClient.reserveStock(data.orderId, data.items) }
                .withCompensation { data -> inventoryClient.releaseStock(data.orderId) }
            .step("process-payment")
                .invokeParticipant { data -> paymentClient.charge(data.orderId, data.amount) }
                .withCompensation { data -> paymentClient.refund(data.orderId) }
            .step("schedule-shipping")
                .invokeParticipant { data -> shippingClient.schedule(data.orderId) }
                .withCompensation { data -> shippingClient.cancel(data.orderId) }
            .build()
    }
}

data class OrderSagaData(
    val orderId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val amount: BigDecimal
)

Orchestration의 장단점

장점 단점
전체 흐름 파악 용이 중앙 집중화
쉬운 디버깅 단일 장애점
명확한 책임 Orchestrator 복잡도
테스트 용이 결합도 증가

보상 트랜잭션 전략

Semantic Lock

@Entity
class StockReservation(
    @Id
    val id: String = UUID.randomUUID().toString(),
    val productId: String,
    val orderId: String,
    val quantity: Int,
    val status: ReservationStatus = ReservationStatus.RESERVED,
    val expiresAt: Instant = Instant.now().plus(Duration.ofMinutes(15))
)

enum class ReservationStatus {
    RESERVED, CONFIRMED, RELEASED, EXPIRED
}

@Scheduled(fixedDelay = 60000)
fun releaseExpiredReservations() {
    val expired = reservationRepository.findExpiredReservations(Instant.now())
    expired.forEach { reservation ->
        releaseReservation(reservation)
    }
}

Compensating Transaction Log

@Entity
@Table(name = "compensation_log")
class CompensationLog(
    @Id
    val id: String = UUID.randomUUID().toString(),
    val sagaId: String,
    val stepName: String,
    val compensationData: String,
    var status: CompensationStatus = CompensationStatus.PENDING,
    val createdAt: Instant = Instant.now(),
    var executedAt: Instant? = null
)

@Service
class CompensationExecutor(
    private val compensationLogRepository: CompensationLogRepository
) {
    @Scheduled(fixedDelay = 10000)
    @Transactional
    fun executePendingCompensations() {
        val pending = compensationLogRepository.findByStatus(CompensationStatus.PENDING)

        pending.forEach { log ->
            try {
                executeCompensation(log)
                log.status = CompensationStatus.COMPLETED
                log.executedAt = Instant.now()
            } catch (e: Exception) {
                log.status = CompensationStatus.FAILED
                // 재시도 로직 또는 알림
            }
            compensationLogRepository.save(log)
        }
    }
}

모니터링과 가시성

Saga 상태 조회 API

@RestController
@RequestMapping("/api/sagas")
class SagaController(
    private val sagaRepository: OrderSagaRepository
) {
    @GetMapping("/{sagaId}")
    fun getSaga(@PathVariable sagaId: String): ResponseEntity<SagaStatusResponse> {
        val saga = sagaRepository.findById(sagaId)
            ?: throw SagaNotFoundException(sagaId)

        return ResponseEntity.ok(
            SagaStatusResponse(
                sagaId = saga.sagaId,
                orderId = saga.orderId,
                state = saga.state.name,
                context = saga.context,
                createdAt = saga.createdAt,
                updatedAt = saga.updatedAt
            )
        )
    }

    @GetMapping
    fun listSagas(
        @RequestParam(required = false) state: OrderSagaState?,
        pageable: Pageable
    ): ResponseEntity<Page<SagaStatusResponse>> {
        val sagas = if (state != null) {
            sagaRepository.findByState(state, pageable)
        } else {
            sagaRepository.findAll(pageable)
        }

        return ResponseEntity.ok(sagas.map { SagaStatusResponse(it) })
    }
}

정리

Saga Pattern 선택 가이드:

상황 권장 방식
단순한 플로우 (2-3 서비스) Choreography
복잡한 플로우 Orchestration
서비스 자율성 중요 Choreography
가시성/디버깅 중요 Orchestration
빈번한 플로우 변경 Orchestration

다음 글에서는 Event Schema의 진화와 버전 관리를 다루겠습니다.

Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS with Separate Read/Write Models
  4. Part 4: Saga Pattern for Distributed Transactions (Current)
  5. Part 5: Event Schema Evolution and Versioning

The Distributed Transaction Problem

In a microservices environment, a single business transaction spans multiple services.

Example: Order Processing Flow

Create Order → Check Inventory → Process Payment → Schedule Shipping

Each step is handled by a different service, and traditional ACID transactions cannot be used.

Introduction to Saga Pattern

A Saga consists of a sequence of local transactions. Each local transaction triggers the next transaction, and on failure, executes compensating transactions to undo previous transactions.

Two Implementation Approaches

  1. Choreography: Each service publishes and subscribes to events
  2. Orchestration: A central coordinator manages the Saga

Choreography Approach

Architecture

┌─────────┐     ┌─────────┐     ┌─────────┐     ┌─────────┐
│  Order  │────▶│Inventory│────▶│ Payment │────▶│Shipping │
│ Service │     │ Service │     │ Service │     │ Service │
└────┬────┘     └────┬────┘     └────┬────┘     └────┬────┘
     │               │               │               │
     │  OrderCreated │ StockReserved │PaymentCompleted│
     └───────────────┴───────────────┴───────────────┘
                    Event Bus (Kafka)

Order Service

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val eventPublisher: OrderEventPublisher
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = Order.create(
            customerId = command.customerId,
            items = command.items
        )

        orderRepository.save(order)

        eventPublisher.publish(
            OrderCreated(
                orderId = order.id,
                customerId = order.customerId,
                items = order.items.map { OrderItemDto(it) },
                totalAmount = order.totalAmount
            )
        )

        return order
    }

    @TransactionalEventListener
    fun onPaymentCompleted(event: PaymentCompleted) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)

        order.markAsPaid()
        orderRepository.save(order)
    }

    @TransactionalEventListener
    fun onStockReservationFailed(event: StockReservationFailed) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)

        order.cancel("Stock reservation failed: ${event.reason}")
        orderRepository.save(order)

        eventPublisher.publish(OrderCancelled(order.id, event.reason))
    }
}

Inventory Service

@Service
class InventoryService(
    private val inventoryRepository: InventoryRepository,
    private val eventPublisher: InventoryEventPublisher
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun onOrderCreated(event: OrderCreated) {
        try {
            val reservations = event.items.map { item ->
                val inventory = inventoryRepository.findByProductId(item.productId)
                    ?: throw ProductNotFoundException(item.productId)

                if (inventory.availableQuantity < item.quantity) {
                    throw InsufficientStockException(item.productId)
                }

                inventory.reserve(item.quantity)
                inventoryRepository.save(inventory)

                StockReservation(
                    productId = item.productId,
                    quantity = item.quantity,
                    reservationId = UUID.randomUUID().toString()
                )
            }

            eventPublisher.publish(
                StockReserved(
                    orderId = event.orderId,
                    reservations = reservations
                )
            )
        } catch (e: Exception) {
            eventPublisher.publish(
                StockReservationFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Unknown error"
                )
            )
        }
    }

    @KafkaListener(topics = ["payment-events"])
    @Transactional
    fun onPaymentFailed(event: PaymentFailed) {
        // Compensating transaction: release reserved stock
        val reservations = stockReservationRepository.findByOrderId(event.orderId)

        reservations.forEach { reservation ->
            val inventory = inventoryRepository.findByProductId(reservation.productId)!!
            inventory.releaseReservation(reservation.quantity)
            inventoryRepository.save(inventory)
        }

        stockReservationRepository.deleteAll(reservations)

        eventPublisher.publish(
            StockReleased(orderId = event.orderId)
        )
    }
}

Payment Service

@Service
class PaymentService(
    private val paymentRepository: PaymentRepository,
    private val paymentGateway: PaymentGateway,
    private val eventPublisher: PaymentEventPublisher
) {
    @KafkaListener(topics = ["inventory-events"])
    @Transactional
    fun onStockReserved(event: StockReserved) {
        try {
            val order = orderClient.getOrder(event.orderId)

            val paymentResult = paymentGateway.charge(
                customerId = order.customerId,
                amount = order.totalAmount
            )

            val payment = Payment(
                orderId = event.orderId,
                amount = order.totalAmount,
                transactionId = paymentResult.transactionId,
                status = PaymentStatus.COMPLETED
            )
            paymentRepository.save(payment)

            eventPublisher.publish(
                PaymentCompleted(
                    orderId = event.orderId,
                    transactionId = paymentResult.transactionId
                )
            )
        } catch (e: PaymentException) {
            eventPublisher.publish(
                PaymentFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Payment failed"
                )
            )
        }
    }
}

Pros and Cons of Choreography

Pros Cons
Loose coupling Difficult to understand full flow
Simple implementation Risk of circular dependencies
Service autonomy Difficult debugging
Scalability Complex testing

Orchestration Approach

Architecture

                    ┌─────────────────┐
                    │     Saga        │
                    │  Orchestrator   │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
    ┌─────────┐        ┌─────────┐        ┌─────────┐
    │  Order  │        │Inventory│        │ Payment │
    │ Service │        │ Service │        │ Service │
    └─────────┘        └─────────┘        └─────────┘

Saga State Machine

enum class OrderSagaState {
    STARTED,
    STOCK_RESERVING,
    STOCK_RESERVED,
    STOCK_RESERVATION_FAILED,
    PAYMENT_PROCESSING,
    PAYMENT_COMPLETED,
    PAYMENT_FAILED,
    SHIPPING_SCHEDULED,
    COMPLETED,
    COMPENSATING,
    COMPENSATED,
    FAILED
}

enum class OrderSagaEvent {
    START,
    STOCK_RESERVE_SUCCESS,
    STOCK_RESERVE_FAIL,
    PAYMENT_SUCCESS,
    PAYMENT_FAIL,
    SHIPPING_SUCCESS,
    SHIPPING_FAIL,
    COMPENSATE_COMPLETE
}

@Entity
@Table(name = "order_saga")
class OrderSaga(
    @Id
    val sagaId: String = UUID.randomUUID().toString(),

    @Column(nullable = false)
    val orderId: String,

    @Enumerated(EnumType.STRING)
    var state: OrderSagaState = OrderSagaState.STARTED,

    @Column(columnDefinition = "TEXT")
    var context: String = "{}",  // Store context as JSON

    val createdAt: Instant = Instant.now(),
    var updatedAt: Instant = Instant.now()
)

Saga Orchestrator

@Service
class OrderSagaOrchestrator(
    private val sagaRepository: OrderSagaRepository,
    private val inventoryClient: InventoryClient,
    private val paymentClient: PaymentClient,
    private val shippingClient: ShippingClient,
    private val orderClient: OrderClient,
    private val objectMapper: ObjectMapper
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Transactional
    fun startSaga(orderId: String): OrderSaga {
        val saga = OrderSaga(orderId = orderId)
        sagaRepository.save(saga)

        processNextStep(saga)
        return saga
    }

    @Transactional
    fun handleEvent(sagaId: String, event: OrderSagaEvent, payload: Any? = null) {
        val saga = sagaRepository.findById(sagaId)
            ?: throw SagaNotFoundException(sagaId)

        val newState = transition(saga.state, event)
        saga.state = newState
        saga.updatedAt = Instant.now()

        if (payload != null) {
            val context = objectMapper.readTree(saga.context).deepCopy() as ObjectNode
            context.set<ObjectNode>(event.name, objectMapper.valueToTree(payload))
            saga.context = objectMapper.writeValueAsString(context)
        }

        sagaRepository.save(saga)

        processNextStep(saga)
    }

    private fun transition(currentState: OrderSagaState, event: OrderSagaEvent): OrderSagaState {
        return when (currentState to event) {
            OrderSagaState.STARTED to OrderSagaEvent.START ->
                OrderSagaState.STOCK_RESERVING

            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_SUCCESS ->
                OrderSagaState.STOCK_RESERVED

            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_FAIL ->
                OrderSagaState.STOCK_RESERVATION_FAILED

            OrderSagaState.STOCK_RESERVED to OrderSagaEvent.START ->
                OrderSagaState.PAYMENT_PROCESSING

            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_SUCCESS ->
                OrderSagaState.PAYMENT_COMPLETED

            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_FAIL ->
                OrderSagaState.COMPENSATING

            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_SUCCESS ->
                OrderSagaState.COMPLETED

            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_FAIL ->
                OrderSagaState.COMPENSATING

            OrderSagaState.COMPENSATING to OrderSagaEvent.COMPENSATE_COMPLETE ->
                OrderSagaState.COMPENSATED

            else -> throw InvalidStateTransitionException(currentState, event)
        }
    }

    // ... remaining implementation same as Korean version
}

Pros and Cons of Orchestration

Pros Cons
Easy to understand full flow Centralization
Easy debugging Single point of failure
Clear responsibilities Orchestrator complexity
Easy testing Increased coupling

Summary

Saga Pattern selection guide:

Situation Recommended Approach
Simple flow (2-3 services) Choreography
Complex flow Orchestration
Service autonomy important Choreography
Visibility/debugging important Orchestration
Frequent flow changes Orchestration

In the next post, we’ll cover Event Schema evolution and versioning.

댓글남기기