Production-Ready Event-Driven Architecture Part 4 - Saga Pattern for Distributed Transactions
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals
- Part 2: Implementing the Outbox Pattern
- Part 3: CQRS with Separate Read/Write Models
- Part 4: Saga Pattern for Distributed Transactions (Current)
- Part 5: Event Schema Evolution and Versioning
The Distributed Transaction Problem
In a microservices environment, a single business transaction spans multiple services.
Example: Order Processing Flow
Create Order → Check Inventory → Process Payment → Schedule Shipping
Each step is handled by a different service, and traditional ACID transactions cannot be used.
Introduction to Saga Pattern
A Saga consists of a sequence of local transactions. Each local transaction triggers the next transaction, and on failure, executes compensating transactions to undo previous transactions.
Two Implementation Approaches
- Choreography: Each service publishes and subscribes to events
- Orchestration: A central coordinator manages the Saga
Choreography Approach
Architecture
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Order │────▶│Inventory│────▶│ Payment │────▶│Shipping │
│ Service │ │ Service │ │ Service │ │ Service │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ │ │
│ OrderCreated │ StockReserved │PaymentCompleted│
└───────────────┴───────────────┴───────────────┘
Event Bus (Kafka)
Order Service
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val eventPublisher: OrderEventPublisher
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
val order = Order.create(
customerId = command.customerId,
items = command.items
)
orderRepository.save(order)
eventPublisher.publish(
OrderCreated(
orderId = order.id,
customerId = order.customerId,
items = order.items.map { OrderItemDto(it) },
totalAmount = order.totalAmount
)
)
return order
}
@TransactionalEventListener
fun onPaymentCompleted(event: PaymentCompleted) {
val order = orderRepository.findById(event.orderId)
?: throw OrderNotFoundException(event.orderId)
order.markAsPaid()
orderRepository.save(order)
}
@TransactionalEventListener
fun onStockReservationFailed(event: StockReservationFailed) {
val order = orderRepository.findById(event.orderId)
?: throw OrderNotFoundException(event.orderId)
order.cancel("Stock reservation failed: ${event.reason}")
orderRepository.save(order)
eventPublisher.publish(OrderCancelled(order.id, event.reason))
}
}
Inventory Service
@Service
class InventoryService(
private val inventoryRepository: InventoryRepository,
private val eventPublisher: InventoryEventPublisher
) {
@KafkaListener(topics = ["order-events"])
@Transactional
fun onOrderCreated(event: OrderCreated) {
try {
val reservations = event.items.map { item ->
val inventory = inventoryRepository.findByProductId(item.productId)
?: throw ProductNotFoundException(item.productId)
if (inventory.availableQuantity < item.quantity) {
throw InsufficientStockException(item.productId)
}
inventory.reserve(item.quantity)
inventoryRepository.save(inventory)
StockReservation(
productId = item.productId,
quantity = item.quantity,
reservationId = UUID.randomUUID().toString()
)
}
eventPublisher.publish(
StockReserved(
orderId = event.orderId,
reservations = reservations
)
)
} catch (e: Exception) {
eventPublisher.publish(
StockReservationFailed(
orderId = event.orderId,
reason = e.message ?: "Unknown error"
)
)
}
}
@KafkaListener(topics = ["payment-events"])
@Transactional
fun onPaymentFailed(event: PaymentFailed) {
// Compensating transaction: release reserved stock
val reservations = stockReservationRepository.findByOrderId(event.orderId)
reservations.forEach { reservation ->
val inventory = inventoryRepository.findByProductId(reservation.productId)!!
inventory.releaseReservation(reservation.quantity)
inventoryRepository.save(inventory)
}
stockReservationRepository.deleteAll(reservations)
eventPublisher.publish(
StockReleased(orderId = event.orderId)
)
}
}
Payment Service
@Service
class PaymentService(
private val paymentRepository: PaymentRepository,
private val paymentGateway: PaymentGateway,
private val eventPublisher: PaymentEventPublisher
) {
@KafkaListener(topics = ["inventory-events"])
@Transactional
fun onStockReserved(event: StockReserved) {
try {
val order = orderClient.getOrder(event.orderId)
val paymentResult = paymentGateway.charge(
customerId = order.customerId,
amount = order.totalAmount
)
val payment = Payment(
orderId = event.orderId,
amount = order.totalAmount,
transactionId = paymentResult.transactionId,
status = PaymentStatus.COMPLETED
)
paymentRepository.save(payment)
eventPublisher.publish(
PaymentCompleted(
orderId = event.orderId,
transactionId = paymentResult.transactionId
)
)
} catch (e: PaymentException) {
eventPublisher.publish(
PaymentFailed(
orderId = event.orderId,
reason = e.message ?: "Payment failed"
)
)
}
}
}
Pros and Cons of Choreography
| Pros | Cons |
|---|---|
| Loose coupling | Difficult to understand full flow |
| Simple implementation | Risk of circular dependencies |
| Service autonomy | Difficult debugging |
| Scalability | Complex testing |
Orchestration Approach
Architecture
┌─────────────────┐
│ Saga │
│ Orchestrator │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Order │ │Inventory│ │ Payment │
│ Service │ │ Service │ │ Service │
└─────────┘ └─────────┘ └─────────┘
Saga State Machine
enum class OrderSagaState {
STARTED,
STOCK_RESERVING,
STOCK_RESERVED,
STOCK_RESERVATION_FAILED,
PAYMENT_PROCESSING,
PAYMENT_COMPLETED,
PAYMENT_FAILED,
SHIPPING_SCHEDULED,
COMPLETED,
COMPENSATING,
COMPENSATED,
FAILED
}
enum class OrderSagaEvent {
START,
STOCK_RESERVE_SUCCESS,
STOCK_RESERVE_FAIL,
PAYMENT_SUCCESS,
PAYMENT_FAIL,
SHIPPING_SUCCESS,
SHIPPING_FAIL,
COMPENSATE_COMPLETE
}
@Entity
@Table(name = "order_saga")
class OrderSaga(
@Id
val sagaId: String = UUID.randomUUID().toString(),
@Column(nullable = false)
val orderId: String,
@Enumerated(EnumType.STRING)
var state: OrderSagaState = OrderSagaState.STARTED,
@Column(columnDefinition = "TEXT")
var context: String = "{}", // Store context as JSON
val createdAt: Instant = Instant.now(),
var updatedAt: Instant = Instant.now()
)
Saga Orchestrator
@Service
class OrderSagaOrchestrator(
private val sagaRepository: OrderSagaRepository,
private val inventoryClient: InventoryClient,
private val paymentClient: PaymentClient,
private val shippingClient: ShippingClient,
private val orderClient: OrderClient,
private val objectMapper: ObjectMapper
) {
private val logger = LoggerFactory.getLogger(javaClass)
@Transactional
fun startSaga(orderId: String): OrderSaga {
val saga = OrderSaga(orderId = orderId)
sagaRepository.save(saga)
processNextStep(saga)
return saga
}
@Transactional
fun handleEvent(sagaId: String, event: OrderSagaEvent, payload: Any? = null) {
val saga = sagaRepository.findById(sagaId)
?: throw SagaNotFoundException(sagaId)
val newState = transition(saga.state, event)
saga.state = newState
saga.updatedAt = Instant.now()
if (payload != null) {
val context = objectMapper.readTree(saga.context).deepCopy() as ObjectNode
context.set<ObjectNode>(event.name, objectMapper.valueToTree(payload))
saga.context = objectMapper.writeValueAsString(context)
}
sagaRepository.save(saga)
processNextStep(saga)
}
private fun transition(currentState: OrderSagaState, event: OrderSagaEvent): OrderSagaState {
return when (currentState to event) {
OrderSagaState.STARTED to OrderSagaEvent.START ->
OrderSagaState.STOCK_RESERVING
OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_SUCCESS ->
OrderSagaState.STOCK_RESERVED
OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_FAIL ->
OrderSagaState.STOCK_RESERVATION_FAILED
OrderSagaState.STOCK_RESERVED to OrderSagaEvent.START ->
OrderSagaState.PAYMENT_PROCESSING
OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_SUCCESS ->
OrderSagaState.PAYMENT_COMPLETED
OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_FAIL ->
OrderSagaState.COMPENSATING
OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_SUCCESS ->
OrderSagaState.COMPLETED
OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_FAIL ->
OrderSagaState.COMPENSATING
OrderSagaState.COMPENSATING to OrderSagaEvent.COMPENSATE_COMPLETE ->
OrderSagaState.COMPENSATED
else -> throw InvalidStateTransitionException(currentState, event)
}
}
private fun processNextStep(saga: OrderSaga) {
when (saga.state) {
OrderSagaState.STARTED -> {
handleEvent(saga.sagaId, OrderSagaEvent.START)
}
OrderSagaState.STOCK_RESERVING -> {
reserveStock(saga)
}
OrderSagaState.STOCK_RESERVED -> {
handleEvent(saga.sagaId, OrderSagaEvent.START)
}
OrderSagaState.PAYMENT_PROCESSING -> {
processPayment(saga)
}
OrderSagaState.PAYMENT_COMPLETED -> {
scheduleShipping(saga)
}
OrderSagaState.COMPENSATING -> {
compensate(saga)
}
OrderSagaState.COMPLETED -> {
logger.info("Saga completed successfully: ${saga.sagaId}")
}
OrderSagaState.COMPENSATED -> {
logger.info("Saga compensated: ${saga.sagaId}")
markOrderAsFailed(saga)
}
else -> {
logger.warn("No action for state: ${saga.state}")
}
}
}
@Async
fun reserveStock(saga: OrderSaga) {
try {
val order = orderClient.getOrder(saga.orderId)
val result = inventoryClient.reserveStock(
ReserveStockRequest(
orderId = saga.orderId,
items = order.items
)
)
handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_SUCCESS, result)
} catch (e: Exception) {
logger.error("Stock reservation failed for saga: ${saga.sagaId}", e)
handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_FAIL, e.message)
}
}
@Async
fun processPayment(saga: OrderSaga) {
try {
val order = orderClient.getOrder(saga.orderId)
val result = paymentClient.processPayment(
ProcessPaymentRequest(
orderId = saga.orderId,
customerId = order.customerId,
amount = order.totalAmount
)
)
handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_SUCCESS, result)
} catch (e: Exception) {
logger.error("Payment failed for saga: ${saga.sagaId}", e)
handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_FAIL, e.message)
}
}
@Async
fun scheduleShipping(saga: OrderSaga) {
try {
val result = shippingClient.scheduleShipping(
ScheduleShippingRequest(orderId = saga.orderId)
)
handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_SUCCESS, result)
} catch (e: Exception) {
logger.error("Shipping scheduling failed for saga: ${saga.sagaId}", e)
handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_FAIL, e.message)
}
}
@Async
fun compensate(saga: OrderSaga) {
logger.info("Starting compensation for saga: ${saga.sagaId}")
val context = objectMapper.readTree(saga.context)
// Execute compensating transactions in reverse order
try {
// Compensate Payment (if exists)
if (context.has("PAYMENT_SUCCESS")) {
val paymentInfo = context.get("PAYMENT_SUCCESS")
paymentClient.refund(
RefundRequest(
orderId = saga.orderId,
transactionId = paymentInfo.get("transactionId").asText()
)
)
}
// Compensate Stock (if exists)
if (context.has("STOCK_RESERVE_SUCCESS")) {
inventoryClient.releaseStock(
ReleaseStockRequest(orderId = saga.orderId)
)
}
handleEvent(saga.sagaId, OrderSagaEvent.COMPENSATE_COMPLETE)
} catch (e: Exception) {
logger.error("Compensation failed for saga: ${saga.sagaId}", e)
// Compensation failure requires manual intervention
markForManualIntervention(saga, e)
}
}
private fun markOrderAsFailed(saga: OrderSaga) {
orderClient.updateOrderStatus(saga.orderId, "FAILED")
}
private fun markForManualIntervention(saga: OrderSaga, error: Exception) {
saga.state = OrderSagaState.FAILED
sagaRepository.save(saga)
// Request manual intervention via alerts, notifications, etc.
}
}
Saga Step Definition (A More Elegant Approach)
@Configuration
class OrderSagaDefinition {
@Bean
fun orderSaga(): SagaDefinition<OrderSagaData> {
return SagaDefinition.builder<OrderSagaData>()
.step("reserve-stock")
.invokeParticipant { data -> inventoryClient.reserveStock(data.orderId, data.items) }
.withCompensation { data -> inventoryClient.releaseStock(data.orderId) }
.step("process-payment")
.invokeParticipant { data -> paymentClient.charge(data.orderId, data.amount) }
.withCompensation { data -> paymentClient.refund(data.orderId) }
.step("schedule-shipping")
.invokeParticipant { data -> shippingClient.schedule(data.orderId) }
.withCompensation { data -> shippingClient.cancel(data.orderId) }
.build()
}
}
data class OrderSagaData(
val orderId: String,
val customerId: String,
val items: List<OrderItem>,
val amount: BigDecimal
)
Pros and Cons of Orchestration
| Pros | Cons |
|---|---|
| Easy to understand full flow | Centralization |
| Easy debugging | Single point of failure |
| Clear responsibilities | Orchestrator complexity |
| Easy testing | Increased coupling |
Compensating Transaction Strategies
Semantic Lock
@Entity
class StockReservation(
@Id
val id: String = UUID.randomUUID().toString(),
val productId: String,
val orderId: String,
val quantity: Int,
val status: ReservationStatus = ReservationStatus.RESERVED,
val expiresAt: Instant = Instant.now().plus(Duration.ofMinutes(15))
)
enum class ReservationStatus {
RESERVED, CONFIRMED, RELEASED, EXPIRED
}
@Scheduled(fixedDelay = 60000)
fun releaseExpiredReservations() {
val expired = reservationRepository.findExpiredReservations(Instant.now())
expired.forEach { reservation ->
releaseReservation(reservation)
}
}
Compensating Transaction Log
@Entity
@Table(name = "compensation_log")
class CompensationLog(
@Id
val id: String = UUID.randomUUID().toString(),
val sagaId: String,
val stepName: String,
val compensationData: String,
var status: CompensationStatus = CompensationStatus.PENDING,
val createdAt: Instant = Instant.now(),
var executedAt: Instant? = null
)
@Service
class CompensationExecutor(
private val compensationLogRepository: CompensationLogRepository
) {
@Scheduled(fixedDelay = 10000)
@Transactional
fun executePendingCompensations() {
val pending = compensationLogRepository.findByStatus(CompensationStatus.PENDING)
pending.forEach { log ->
try {
executeCompensation(log)
log.status = CompensationStatus.COMPLETED
log.executedAt = Instant.now()
} catch (e: Exception) {
log.status = CompensationStatus.FAILED
// Retry logic or notification
}
compensationLogRepository.save(log)
}
}
}
Monitoring and Observability
Saga Status Query API
@RestController
@RequestMapping("/api/sagas")
class SagaController(
private val sagaRepository: OrderSagaRepository
) {
@GetMapping("/{sagaId}")
fun getSaga(@PathVariable sagaId: String): ResponseEntity<SagaStatusResponse> {
val saga = sagaRepository.findById(sagaId)
?: throw SagaNotFoundException(sagaId)
return ResponseEntity.ok(
SagaStatusResponse(
sagaId = saga.sagaId,
orderId = saga.orderId,
state = saga.state.name,
context = saga.context,
createdAt = saga.createdAt,
updatedAt = saga.updatedAt
)
)
}
@GetMapping
fun listSagas(
@RequestParam(required = false) state: OrderSagaState?,
pageable: Pageable
): ResponseEntity<Page<SagaStatusResponse>> {
val sagas = if (state != null) {
sagaRepository.findByState(state, pageable)
} else {
sagaRepository.findAll(pageable)
}
return ResponseEntity.ok(sagas.map { SagaStatusResponse(it) })
}
}
Summary
Saga Pattern selection guide:
| Situation | Recommended Approach |
|---|---|
| Simple flow (2-3 services) | Choreography |
| Complex flow | Orchestration |
| Service autonomy important | Choreography |
| Visibility/debugging important | Orchestration |
| Frequent flow changes | Orchestration |
In the next post, we will cover Event Schema evolution and versioning.
댓글남기기