6 분 소요


Series Introduction

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS with Separate Read/Write Models
  4. Part 4: Saga Pattern for Distributed Transactions
  5. Part 5: Event Schema Evolution and Versioning (Current)

The Need for Schema Evolution

Event-driven systems change over time:

  • Adding new fields
  • Removing existing fields
  • Changing field types
  • Renaming fields

These changes should not break existing Consumers.

Types of Compatibility

1. Backward Compatibility

New schema can read old data

Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓

Allowed changes:

  • Adding new fields with default values
  • Removing optional fields

2. Forward Compatibility

Old schema can read new data

Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓

Allowed changes:

  • Removing fields with default values
  • Adding optional fields

3. Full Compatibility

Both directions compatible

Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓

Safest but most restrictive.

Schema Management with Avro

Avro Schema Definition

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "customerId",
      "type": "string"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }
    },
    {
      "name": "totalAmount",
      "type": "double"
    },
    {
      "name": "occurredAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Schema Evolution Example

V1 to V2: Adding new fields (backward compatible)

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events.order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "items", "type": {"type": "array", "items": "OrderItem"}},
    {"name": "totalAmount", "type": "double"},
    {"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    {
      "name": "currency",
      "type": "string",
      "default": "KRW"
    },
    {
      "name": "metadata",
      "type": ["null", {"type": "map", "values": "string"}],
      "default": null
    }
  ]
}

Spring Boot + Avro Setup

// build.gradle.kts
plugins {
    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

dependencies {
    implementation("org.apache.avro:avro:1.11.3")
    implementation("io.confluent:kafka-avro-serializer:7.5.0")
}

avro {
    setCreateSetters(false)
    setFieldVisibility("PRIVATE")
}
# application.yml
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      schema.registry.url: http://localhost:8081
      specific.avro.reader: true

Avro Producer

@Service
class OrderEventProducer(
    private val kafkaTemplate: KafkaTemplate<String, OrderCreated>
) {
    fun publishOrderCreated(order: Order) {
        val event = OrderCreated.newBuilder()
            .setOrderId(order.id)
            .setCustomerId(order.customerId)
            .setItems(order.items.map { item ->
                OrderItem.newBuilder()
                    .setProductId(item.productId)
                    .setQuantity(item.quantity)
                    .setPrice(item.price.toDouble())
                    .build()
            })
            .setTotalAmount(order.totalAmount.toDouble())
            .setOccurredAt(Instant.now().toEpochMilli())
            .setCurrency("KRW")  // v2 field
            .setMetadata(null)   // v2 field
            .build()

        kafkaTemplate.send("order-events", order.id, event)
    }
}

Avro Consumer (V1 Consumer handling V2 messages)

@Component
class OrderEventConsumer {

    @KafkaListener(topics = ["order-events"])
    fun handleOrderCreated(event: OrderCreated) {
        // V1 Consumer ignores currency and metadata fields
        // Avro handles this automatically
        processOrder(
            orderId = event.getOrderId(),
            customerId = event.getCustomerId(),
            items = event.getItems(),
            totalAmount = event.getTotalAmount()
        )
    }
}

Schema Management with Protocol Buffers

Proto File Definition

syntax = "proto3";

package com.example.events.order;

option java_multiple_files = true;
option java_package = "com.example.events.order";

import "google/protobuf/timestamp.proto";

message OrderCreated {
  string order_id = 1;
  string customer_id = 2;
  repeated OrderItem items = 3;
  double total_amount = 4;
  google.protobuf.Timestamp occurred_at = 5;

  // V2 additions
  string currency = 6;  // New field (optional)
  map<string, string> metadata = 7;  // New field (optional)
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double price = 3;
}

Protobuf Evolution Rules

// Safe changes:
// 1. Adding new fields (use unique field numbers)
message OrderCreatedV2 {
  string order_id = 1;
  string customer_id = 2;
  repeated OrderItem items = 3;
  double total_amount = 4;
  google.protobuf.Timestamp occurred_at = 5;
  string currency = 6;          // New field
  string shipping_method = 7;   // New field
}

// 2. Changing optional to repeated (scalar types)
// 3. Changing between compatible types (int32 <-> int64)

// Dangerous changes (don't do):
// 1. Changing field numbers
// 2. Changing field type to incompatible type
// 3. Adding required fields (proto2)

Spring Boot + Protobuf

// build.gradle.kts
plugins {
    id("com.google.protobuf") version "0.9.4"
}

dependencies {
    implementation("com.google.protobuf:protobuf-java:3.25.1")
    implementation("io.confluent:kafka-protobuf-serializer:7.5.0")
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.25.1"
    }
}

Using Schema Registry

Schema Registry Architecture

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│   Producer   │────────▶│    Kafka     │────────▶│   Consumer   │
└──────┬───────┘         └──────────────┘         └──────┬───────┘
       │                                                  │
       │  Register Schema                    Get Schema   │
       │                                                  │
       ▼                                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Schema Registry                             │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  Subject: order-events-value                             │    │
│  │  ├── Version 1: OrderCreated (v1)                       │    │
│  │  ├── Version 2: OrderCreated (v2) + currency            │    │
│  │  └── Version 3: OrderCreated (v3) + metadata            │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Docker Compose Setup

version: '3.8'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    hostname: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Using Schema Registry API

@Configuration
class SchemaRegistryConfig {

    @Bean
    fun schemaRegistryClient(): SchemaRegistryClient {
        return CachedSchemaRegistryClient(
            "http://localhost:8081",
            100  // max schemas to cache
        )
    }
}

@Service
class SchemaService(
    private val schemaRegistryClient: SchemaRegistryClient
) {
    fun registerSchema(subject: String, schema: Schema): Int {
        return schemaRegistryClient.register(subject, schema)
    }

    fun getLatestSchema(subject: String): Schema {
        val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
        return Schema.Parser().parse(metadata.schema)
    }

    fun checkCompatibility(subject: String, schema: Schema): Boolean {
        return schemaRegistryClient.testCompatibility(subject, schema)
    }

    fun getSchemaVersions(subject: String): List<Int> {
        return schemaRegistryClient.getAllVersions(subject)
    }
}

Compatibility Configuration

# Subject-level compatibility configuration
curl -X PUT http://localhost:8081/config/order-events-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Available compatibility levels:
# - BACKWARD (default)
# - BACKWARD_TRANSITIVE
# - FORWARD
# - FORWARD_TRANSITIVE
# - FULL
# - FULL_TRANSITIVE
# - NONE

Practical Schema Evolution Strategies

1. Phased Rollout

Day 1: Deploy Consumer V2 (can handle both V1 and V2)
Day 2: Deploy Producer V2 (starts publishing V2 events)
Day 7: After monitoring, remove V1 Consumer

2. Gradual Transition with Feature Flags

@Service
class OrderEventProducer(
    private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
    private val featureFlagService: FeatureFlagService,
    private val schemaV1: Schema,
    private val schemaV2: Schema
) {
    fun publishOrderCreated(order: Order) {
        val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
            createV2Event(order)
        } else {
            createV1Event(order)
        }

        kafkaTemplate.send("order-events", order.id, event)
    }

    private fun createV1Event(order: Order): GenericRecord {
        return GenericRecordBuilder(schemaV1)
            .set("orderId", order.id)
            .set("customerId", order.customerId)
            .set("totalAmount", order.totalAmount.toDouble())
            .build()
    }

    private fun createV2Event(order: Order): GenericRecord {
        return GenericRecordBuilder(schemaV2)
            .set("orderId", order.id)
            .set("customerId", order.customerId)
            .set("totalAmount", order.totalAmount.toDouble())
            .set("currency", order.currency)
            .set("metadata", order.metadata)
            .build()
    }
}

3. Multi-Version Consumer

@Component
class MultiVersionOrderConsumer {

    @KafkaListener(topics = ["order-events"])
    fun handleOrderEvent(
        @Payload record: GenericRecord,
        @Header(KafkaHeaders.RECEIVED_KEY) key: String
    ) {
        val schemaVersion = getSchemaVersion(record)

        when {
            schemaVersion < 2 -> handleV1(record)
            schemaVersion < 3 -> handleV2(record)
            else -> handleV3(record)
        }
    }

    private fun getSchemaVersion(record: GenericRecord): Int {
        return record.schema.getField("currency")?.let { 2 }
            ?: record.schema.getField("metadata")?.let { 3 }
            ?: 1
    }

    private fun handleV1(record: GenericRecord) {
        val orderId = record.get("orderId").toString()
        val totalAmount = record.get("totalAmount") as Double
        // V1 processing logic (use default currency)
        processOrder(orderId, totalAmount, "KRW")
    }

    private fun handleV2(record: GenericRecord) {
        val orderId = record.get("orderId").toString()
        val totalAmount = record.get("totalAmount") as Double
        val currency = record.get("currency")?.toString() ?: "KRW"
        processOrder(orderId, totalAmount, currency)
    }
}

4. Dead Letter Queue and Schema Mismatch Handling

@Component
class SchemaAwareErrorHandler(
    private val deadLetterProducer: KafkaTemplate<String, ByteArray>
) : CommonErrorHandler {

    override fun handleRecord(
        thrownException: Exception,
        record: ConsumerRecord<*, *>,
        consumer: Consumer<*, *>,
        container: MessageListenerContainer
    ) {
        when (thrownException.cause) {
            is SerializationException -> {
                // Schema mismatch - send to DLQ
                sendToDeadLetter(record, thrownException)
            }
            else -> {
                // Handle other errors
                throw thrownException
            }
        }
    }

    private fun sendToDeadLetter(
        record: ConsumerRecord<*, *>,
        exception: Exception
    ) {
        val headers = record.headers().toMutableList()
        headers.add(RecordHeader("error-reason", exception.message?.toByteArray()))
        headers.add(RecordHeader("original-topic", record.topic().toByteArray()))

        deadLetterProducer.send(
            ProducerRecord(
                "${record.topic()}-dlq",
                null,
                record.key() as? String,
                record.value() as? ByteArray,
                headers
            )
        )
    }
}

Schema Evolution Checklist

Before Change

  • Check compatibility with current schema
  • Test compatibility in Schema Registry
  • Verify all Consumers can handle new schema
  • Prepare rollback plan

During Change

  • Deploy Consumers first
  • Enhanced monitoring
  • Gradual traffic transition

After Change

  • Monitor DLQ
  • Check Consumer lag
  • Monitor error rates

Summary

Key principles of schema evolution:

Principle Description
Always maintain backward compatibility New Consumer must handle old events
Never reuse field numbers/names Deleted field numbers are permanently reserved
Require default values New fields must always include defaults
Gradual deployment Consumer first, Producer later
Use Schema Registry Centralized schema management

This concludes the Event-Driven Architecture series. By combining the patterns covered in this series, you can build scalable and maintainable microservices.

댓글남기기