Spring Cloud Stream Kafka Binder 정리
spring cloud stream 3.0 을 기준으로 programming model에 변경이 있었다.
- 3.0 이전 : Imperative(선언적) -> 어노테이션 방식 ex) @StreamListener, @Input, @Output
- 3.0 이후 : Functional style
그러니 functional 형태로 된 예제를 spring cloud stream kafka binder 공식 홈페이지의 것을 참조하여 살펴 볼 것이다. 우선 functional consumer를 만드는 순서는 다음과 같다.
- java.util.function.Consumer를 반환하는 bean 생성
- spring.cloud.stream.functionl.definition 설정
- binding name의 property 설정
1 2 3 4 5 6 7 8 9 10 11 12 | @SpringBootApplication public class SimpleConsumerApplication { @Bean public java.util.function.Consumer<KStream<Object, String>> process() { return input -> input.foreach((key, value) -> { System.out.println("Key: " + key + " Value: " + value); }); } } |
1 | spring.cloud.stream.functionl.definition=process사 |
사용할 bean을 명시 후
1 | spring.cloud.stream.bindings.process-in-0.destination=my_topic
|
binding name을 이용해 destination(=topic)을 설정해 주면 끝이다.
위 설정에서 process-in-0가 binding name인데 binder가 어떤 규칙에 의해 자동으로 생성되는 녀석이다. 규칙은 아래와 같다.
<bean name>-<in | out>-<index>
- bean name는 process
- consumer 임으로 in (producer는 out)
- index는 파라미터 순서를 의미하고 0부터 시작 한다
- 즉, process의 첫번째 파라미터의 값은 my_topic으로부터 온 값이란 의미가 된다
index 이해를 돕기위해 아래 예제를 보면, BiConsumer 임으로 input이 두개가 됨으로 앞에서부터 KStream<String, Long>은 process-in-0가 될것이고 그 뒤 KTable은 process-in-1이 될 것이다.
1 2 3 4 | @Bean public BiConsumer<KStream<String, Long>, KTable<String, String>> process() { return (userClicksStream, userRegionsTable) -> {} } |
binding name의 다른 설정들은 공식 문서를 참고하여 필요한 것들을 찾아 보도록...
multiple output도 아래처럼 가능하며 array 형태가 되고 binding name의 index가 array의 index가 된다. process-out-0, process-out-1, process-out-2 등등.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | @Bean public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() { Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english"); Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french"); Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish"); return input -> input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, value) -> value) .windowedBy(TimeWindows.of(5000)) .count(Materialized.as("WordCounts-branch")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))) .branch(isEnglish, isFrench, isSpanish); } |
댓글
댓글 쓰기