티스토리 뷰

Kafka Streams (window, suppress)

Contents Of Tables

  1. Stateless Transformation
  2. Stateful Transformation
  3. Windowing
  4. Window Final Results

Confluent Streams API 가이드: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html

Stateless Transformation

아래 Java코드를 보자. 아래 코드는 String을 consuming해서, 각 단어가 몇번 나왔는지 counting하는 코드이다.

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));

KTable<String, Long> wordCounts = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> value)
    .count();
wordCounts.toStream().to("streams-wordcount-output", Produced.with(stringSerde, longSerde));

그런데, 아래 결과를 보면, 뭔가 이상하다. kafka가 total 3번 나왔으므로, kafka 3으로만 출력하지 않고, kafka 1, kafka 2도 같이 출력 되고 있다. 그런데, 왜 이렇게 이상하게 출력할까? Stateless transformation이기 때문이다. 즉 상태(State)를 저장도 안하고 보지 않기 때문이다. 그럼 이제 Stateful transformation을 해볼까?

$ producer --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
join kafka summit

$ consumer --topic streams-wordcount-output
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

Stateful Transformation

stateful trasformation이 뭘까? 간단히 설명하자면 Streams가 작동할때 상태값을 저장한다는 것이다. 즉 n번째 상태값이 n+1번째 결과값에 영향을 미친다는 것이다 그럼 위의 word count앱에 statefull 하게 만들어 보자.

// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

위와 같이 설정에 한줄만 넣어주면, Statefull transformation이 된다. 그럼 결과값을 보자. kafka가 3으로만 표시된다.

$ consumer --topic streams-wordcount-output
all 1
streams 2
lead 1
to 1
kafka 3
hello 1
summit 1
join 1

Windowing

statefull transformation이 다 좋은데 말이다. 이 statefull은 계속 쭈~~욱 상태를 유지한다. 근데, 난 상태를 1시간만 유지하고 싶다면 어떻게 할까? 예를 들어 에러 갯수를 count하는데, 1시간마다 에러갯수를 counting하고 싶으면 어떻게 할까? 이럴때 쓰는 것이 Windowing이다. windowing은 몇분, 몇시간마다 상태를 초기화 할지 설정하는 방법이다. 아래처럼 windowedBy(1분) 을 쓰면, 1분후에 초기화 된다.

KTable<Windowed<String>, Long> wordCounts = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> value)
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ZERO))
    .count();

final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
wordCounts.toStream().to("streams-wordcount-output", Produced.with(windowedSerde, longSerde));

결과를 보면... 1분안에는 합쳐서 counting되지만, 1분후에는 0으로 초기화 된다. 여기서 중요한 것은 grace(Duration.ZERO) 처럼 0으로 셋팅을 안하면, 응답이 안온다. 1818 grace의 디폴트값이 1시간정도 되서, TEST시 응답이 안온다. 이것땜에 시간 존나많이 날렸다

$ producer --topic streams-plaintext-input "all streams lead to kafka" 
$ producer --topic streams-plaintext-input "all streams lead to kafka" 
...
$ consumer --topic streams-wordcount-output
all 2
streams 2
lead 2
to 2
kafka 2
...
...1분후...
$ producer --topic streams-plaintext-input "all streams lead to kafka" 
$ consumer --topic streams-wordcount-output
all 1
streams 1
lead 1
to 1
kafka 1

Window Final Results

1시간 안에 100개의 record가 발생하는데, 이 record들 comsume되자 마자 produce되지 않고, 1시간후에 Final Results produce하고 싶을때가 있다. 이럴때 쓰는 API가 suppress() 이다.

KTable<Windowed<String>, Long> wordCounts = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> value)
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ZERO))
    .count()
    .suppress(Suppressed.with(Serdes.String, Serdes.Long));

final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
wordCounts.toStream().to("streams-wordcount-output", Produced.with(windowedSerde, longSerde));

아래 결과에서 보는것 처럼, 1분후에야 결과가 나온다.

$ producer --topic streams-plaintext-input "all streams lead to kafka" 
$ producer --topic streams-plaintext-input "all streams lead to kafka" 
...
...1분후
$ consumer --topic streams-wordcount-output
all 2
streams 2
lead 2
to 2
kafka 2

'Kafka & Elasticsearch' 카테고리의 다른 글

Text and Keyword 타입  (0) 2020.04.29
Pandas  (0) 2020.03.05
Elasticsearch scaling down  (0) 2019.08.04
Kafka Fail-over (cluster)  (0) 2019.01.07
토픽 삭제 (Topic delete)  (0) 2019.01.06
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함