10 분 소요


시리즈 소개

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리 (현재 글)

스키마 진화의 필요성

이벤트 기반 시스템은 시간이 지나면서 변화합니다:

  • 새로운 필드 추가
  • 기존 필드 제거
  • 필드 타입 변경
  • 필드 이름 변경

이러한 변경이 기존 Consumer를 깨뜨리지 않도록 해야 합니다.

호환성 유형

1. Backward Compatibility (하위 호환성)

새로운 스키마로 이전 데이터를 읽을 수 있음

Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓

허용되는 변경:

  • 기본값이 있는 새 필드 추가
  • 선택적 필드 제거

2. Forward Compatibility (상위 호환성)

이전 스키마로 새로운 데이터를 읽을 수 있음

Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓

허용되는 변경:

  • 기본값이 있는 필드 제거
  • 선택적 필드 추가

3. Full Compatibility (완전 호환성)

양방향 모두 호환

Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓

가장 안전하지만 제약이 많음.

Avro를 활용한 스키마 관리

Avro Schema 정의

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "customerId",
      "type": "string"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }
    },
    {
      "name": "totalAmount",
      "type": "double"
    },
    {
      "name": "occurredAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

스키마 진화 예제

V1 → V2: 새 필드 추가 (하위 호환)

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "items", "type": {"type": "array", "items": "OrderItem"}},
    {"name": "totalAmount", "type": "double"},
    {"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    {
      "name": "currency",
      "type": "string",
      "default": "KRW"
    },
    {
      "name": "metadata",
      "type": ["null", {"type": "map", "values": "string"}],
      "default": null
    }
  ]
}

Spring Boot + Avro 설정

// build.gradle.kts
plugins {
    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

dependencies {
    implementation("org.apache.avro:avro:1.11.3")
    implementation("io.confluent:kafka-avro-serializer:7.5.0")
}

avro {
    setCreateSetters(false)
    setFieldVisibility("PRIVATE")
}
# application.yml
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      schema.registry.url: http://localhost:8081
      specific.avro.reader: true

Avro Producer

@Service
class OrderEventProducer(
    private val kafkaTemplate: KafkaTemplate<String, OrderCreated>
) {
    fun publishOrderCreated(order: Order) {
        val event = OrderCreated.newBuilder()
            .setOrderId(order.id)
            .setCustomerId(order.customerId)
            .setItems(order.items.map { item ->
                OrderItem.newBuilder()
                    .setProductId(item.productId)
                    .setQuantity(item.quantity)
                    .setPrice(item.price.toDouble())
                    .build()
            })
            .setTotalAmount(order.totalAmount.toDouble())
            .setOccurredAt(Instant.now().toEpochMilli())
            .setCurrency("KRW")  // v2 필드
            .setMetadata(null)   // v2 필드
            .build()

        kafkaTemplate.send("order-events", order.id, event)
    }
}

Avro Consumer (V1 Consumer가 V2 메시지 처리)

@Component
class OrderEventConsumer {

    @KafkaListener(topics = ["order-events"])
    fun handleOrderCreated(event: OrderCreated) {
        // V1 Consumer는 currency, metadata 필드를 무시
        // Avro가 자동으로 처리
        processOrder(
            orderId = event.getOrderId(),
            customerId = event.getCustomerId(),
            items = event.getItems(),
            totalAmount = event.getTotalAmount()
        )
    }
}

Protocol Buffers를 활용한 스키마 관리

Proto 파일 정의

syntax = "proto3";

package com.example.events.order;

option java_multiple_files = true;
option java_package = "com.example.events.order";

import "google/protobuf/timestamp.proto";

message OrderCreated {
  string order_id = 1;
  string customer_id = 2;
  repeated OrderItem items = 3;
  double total_amount = 4;
  google.protobuf.Timestamp occurred_at = 5;

  // V2 additions
  string currency = 6;  // 새 필드 (선택적)
  map<string, string> metadata = 7;  // 새 필드 (선택적)
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double price = 3;
}

Protobuf 진화 규칙

// 안전한 변경들:
// 1. 새 필드 추가 (고유한 필드 번호 사용)
message OrderCreatedV2 {
  string order_id = 1;
  string customer_id = 2;
  repeated OrderItem items = 3;
  double total_amount = 4;
  google.protobuf.Timestamp occurred_at = 5;
  string currency = 6;          // 새 필드
  string shipping_method = 7;   // 새 필드
}

// 2. 필드를 optional에서 repeated로 변경 (scalar types)
// 3. 호환되는 타입 간 변경 (int32 <-> int64)

// 위험한 변경들 (하지 말 것):
// 1. 필드 번호 변경
// 2. 필드 타입을 비호환 타입으로 변경
// 3. required 필드 추가 (proto2)

Spring Boot + Protobuf

// build.gradle.kts
plugins {
    id("com.google.protobuf") version "0.9.4"
}

dependencies {
    implementation("com.google.protobuf:protobuf-java:3.25.1")
    implementation("io.confluent:kafka-protobuf-serializer:7.5.0")
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.25.1"
    }
}

Schema Registry 활용

Schema Registry 아키텍처

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│   Producer   │────────▶│    Kafka     │────────▶│   Consumer   │
└──────┬───────┘         └──────────────┘         └──────┬───────┘
       │                                                  │
       │  Register Schema                    Get Schema   │
       │                                                  │
       ▼                                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Schema Registry                             │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  Subject: order-events-value                             │    │
│  │  ├── Version 1: OrderCreated (v1)                       │    │
│  │  ├── Version 2: OrderCreated (v2) + currency            │    │
│  │  └── Version 3: OrderCreated (v3) + metadata            │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Docker Compose 설정

version: '3.8'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    hostname: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Schema Registry API 활용

@Configuration
class SchemaRegistryConfig {

    @Bean
    fun schemaRegistryClient(): SchemaRegistryClient {
        return CachedSchemaRegistryClient(
            "http://localhost:8081",
            100  // max schemas to cache
        )
    }
}

@Service
class SchemaService(
    private val schemaRegistryClient: SchemaRegistryClient
) {
    fun registerSchema(subject: String, schema: Schema): Int {
        return schemaRegistryClient.register(subject, schema)
    }

    fun getLatestSchema(subject: String): Schema {
        val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
        return Schema.Parser().parse(metadata.schema)
    }

    fun checkCompatibility(subject: String, schema: Schema): Boolean {
        return schemaRegistryClient.testCompatibility(subject, schema)
    }

    fun getSchemaVersions(subject: String): List<Int> {
        return schemaRegistryClient.getAllVersions(subject)
    }
}

호환성 설정

# Subject 레벨 호환성 설정
curl -X PUT http://localhost:8081/config/order-events-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# 가능한 호환성 수준:
# - BACKWARD (기본값)
# - BACKWARD_TRANSITIVE
# - FORWARD
# - FORWARD_TRANSITIVE
# - FULL
# - FULL_TRANSITIVE
# - NONE

실전 스키마 진화 전략

1. 단계별 롤아웃

Day 1: Consumer V2 배포 (V1, V2 모두 처리 가능)
Day 2: Producer V2 배포 (V2 이벤트 발행 시작)
Day 7: 모니터링 후 V1 Consumer 제거

2. Feature Flag를 활용한 점진적 전환

@Service
class OrderEventProducer(
    private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
    private val featureFlagService: FeatureFlagService,
    private val schemaV1: Schema,
    private val schemaV2: Schema
) {
    fun publishOrderCreated(order: Order) {
        val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
            createV2Event(order)
        } else {
            createV1Event(order)
        }

        kafkaTemplate.send("order-events", order.id, event)
    }

    private fun createV1Event(order: Order): GenericRecord {
        return GenericRecordBuilder(schemaV1)
            .set("orderId", order.id)
            .set("customerId", order.customerId)
            .set("totalAmount", order.totalAmount.toDouble())
            .build()
    }

    private fun createV2Event(order: Order): GenericRecord {
        return GenericRecordBuilder(schemaV2)
            .set("orderId", order.id)
            .set("customerId", order.customerId)
            .set("totalAmount", order.totalAmount.toDouble())
            .set("currency", order.currency)
            .set("metadata", order.metadata)
            .build()
    }
}

3. 다중 스키마 Consumer

@Component
class MultiVersionOrderConsumer {

    @KafkaListener(topics = ["order-events"])
    fun handleOrderEvent(
        @Payload record: GenericRecord,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String
    ) {
        val schemaVersion = getSchemaVersion(record)

        when {
            schemaVersion < 2 -> handleV1(record)
            schemaVersion < 3 -> handleV2(record)
            else -> handleV3(record)
        }
    }

    private fun getSchemaVersion(record: GenericRecord): Int {
        return record.schema.getField("currency")?.let { 2 }
            ?: record.schema.getField("metadata")?.let { 3 }
            ?: 1
    }

    private fun handleV1(record: GenericRecord) {
        val orderId = record.get("orderId").toString()
        val totalAmount = record.get("totalAmount") as Double
        // V1 처리 로직 (currency 기본값 사용)
        processOrder(orderId, totalAmount, "KRW")
    }

    private fun handleV2(record: GenericRecord) {
        val orderId = record.get("orderId").toString()
        val totalAmount = record.get("totalAmount") as Double
        val currency = record.get("currency")?.toString() ?: "KRW"
        processOrder(orderId, totalAmount, currency)
    }
}

4. Dead Letter Queue와 스키마 불일치 처리

@Component
class SchemaAwareErrorHandler(
    private val deadLetterProducer: KafkaTemplate<String, ByteArray>
) : CommonErrorHandler {

    override fun handleRecord(
        thrownException: Exception,
        record: ConsumerRecord<*, *>,
        consumer: Consumer<*, *>,
        container: MessageListenerContainer
    ) {
        when (thrownException.cause) {
            is SerializationException -> {
                // 스키마 불일치 - DLQ로 전송
                sendToDeadLetter(record, thrownException)
            }
            else -> {
                // 다른 에러 처리
                throw thrownException
            }
        }
    }

    private fun sendToDeadLetter(
        record: ConsumerRecord<*, *>,
        exception: Exception
    ) {
        val headers = record.headers().toMutableList()
        headers.add(RecordHeader("error-reason", exception.message?.toByteArray()))
        headers.add(RecordHeader("original-topic", record.topic().toByteArray()))

        deadLetterProducer.send(
            ProducerRecord(
                "${record.topic()}-dlq",
                null,
                record.key() as? String,
                record.value() as? ByteArray,
                headers
            )
        )
    }
}

스키마 진화 체크리스트

변경 전

  • 현재 스키마와의 호환성 확인
  • Schema Registry에서 호환성 테스트
  • 모든 Consumer가 새 스키마 처리 가능한지 확인
  • 롤백 계획 수립

변경 중

  • Consumer 먼저 배포
  • 모니터링 강화
  • 점진적 트래픽 전환

변경 후

  • DLQ 모니터링
  • Consumer lag 확인
  • 에러율 모니터링

정리

스키마 진화의 핵심 원칙:

원칙 설명
항상 하위 호환성 유지 새 Consumer가 이전 이벤트를 처리 가능해야 함
필드 번호/이름 재사용 금지 삭제된 필드 번호는 영구적으로 예약
기본값 필수 새 필드는 항상 기본값 포함
점진적 배포 Consumer 먼저, Producer 나중에
Schema Registry 활용 중앙 집중식 스키마 관리

이것으로 Event-Driven Architecture 시리즈를 마칩니다. 이 시리즈에서 다룬 패턴들을 조합하면 확장 가능하고 유지보수 가능한 마이크로서비스를 구축할 수 있습니다.

Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS with Separate Read/Write Models
  4. Part 4: Saga Pattern for Distributed Transactions
  5. Part 5: Event Schema Evolution and Versioning (Current)

The Need for Schema Evolution

Event-driven systems change over time:

  • Adding new fields
  • Removing existing fields
  • Changing field types
  • Renaming fields

These changes should not break existing Consumers.

Types of Compatibility

1. Backward Compatibility

New schema can read old data

Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓

Allowed changes:

  • Adding new fields with default values
  • Removing optional fields

2. Forward Compatibility

Old schema can read new data

Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓

Allowed changes:

  • Removing fields with default values
  • Adding optional fields

3. Full Compatibility

Both directions compatible

Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓

Safest but most restrictive.

Schema Management with Avro

Avro Schema Definition

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "customerId",
      "type": "string"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }
    },
    {
      "name": "totalAmount",
      "type": "double"
    },
    {
      "name": "occurredAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Schema Evolution Example

V1 → V2: Adding new fields (backward compatible)

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "items", "type": {"type": "array", "items": "OrderItem"}},
    {"name": "totalAmount", "type": "double"},
    {"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    {
      "name": "currency",
      "type": "string",
      "default": "KRW"
    },
    {
      "name": "metadata",
      "type": ["null", {"type": "map", "values": "string"}],
      "default": null
    }
  ]
}

Spring Boot + Avro Setup

// build.gradle.kts
plugins {
    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

dependencies {
    implementation("org.apache.avro:avro:1.11.3")
    implementation("io.confluent:kafka-avro-serializer:7.5.0")
}

avro {
    setCreateSetters(false)
    setFieldVisibility("PRIVATE")
}
# application.yml
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      schema.registry.url: http://localhost:8081
      specific.avro.reader: true

Schema Management with Protocol Buffers

Proto File Definition

syntax = "proto3";

package com.example.events.order;

option java_multiple_files = true;
option java_package = "com.example.events.order";

import "google/protobuf/timestamp.proto";

message OrderCreated {
  string order_id = 1;
  string customer_id = 2;
  repeated OrderItem items = 3;
  double total_amount = 4;
  google.protobuf.Timestamp occurred_at = 5;

  // V2 additions
  string currency = 6;  // New field (optional)
  map<string, string> metadata = 7;  // New field (optional)
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double price = 3;
}

Protobuf Evolution Rules

// Safe changes:
// 1. Adding new fields (use unique field numbers)
// 2. Changing optional to repeated (scalar types)
// 3. Changing between compatible types (int32 <-> int64)

// Dangerous changes (don't do):
// 1. Changing field numbers
// 2. Changing field type to incompatible type
// 3. Adding required fields (proto2)

Using Schema Registry

Schema Registry Architecture

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│   Producer   │────────▶│    Kafka     │────────▶│   Consumer   │
└──────┬───────┘         └──────────────┘         └──────┬───────┘
       │                                                  │
       │  Register Schema                    Get Schema   │
       │                                                  │
       ▼                                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Schema Registry                             │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  Subject: order-events-value                             │    │
│  │  ├── Version 1: OrderCreated (v1)                       │    │
│  │  ├── Version 2: OrderCreated (v2) + currency            │    │
│  │  └── Version 3: OrderCreated (v3) + metadata            │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Practical Schema Evolution Strategies

1. Phased Rollout

Day 1: Deploy Consumer V2 (can handle both V1 and V2)
Day 2: Deploy Producer V2 (starts publishing V2 events)
Day 7: After monitoring, remove V1 Consumer

2. Gradual Transition with Feature Flags

@Service
class OrderEventProducer(
    private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
    private val featureFlagService: FeatureFlagService,
    private val schemaV1: Schema,
    private val schemaV2: Schema
) {
    fun publishOrderCreated(order: Order) {
        val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
            createV2Event(order)
        } else {
            createV1Event(order)
        }

        kafkaTemplate.send("order-events", order.id, event)
    }
}

Schema Evolution Checklist

Before Change

  • Check compatibility with current schema
  • Test compatibility in Schema Registry
  • Verify all Consumers can handle new schema
  • Prepare rollback plan

During Change

  • Deploy Consumers first
  • Enhanced monitoring
  • Gradual traffic transition

After Change

  • Monitor DLQ
  • Check Consumer lag
  • Monitor error rates

Summary

Key principles of schema evolution:

Principle Description
Always maintain backward compatibility New Consumer must handle old events
Never reuse field numbers/names Deleted field numbers are permanently reserved
Require default values New fields must always include defaults
Gradual deployment Consumer first, Producer later
Use Schema Registry Centralized schema management

This concludes the Event-Driven Architecture series. By combining the patterns covered in this series, you can build scalable and maintainable microservices.

댓글남기기