7 분 소요


Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS with Separate Read/Write Models
  4. Part 4: Saga Pattern for Distributed Transactions (Current)
  5. Part 5: Event Schema Evolution and Versioning

The Distributed Transaction Problem

In a microservices environment, a single business transaction spans multiple services.

Example: Order Processing Flow

Create Order → Check Inventory → Process Payment → Schedule Shipping

Each step is handled by a different service, and traditional ACID transactions cannot be used.

Introduction to Saga Pattern

A Saga consists of a sequence of local transactions. Each local transaction triggers the next transaction, and on failure, executes compensating transactions to undo previous transactions.

Two Implementation Approaches

  1. Choreography: Each service publishes and subscribes to events
  2. Orchestration: A central coordinator manages the Saga

Choreography Approach

Architecture

┌─────────┐     ┌─────────┐     ┌─────────┐     ┌─────────┐
│  Order  │────▶│Inventory│────▶│ Payment │────▶│Shipping │
│ Service │     │ Service │     │ Service │     │ Service │
└────┬────┘     └────┬────┘     └────┬────┘     └────┬────┘
     │               │               │               │
     │  OrderCreated │ StockReserved │PaymentCompleted│
     └───────────────┴───────────────┴───────────────┘
                    Event Bus (Kafka)

Order Service

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val eventPublisher: OrderEventPublisher
) {
    @Transactional
    fun createOrder(command: CreateOrderCommand): Order {
        val order = Order.create(
            customerId = command.customerId,
            items = command.items
        )

        orderRepository.save(order)

        eventPublisher.publish(
            OrderCreated(
                orderId = order.id,
                customerId = order.customerId,
                items = order.items.map { OrderItemDto(it) },
                totalAmount = order.totalAmount
            )
        )

        return order
    }

    @TransactionalEventListener
    fun onPaymentCompleted(event: PaymentCompleted) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)

        order.markAsPaid()
        orderRepository.save(order)
    }

    @TransactionalEventListener
    fun onStockReservationFailed(event: StockReservationFailed) {
        val order = orderRepository.findById(event.orderId)
            ?: throw OrderNotFoundException(event.orderId)

        order.cancel("Stock reservation failed: ${event.reason}")
        orderRepository.save(order)

        eventPublisher.publish(OrderCancelled(order.id, event.reason))
    }
}

Inventory Service

@Service
class InventoryService(
    private val inventoryRepository: InventoryRepository,
    private val eventPublisher: InventoryEventPublisher
) {
    @KafkaListener(topics = ["order-events"])
    @Transactional
    fun onOrderCreated(event: OrderCreated) {
        try {
            val reservations = event.items.map { item ->
                val inventory = inventoryRepository.findByProductId(item.productId)
                    ?: throw ProductNotFoundException(item.productId)

                if (inventory.availableQuantity < item.quantity) {
                    throw InsufficientStockException(item.productId)
                }

                inventory.reserve(item.quantity)
                inventoryRepository.save(inventory)

                StockReservation(
                    productId = item.productId,
                    quantity = item.quantity,
                    reservationId = UUID.randomUUID().toString()
                )
            }

            eventPublisher.publish(
                StockReserved(
                    orderId = event.orderId,
                    reservations = reservations
                )
            )
        } catch (e: Exception) {
            eventPublisher.publish(
                StockReservationFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Unknown error"
                )
            )
        }
    }

    @KafkaListener(topics = ["payment-events"])
    @Transactional
    fun onPaymentFailed(event: PaymentFailed) {
        // Compensating transaction: release reserved stock
        val reservations = stockReservationRepository.findByOrderId(event.orderId)

        reservations.forEach { reservation ->
            val inventory = inventoryRepository.findByProductId(reservation.productId)!!
            inventory.releaseReservation(reservation.quantity)
            inventoryRepository.save(inventory)
        }

        stockReservationRepository.deleteAll(reservations)

        eventPublisher.publish(
            StockReleased(orderId = event.orderId)
        )
    }
}

Payment Service

@Service
class PaymentService(
    private val paymentRepository: PaymentRepository,
    private val paymentGateway: PaymentGateway,
    private val eventPublisher: PaymentEventPublisher
) {
    @KafkaListener(topics = ["inventory-events"])
    @Transactional
    fun onStockReserved(event: StockReserved) {
        try {
            val order = orderClient.getOrder(event.orderId)

            val paymentResult = paymentGateway.charge(
                customerId = order.customerId,
                amount = order.totalAmount
            )

            val payment = Payment(
                orderId = event.orderId,
                amount = order.totalAmount,
                transactionId = paymentResult.transactionId,
                status = PaymentStatus.COMPLETED
            )
            paymentRepository.save(payment)

            eventPublisher.publish(
                PaymentCompleted(
                    orderId = event.orderId,
                    transactionId = paymentResult.transactionId
                )
            )
        } catch (e: PaymentException) {
            eventPublisher.publish(
                PaymentFailed(
                    orderId = event.orderId,
                    reason = e.message ?: "Payment failed"
                )
            )
        }
    }
}

Pros and Cons of Choreography

Pros Cons
Loose coupling Difficult to understand full flow
Simple implementation Risk of circular dependencies
Service autonomy Difficult debugging
Scalability Complex testing

Orchestration Approach

Architecture

                    ┌─────────────────┐
                    │     Saga        │
                    │  Orchestrator   │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
    ┌─────────┐        ┌─────────┐        ┌─────────┐
    │  Order  │        │Inventory│        │ Payment │
    │ Service │        │ Service │        │ Service │
    └─────────┘        └─────────┘        └─────────┘

Saga State Machine

enum class OrderSagaState {
    STARTED,
    STOCK_RESERVING,
    STOCK_RESERVED,
    STOCK_RESERVATION_FAILED,
    PAYMENT_PROCESSING,
    PAYMENT_COMPLETED,
    PAYMENT_FAILED,
    SHIPPING_SCHEDULED,
    COMPLETED,
    COMPENSATING,
    COMPENSATED,
    FAILED
}

enum class OrderSagaEvent {
    START,
    STOCK_RESERVE_SUCCESS,
    STOCK_RESERVE_FAIL,
    PAYMENT_SUCCESS,
    PAYMENT_FAIL,
    SHIPPING_SUCCESS,
    SHIPPING_FAIL,
    COMPENSATE_COMPLETE
}

@Entity
@Table(name = "order_saga")
class OrderSaga(
    @Id
    val sagaId: String = UUID.randomUUID().toString(),

    @Column(nullable = false)
    val orderId: String,

    @Enumerated(EnumType.STRING)
    var state: OrderSagaState = OrderSagaState.STARTED,

    @Column(columnDefinition = "TEXT")
    var context: String = "{}",  // Store context as JSON

    val createdAt: Instant = Instant.now(),
    var updatedAt: Instant = Instant.now()
)

Saga Orchestrator

@Service
class OrderSagaOrchestrator(
    private val sagaRepository: OrderSagaRepository,
    private val inventoryClient: InventoryClient,
    private val paymentClient: PaymentClient,
    private val shippingClient: ShippingClient,
    private val orderClient: OrderClient,
    private val objectMapper: ObjectMapper
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @Transactional
    fun startSaga(orderId: String): OrderSaga {
        val saga = OrderSaga(orderId = orderId)
        sagaRepository.save(saga)

        processNextStep(saga)
        return saga
    }

    @Transactional
    fun handleEvent(sagaId: String, event: OrderSagaEvent, payload: Any? = null) {
        val saga = sagaRepository.findById(sagaId)
            ?: throw SagaNotFoundException(sagaId)

        val newState = transition(saga.state, event)
        saga.state = newState
        saga.updatedAt = Instant.now()

        if (payload != null) {
            val context = objectMapper.readTree(saga.context).deepCopy() as ObjectNode
            context.set<ObjectNode>(event.name, objectMapper.valueToTree(payload))
            saga.context = objectMapper.writeValueAsString(context)
        }

        sagaRepository.save(saga)

        processNextStep(saga)
    }

    private fun transition(currentState: OrderSagaState, event: OrderSagaEvent): OrderSagaState {
        return when (currentState to event) {
            OrderSagaState.STARTED to OrderSagaEvent.START ->
                OrderSagaState.STOCK_RESERVING

            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_SUCCESS ->
                OrderSagaState.STOCK_RESERVED

            OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_FAIL ->
                OrderSagaState.STOCK_RESERVATION_FAILED

            OrderSagaState.STOCK_RESERVED to OrderSagaEvent.START ->
                OrderSagaState.PAYMENT_PROCESSING

            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_SUCCESS ->
                OrderSagaState.PAYMENT_COMPLETED

            OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_FAIL ->
                OrderSagaState.COMPENSATING

            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_SUCCESS ->
                OrderSagaState.COMPLETED

            OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_FAIL ->
                OrderSagaState.COMPENSATING

            OrderSagaState.COMPENSATING to OrderSagaEvent.COMPENSATE_COMPLETE ->
                OrderSagaState.COMPENSATED

            else -> throw InvalidStateTransitionException(currentState, event)
        }
    }

    private fun processNextStep(saga: OrderSaga) {
        when (saga.state) {
            OrderSagaState.STARTED -> {
                handleEvent(saga.sagaId, OrderSagaEvent.START)
            }

            OrderSagaState.STOCK_RESERVING -> {
                reserveStock(saga)
            }

            OrderSagaState.STOCK_RESERVED -> {
                handleEvent(saga.sagaId, OrderSagaEvent.START)
            }

            OrderSagaState.PAYMENT_PROCESSING -> {
                processPayment(saga)
            }

            OrderSagaState.PAYMENT_COMPLETED -> {
                scheduleShipping(saga)
            }

            OrderSagaState.COMPENSATING -> {
                compensate(saga)
            }

            OrderSagaState.COMPLETED -> {
                logger.info("Saga completed successfully: ${saga.sagaId}")
            }

            OrderSagaState.COMPENSATED -> {
                logger.info("Saga compensated: ${saga.sagaId}")
                markOrderAsFailed(saga)
            }

            else -> {
                logger.warn("No action for state: ${saga.state}")
            }
        }
    }

    @Async
    fun reserveStock(saga: OrderSaga) {
        try {
            val order = orderClient.getOrder(saga.orderId)
            val result = inventoryClient.reserveStock(
                ReserveStockRequest(
                    orderId = saga.orderId,
                    items = order.items
                )
            )
            handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Stock reservation failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_FAIL, e.message)
        }
    }

    @Async
    fun processPayment(saga: OrderSaga) {
        try {
            val order = orderClient.getOrder(saga.orderId)
            val result = paymentClient.processPayment(
                ProcessPaymentRequest(
                    orderId = saga.orderId,
                    customerId = order.customerId,
                    amount = order.totalAmount
                )
            )
            handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Payment failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_FAIL, e.message)
        }
    }

    @Async
    fun scheduleShipping(saga: OrderSaga) {
        try {
            val result = shippingClient.scheduleShipping(
                ScheduleShippingRequest(orderId = saga.orderId)
            )
            handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_SUCCESS, result)
        } catch (e: Exception) {
            logger.error("Shipping scheduling failed for saga: ${saga.sagaId}", e)
            handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_FAIL, e.message)
        }
    }

    @Async
    fun compensate(saga: OrderSaga) {
        logger.info("Starting compensation for saga: ${saga.sagaId}")

        val context = objectMapper.readTree(saga.context)

        // Execute compensating transactions in reverse order
        try {
            // Compensate Payment (if exists)
            if (context.has("PAYMENT_SUCCESS")) {
                val paymentInfo = context.get("PAYMENT_SUCCESS")
                paymentClient.refund(
                    RefundRequest(
                        orderId = saga.orderId,
                        transactionId = paymentInfo.get("transactionId").asText()
                    )
                )
            }

            // Compensate Stock (if exists)
            if (context.has("STOCK_RESERVE_SUCCESS")) {
                inventoryClient.releaseStock(
                    ReleaseStockRequest(orderId = saga.orderId)
                )
            }

            handleEvent(saga.sagaId, OrderSagaEvent.COMPENSATE_COMPLETE)
        } catch (e: Exception) {
            logger.error("Compensation failed for saga: ${saga.sagaId}", e)
            // Compensation failure requires manual intervention
            markForManualIntervention(saga, e)
        }
    }

    private fun markOrderAsFailed(saga: OrderSaga) {
        orderClient.updateOrderStatus(saga.orderId, "FAILED")
    }

    private fun markForManualIntervention(saga: OrderSaga, error: Exception) {
        saga.state = OrderSagaState.FAILED
        sagaRepository.save(saga)
        // Request manual intervention via alerts, notifications, etc.
    }
}

Saga Step Definition (A More Elegant Approach)

@Configuration
class OrderSagaDefinition {

    @Bean
    fun orderSaga(): SagaDefinition<OrderSagaData> {
        return SagaDefinition.builder<OrderSagaData>()
            .step("reserve-stock")
                .invokeParticipant { data -> inventoryClient.reserveStock(data.orderId, data.items) }
                .withCompensation { data -> inventoryClient.releaseStock(data.orderId) }
            .step("process-payment")
                .invokeParticipant { data -> paymentClient.charge(data.orderId, data.amount) }
                .withCompensation { data -> paymentClient.refund(data.orderId) }
            .step("schedule-shipping")
                .invokeParticipant { data -> shippingClient.schedule(data.orderId) }
                .withCompensation { data -> shippingClient.cancel(data.orderId) }
            .build()
    }
}

data class OrderSagaData(
    val orderId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val amount: BigDecimal
)

Pros and Cons of Orchestration

Pros Cons
Easy to understand full flow Centralization
Easy debugging Single point of failure
Clear responsibilities Orchestrator complexity
Easy testing Increased coupling

Compensating Transaction Strategies

Semantic Lock

@Entity
class StockReservation(
    @Id
    val id: String = UUID.randomUUID().toString(),
    val productId: String,
    val orderId: String,
    val quantity: Int,
    val status: ReservationStatus = ReservationStatus.RESERVED,
    val expiresAt: Instant = Instant.now().plus(Duration.ofMinutes(15))
)

enum class ReservationStatus {
    RESERVED, CONFIRMED, RELEASED, EXPIRED
}

@Scheduled(fixedDelay = 60000)
fun releaseExpiredReservations() {
    val expired = reservationRepository.findExpiredReservations(Instant.now())
    expired.forEach { reservation ->
        releaseReservation(reservation)
    }
}

Compensating Transaction Log

@Entity
@Table(name = "compensation_log")
class CompensationLog(
    @Id
    val id: String = UUID.randomUUID().toString(),
    val sagaId: String,
    val stepName: String,
    val compensationData: String,
    var status: CompensationStatus = CompensationStatus.PENDING,
    val createdAt: Instant = Instant.now(),
    var executedAt: Instant? = null
)

@Service
class CompensationExecutor(
    private val compensationLogRepository: CompensationLogRepository
) {
    @Scheduled(fixedDelay = 10000)
    @Transactional
    fun executePendingCompensations() {
        val pending = compensationLogRepository.findByStatus(CompensationStatus.PENDING)

        pending.forEach { log ->
            try {
                executeCompensation(log)
                log.status = CompensationStatus.COMPLETED
                log.executedAt = Instant.now()
            } catch (e: Exception) {
                log.status = CompensationStatus.FAILED
                // Retry logic or notification
            }
            compensationLogRepository.save(log)
        }
    }
}

Monitoring and Observability

Saga Status Query API

@RestController
@RequestMapping("/api/sagas")
class SagaController(
    private val sagaRepository: OrderSagaRepository
) {
    @GetMapping("/{sagaId}")
    fun getSaga(@PathVariable sagaId: String): ResponseEntity<SagaStatusResponse> {
        val saga = sagaRepository.findById(sagaId)
            ?: throw SagaNotFoundException(sagaId)

        return ResponseEntity.ok(
            SagaStatusResponse(
                sagaId = saga.sagaId,
                orderId = saga.orderId,
                state = saga.state.name,
                context = saga.context,
                createdAt = saga.createdAt,
                updatedAt = saga.updatedAt
            )
        )
    }

    @GetMapping
    fun listSagas(
        @RequestParam(required = false) state: OrderSagaState?,
        pageable: Pageable
    ): ResponseEntity<Page<SagaStatusResponse>> {
        val sagas = if (state != null) {
            sagaRepository.findByState(state, pageable)
        } else {
            sagaRepository.findAll(pageable)
        }

        return ResponseEntity.ok(sagas.map { SagaStatusResponse(it) })
    }
}

Summary

Saga Pattern selection guide:

Situation Recommended Approach
Simple flow (2-3 services) Choreography
Complex flow Orchestration
Service autonomy important Choreography
Visibility/debugging important Orchestration
Frequent flow changes Orchestration

In the next post, we will cover Event Schema evolution and versioning.

댓글남기기