5 분 소요


  • 스트림모듈은 아직까지는 JPA나 module-core의 엔티티를 사용 할 일이 없었다.
  • 스트림 모듈을 세팅하면서 Kafka와 어떻게 SpringBoot가 커넥션을 맺고, Consumer와 Producer가 어떻게 세팅되는지 정리해보려고한다.
  • Kafka는 로컬 카프카도 괜찮고, 서버에 띄워진 Kafka, 혹은 Docker, Confluent등등 Kafka를 지원하는 솔루션이면 모두 사용 할 수 있다.
  • 본문은 Kafka가 모두 준비되어있다는 가정하에 작성하려고한다.

module-stream 만들기

  • 이전시간에 만들었었던 module-stream에서 이어서 작업하려고 한다.
  • Kafka는 module-stream어플리케이션에서만 사용할 것 이기 때문에 module-stream에만 의존성을 추가하면 된다.
  • SpringBoot Application 내에서 Consumer, Producer를 만들자

module-stream 설정하기

build.gradle.kts 설정

  • Kafka는 module-stream어플리케이션에서만 사용할 것 이기 때문에 module-streambuild.gradle.kts에 의존성을 추가 해 준다

    plugins{
    
    }
    
    dependencies{
        implementation("org.springframework.kafka:spring-kafka")
    }
    

Springboot Application Class 작성

  • Springboot에서 사용 할 것 이기 때문에 @SpringBootApplication를 적용할 클래스를 만들어 준다
  • 마찬가지로 패키지이름을 com.wool 로 작성하고, 하위에 ModuleStreamApplication.kt 를 생성하여 아래의 소스코드를 적어준다

    package com.wool
    
    import org.springframework.boot.autoconfigure.SpringBootApplication
    import org.springframework.boot.runApplication
    
    @SpringBootApplication
    class ModuleStreamApplication
    
    fun main(args: Array<String>) {
        runApplication<ModuleStreamApplication>(*args)
    }
    

application.yml 작성

  • Springboot에서 사용 할 것 이기 때문에 application.yml을 작성해 준다
  • resources 패키지 하위에 application.yml를 작성한 후 카프카 설정값들을 적어준다
  • 우선은 Produce, Consume테스트만 진행 할 것이기 때문에 모듈작업과는 조금은 달라보일 수 있다

    spring:
      jackson:
        serialization:
          fail-on-empty-beans: false
    
      kafka:
        properties:
          session:
            timeout:
              ms: 45000
          sasl:
            mechanism: PLAIN
            jaas:
              config: {}
          security:
            protocol: SASL_SSL
          bootstrap:
            servers: {}
    

Kafka 연결하기

  • order와 관련된 데이터를 구독하는 Consumer Class를 만들어 보자
  • application.yml에 설정값을 잘 넣었다면 Spring이 자동으로 카프카와 연동시켜준다

OrderConsumer Class 작성

  • com.wool패키지 하위에 consumer패키지를 만들어 OrderConsumer 클래스를 생성 해 준다

    package com.wool.consumer
    
    import org.springframework.kafka.annotation.KafkaListener
    import org.springframework.stereotype.Service
    
    
    @Service
    class OrderConsumer {
    
        @KafkaListener(topics = ["order"], groupId = "order-consumer")
        fun consume(message: String){
            println("###########################")
            println(message)
            println("###########################")
        }
    }
    
    • @KafkaListener를 사용하여 order라는 토픽을 구독하고, order-consumer라는 그룹아이디를 가진 Consumer를 만들었다
    • @Service어노테이션을 붙여 SpringBoot에서 Bean으로 등록하도록 한다

OrderProducer Class 작성

  • com.wool패키지 하위에 producer패키지를 만들어 OrderProducer 클래스를 생성 해 준다

    package com.wool.producer
    
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.wool.controller.dto.OrderProduceDto
    import org.springframework.kafka.core.KafkaTemplate
    import org.springframework.stereotype.Service
    
    @Service
    class OrderProducer(
    private val kafkaTemplate: KafkaTemplate<String, String>
    ) {
    
        final val KAFKA_ORDER_TOPIC: String = "order"
    
        fun sendOrderMessage(message: OrderProduceDto){
            // OrderProduceDto를 json serialize
            val obm:ObjectMapper = ObjectMapper()
            val jsomMessage = obm.writeValueAsString(message)
    
            kafkaTemplate.send(KAFKA_ORDER_TOPIC, jsomMessage)
        }
    
    }
    
    • @Service어노테이션을 붙여 SpringBoot에서 Bean으로 등록하도록 한다
    • KafkaTemplate을 주입받아 produce메소드를 만들었다
    • kafkaTemplate.send(KAFKA_ORDER_TOPIC, jsomMessage)를 통해 order라는 토픽에 메시지를 보낼 수 있다

ProducerController 작성

  • Stream Application 이 실행 될 때 마다 주기적으로 message를 Produce 해 주면 좋지만, 주문을 받는 일을 한 후 나온 데이터 이기 때문에 Producer 를 호출 해 주는 Controller를 만들어주자
  • Controller 를 하나 만들고, api가 호출 될 때 OrderProducer를 호출하도록 작성하자
  • com.wool.controller 패키지에 dto와 controller를 만들어주자

OrderProduceDto 작성

package com.wool.controller.dto

data class OrderProduceDto(
    val orderStoreName: String,
    val orderStoreAddress: String,
    val orderItem: String,
    val orderPrice: String,
    val customerId: Int,
)
  • produce 시에 “앱” 혹은 “웹” 에서 사용자가 이미 로그인 되어있고, 자신의 id 값을 보내 줄 수 있다는 가정 하에 만들었다

ProducerController 작성

package com.wool.controller

import com.wool.controller.dto.OrderProduceDto
import com.wool.producer.OrderProducer
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController


@RestController
class ProducerController(
    private val orderProducer: OrderProducer
) {

    @PostMapping("/order-produce")
    fun produceOrder(@RequestBody orderDto: OrderProduceDto) {
        orderProducer.sendOrderMessage(orderDto)
    }

}

마무리

  • Springboot Application 을 실행하면, Consumer가 kafka를 바라보고, order 토픽을 구독한 채로 있다
  • http://localhost:8080/order-producePOST요청을 보내면, OrderProducerorder토픽에 메시지를 보내고, Consumer가 메시지를 받아서 출력한다
  • multimodule과 크게 상관없는 내용들이었지만, 다음 포스트에서 멀티모듈의 이점을 살리기 위해 api 주문요청 -> 주문 produce -> 주문 consume -> 주문 저장 -> api조회 의 흐름으로 가보자
  • 굳이 안태워도 될 것 같은 흐름이지만, 멀티모듈을 사용한다…. 생각하고 만들어보려한다
  • The stream module hasn’t had a need to use JPA or module-core entities yet.
  • While setting up the stream module, I’m going to document how SpringBoot connects with Kafka and how Consumer and Producer are configured.
  • For Kafka, you can use local Kafka, server-deployed Kafka, Docker, Confluent, or any solution that supports Kafka.
  • This post is written assuming Kafka is already prepared.

Creating module-stream

  • We’ll continue working from the module-stream we created last time.
  • Since Kafka will only be used in the module-stream application, we only need to add dependencies to module-stream.
  • Let’s create Consumer and Producer within the SpringBoot Application

Configuring module-stream

build.gradle.kts Configuration

  • Since Kafka will only be used in the module-stream application, add dependencies to module-stream’s build.gradle.kts

    plugins{
    
    }
    
    dependencies{
        implementation("org.springframework.kafka:spring-kafka")
    }
    

Writing Springboot Application Class

  • Since we’ll use it in Springboot, create a class to apply @SpringBootApplication
  • Similarly, name the package com.wool, and create ModuleStreamApplication.kt underneath with the following source code

    package com.wool
    
    import org.springframework.boot.autoconfigure.SpringBootApplication
    import org.springframework.boot.runApplication
    
    @SpringBootApplication
    class ModuleStreamApplication
    
    fun main(args: Array<String>) {
        runApplication<ModuleStreamApplication>(*args)
    }
    

Writing application.yml

  • Since we’ll use it in Springboot, write application.yml
  • Write application.yml under the resources package and add Kafka configuration values
  • For now, since we’re only doing Produce and Consume testing, it may look a bit different from module work

    spring:
      jackson:
        serialization:
          fail-on-empty-beans: false
    
      kafka:
        properties:
          session:
            timeout:
              ms: 45000
          sasl:
            mechanism: PLAIN
            jaas:
              config: {}
          security:
            protocol: SASL_SSL
          bootstrap:
            servers: {}
    

Connecting to Kafka

  • Let’s create a Consumer Class that subscribes to order-related data
  • If you’ve properly added configuration values in application.yml, Spring will automatically connect to Kafka

Writing OrderConsumer Class

  • Create a consumer package under the com.wool package and create the OrderConsumer class

    package com.wool.consumer
    
    import org.springframework.kafka.annotation.KafkaListener
    import org.springframework.stereotype.Service
    
    
    @Service
    class OrderConsumer {
    
        @KafkaListener(topics = ["order"], groupId = "order-consumer")
        fun consume(message: String){
            println("###########################")
            println(message)
            println("###########################")
        }
    }
    
    • Using @KafkaListener, we created a Consumer that subscribes to the order topic with group ID order-consumer
    • Added the @Service annotation to register it as a Bean in SpringBoot

Writing OrderProducer Class

  • Create a producer package under the com.wool package and create the OrderProducer class

    package com.wool.producer
    
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.wool.controller.dto.OrderProduceDto
    import org.springframework.kafka.core.KafkaTemplate
    import org.springframework.stereotype.Service
    
    @Service
    class OrderProducer(
    private val kafkaTemplate: KafkaTemplate<String, String>
    ) {
    
        final val KAFKA_ORDER_TOPIC: String = "order"
    
        fun sendOrderMessage(message: OrderProduceDto){
            // JSON serialize OrderProduceDto
            val obm:ObjectMapper = ObjectMapper()
            val jsomMessage = obm.writeValueAsString(message)
    
            kafkaTemplate.send(KAFKA_ORDER_TOPIC, jsomMessage)
        }
    
    }
    
    • Added the @Service annotation to register it as a Bean in SpringBoot
    • Injected KafkaTemplate and created a produce method
    • Through kafkaTemplate.send(KAFKA_ORDER_TOPIC, jsomMessage), we can send messages to the order topic

Writing ProducerController

  • It would be nice to produce messages periodically whenever the Stream Application runs, but since the data comes after receiving orders, let’s create a Controller that calls the Producer
  • Let’s create a Controller and write it to call OrderProducer when the API is called
  • Create dto and controller in the com.wool.controller package

Writing OrderProduceDto

package com.wool.controller.dto

data class OrderProduceDto(
    val orderStoreName: String,
    val orderStoreAddress: String,
    val orderItem: String,
    val orderPrice: String,
    val customerId: Int,
)
  • This was created assuming that during produce, the user is already logged in from the “app” or “web” and can send their id value

Writing ProducerController

package com.wool.controller

import com.wool.controller.dto.OrderProduceDto
import com.wool.producer.OrderProducer
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController


@RestController
class ProducerController(
    private val orderProducer: OrderProducer
) {

    @PostMapping("/order-produce")
    fun produceOrder(@RequestBody orderDto: OrderProduceDto) {
        orderProducer.sendOrderMessage(orderDto)
    }

}

Wrapping Up

  • When you run the Springboot Application, the Consumer watches Kafka and remains subscribed to the order topic
  • When you send a POST request to http://localhost:8080/order-produce, OrderProducer sends a message to the order topic, and the Consumer receives and prints the message
  • Although this content wasn’t directly related to multi-module, in the next post let’s follow the flow of api order request -> order produce -> order consume -> save order -> api query to take advantage of multi-module benefits
  • It’s a flow that may not seem necessary, but let’s try building it with multi-module in mind

댓글남기기