13 분 소요


시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리 (현재 글)
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

CQRS란?

CQRS(Command Query Responsibility Segregation)는 읽기(Query)와 쓰기(Command)를 분리하는 패턴입니다.

왜 CQRS인가?

전통적인 CRUD 모델의 한계:

  • 읽기와 쓰기의 요구사항이 다름
  • 복잡한 쿼리를 위해 여러 테이블 조인 필요
  • 읽기 최적화와 쓰기 최적화가 충돌
  • 스케일링이 어려움

CQRS의 장점:

  • 읽기/쓰기 각각에 최적화된 모델 사용 가능
  • 독립적인 스케일링
  • 복잡한 도메인 로직과 쿼리 로직 분리
  • Event Sourcing과의 자연스러운 결합

아키텍처 개요

                    ┌─────────────────┐
                    │     Client      │
                    └────────┬────────┘
                             │
           ┌─────────────────┴─────────────────┐
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │  Command    │                    │   Query     │
    │  Service    │                    │   Service   │
    └──────┬──────┘                    └──────┬──────┘
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │   Write     │                    │    Read     │
    │   Model     │─────Events────────▶│    Model    │
    │  (PostgreSQL)│                   │(Elasticsearch)│
    └─────────────┘                    └─────────────┘

구현

Command Side

Command 정의

// Commands
sealed interface OrderCommand {
    val orderId: String
}

data class CreateOrderCommand(
    override val orderId: String = UUID.randomUUID().toString(),
    val customerId: String,
    val items: List<OrderItemDto>
) : OrderCommand

data class ShipOrderCommand(
    override val orderId: String,
    val shippingAddress: String,
    val trackingNumber: String
) : OrderCommand

data class CancelOrderCommand(
    override val orderId: String,
    val reason: String
) : OrderCommand

Command Handler

@Service
class OrderCommandHandler(
    private val orderRepository: OrderRepository,
    private val eventPublisher: ApplicationEventPublisher
) {
    @Transactional
    fun handle(command: CreateOrderCommand): String {
        val order = Order.create(
            orderId = command.orderId,
            customerId = command.customerId,
            items = command.items.map { it.toOrderItem() }
        )

        orderRepository.save(order)

        // Publish domain events
        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()

        return order.id
    }

    @Transactional
    fun handle(command: ShipOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)

        order.ship(command.shippingAddress, command.trackingNumber)
        orderRepository.save(order)

        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }

    @Transactional
    fun handle(command: CancelOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)

        order.cancel(command.reason)
        orderRepository.save(order)

        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }
}

Command Controller

@RestController
@RequestMapping("/api/orders")
class OrderCommandController(
    private val commandHandler: OrderCommandHandler
) {
    @PostMapping
    fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<OrderIdResponse> {
        val orderId = commandHandler.handle(
            CreateOrderCommand(
                customerId = request.customerId,
                items = request.items
            )
        )
        return ResponseEntity
            .created(URI.create("/api/orders/$orderId"))
            .body(OrderIdResponse(orderId))
    }

    @PostMapping("/{orderId}/ship")
    fun shipOrder(
        @PathVariable orderId: String,
        @RequestBody request: ShipOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            ShipOrderCommand(
                orderId = orderId,
                shippingAddress = request.shippingAddress,
                trackingNumber = request.trackingNumber
            )
        )
        return ResponseEntity.ok().build()
    }

    @PostMapping("/{orderId}/cancel")
    fun cancelOrder(
        @PathVariable orderId: String,
        @RequestBody request: CancelOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            CancelOrderCommand(
                orderId = orderId,
                reason = request.reason
            )
        )
        return ResponseEntity.ok().build()
    }
}

Query Side

Read Model

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val customerId: String,
    val customerName: String,
    val status: String,
    val items: List<OrderItemReadModel>,
    val totalAmount: BigDecimal,
    val itemCount: Int,
    val shippingAddress: String?,
    val trackingNumber: String?,
    val createdAt: Instant,
    val updatedAt: Instant
)

data class OrderItemReadModel(
    val productId: String,
    val productName: String,
    val quantity: Int,
    val price: BigDecimal
)

Event Handler (Projector)

@Component
class OrderProjector(
    private val orderReadModelRepository: OrderReadModelRepository,
    private val customerService: CustomerService,
    private val productService: ProductService
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @EventListener
    @Async
    fun on(event: OrderCreated) {
        logger.info("Projecting OrderCreated: ${event.aggregateId}")

        val customer = customerService.getCustomer(event.customerId)
        val products = productService.getProducts(event.items.map { it.productId })

        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            customerId = event.customerId,
            customerName = customer.name,
            status = "CREATED",
            items = event.items.map { item ->
                val product = products.find { it.id == item.productId }!!
                OrderItemReadModel(
                    productId = item.productId,
                    productName = product.name,
                    quantity = item.quantity,
                    price = item.price
                )
            },
            totalAmount = event.totalAmount,
            itemCount = event.items.size,
            shippingAddress = null,
            trackingNumber = null,
            createdAt = event.occurredAt,
            updatedAt = event.occurredAt
        )

        orderReadModelRepository.save(readModel)
    }

    @EventListener
    @Async
    fun on(event: OrderShipped) {
        logger.info("Projecting OrderShipped: ${event.aggregateId}")

        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                shippingAddress = event.shippingAddress,
                trackingNumber = event.trackingNumber,
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }

    @EventListener
    @Async
    fun on(event: OrderCancelled) {
        logger.info("Projecting OrderCancelled: ${event.aggregateId}")

        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "CANCELLED",
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Query Service

@Service
class OrderQueryService(
    private val orderReadModelRepository: OrderReadModelRepository
) {
    fun findById(orderId: String): OrderReadModel? {
        return orderReadModelRepository.findById(orderId).orElse(null)
    }

    fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.findByCustomerId(customerId, pageable)
    }

    fun searchOrders(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.search(criteria, pageable)
    }

    fun getOrderStatistics(customerId: String): OrderStatistics {
        val orders = orderReadModelRepository.findByCustomerId(customerId)
        return OrderStatistics(
            totalOrders = orders.size,
            totalAmount = orders.sumOf { it.totalAmount },
            averageOrderAmount = orders.map { it.totalAmount }
                .takeIf { it.isNotEmpty() }
                ?.let { amounts -> amounts.reduce { a, b -> a + b } / amounts.size.toBigDecimal() }
                ?: BigDecimal.ZERO,
            ordersByStatus = orders.groupBy { it.status }.mapValues { it.value.size }
        )
    }
}

Query Controller

@RestController
@RequestMapping("/api/orders")
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderReadModel> {
        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }

    @GetMapping
    fun searchOrders(
        @RequestParam(required = false) customerId: String?,
        @RequestParam(required = false) status: String?,
        @RequestParam(required = false) fromDate: Instant?,
        @RequestParam(required = false) toDate: Instant?,
        pageable: Pageable
    ): ResponseEntity<Page<OrderReadModel>> {
        val criteria = OrderSearchCriteria(
            customerId = customerId,
            status = status,
            fromDate = fromDate,
            toDate = toDate
        )
        return ResponseEntity.ok(queryService.searchOrders(criteria, pageable))
    }

    @GetMapping("/customers/{customerId}/statistics")
    fun getCustomerStatistics(
        @PathVariable customerId: String
    ): ResponseEntity<OrderStatistics> {
        return ResponseEntity.ok(queryService.getOrderStatistics(customerId))
    }
}

Elasticsearch Repository

interface OrderReadModelRepository : ElasticsearchRepository<OrderReadModel, String> {

    fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel>

    fun findByCustomerId(customerId: String): List<OrderReadModel>

    fun findByStatus(status: String, pageable: Pageable): Page<OrderReadModel>

    @Query("""
        {
            "bool": {
                "must": [
                    {"match": {"customerId": "?0"}}
                ],
                "filter": [
                    {"range": {"createdAt": {"gte": "?1", "lte": "?2"}}}
                ]
            }
        }
    """)
    fun findByCustomerIdAndDateRange(
        customerId: String,
        fromDate: Instant,
        toDate: Instant,
        pageable: Pageable
    ): Page<OrderReadModel>
}

@Repository
class OrderReadModelRepositoryCustomImpl(
    private val elasticsearchOperations: ElasticsearchOperations
) : OrderReadModelRepositoryCustom {

    override fun search(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
        val queryBuilder = BoolQuery.Builder()

        criteria.customerId?.let {
            queryBuilder.must(MatchQuery.of { q -> q.field("customerId").query(it) }._toQuery())
        }

        criteria.status?.let {
            queryBuilder.must(MatchQuery.of { q -> q.field("status").query(it) }._toQuery())
        }

        if (criteria.fromDate != null || criteria.toDate != null) {
            val rangeQuery = RangeQuery.of { r ->
                r.field("createdAt")
                criteria.fromDate?.let { r.gte(JsonData.of(it.toString())) }
                criteria.toDate?.let { r.lte(JsonData.of(it.toString())) }
                r
            }
            queryBuilder.filter(rangeQuery._toQuery())
        }

        val searchQuery = NativeQuery.builder()
            .withQuery(queryBuilder.build()._toQuery())
            .withPageable(pageable)
            .build()

        val hits = elasticsearchOperations.search(searchQuery, OrderReadModel::class.java)

        return PageImpl(
            hits.searchHits.map { it.content },
            pageable,
            hits.totalHits
        )
    }
}

Eventual Consistency 처리

CQRS에서 읽기 모델은 쓰기 모델과 최종적으로 일관성을 가집니다. 이를 처리하는 전략:

1. Optimistic UI Update

// Frontend: React example
const createOrder = async (orderData: CreateOrderRequest) => {
  // Optimistically add to local state
  const tempId = generateTempId();
  setOrders(prev => [...prev, { ...orderData, id: tempId, status: 'CREATING' }]);

  try {
    const response = await api.post('/orders', orderData);
    // Replace temp order with real one
    setOrders(prev => prev.map(o =>
      o.id === tempId ? { ...o, id: response.data.orderId, status: 'CREATED' } : o
    ));
  } catch (error) {
    // Remove temp order on failure
    setOrders(prev => prev.filter(o => o.id !== tempId));
    throw error;
  }
};

2. Polling for Consistency

@RestController
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(
        @PathVariable orderId: String,
        @RequestParam(required = false, defaultValue = "false") waitForConsistency: Boolean,
        @RequestParam(required = false, defaultValue = "5000") timeoutMs: Long
    ): ResponseEntity<OrderReadModel> {
        if (waitForConsistency) {
            return waitForOrder(orderId, timeoutMs)
        }

        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }

    private fun waitForOrder(orderId: String, timeoutMs: Long): ResponseEntity<OrderReadModel> {
        val startTime = System.currentTimeMillis()

        while (System.currentTimeMillis() - startTime < timeoutMs) {
            queryService.findById(orderId)?.let {
                return ResponseEntity.ok(it)
            }
            Thread.sleep(100)
        }

        throw OrderNotFoundException(orderId)
    }
}

3. Version-based Consistency Check

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val version: Long,  // Add version field
    // ... other fields
)

@Component
class OrderProjector {
    @EventListener
    @Async
    fun on(event: OrderCreated) {
        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            version = 1,
            // ...
        )
        orderReadModelRepository.save(readModel)
    }

    @EventListener
    @Async
    fun on(event: OrderShipped) {
        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                version = order.version + 1,
                // ...
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Read Model Rebuild

이벤트 소싱과 결합된 CQRS의 장점 중 하나는 Read Model을 언제든지 재구축할 수 있다는 것입니다.

@Service
class ReadModelRebuilder(
    private val eventStore: EventStore,
    private val orderReadModelRepository: OrderReadModelRepository,
    private val orderProjector: OrderProjector
) {
    @Async
    fun rebuildOrderReadModels() {
        logger.info("Starting read model rebuild...")

        // Clear existing read models
        orderReadModelRepository.deleteAll()

        // Replay all events
        val aggregateIds = eventStore.getAllAggregateIds("Order")

        aggregateIds.forEach { aggregateId ->
            val events = eventStore.getEvents(aggregateId)
            events.forEach { event ->
                when (event) {
                    is OrderCreated -> orderProjector.on(event)
                    is OrderShipped -> orderProjector.on(event)
                    is OrderCancelled -> orderProjector.on(event)
                }
            }
        }

        logger.info("Read model rebuild completed for ${aggregateIds.size} aggregates")
    }
}

정리

CQRS 적용 시 고려사항:

장점 단점
읽기/쓰기 독립적 최적화 복잡성 증가
독립적 스케일링 최종적 일관성 처리 필요
유연한 쿼리 모델 인프라 비용 증가
Event Sourcing과 자연스러운 결합 러닝 커브

다음 글에서는 분산 트랜잭션을 처리하는 Saga Pattern을 다루겠습니다.

Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS with Separate Read/Write Models (Current)
  4. Part 4: Saga Pattern for Distributed Transactions
  5. Part 5: Event Schema Evolution and Versioning

What is CQRS?

CQRS (Command Query Responsibility Segregation) is a pattern that separates reads (Query) and writes (Command).

Why CQRS?

Limitations of traditional CRUD model:

  • Read and write requirements are different
  • Multiple table joins needed for complex queries
  • Read optimization and write optimization conflict
  • Scaling is difficult

Benefits of CQRS:

  • Can use models optimized for each read/write
  • Independent scaling
  • Separation of complex domain logic and query logic
  • Natural combination with Event Sourcing

Architecture Overview

                    ┌─────────────────┐
                    │     Client      │
                    └────────┬────────┘
                             │
           ┌─────────────────┴─────────────────┐
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │  Command    │                    │   Query     │
    │  Service    │                    │   Service   │
    └──────┬──────┘                    └──────┬──────┘
           │                                   │
           ▼                                   ▼
    ┌─────────────┐                    ┌─────────────┐
    │   Write     │                    │    Read     │
    │   Model     │─────Events────────▶│    Model    │
    │  (PostgreSQL)│                   │(Elasticsearch)│
    └─────────────┘                    └─────────────┘

Implementation

Command Side

Command Definition

// Commands
sealed interface OrderCommand {
    val orderId: String
}

data class CreateOrderCommand(
    override val orderId: String = UUID.randomUUID().toString(),
    val customerId: String,
    val items: List<OrderItemDto>
) : OrderCommand

data class ShipOrderCommand(
    override val orderId: String,
    val shippingAddress: String,
    val trackingNumber: String
) : OrderCommand

data class CancelOrderCommand(
    override val orderId: String,
    val reason: String
) : OrderCommand

Command Handler

@Service
class OrderCommandHandler(
    private val orderRepository: OrderRepository,
    private val eventPublisher: ApplicationEventPublisher
) {
    @Transactional
    fun handle(command: CreateOrderCommand): String {
        val order = Order.create(
            orderId = command.orderId,
            customerId = command.customerId,
            items = command.items.map { it.toOrderItem() }
        )

        orderRepository.save(order)

        // Publish domain events
        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()

        return order.id
    }

    @Transactional
    fun handle(command: ShipOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)

        order.ship(command.shippingAddress, command.trackingNumber)
        orderRepository.save(order)

        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }

    @Transactional
    fun handle(command: CancelOrderCommand) {
        val order = orderRepository.findById(command.orderId)
            ?: throw OrderNotFoundException(command.orderId)

        order.cancel(command.reason)
        orderRepository.save(order)

        order.getPendingEvents().forEach { event ->
            eventPublisher.publishEvent(event)
        }
        order.clearPendingEvents()
    }
}

Command Controller

@RestController
@RequestMapping("/api/orders")
class OrderCommandController(
    private val commandHandler: OrderCommandHandler
) {
    @PostMapping
    fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<OrderIdResponse> {
        val orderId = commandHandler.handle(
            CreateOrderCommand(
                customerId = request.customerId,
                items = request.items
            )
        )
        return ResponseEntity
            .created(URI.create("/api/orders/$orderId"))
            .body(OrderIdResponse(orderId))
    }

    @PostMapping("/{orderId}/ship")
    fun shipOrder(
        @PathVariable orderId: String,
        @RequestBody request: ShipOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            ShipOrderCommand(
                orderId = orderId,
                shippingAddress = request.shippingAddress,
                trackingNumber = request.trackingNumber
            )
        )
        return ResponseEntity.ok().build()
    }

    @PostMapping("/{orderId}/cancel")
    fun cancelOrder(
        @PathVariable orderId: String,
        @RequestBody request: CancelOrderRequest
    ): ResponseEntity<Unit> {
        commandHandler.handle(
            CancelOrderCommand(
                orderId = orderId,
                reason = request.reason
            )
        )
        return ResponseEntity.ok().build()
    }
}

Query Side

Read Model

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val customerId: String,
    val customerName: String,
    val status: String,
    val items: List<OrderItemReadModel>,
    val totalAmount: BigDecimal,
    val itemCount: Int,
    val shippingAddress: String?,
    val trackingNumber: String?,
    val createdAt: Instant,
    val updatedAt: Instant
)

data class OrderItemReadModel(
    val productId: String,
    val productName: String,
    val quantity: Int,
    val price: BigDecimal
)

Event Handler (Projector)

@Component
class OrderProjector(
    private val orderReadModelRepository: OrderReadModelRepository,
    private val customerService: CustomerService,
    private val productService: ProductService
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @EventListener
    @Async
    fun on(event: OrderCreated) {
        logger.info("Projecting OrderCreated: ${event.aggregateId}")

        val customer = customerService.getCustomer(event.customerId)
        val products = productService.getProducts(event.items.map { it.productId })

        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            customerId = event.customerId,
            customerName = customer.name,
            status = "CREATED",
            items = event.items.map { item ->
                val product = products.find { it.id == item.productId }!!
                OrderItemReadModel(
                    productId = item.productId,
                    productName = product.name,
                    quantity = item.quantity,
                    price = item.price
                )
            },
            totalAmount = event.totalAmount,
            itemCount = event.items.size,
            shippingAddress = null,
            trackingNumber = null,
            createdAt = event.occurredAt,
            updatedAt = event.occurredAt
        )

        orderReadModelRepository.save(readModel)
    }

    @EventListener
    @Async
    fun on(event: OrderShipped) {
        logger.info("Projecting OrderShipped: ${event.aggregateId}")

        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                shippingAddress = event.shippingAddress,
                trackingNumber = event.trackingNumber,
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }

    @EventListener
    @Async
    fun on(event: OrderCancelled) {
        logger.info("Projecting OrderCancelled: ${event.aggregateId}")

        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "CANCELLED",
                updatedAt = event.occurredAt
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Query Service

@Service
class OrderQueryService(
    private val orderReadModelRepository: OrderReadModelRepository
) {
    fun findById(orderId: String): OrderReadModel? {
        return orderReadModelRepository.findById(orderId).orElse(null)
    }

    fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.findByCustomerId(customerId, pageable)
    }

    fun searchOrders(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
        return orderReadModelRepository.search(criteria, pageable)
    }

    fun getOrderStatistics(customerId: String): OrderStatistics {
        val orders = orderReadModelRepository.findByCustomerId(customerId)
        return OrderStatistics(
            totalOrders = orders.size,
            totalAmount = orders.sumOf { it.totalAmount },
            averageOrderAmount = orders.map { it.totalAmount }
                .takeIf { it.isNotEmpty() }
                ?.let { amounts -> amounts.reduce { a, b -> a + b } / amounts.size.toBigDecimal() }
                ?: BigDecimal.ZERO,
            ordersByStatus = orders.groupBy { it.status }.mapValues { it.value.size }
        )
    }
}

Query Controller

@RestController
@RequestMapping("/api/orders")
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderReadModel> {
        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }

    @GetMapping
    fun searchOrders(
        @RequestParam(required = false) customerId: String?,
        @RequestParam(required = false) status: String?,
        @RequestParam(required = false) fromDate: Instant?,
        @RequestParam(required = false) toDate: Instant?,
        pageable: Pageable
    ): ResponseEntity<Page<OrderReadModel>> {
        val criteria = OrderSearchCriteria(
            customerId = customerId,
            status = status,
            fromDate = fromDate,
            toDate = toDate
        )
        return ResponseEntity.ok(queryService.searchOrders(criteria, pageable))
    }

    @GetMapping("/customers/{customerId}/statistics")
    fun getCustomerStatistics(
        @PathVariable customerId: String
    ): ResponseEntity<OrderStatistics> {
        return ResponseEntity.ok(queryService.getOrderStatistics(customerId))
    }
}

Handling Eventual Consistency

In CQRS, the read model has eventual consistency with the write model. Strategies to handle this:

1. Optimistic UI Update

// Frontend: React example
const createOrder = async (orderData: CreateOrderRequest) => {
  // Optimistically add to local state
  const tempId = generateTempId();
  setOrders(prev => [...prev, { ...orderData, id: tempId, status: 'CREATING' }]);

  try {
    const response = await api.post('/orders', orderData);
    // Replace temp order with real one
    setOrders(prev => prev.map(o =>
      o.id === tempId ? { ...o, id: response.data.orderId, status: 'CREATED' } : o
    ));
  } catch (error) {
    // Remove temp order on failure
    setOrders(prev => prev.filter(o => o.id !== tempId));
    throw error;
  }
};

2. Polling for Consistency

@RestController
class OrderQueryController(
    private val queryService: OrderQueryService
) {
    @GetMapping("/{orderId}")
    fun getOrder(
        @PathVariable orderId: String,
        @RequestParam(required = false, defaultValue = "false") waitForConsistency: Boolean,
        @RequestParam(required = false, defaultValue = "5000") timeoutMs: Long
    ): ResponseEntity<OrderReadModel> {
        if (waitForConsistency) {
            return waitForOrder(orderId, timeoutMs)
        }

        val order = queryService.findById(orderId)
            ?: throw OrderNotFoundException(orderId)
        return ResponseEntity.ok(order)
    }

    private fun waitForOrder(orderId: String, timeoutMs: Long): ResponseEntity<OrderReadModel> {
        val startTime = System.currentTimeMillis()

        while (System.currentTimeMillis() - startTime < timeoutMs) {
            queryService.findById(orderId)?.let {
                return ResponseEntity.ok(it)
            }
            Thread.sleep(100)
        }

        throw OrderNotFoundException(orderId)
    }
}

3. Version-based Consistency Check

@Document(indexName = "orders")
data class OrderReadModel(
    @Id
    val orderId: String,
    val version: Long,  // Add version field
    // ... other fields
)

@Component
class OrderProjector {
    @EventListener
    @Async
    fun on(event: OrderCreated) {
        val readModel = OrderReadModel(
            orderId = event.aggregateId,
            version = 1,
            // ...
        )
        orderReadModelRepository.save(readModel)
    }

    @EventListener
    @Async
    fun on(event: OrderShipped) {
        orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
            val updated = order.copy(
                status = "SHIPPED",
                version = order.version + 1,
                // ...
            )
            orderReadModelRepository.save(updated)
        }
    }
}

Read Model Rebuild

One advantage of CQRS combined with event sourcing is that you can rebuild the Read Model at any time.

@Service
class ReadModelRebuilder(
    private val eventStore: EventStore,
    private val orderReadModelRepository: OrderReadModelRepository,
    private val orderProjector: OrderProjector
) {
    @Async
    fun rebuildOrderReadModels() {
        logger.info("Starting read model rebuild...")

        // Clear existing read models
        orderReadModelRepository.deleteAll()

        // Replay all events
        val aggregateIds = eventStore.getAllAggregateIds("Order")

        aggregateIds.forEach { aggregateId ->
            val events = eventStore.getEvents(aggregateId)
            events.forEach { event ->
                when (event) {
                    is OrderCreated -> orderProjector.on(event)
                    is OrderShipped -> orderProjector.on(event)
                    is OrderCancelled -> orderProjector.on(event)
                }
            }
        }

        logger.info("Read model rebuild completed for ${aggregateIds.size} aggregates")
    }
}

Summary

Considerations when applying CQRS:

Pros Cons
Independent optimization for read/write Increased complexity
Independent scaling Need to handle eventual consistency
Flexible query models Increased infrastructure costs
Natural combination with Event Sourcing Learning curve

In the next post, we’ll cover the Saga Pattern for handling distributed transactions.

댓글남기기