Production-Ready Event-Driven Architecture Part 3 - CQRS with Separate Read/Write Models
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals
- Part 2: Implementing the Outbox Pattern
- Part 3: CQRS with Separate Read/Write Models (Current)
- Part 4: Saga Pattern for Distributed Transactions
- Part 5: Event Schema Evolution and Versioning
What is CQRS?
CQRS (Command Query Responsibility Segregation) is a pattern that separates reads (Query) and writes (Command).
Why CQRS?
Limitations of traditional CRUD model:
- Read and write requirements are different
- Multiple table joins needed for complex queries
- Read optimization and write optimization conflict
- Scaling is difficult
Benefits of CQRS:
- Can use models optimized for each read/write
- Independent scaling
- Separation of complex domain logic and query logic
- Natural combination with Event Sourcing
Architecture Overview
┌─────────────────┐
│ Client │
└────────┬────────┘
│
┌─────────────────┴─────────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Service │ │ Service │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │ │ Read │
│ Model │─────Events────────▶│ Model │
│ (PostgreSQL)│ │(Elasticsearch)│
└─────────────┘ └─────────────┘
Implementation
Command Side
Command Definition
// Commands
sealed interface OrderCommand {
val orderId: String
}
data class CreateOrderCommand(
override val orderId: String = UUID.randomUUID().toString(),
val customerId: String,
val items: List<OrderItemDto>
) : OrderCommand
data class ShipOrderCommand(
override val orderId: String,
val shippingAddress: String,
val trackingNumber: String
) : OrderCommand
data class CancelOrderCommand(
override val orderId: String,
val reason: String
) : OrderCommand
Command Handler
@Service
class OrderCommandHandler(
private val orderRepository: OrderRepository,
private val eventPublisher: ApplicationEventPublisher
) {
@Transactional
fun handle(command: CreateOrderCommand): String {
val order = Order.create(
orderId = command.orderId,
customerId = command.customerId,
items = command.items.map { it.toOrderItem() }
)
orderRepository.save(order)
// Publish domain events
order.getPendingEvents().forEach { event ->
eventPublisher.publishEvent(event)
}
order.clearPendingEvents()
return order.id
}
@Transactional
fun handle(command: ShipOrderCommand) {
val order = orderRepository.findById(command.orderId)
?: throw OrderNotFoundException(command.orderId)
order.ship(command.shippingAddress, command.trackingNumber)
orderRepository.save(order)
order.getPendingEvents().forEach { event ->
eventPublisher.publishEvent(event)
}
order.clearPendingEvents()
}
@Transactional
fun handle(command: CancelOrderCommand) {
val order = orderRepository.findById(command.orderId)
?: throw OrderNotFoundException(command.orderId)
order.cancel(command.reason)
orderRepository.save(order)
order.getPendingEvents().forEach { event ->
eventPublisher.publishEvent(event)
}
order.clearPendingEvents()
}
}
Command Controller
@RestController
@RequestMapping("/api/orders")
class OrderCommandController(
private val commandHandler: OrderCommandHandler
) {
@PostMapping
fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<OrderIdResponse> {
val orderId = commandHandler.handle(
CreateOrderCommand(
customerId = request.customerId,
items = request.items
)
)
return ResponseEntity
.created(URI.create("/api/orders/$orderId"))
.body(OrderIdResponse(orderId))
}
@PostMapping("/{orderId}/ship")
fun shipOrder(
@PathVariable orderId: String,
@RequestBody request: ShipOrderRequest
): ResponseEntity<Unit> {
commandHandler.handle(
ShipOrderCommand(
orderId = orderId,
shippingAddress = request.shippingAddress,
trackingNumber = request.trackingNumber
)
)
return ResponseEntity.ok().build()
}
@PostMapping("/{orderId}/cancel")
fun cancelOrder(
@PathVariable orderId: String,
@RequestBody request: CancelOrderRequest
): ResponseEntity<Unit> {
commandHandler.handle(
CancelOrderCommand(
orderId = orderId,
reason = request.reason
)
)
return ResponseEntity.ok().build()
}
}
Query Side
Read Model
@Document(indexName = "orders")
data class OrderReadModel(
@Id
val orderId: String,
val customerId: String,
val customerName: String,
val status: String,
val items: List<OrderItemReadModel>,
val totalAmount: BigDecimal,
val itemCount: Int,
val shippingAddress: String?,
val trackingNumber: String?,
val createdAt: Instant,
val updatedAt: Instant
)
data class OrderItemReadModel(
val productId: String,
val productName: String,
val quantity: Int,
val price: BigDecimal
)
Event Handler (Projector)
@Component
class OrderProjector(
private val orderReadModelRepository: OrderReadModelRepository,
private val customerService: CustomerService,
private val productService: ProductService
) {
private val logger = LoggerFactory.getLogger(javaClass)
@EventListener
@Async
fun on(event: OrderCreated) {
logger.info("Projecting OrderCreated: ${event.aggregateId}")
val customer = customerService.getCustomer(event.customerId)
val products = productService.getProducts(event.items.map { it.productId })
val readModel = OrderReadModel(
orderId = event.aggregateId,
customerId = event.customerId,
customerName = customer.name,
status = "CREATED",
items = event.items.map { item ->
val product = products.find { it.id == item.productId }!!
OrderItemReadModel(
productId = item.productId,
productName = product.name,
quantity = item.quantity,
price = item.price
)
},
totalAmount = event.totalAmount,
itemCount = event.items.size,
shippingAddress = null,
trackingNumber = null,
createdAt = event.occurredAt,
updatedAt = event.occurredAt
)
orderReadModelRepository.save(readModel)
}
@EventListener
@Async
fun on(event: OrderShipped) {
logger.info("Projecting OrderShipped: ${event.aggregateId}")
orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
val updated = order.copy(
status = "SHIPPED",
shippingAddress = event.shippingAddress,
trackingNumber = event.trackingNumber,
updatedAt = event.occurredAt
)
orderReadModelRepository.save(updated)
}
}
@EventListener
@Async
fun on(event: OrderCancelled) {
logger.info("Projecting OrderCancelled: ${event.aggregateId}")
orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
val updated = order.copy(
status = "CANCELLED",
updatedAt = event.occurredAt
)
orderReadModelRepository.save(updated)
}
}
}
Query Service
@Service
class OrderQueryService(
private val orderReadModelRepository: OrderReadModelRepository
) {
fun findById(orderId: String): OrderReadModel? {
return orderReadModelRepository.findById(orderId).orElse(null)
}
fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel> {
return orderReadModelRepository.findByCustomerId(customerId, pageable)
}
fun searchOrders(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
return orderReadModelRepository.search(criteria, pageable)
}
fun getOrderStatistics(customerId: String): OrderStatistics {
val orders = orderReadModelRepository.findByCustomerId(customerId)
return OrderStatistics(
totalOrders = orders.size,
totalAmount = orders.sumOf { it.totalAmount },
averageOrderAmount = orders.map { it.totalAmount }
.takeIf { it.isNotEmpty() }
?.let { amounts -> amounts.reduce { a, b -> a + b } / amounts.size.toBigDecimal() }
?: BigDecimal.ZERO,
ordersByStatus = orders.groupBy { it.status }.mapValues { it.value.size }
)
}
}
Query Controller
@RestController
@RequestMapping("/api/orders")
class OrderQueryController(
private val queryService: OrderQueryService
) {
@GetMapping("/{orderId}")
fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderReadModel> {
val order = queryService.findById(orderId)
?: throw OrderNotFoundException(orderId)
return ResponseEntity.ok(order)
}
@GetMapping
fun searchOrders(
@RequestParam(required = false) customerId: String?,
@RequestParam(required = false) status: String?,
@RequestParam(required = false) fromDate: Instant?,
@RequestParam(required = false) toDate: Instant?,
pageable: Pageable
): ResponseEntity<Page<OrderReadModel>> {
val criteria = OrderSearchCriteria(
customerId = customerId,
status = status,
fromDate = fromDate,
toDate = toDate
)
return ResponseEntity.ok(queryService.searchOrders(criteria, pageable))
}
@GetMapping("/customers/{customerId}/statistics")
fun getCustomerStatistics(
@PathVariable customerId: String
): ResponseEntity<OrderStatistics> {
return ResponseEntity.ok(queryService.getOrderStatistics(customerId))
}
}
Elasticsearch Repository
interface OrderReadModelRepository : ElasticsearchRepository<OrderReadModel, String> {
fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel>
fun findByCustomerId(customerId: String): List<OrderReadModel>
fun findByStatus(status: String, pageable: Pageable): Page<OrderReadModel>
@Query("""
{
"bool": {
"must": [
{"match": {"customerId": "?0"}}
],
"filter": [
{"range": {"createdAt": {"gte": "?1", "lte": "?2"}}}
]
}
}
""")
fun findByCustomerIdAndDateRange(
customerId: String,
fromDate: Instant,
toDate: Instant,
pageable: Pageable
): Page<OrderReadModel>
}
@Repository
class OrderReadModelRepositoryCustomImpl(
private val elasticsearchOperations: ElasticsearchOperations
) : OrderReadModelRepositoryCustom {
override fun search(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
val queryBuilder = BoolQuery.Builder()
criteria.customerId?.let {
queryBuilder.must(MatchQuery.of { q -> q.field("customerId").query(it) }._toQuery())
}
criteria.status?.let {
queryBuilder.must(MatchQuery.of { q -> q.field("status").query(it) }._toQuery())
}
if (criteria.fromDate != null || criteria.toDate != null) {
val rangeQuery = RangeQuery.of { r ->
r.field("createdAt")
criteria.fromDate?.let { r.gte(JsonData.of(it.toString())) }
criteria.toDate?.let { r.lte(JsonData.of(it.toString())) }
r
}
queryBuilder.filter(rangeQuery._toQuery())
}
val searchQuery = NativeQuery.builder()
.withQuery(queryBuilder.build()._toQuery())
.withPageable(pageable)
.build()
val hits = elasticsearchOperations.search(searchQuery, OrderReadModel::class.java)
return PageImpl(
hits.searchHits.map { it.content },
pageable,
hits.totalHits
)
}
}
Handling Eventual Consistency
In CQRS, the read model has eventual consistency with the write model. Strategies to handle this:
1. Optimistic UI Update
// Frontend: React example
const createOrder = async (orderData: CreateOrderRequest) => {
// Optimistically add to local state
const tempId = generateTempId();
setOrders(prev => [...prev, { ...orderData, id: tempId, status: 'CREATING' }]);
try {
const response = await api.post('/orders', orderData);
// Replace temp order with real one
setOrders(prev => prev.map(o =>
o.id === tempId ? { ...o, id: response.data.orderId, status: 'CREATED' } : o
));
} catch (error) {
// Remove temp order on failure
setOrders(prev => prev.filter(o => o.id !== tempId));
throw error;
}
};
2. Polling for Consistency
@RestController
class OrderQueryController(
private val queryService: OrderQueryService
) {
@GetMapping("/{orderId}")
fun getOrder(
@PathVariable orderId: String,
@RequestParam(required = false, defaultValue = "false") waitForConsistency: Boolean,
@RequestParam(required = false, defaultValue = "5000") timeoutMs: Long
): ResponseEntity<OrderReadModel> {
if (waitForConsistency) {
return waitForOrder(orderId, timeoutMs)
}
val order = queryService.findById(orderId)
?: throw OrderNotFoundException(orderId)
return ResponseEntity.ok(order)
}
private fun waitForOrder(orderId: String, timeoutMs: Long): ResponseEntity<OrderReadModel> {
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < timeoutMs) {
queryService.findById(orderId)?.let {
return ResponseEntity.ok(it)
}
Thread.sleep(100)
}
throw OrderNotFoundException(orderId)
}
}
3. Version-based Consistency Check
@Document(indexName = "orders")
data class OrderReadModel(
@Id
val orderId: String,
val version: Long, // Add version field
// ... other fields
)
@Component
class OrderProjector {
@EventListener
@Async
fun on(event: OrderCreated) {
val readModel = OrderReadModel(
orderId = event.aggregateId,
version = 1,
// ...
)
orderReadModelRepository.save(readModel)
}
@EventListener
@Async
fun on(event: OrderShipped) {
orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
val updated = order.copy(
status = "SHIPPED",
version = order.version + 1,
// ...
)
orderReadModelRepository.save(updated)
}
}
}
Read Model Rebuild
One advantage of CQRS combined with event sourcing is that you can rebuild the Read Model at any time.
@Service
class ReadModelRebuilder(
private val eventStore: EventStore,
private val orderReadModelRepository: OrderReadModelRepository,
private val orderProjector: OrderProjector
) {
@Async
fun rebuildOrderReadModels() {
logger.info("Starting read model rebuild...")
// Clear existing read models
orderReadModelRepository.deleteAll()
// Replay all events
val aggregateIds = eventStore.getAllAggregateIds("Order")
aggregateIds.forEach { aggregateId ->
val events = eventStore.getEvents(aggregateId)
events.forEach { event ->
when (event) {
is OrderCreated -> orderProjector.on(event)
is OrderShipped -> orderProjector.on(event)
is OrderCancelled -> orderProjector.on(event)
}
}
}
logger.info("Read model rebuild completed for ${aggregateIds.size} aggregates")
}
}
Summary
Considerations when applying CQRS:
| Pros | Cons |
|---|---|
| Independent optimization for read/write | Increased complexity |
| Independent scaling | Need to handle eventual consistency |
| Flexible query models | Increased infrastructure costs |
| Natural combination with Event Sourcing | Learning curve |
In the next post, we’ll cover the Saga Pattern for handling distributed transactions.
댓글남기기