Spring Cloud Stream Kafka Binder 정리

 spring cloud stream 3.0 을 기준으로 programming model에 변경이 있었다.

  • 3.0 이전 : Imperative(선언적) -> 어노테이션 방식 ex) @StreamListener, @Input, @Output
  • 3.0 이후 : Functional style

그러니 functional 형태로 된 예제를 spring cloud stream kafka binder 공식 홈페이지의 것을 참조하여 살펴 볼 것이다. 우선 functional consumer를 만드는 순서는 다음과 같다.

  1. java.util.function.Consumer를 반환하는 bean 생성
  2. spring.cloud.stream.functionl.definition 설정
  3. binding name의 property 설정

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}
위처럼 bean 생성하고
1
spring.cloud.stream.functionl.definition=process사
사용할 bean을 명시 후
1
spring.cloud.stream.bindings.process-in-0.destination=my_topic
binding name을 이용해 destination(=topic)을 설정해 주면 끝이다.

위 설정에서 process-in-0가 binding name인데 binder가 어떤 규칙에 의해 자동으로 생성되는 녀석이다. 규칙은 아래와 같다.
<bean name>-<in | out>-<index>
  • bean name는 process
  • consumer 임으로 in (producer는 out)
  • index는 파라미터 순서를 의미하고 0부터 시작 한다
  • 즉, process의 첫번째 파라미터의 값은 my_topic으로부터 온 값이란 의미가 된다

index 이해를 돕기위해 아래 예제를 보면,  BiConsumer 임으로 input이 두개가 됨으로 앞에서부터 KStream<String, Long>은 process-in-0가 될것이고 그 뒤 KTable은 process-in-1이 될 것이다.
1
2
3
4
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}
binding name의 다른 설정들은 공식 문서를 참고하여 필요한 것들을 찾아 보도록...

multiple output도 아래처럼 가능하며 array 형태가 되고 binding name의 index가 array의 index가 된다. process-out-0, process-out-1, process-out-2 등등.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> input
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(5000))
            .count(Materialized.as("WordCounts-branch"))
            .toStream()
            .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                    new Date(key.window().start()), new Date(key.window().end()))))
            .branch(isEnglish, isFrench, isSpanish);
}


댓글

이 블로그의 인기 게시물

[Protocol] WIEGAND 통신

Orange for Oracle에서 한글 깨짐 해결책

[URL] 대소문자를 구분하나?