Production-Ready Event-Driven Architecture Part 5 - Event Schema Evolution and Versioning
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals
- Part 2: Implementing the Outbox Pattern
- Part 3: CQRS with Separate Read/Write Models
- Part 4: Saga Pattern for Distributed Transactions
- Part 5: Event Schema Evolution and Versioning (Current)
The Need for Schema Evolution
Event-driven systems change over time:
- Adding new fields
- Removing existing fields
- Changing field types
- Renaming fields
These changes should not break existing Consumers.
Types of Compatibility
1. Backward Compatibility
New schema can read old data
Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓
Allowed changes:
- Adding new fields with default values
- Removing optional fields
2. Forward Compatibility
Old schema can read new data
Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓
Allowed changes:
- Removing fields with default values
- Adding optional fields
3. Full Compatibility
Both directions compatible
Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓
Safest but most restrictive.
Schema Management with Avro
Avro Schema Definition
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{
"name": "orderId",
"type": "string"
},
{
"name": "customerId",
"type": "string"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}
},
{
"name": "totalAmount",
"type": "double"
},
{
"name": "occurredAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
Schema Evolution Example
V1 to V2: Adding new fields (backward compatible)
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "items", "type": {"type": "array", "items": "OrderItem"}},
{"name": "totalAmount", "type": "double"},
{"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{
"name": "currency",
"type": "string",
"default": "KRW"
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null
}
]
}
Spring Boot + Avro Setup
// build.gradle.kts
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}
dependencies {
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.0")
}
avro {
setCreateSetters(false)
setFieldVisibility("PRIVATE")
}
# application.yml
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
Avro Producer
@Service
class OrderEventProducer(
private val kafkaTemplate: KafkaTemplate<String, OrderCreated>
) {
fun publishOrderCreated(order: Order) {
val event = OrderCreated.newBuilder()
.setOrderId(order.id)
.setCustomerId(order.customerId)
.setItems(order.items.map { item ->
OrderItem.newBuilder()
.setProductId(item.productId)
.setQuantity(item.quantity)
.setPrice(item.price.toDouble())
.build()
})
.setTotalAmount(order.totalAmount.toDouble())
.setOccurredAt(Instant.now().toEpochMilli())
.setCurrency("KRW") // v2 field
.setMetadata(null) // v2 field
.build()
kafkaTemplate.send("order-events", order.id, event)
}
}
Avro Consumer (V1 Consumer handling V2 messages)
@Component
class OrderEventConsumer {
@KafkaListener(topics = ["order-events"])
fun handleOrderCreated(event: OrderCreated) {
// V1 Consumer ignores currency and metadata fields
// Avro handles this automatically
processOrder(
orderId = event.getOrderId(),
customerId = event.getCustomerId(),
items = event.getItems(),
totalAmount = event.getTotalAmount()
)
}
}
Schema Management with Protocol Buffers
Proto File Definition
syntax = "proto3";
package com.example.events.order;
option java_multiple_files = true;
option java_package = "com.example.events.order";
import "google/protobuf/timestamp.proto";
message OrderCreated {
string order_id = 1;
string customer_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
google.protobuf.Timestamp occurred_at = 5;
// V2 additions
string currency = 6; // New field (optional)
map<string, string> metadata = 7; // New field (optional)
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
Protobuf Evolution Rules
// Safe changes:
// 1. Adding new fields (use unique field numbers)
message OrderCreatedV2 {
string order_id = 1;
string customer_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
google.protobuf.Timestamp occurred_at = 5;
string currency = 6; // New field
string shipping_method = 7; // New field
}
// 2. Changing optional to repeated (scalar types)
// 3. Changing between compatible types (int32 <-> int64)
// Dangerous changes (don't do):
// 1. Changing field numbers
// 2. Changing field type to incompatible type
// 3. Adding required fields (proto2)
Spring Boot + Protobuf
// build.gradle.kts
plugins {
id("com.google.protobuf") version "0.9.4"
}
dependencies {
implementation("com.google.protobuf:protobuf-java:3.25.1")
implementation("io.confluent:kafka-protobuf-serializer:7.5.0")
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.1"
}
}
Using Schema Registry
Schema Registry Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Producer │────────▶│ Kafka │────────▶│ Consumer │
└──────┬───────┘ └──────────────┘ └──────┬───────┘
│ │
│ Register Schema Get Schema │
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Schema Registry │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Subject: order-events-value │ │
│ │ ├── Version 1: OrderCreated (v1) │ │
│ │ ├── Version 2: OrderCreated (v2) + currency │ │
│ │ └── Version 3: OrderCreated (v3) + metadata │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Docker Compose Setup
version: '3.8'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Using Schema Registry API
@Configuration
class SchemaRegistryConfig {
@Bean
fun schemaRegistryClient(): SchemaRegistryClient {
return CachedSchemaRegistryClient(
"http://localhost:8081",
100 // max schemas to cache
)
}
}
@Service
class SchemaService(
private val schemaRegistryClient: SchemaRegistryClient
) {
fun registerSchema(subject: String, schema: Schema): Int {
return schemaRegistryClient.register(subject, schema)
}
fun getLatestSchema(subject: String): Schema {
val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
return Schema.Parser().parse(metadata.schema)
}
fun checkCompatibility(subject: String, schema: Schema): Boolean {
return schemaRegistryClient.testCompatibility(subject, schema)
}
fun getSchemaVersions(subject: String): List<Int> {
return schemaRegistryClient.getAllVersions(subject)
}
}
Compatibility Configuration
# Subject-level compatibility configuration
curl -X PUT http://localhost:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Available compatibility levels:
# - BACKWARD (default)
# - BACKWARD_TRANSITIVE
# - FORWARD
# - FORWARD_TRANSITIVE
# - FULL
# - FULL_TRANSITIVE
# - NONE
Practical Schema Evolution Strategies
1. Phased Rollout
Day 1: Deploy Consumer V2 (can handle both V1 and V2)
Day 2: Deploy Producer V2 (starts publishing V2 events)
Day 7: After monitoring, remove V1 Consumer
2. Gradual Transition with Feature Flags
@Service
class OrderEventProducer(
private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
private val featureFlagService: FeatureFlagService,
private val schemaV1: Schema,
private val schemaV2: Schema
) {
fun publishOrderCreated(order: Order) {
val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
createV2Event(order)
} else {
createV1Event(order)
}
kafkaTemplate.send("order-events", order.id, event)
}
private fun createV1Event(order: Order): GenericRecord {
return GenericRecordBuilder(schemaV1)
.set("orderId", order.id)
.set("customerId", order.customerId)
.set("totalAmount", order.totalAmount.toDouble())
.build()
}
private fun createV2Event(order: Order): GenericRecord {
return GenericRecordBuilder(schemaV2)
.set("orderId", order.id)
.set("customerId", order.customerId)
.set("totalAmount", order.totalAmount.toDouble())
.set("currency", order.currency)
.set("metadata", order.metadata)
.build()
}
}
3. Multi-Version Consumer
@Component
class MultiVersionOrderConsumer {
@KafkaListener(topics = ["order-events"])
fun handleOrderEvent(
@Payload record: GenericRecord,
@Header(KafkaHeaders.RECEIVED_KEY) key: String
) {
val schemaVersion = getSchemaVersion(record)
when {
schemaVersion < 2 -> handleV1(record)
schemaVersion < 3 -> handleV2(record)
else -> handleV3(record)
}
}
private fun getSchemaVersion(record: GenericRecord): Int {
return record.schema.getField("currency")?.let { 2 }
?: record.schema.getField("metadata")?.let { 3 }
?: 1
}
private fun handleV1(record: GenericRecord) {
val orderId = record.get("orderId").toString()
val totalAmount = record.get("totalAmount") as Double
// V1 processing logic (use default currency)
processOrder(orderId, totalAmount, "KRW")
}
private fun handleV2(record: GenericRecord) {
val orderId = record.get("orderId").toString()
val totalAmount = record.get("totalAmount") as Double
val currency = record.get("currency")?.toString() ?: "KRW"
processOrder(orderId, totalAmount, currency)
}
}
4. Dead Letter Queue and Schema Mismatch Handling
@Component
class SchemaAwareErrorHandler(
private val deadLetterProducer: KafkaTemplate<String, ByteArray>
) : CommonErrorHandler {
override fun handleRecord(
thrownException: Exception,
record: ConsumerRecord<*, *>,
consumer: Consumer<*, *>,
container: MessageListenerContainer
) {
when (thrownException.cause) {
is SerializationException -> {
// Schema mismatch - send to DLQ
sendToDeadLetter(record, thrownException)
}
else -> {
// Handle other errors
throw thrownException
}
}
}
private fun sendToDeadLetter(
record: ConsumerRecord<*, *>,
exception: Exception
) {
val headers = record.headers().toMutableList()
headers.add(RecordHeader("error-reason", exception.message?.toByteArray()))
headers.add(RecordHeader("original-topic", record.topic().toByteArray()))
deadLetterProducer.send(
ProducerRecord(
"${record.topic()}-dlq",
null,
record.key() as? String,
record.value() as? ByteArray,
headers
)
)
}
}
Schema Evolution Checklist
Before Change
- Check compatibility with current schema
- Test compatibility in Schema Registry
- Verify all Consumers can handle new schema
- Prepare rollback plan
During Change
- Deploy Consumers first
- Enhanced monitoring
- Gradual traffic transition
After Change
- Monitor DLQ
- Check Consumer lag
- Monitor error rates
Summary
Key principles of schema evolution:
| Principle | Description |
|---|---|
| Always maintain backward compatibility | New Consumer must handle old events |
| Never reuse field numbers/names | Deleted field numbers are permanently reserved |
| Require default values | New fields must always include defaults |
| Gradual deployment | Consumer first, Producer later |
| Use Schema Registry | Centralized schema management |
This concludes the Event-Driven Architecture series. By combining the patterns covered in this series, you can build scalable and maintainable microservices.
댓글남기기