7 분 소요


Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS with Separate Read/Write Models (Current)
  4. Part 4: Saga Pattern for Distributed Transactions
  5. Part 5: Event Schema Evolution and Versioning

What is CQRS?

CQRS (Command Query Responsibility Segregation) is a pattern that separates reads (Query) and writes (Command).

Why CQRS?

Limitations of traditional CRUD model:

  • Read and write requirements are different
  • Multiple table joins needed for complex queries
  • Read optimization and write optimization conflict
  • Scaling is difficult

Benefits of CQRS:

  • Can use models optimized for each read/write
  • Independent scaling
  • Separation of complex domain logic and query logic
  • Natural combination with Event Sourcing

Architecture Overview

                    ┌─────────────────┐
                    │     Client      │
                    └────────┬────────┘
                             │
           ┌─────────────────┴─────────────────┐
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │  Command    │                    │   Query     │
    │  Service    │                    │   Service   │
    └──────┬──────┘                    └──────┬──────┘
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │   Write     │                    │    Read     │
    │   Model     │─────Events────────▶│    Model    │
    │  (PostgreSQL)│                   │(Elasticsearch)│
    └─────────────┘                    └─────────────┘

Implementation

Command Side

Command Definition

// Commands
sealed interface OrderCommand {
    val orderId: String
}

data class CreateOrderCommand(
    override val orderId: String = UUID.randomUUID().toString(),
    val customerId: String,
    val items: List<OrderItemDto>
) : OrderCommand

data class ShipOrderCommand(
    override val orderId: String,
    val shippingAddress: String,
    val trackingNumber: String
) : OrderCommand

data class CancelOrderCommand(
    override val orderId: String,
    val reason: String
) : OrderCommand

Command Handler

@Service
class OrderCommandHandler(
    private val orderRepository: OrderRepository,
    private val eventPublisher: ApplicationEventPublisher
) {
    @Transactional
    fun handle(command: CreateOrderCommand): String {
        val order = Order.create(
            orderId = command.orderId,
            customerId = command.customerId,
            items = command.items.map { it.toOrderItem() }
        )

        orderRepository.save(order)

        // Publish domain events
        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()

        return order.id
    }

    @Transactional
    fun handle(command: ShipOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)

        order.ship(command.shippingAddress, command.trackingNumber)
        orderRepository.save(order)

        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }

    @Transactional
    fun handle(command: CancelOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)

        order.cancel(command.reason)
        orderRepository.save(order)

        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }
}

Command Controller

@RestController
@RequestMapping("/api/orders")
class OrderCommandController(
    private val commandHandler: OrderCommandHandler
) {
    @PostMapping
    fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<OrderIdResponse> {
        val orderId = commandHandler.handle(
            CreateOrderCommand(
                customerId = request.customerId,
                items = request.items
            )
        )
        return ResponseEntity
            .created(URI.create("/api/orders/$orderId"))
            .body(OrderIdResponse(orderId))
    }

    @PostMapping("/{orderId}/ship")
    fun shipOrder(
        @PathVariable orderId: String,
        @RequestBody request: ShipOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            ShipOrderCommand(
                orderId = orderId,
                shippingAddress = request.shippingAddress,
                trackingNumber = request.trackingNumber
            )
        )
        return ResponseEntity.ok().build()
    }

    @PostMapping("/{orderId}/cancel")
    fun cancelOrder(
        @PathVariable orderId: String,
        @RequestBody request: CancelOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            CancelOrderCommand(
                orderId = orderId,
                reason = request.reason
            )
        )
        return ResponseEntity.ok().build()
    }
}

Query Side

Read Model

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val customerId: String,
    val customerName: String,
    val status: String,
    val items: List<OrderItemReadModel>,
    val totalAmount: BigDecimal,
    val itemCount: Int,
    val shippingAddress: String?,
    val trackingNumber: String?,
    val createdAt: Instant,
    val updatedAt: Instant
)

data class OrderItemReadModel(
    val productId: String,
    val productName: String,
    val quantity: Int,
    val price: BigDecimal
)

Event Handler (Projector)

@Component
class OrderProjector(
    private val orderReadModelRepository: OrderReadModelRepository,
    private val customerService: CustomerService,
    private val productService: ProductService
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @EventListener
    @Async
    fun on(event: OrderCreated) {
        logger.info("Projecting OrderCreated: ${event.aggregateId}")

        val customer = customerService.getCustomer(event.customerId)
        val products = productService.getProducts(event.items.map { it.productId })

        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            customerId = event.customerId,
            customerName = customer.name,
            status = "CREATED",
            items = event.items.map { item ->
                val product = products.find { it.id == item.productId }!!
                OrderItemReadModel(
                    productId = item.productId,
                    productName = product.name,
                    quantity = item.quantity,
                    price = item.price
                )
            },
            totalAmount = event.totalAmount,
            itemCount = event.items.size,
            shippingAddress = null,
            trackingNumber = null,
            createdAt = event.occurredAt,
            updatedAt = event.occurredAt
        )

        orderReadModelRepository.save(readModel)
    }

    @EventListener
    @Async
    fun on(event: OrderShipped) {
        logger.info("Projecting OrderShipped: ${event.aggregateId}")

        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                shippingAddress = event.shippingAddress,
                trackingNumber = event.trackingNumber,
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }

    @EventListener
    @Async
    fun on(event: OrderCancelled) {
        logger.info("Projecting OrderCancelled: ${event.aggregateId}")

        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "CANCELLED",
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Query Service

@Service
class OrderQueryService(
    private val orderReadModelRepository: OrderReadModelRepository
) {
    fun findById(orderId: String): OrderReadModel? {
        return orderReadModelRepository.findById(orderId).orElse(null)
    }

    fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.findByCustomerId(customerId, pageable)
    }

    fun searchOrders(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.search(criteria, pageable)
    }

    fun getOrderStatistics(customerId: String): OrderStatistics {
        val orders = orderReadModelRepository.findByCustomerId(customerId)
        return OrderStatistics(
            totalOrders = orders.size,
            totalAmount = orders.sumOf { it.totalAmount },
            averageOrderAmount = orders.map { it.totalAmount }
                .takeIf { it.isNotEmpty() }
                ?.let { amounts -> amounts.reduce { a, b -> a + b } / amounts.size.toBigDecimal() }
                ?: BigDecimal.ZERO,
            ordersByStatus = orders.groupBy { it.status }.mapValues { it.value.size }
        )
    }
}

Query Controller

@RestController
@RequestMapping("/api/orders")
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderReadModel> {
        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }

    @GetMapping
    fun searchOrders(
        @RequestParam(required = false) customerId: String?,
        @RequestParam(required = false) status: String?,
        @RequestParam(required = false) fromDate: Instant?,
        @RequestParam(required = false) toDate: Instant?,
        pageable: Pageable
    ): ResponseEntity<Page<OrderReadModel>> {
        val criteria = OrderSearchCriteria(
            customerId = customerId,
            status = status,
            fromDate = fromDate,
            toDate = toDate
        )
        return ResponseEntity.ok(queryService.searchOrders(criteria, pageable))
    }

    @GetMapping("/customers/{customerId}/statistics")
    fun getCustomerStatistics(
        @PathVariable customerId: String
    ): ResponseEntity<OrderStatistics> {
        return ResponseEntity.ok(queryService.getOrderStatistics(customerId))
    }
}

Elasticsearch Repository

interface OrderReadModelRepository : ElasticsearchRepository<OrderReadModel, String> {

    fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel>

    fun findByCustomerId(customerId: String): List<OrderReadModel>

    fun findByStatus(status: String, pageable: Pageable): Page<OrderReadModel>

    @Query("""
        {
            "bool": {
                "must": [
                    {"match": {"customerId": "?0"}}
                ],
                "filter": [
                    {"range": {"createdAt": {"gte": "?1", "lte": "?2"}}}
                ]
            }
        }
    """)
    fun findByCustomerIdAndDateRange(
        customerId: String,
        fromDate: Instant,
        toDate: Instant,
        pageable: Pageable
    ): Page<OrderReadModel>
}

@Repository
class OrderReadModelRepositoryCustomImpl(
    private val elasticsearchOperations: ElasticsearchOperations
) : OrderReadModelRepositoryCustom {

    override fun search(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
        val queryBuilder = BoolQuery.Builder()

        criteria.customerId?.let {
            queryBuilder.must(MatchQuery.of { q -> q.field("customerId").query(it) }._toQuery())
        }

        criteria.status?.let {
            queryBuilder.must(MatchQuery.of { q -> q.field("status").query(it) }._toQuery())
        }

        if (criteria.fromDate != null || criteria.toDate != null) {
            val rangeQuery = RangeQuery.of { r ->
                r.field("createdAt")
                criteria.fromDate?.let { r.gte(JsonData.of(it.toString())) }
                criteria.toDate?.let { r.lte(JsonData.of(it.toString())) }
                r
            }
            queryBuilder.filter(rangeQuery._toQuery())
        }

        val searchQuery = NativeQuery.builder()
            .withQuery(queryBuilder.build()._toQuery())
            .withPageable(pageable)
            .build()

        val hits = elasticsearchOperations.search(searchQuery, OrderReadModel::class.java)

        return PageImpl(
            hits.searchHits.map { it.content },
            pageable,
            hits.totalHits
        )
    }
}

Handling Eventual Consistency

In CQRS, the read model has eventual consistency with the write model. Strategies to handle this:

1. Optimistic UI Update

// Frontend: React example
const createOrder = async (orderData: CreateOrderRequest) => {
  // Optimistically add to local state
  const tempId = generateTempId();
  setOrders(prev => [...prev, { ...orderData, id: tempId, status: 'CREATING' }]);

  try {
    const response = await api.post('/orders', orderData);
    // Replace temp order with real one
    setOrders(prev => prev.map(o =>
      o.id === tempId ? { ...o, id: response.data.orderId, status: 'CREATED' } : o
    ));
  } catch (error) {
    // Remove temp order on failure
    setOrders(prev => prev.filter(o => o.id !== tempId));
    throw error;
  }
};

2. Polling for Consistency

@RestController
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(
        @PathVariable orderId: String,
        @RequestParam(required = false, defaultValue = "false") waitForConsistency: Boolean,
        @RequestParam(required = false, defaultValue = "5000") timeoutMs: Long
    ): ResponseEntity<OrderReadModel> {
        if (waitForConsistency) {
            return waitForOrder(orderId, timeoutMs)
        }

        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }

    private fun waitForOrder(orderId: String, timeoutMs: Long): ResponseEntity<OrderReadModel> {
        val startTime = System.currentTimeMillis()

        while (System.currentTimeMillis() - startTime < timeoutMs) {
            queryService.findById(orderId)?.let {
                return ResponseEntity.ok(it)
            }
            Thread.sleep(100)
        }

        throw OrderNotFoundException(orderId)
    }
}

3. Version-based Consistency Check

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val version: Long,  // Add version field
    // ... other fields
)

@Component
class OrderProjector {
    @EventListener
    @Async
    fun on(event: OrderCreated) {
        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            version = 1,
            // ...
        )
        orderReadModelRepository.save(readModel)
    }

    @EventListener
    @Async
    fun on(event: OrderShipped) {
        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                version = order.version + 1,
                // ...
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Read Model Rebuild

One advantage of CQRS combined with event sourcing is that you can rebuild the Read Model at any time.

@Service
class ReadModelRebuilder(
    private val eventStore: EventStore,
    private val orderReadModelRepository: OrderReadModelRepository,
    private val orderProjector: OrderProjector
) {
    @Async
    fun rebuildOrderReadModels() {
        logger.info("Starting read model rebuild...")

        // Clear existing read models
        orderReadModelRepository.deleteAll()

        // Replay all events
        val aggregateIds = eventStore.getAllAggregateIds("Order")

        aggregateIds.forEach { aggregateId ->
            val events = eventStore.getEvents(aggregateId)
            events.forEach { event ->
                when (event) {
                    is OrderCreated -> orderProjector.on(event)
                    is OrderShipped -> orderProjector.on(event)
                    is OrderCancelled -> orderProjector.on(event)
                }
            }
        }

        logger.info("Read model rebuild completed for ${aggregateIds.size} aggregates")
    }
}

Summary

Considerations when applying CQRS:

Pros Cons
Independent optimization for read/write Increased complexity
Independent scaling Need to handle eventual consistency
Flexible query models Increased infrastructure costs
Natural combination with Event Sourcing Learning curve

In the next post, we’ll cover the Saga Pattern for handling distributed transactions.

댓글남기기