프로덕션 레디 이벤트 기반 아키텍처 Part 5 - Event Schema 진화와 버전 관리 Production-Ready Event-Driven Architecture Part 5 - Event Schema Evolution and Versioning
시리즈 소개
이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.
- Part 1: Event Sourcing 기초
- Part 2: Outbox Pattern 구현
- Part 3: CQRS와 Read/Write 모델 분리
- Part 4: Saga Pattern으로 분산 트랜잭션 처리
- Part 5: Event Schema 진화와 버전 관리 (현재 글)
스키마 진화의 필요성
이벤트 기반 시스템은 시간이 지나면서 변화합니다:
- 새로운 필드 추가
- 기존 필드 제거
- 필드 타입 변경
- 필드 이름 변경
이러한 변경이 기존 Consumer를 깨뜨리지 않도록 해야 합니다.
호환성 유형
1. Backward Compatibility (하위 호환성)
새로운 스키마로 이전 데이터를 읽을 수 있음
Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓
허용되는 변경:
- 기본값이 있는 새 필드 추가
- 선택적 필드 제거
2. Forward Compatibility (상위 호환성)
이전 스키마로 새로운 데이터를 읽을 수 있음
Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓
허용되는 변경:
- 기본값이 있는 필드 제거
- 선택적 필드 추가
3. Full Compatibility (완전 호환성)
양방향 모두 호환
Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓
가장 안전하지만 제약이 많음.
Avro를 활용한 스키마 관리
Avro Schema 정의
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{
"name": "orderId",
"type": "string"
},
{
"name": "customerId",
"type": "string"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}
},
{
"name": "totalAmount",
"type": "double"
},
{
"name": "occurredAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
스키마 진화 예제
V1 → V2: 새 필드 추가 (하위 호환)
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "items", "type": {"type": "array", "items": "OrderItem"}},
{"name": "totalAmount", "type": "double"},
{"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{
"name": "currency",
"type": "string",
"default": "KRW"
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null
}
]
}
Spring Boot + Avro 설정
// build.gradle.kts
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}
dependencies {
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.0")
}
avro {
setCreateSetters(false)
setFieldVisibility("PRIVATE")
}
# application.yml
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
Avro Producer
@Service
class OrderEventProducer(
private val kafkaTemplate: KafkaTemplate<String, OrderCreated>
) {
fun publishOrderCreated(order: Order) {
val event = OrderCreated.newBuilder()
.setOrderId(order.id)
.setCustomerId(order.customerId)
.setItems(order.items.map { item ->
OrderItem.newBuilder()
.setProductId(item.productId)
.setQuantity(item.quantity)
.setPrice(item.price.toDouble())
.build()
})
.setTotalAmount(order.totalAmount.toDouble())
.setOccurredAt(Instant.now().toEpochMilli())
.setCurrency("KRW") // v2 필드
.setMetadata(null) // v2 필드
.build()
kafkaTemplate.send("order-events", order.id, event)
}
}
Avro Consumer (V1 Consumer가 V2 메시지 처리)
@Component
class OrderEventConsumer {
@KafkaListener(topics = ["order-events"])
fun handleOrderCreated(event: OrderCreated) {
// V1 Consumer는 currency, metadata 필드를 무시
// Avro가 자동으로 처리
processOrder(
orderId = event.getOrderId(),
customerId = event.getCustomerId(),
items = event.getItems(),
totalAmount = event.getTotalAmount()
)
}
}
Protocol Buffers를 활용한 스키마 관리
Proto 파일 정의
syntax = "proto3";
package com.example.events.order;
option java_multiple_files = true;
option java_package = "com.example.events.order";
import "google/protobuf/timestamp.proto";
message OrderCreated {
string order_id = 1;
string customer_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
google.protobuf.Timestamp occurred_at = 5;
// V2 additions
string currency = 6; // 새 필드 (선택적)
map<string, string> metadata = 7; // 새 필드 (선택적)
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
Protobuf 진화 규칙
// 안전한 변경들:
// 1. 새 필드 추가 (고유한 필드 번호 사용)
message OrderCreatedV2 {
string order_id = 1;
string customer_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
google.protobuf.Timestamp occurred_at = 5;
string currency = 6; // 새 필드
string shipping_method = 7; // 새 필드
}
// 2. 필드를 optional에서 repeated로 변경 (scalar types)
// 3. 호환되는 타입 간 변경 (int32 <-> int64)
// 위험한 변경들 (하지 말 것):
// 1. 필드 번호 변경
// 2. 필드 타입을 비호환 타입으로 변경
// 3. required 필드 추가 (proto2)
Spring Boot + Protobuf
// build.gradle.kts
plugins {
id("com.google.protobuf") version "0.9.4"
}
dependencies {
implementation("com.google.protobuf:protobuf-java:3.25.1")
implementation("io.confluent:kafka-protobuf-serializer:7.5.0")
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.1"
}
}
Schema Registry 활용
Schema Registry 아키텍처
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Producer │────────▶│ Kafka │────────▶│ Consumer │
└──────┬───────┘ └──────────────┘ └──────┬───────┘
│ │
│ Register Schema Get Schema │
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Schema Registry │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Subject: order-events-value │ │
│ │ ├── Version 1: OrderCreated (v1) │ │
│ │ ├── Version 2: OrderCreated (v2) + currency │ │
│ │ └── Version 3: OrderCreated (v3) + metadata │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Docker Compose 설정
version: '3.8'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Schema Registry API 활용
@Configuration
class SchemaRegistryConfig {
@Bean
fun schemaRegistryClient(): SchemaRegistryClient {
return CachedSchemaRegistryClient(
"http://localhost:8081",
100 // max schemas to cache
)
}
}
@Service
class SchemaService(
private val schemaRegistryClient: SchemaRegistryClient
) {
fun registerSchema(subject: String, schema: Schema): Int {
return schemaRegistryClient.register(subject, schema)
}
fun getLatestSchema(subject: String): Schema {
val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
return Schema.Parser().parse(metadata.schema)
}
fun checkCompatibility(subject: String, schema: Schema): Boolean {
return schemaRegistryClient.testCompatibility(subject, schema)
}
fun getSchemaVersions(subject: String): List<Int> {
return schemaRegistryClient.getAllVersions(subject)
}
}
호환성 설정
# Subject 레벨 호환성 설정
curl -X PUT http://localhost:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# 가능한 호환성 수준:
# - BACKWARD (기본값)
# - BACKWARD_TRANSITIVE
# - FORWARD
# - FORWARD_TRANSITIVE
# - FULL
# - FULL_TRANSITIVE
# - NONE
실전 스키마 진화 전략
1. 단계별 롤아웃
Day 1: Consumer V2 배포 (V1, V2 모두 처리 가능)
Day 2: Producer V2 배포 (V2 이벤트 발행 시작)
Day 7: 모니터링 후 V1 Consumer 제거
2. Feature Flag를 활용한 점진적 전환
@Service
class OrderEventProducer(
private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
private val featureFlagService: FeatureFlagService,
private val schemaV1: Schema,
private val schemaV2: Schema
) {
fun publishOrderCreated(order: Order) {
val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
createV2Event(order)
} else {
createV1Event(order)
}
kafkaTemplate.send("order-events", order.id, event)
}
private fun createV1Event(order: Order): GenericRecord {
return GenericRecordBuilder(schemaV1)
.set("orderId", order.id)
.set("customerId", order.customerId)
.set("totalAmount", order.totalAmount.toDouble())
.build()
}
private fun createV2Event(order: Order): GenericRecord {
return GenericRecordBuilder(schemaV2)
.set("orderId", order.id)
.set("customerId", order.customerId)
.set("totalAmount", order.totalAmount.toDouble())
.set("currency", order.currency)
.set("metadata", order.metadata)
.build()
}
}
3. 다중 스키마 Consumer
@Component
class MultiVersionOrderConsumer {
@KafkaListener(topics = ["order-events"])
fun handleOrderEvent(
@Payload record: GenericRecord,
@Header(KafkaHeaders.RECEIVED_KEY) key: String
) {
val schemaVersion = getSchemaVersion(record)
when {
schemaVersion < 2 -> handleV1(record)
schemaVersion < 3 -> handleV2(record)
else -> handleV3(record)
}
}
private fun getSchemaVersion(record: GenericRecord): Int {
return record.schema.getField("currency")?.let { 2 }
?: record.schema.getField("metadata")?.let { 3 }
?: 1
}
private fun handleV1(record: GenericRecord) {
val orderId = record.get("orderId").toString()
val totalAmount = record.get("totalAmount") as Double
// V1 처리 로직 (currency 기본값 사용)
processOrder(orderId, totalAmount, "KRW")
}
private fun handleV2(record: GenericRecord) {
val orderId = record.get("orderId").toString()
val totalAmount = record.get("totalAmount") as Double
val currency = record.get("currency")?.toString() ?: "KRW"
processOrder(orderId, totalAmount, currency)
}
}
4. Dead Letter Queue와 스키마 불일치 처리
@Component
class SchemaAwareErrorHandler(
private val deadLetterProducer: KafkaTemplate<String, ByteArray>
) : CommonErrorHandler {
override fun handleRecord(
thrownException: Exception,
record: ConsumerRecord<*, *>,
consumer: Consumer<*, *>,
container: MessageListenerContainer
) {
when (thrownException.cause) {
is SerializationException -> {
// 스키마 불일치 - DLQ로 전송
sendToDeadLetter(record, thrownException)
}
else -> {
// 다른 에러 처리
throw thrownException
}
}
}
private fun sendToDeadLetter(
record: ConsumerRecord<*, *>,
exception: Exception
) {
val headers = record.headers().toMutableList()
headers.add(RecordHeader("error-reason", exception.message?.toByteArray()))
headers.add(RecordHeader("original-topic", record.topic().toByteArray()))
deadLetterProducer.send(
ProducerRecord(
"${record.topic()}-dlq",
null,
record.key() as? String,
record.value() as? ByteArray,
headers
)
)
}
}
스키마 진화 체크리스트
변경 전
- 현재 스키마와의 호환성 확인
- Schema Registry에서 호환성 테스트
- 모든 Consumer가 새 스키마 처리 가능한지 확인
- 롤백 계획 수립
변경 중
- Consumer 먼저 배포
- 모니터링 강화
- 점진적 트래픽 전환
변경 후
- DLQ 모니터링
- Consumer lag 확인
- 에러율 모니터링
정리
스키마 진화의 핵심 원칙:
| 원칙 | 설명 |
|---|---|
| 항상 하위 호환성 유지 | 새 Consumer가 이전 이벤트를 처리 가능해야 함 |
| 필드 번호/이름 재사용 금지 | 삭제된 필드 번호는 영구적으로 예약 |
| 기본값 필수 | 새 필드는 항상 기본값 포함 |
| 점진적 배포 | Consumer 먼저, Producer 나중에 |
| Schema Registry 활용 | 중앙 집중식 스키마 관리 |
이것으로 Event-Driven Architecture 시리즈를 마칩니다. 이 시리즈에서 다룬 패턴들을 조합하면 확장 가능하고 유지보수 가능한 마이크로서비스를 구축할 수 있습니다.
Series Introduction
This series covers how to build production-ready event-driven architecture.
- Part 1: Event Sourcing Fundamentals
- Part 2: Implementing the Outbox Pattern
- Part 3: CQRS with Separate Read/Write Models
- Part 4: Saga Pattern for Distributed Transactions
- Part 5: Event Schema Evolution and Versioning (Current)
The Need for Schema Evolution
Event-driven systems change over time:
- Adding new fields
- Removing existing fields
- Changing field types
- Renaming fields
These changes should not break existing Consumers.
Types of Compatibility
1. Backward Compatibility
New schema can read old data
Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓
Allowed changes:
- Adding new fields with default values
- Removing optional fields
2. Forward Compatibility
Old schema can read new data
Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓
Allowed changes:
- Removing fields with default values
- Adding optional fields
3. Full Compatibility
Both directions compatible
Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓
Safest but most restrictive.
Schema Management with Avro
Avro Schema Definition
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{
"name": "orderId",
"type": "string"
},
{
"name": "customerId",
"type": "string"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}
},
{
"name": "totalAmount",
"type": "double"
},
{
"name": "occurredAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
Schema Evolution Example
V1 → V2: Adding new fields (backward compatible)
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "items", "type": {"type": "array", "items": "OrderItem"}},
{"name": "totalAmount", "type": "double"},
{"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{
"name": "currency",
"type": "string",
"default": "KRW"
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null
}
]
}
Spring Boot + Avro Setup
// build.gradle.kts
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}
dependencies {
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.0")
}
avro {
setCreateSetters(false)
setFieldVisibility("PRIVATE")
}
# application.yml
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
Schema Management with Protocol Buffers
Proto File Definition
syntax = "proto3";
package com.example.events.order;
option java_multiple_files = true;
option java_package = "com.example.events.order";
import "google/protobuf/timestamp.proto";
message OrderCreated {
string order_id = 1;
string customer_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
google.protobuf.Timestamp occurred_at = 5;
// V2 additions
string currency = 6; // New field (optional)
map<string, string> metadata = 7; // New field (optional)
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
Protobuf Evolution Rules
// Safe changes:
// 1. Adding new fields (use unique field numbers)
// 2. Changing optional to repeated (scalar types)
// 3. Changing between compatible types (int32 <-> int64)
// Dangerous changes (don't do):
// 1. Changing field numbers
// 2. Changing field type to incompatible type
// 3. Adding required fields (proto2)
Using Schema Registry
Schema Registry Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Producer │────────▶│ Kafka │────────▶│ Consumer │
└──────┬───────┘ └──────────────┘ └──────┬───────┘
│ │
│ Register Schema Get Schema │
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Schema Registry │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Subject: order-events-value │ │
│ │ ├── Version 1: OrderCreated (v1) │ │
│ │ ├── Version 2: OrderCreated (v2) + currency │ │
│ │ └── Version 3: OrderCreated (v3) + metadata │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Practical Schema Evolution Strategies
1. Phased Rollout
Day 1: Deploy Consumer V2 (can handle both V1 and V2)
Day 2: Deploy Producer V2 (starts publishing V2 events)
Day 7: After monitoring, remove V1 Consumer
2. Gradual Transition with Feature Flags
@Service
class OrderEventProducer(
private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
private val featureFlagService: FeatureFlagService,
private val schemaV1: Schema,
private val schemaV2: Schema
) {
fun publishOrderCreated(order: Order) {
val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
createV2Event(order)
} else {
createV1Event(order)
}
kafkaTemplate.send("order-events", order.id, event)
}
}
Schema Evolution Checklist
Before Change
- Check compatibility with current schema
- Test compatibility in Schema Registry
- Verify all Consumers can handle new schema
- Prepare rollback plan
During Change
- Deploy Consumers first
- Enhanced monitoring
- Gradual traffic transition
After Change
- Monitor DLQ
- Check Consumer lag
- Monitor error rates
Summary
Key principles of schema evolution:
| Principle | Description |
|---|---|
| Always maintain backward compatibility | New Consumer must handle old events |
| Never reuse field numbers/names | Deleted field numbers are permanently reserved |
| Require default values | New fields must always include defaults |
| Gradual deployment | Consumer first, Producer later |
| Use Schema Registry | Centralized schema management |
This concludes the Event-Driven Architecture series. By combining the patterns covered in this series, you can build scalable and maintainable microservices.
댓글남기기