기본 콘텐츠로 건너뛰기

[번역] 카프카 컨슈머 소개: 새 아파치 카프카 0.9 컨슈머 클라이언트 시작하기

 카프카 컨슈머 클라이언트 0.9.0 에 대한 글이 있어서 학습차 요약정리 해두기로 했음. 발번역이고 의역과 생략된 내용 있으니,,, ((((( ' ')

원문: http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client

원작자: Jason Gustafson


 애초에 카프카는 스칼라로 만들어진 프로듀서와 컨슈머 클라이언트를 제공했다. 시간이 지나면서 이 API에 많은 제약이 있음을 깨닫게 되었다. 예를 들어, 컨슈머 그룹을 지원하고 Failover 처리하는 ‘high-level’ 컨슈머 API가 있지만 더 복잡한 시나리오를 지원하지 못했다. 그리고 풀 컨트롤을 제공하는 “simple” 컨슈머 클라이언트가 있지만 사용자가 Failover와 에러 처리를 직접해야 했다. 그래서 다양한 사례를 처리하기 위해 클라이언트를 다시 디자인하기 시작했다.

 첫 단계로 0.8.1에 Producer API를 다시 작성하고, 두 번째 단계로 새(new) 컨슈머 API 소개하는 0.9 배포가 최근에 완료되었다. 카프카에서 제공하는 새(new) 그룹 코디네이션 프로토콜 기반으로 새 컨슈머를 개발하면 아래와 같은 이점을 가질 수 있다.

  • 깔끔하게 통합된 API: 새 컨슈머는 예전의 “심플” 하고 “고수준" 컨슈머 클라이언트의 두가지 능력을 결함하고,  자신만의 소비 전략을 만들기 위해 그룹 코디네이션과  저수준의 접근성 두가지를 결합한다.
  • 의존성 감소: 새 컨슈머는 순수 자바로 작성되었다. 스칼라 런타임이나 주키퍼에 의존성이 없어서 프로젝트에 포함 시킬 수 있는 더 가벼운 라이브러리를 만들 수 있다.
  • 향상된 보안: 카프카 0.9에 구현된 보안확장은 새 컨슈머에만 지원된다.
  • 또한, 새 컨슈머는 컨슈머 프로세스 그룹의 Fault-Tolerant를 관리하기 위해 프로토콜 세트(set)를 제공한다. 이전에 이 기능은 주키퍼와 통신하는 무거운 구현었다. 로직이 복잡해 다른 언어에서 전체 기능구현이 어려웠으나, 새 프로토콜의 소개로 현재는 구현이 쉬워졌다. 사실, 이미 C 클라이언트는 이 프로토콜로 (구현을) 옮겼다.

 새 컨슈머는 새로 디자인된 API를 사용하고 새 코디네이션 프로토콜을 사용하지만, 콘셉트는 기본적으로 다르지 않아서 이전(0.9 이전) 컨슈머에 익숙한 사용자가 이해하는데 문제가 없다. 그러나 그룹 관리와 스레딩 모델에 미묘한 디테일이 있어 주의가 필요하다. 이 튜토리얼의 목적은 새 컨슈머의 기본 사용법을 다루고 이 사용법들의 세부적인 내용을 설명하고자 함이다.

유의할 점: 글을 쓰는 시점에 새 컨슈머는 안전성이란 용어에서 여전히 “베타”이고 몇 가지 중요한 버그가 0.9.0 브랜치에서 수정 되었다. 0.9.0 브랜치에서 테스트할 것을 권고하고 여전히 문제가 있으면 메일 리스트나 지라로 리포트를..


Getting Started


 코드를 보기 전에 기본 콘셉트를 살펴보면, 카프카에서 각 토픽은 파티션으로 나누어진다. 그리고 파티션은 로그(log)의 집합이다. 프로듀서는 이 로그들 끝에 쓰고 컨슈머는 자기 페이스에 따라 로그를 읽는다. 카프카는 컨슈머 그룹 간에 파티션을 분산해서 토픽 소비(consumption)를 스케일(scale)한다. 컨슈머 그룹은 공통 그룹 식별자(identifier)를 공유하는 세트이다. 아래 그림은 세 개의 파티션을 가진 단일 토픽과 두 멤버를 가진 컨슈머 그룹을 보여준다. 토픽의 각 파티션은 정확히 그룹 내의 한 멤버에게 할당된다.



 옛날 컨슈머는 그룹 관리를 위해 주키퍼에 의존하는 반면, 새 컨슈머는 카프카 자체로 만든 그룹 코디네이션 프로토콜을 사용한다. 각 그룹의 브로커들 중에 하나가 코디네이터로 선택된다. 코디네이터는 그룹의 상태를 관리할 책임이 있고, 코디네이터의 주된 일은 새 멤버가 오고 갈 때, 그리고 토픽 메타데이터가 변할 때 파티션 할당을 중재 하는것이다. 파티션 재할당 동작은 그룹 재균형(rebalance)으로 알려져 있다.

 그룹이 처음 초기화될 때, 컨슈머는 보통 각 파티션의 가장 처음 또는 가장 마지막 오프셋 둘 중 하나에서 읽기 시작한다. 그런 후 각 파티션 로그에서 메시지가 순차적으로 읽힌다. 컨슈머는 성공적으로 처리된 메시지의 오프셋을 커밋(commit) 하는데, 예를 들어, 아래 그림에서 컨슈머의 위치는 오프셋이 6이고 마지막 커밋된 오프셋은 1이된다.


 파티션이 그룹의 다른 컨슈머에 재할당 될 때, 초기 위치는 마지막에 커밋된 오프셋으로 세팅된다. 위 예제에서 컨슈머가 갑자기 크래시(crash) 나면, 파티션을 인수한 그룹 멤버는 오프셋 1부터 소모를 시작한다. 이 경우, 크래시가 생긴 컨슈머의 위치인 6까지 메시지를 재처리 해야한다.

 또한, 이 다이어 그램은 로그에서 두가지 다른 의미 있는 위치를 보여준다. "Log End Offset"은 로그에 마지막으로 쓰인 오프셋이고, "High Watermark"는 모든 복제 로그에 성공적으로 카피된 최종 메시지의 오프셋이다. 컨슈머의 관점에서 알아야 할것은 단지 "High Watermark"까지만 읽어 올라갈 수 있다는 것이다. 이것은 컨슈머가 복제되지 않은 메시지가 읽히는 것을 방지한다. 복제되지 않은 메시지는 소실될 수 있다.

Configuration and Initialization


 컨슈머를 시작하려면, kafka-client 의존성을 프로젝트에 추가해야한다.

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.9.0.0-cp1</version>
</dependency>

 컨슈머는 다른 카프카 클라이언트와 마찬가지로 프로퍼티를 사용해서 생성된다. 아래 예에서는 컨슈머 그룹을 사용하기 위해 필요한 최소 설정을 제공하고 있다.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

 이전 컨슈머와 프로듀서처럼, 컨슈머가 나머지 클러스터를 찾을 수 있게 초기 브로커 리스트를 설정할 필요가 있지만, 클러스터 내 모든 서버를 필요로 하지 않는다. – 클라이언트는 이 리스트에서 살아있는 전체 브로커 세트를 결정한다. 여기에서는 브로커가 로컬(localhost)에서 동작하는 것으로 가정했고, 컨슈머가 어떻게 메시지 키와 값을 역직렬화 하는지 이야기할 필요가 있다. 마지막으로, 컨슈머 그룹에 가입하려면 그룹 ID를 설정해야 한다. 이 튜토리얼을 진행하면서 더 많은 설정을 소개 할 것이다.


Topic Subscription


소비를 시작하려면, 처음에 어플리케이션이 읽으려고 하는 토픽을 구독해야한다. 아래 예에서는 “foo”와 “bar” 토픽을 구독하고 있다.

consumer.subscribe(Arrays.asList("foo", "bar"));

 구독 후에, 컨슈머는 파티션을 할당하기 위해 나머지 그룹을 조정 할 수 있다. 이 과정은 데이터를 소모하기 시작할 때 모두 자동으로 된다. 뒷부분에서 어떻게 assign API를 사용해서 수동으로 파티션을 할 수 있는지 보여 주겠지만, 자동과 수동 할당을 섞어 사용하는 것은 불가능함을 염두해야 한다.

 구독 방법은 증분(incremental)이 아니다 (치환이 되지 누적되어 쌓이지 않는다): 반드시 소비하려는 전체 토픽 리스트를 포함해야 한다. 언제든 구독했던 토픽 세트를 변경할 수 있다. – 이전에 구독했던 토픽은 subscribe를 호출할 때 새 리스트로 치환된다.


Basic Poll Loop


 컨슈머는 잠재적으로 여러 브로커에 걸쳐 흩어진 여러 토픽의 파티션에서 데이터를 병렬로 패치할 필요가 있어서, 유닉스의 select나 poll과 유사한 API 스타일을 사용한다: 일단 토픽이 등록되면, 모든 future 코디네이션, 재균형, 그리고 데이터 패치가 이벤트 루프에서 호출한 단일 poll 호출을 통해 떨어진다. 이 스타일은 싱글 스레드에서 IO를 처리할 수 있는 간단하고 효과적인 구현이 되게 한다.

 토픽 구독을 시작한 후, 파티션을 할당 받고 데이터 패치를 하려면 이벤트 루프를 실행해야 한다. 복잡하게 들리지만, 필요한 것은 루프로 poll을 호출 하고 컨슈머는 나머지를 처리하게 된다. 각 poll 호출은 파티션에 할당된 메시지 세트를 리턴한다. 예를 들어, 아래는 메세지가 오는 대로 오프셋과 패치된 레코드값을 출력하는 기본 poll 루프다.

try {
 while (running) {
  ConsumerRecords<String, String> records = consumer.poll(1000);
  for (ConsumerRecord<String, String> record : records)
   System.out.println(record.offset() + ": " + record.value());
 }
} finally {
 consumer.close();
}

 poll API는 현재 포지션에 기반해 패치한 레코드를 반환한다. 그룹이 처음 생성될 때, reset 정책에 따라 포지션이 세팅된다 (보통 각 파티션의 최초 또는 마지막의 오프셋임). 한번 컨슈머가 오프셋을 커밋하기 시작하면, 포지션은 나중에 재균형 작업으로 인해 마지막 커밋된 오프셋으로 리셋된다. Poll 할 때 전달된 파라메터는 (컨슈머가 현재 포지션에 있는) 레코드를 기다릴 때 사용하는 최대 블럭 시간이다. 컨슈머는 레코드가 가용하다면 즉각 리턴을 하지만, 가용한 레코드가 없다면 리턴전에 정해진 타임아웃 동안 기다린다.

 컨슈머는 컨슈머 자신의 스레드에서 동작하도록 디자인되었고, 스레드 세이프(thread safe) 하지 않다. 이 예제에서는, 사용되는 플래그는 애플리케이션이 셧다운 되어 이벤트 루프를 빠져나올 때 사용된다. 플래그가 다른 스레드에서 false로 바뀌면, 루프는 poll 리턴을 하자마자 끝날것이고 애플리케이션은 모든 레코드가 반환되면 처리를 마친다.

 컨슈머는 작업이 끝나면 항상 close 돼야 한다. 사용한 소켓을 정리하는 것뿐만 아니라, 컨슈머가 코디네이터에게 그룹에서 떠난다는 것을 알린다.

 이 예제는 컨슈머가 셧다운 할 때 너무 오래 지연되지 않게 상대적으로 짧은 타임아웃을 사용했다. 대안으로, 긴 타임아웃을 사용하고 wakeup API를 사용해 루프를 탈출할 수도 있다.

try {
 while (true) {
  ConsumerRecords<String, String> records 
   = consumer.poll(Long.MAX_VALUE);
  
  for (ConsumerRecord<String, String> record : records)
   System.out.println(
    record.offset() + :  + 
    record.value()
   );
  }
} catch (WakeupException e) {
 // ignore for shutdown
} finally {
 consumer.close();
}

 타임아웃을 Long.MAX_VALUE로 변경했다. 기본적으로 컨슈머는 다음 레코드가 반환 될 때 까지 무기한 블록(block) 됨을 의미한다. 이전 예제같이 플래그를 세팅하는 대신, 스레드 셧다운 트리거가 동작 중인 poll을 인트럽트 하는 consumer.wakeup() 호출해서 WakeupException을 발생시킬 수 있다. 이 wakeup API는 다른 스레드에서 사용하기 안전하다 (스레드 세이프 하다). 주의할 것은 동작 중인 poll이 없다면, 다음 호출에서 exception이 발생한다. 이 예제에서는, 예외가 전파되는 것을 방지하기 위해 catch 하고 있다.

Putting it all Together


 다음 예제는, 컨슈머를 초기화, 토픽 리스트를 구독, 그리고 외부에서 셧다운 까지 poll 루프로 무한 실행하는 간단한 Runnable 태스크다.

public class ConsumerLoop implements Runnable {
 private final KafkaConsumer<String, String> consumer;
 private final List<String> topics;
 private final int id;

 public ConsumerLoop(int id,
                     String groupId,
                     List<String> topics) {
  this.id = id;
  this.topics = topics;
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put(group.id, groupId);
  props.put(key.deserializer, StringDeserializer.class.getName());
  props.put(value.deserializer, StringDeserializer.class.getName());
  this.consumer = new KafkaConsumer<>(props);
 }

 @Override
 public void run() {
  try {
   consumer.subscribe(topics);

   while (true) {
    ConsumerRecords<String, String> records 
     = consumer.poll(Long.MAX_VALUE);
    for (ConsumerRecord<String, String> record : records) {
     Map<String, Object> data = new HashMap<>();
     data.put("partition", record.partition());
     data.put("offset", record.offset());
     data.put("value", record.value());
     System.out.println(this.id + ": " + data);
    }
   }
  } catch (WakeupException e) {
   // ignore for shutdown
  } finally {
   consumer.close();
  }
 }

 public void shutdown() {
  consumer.wakeup();
 }
}

 이 예제를 테스트 하려면, 카프카 브로커0.9.0.0 릴리즈와 소비할 수 있는 문자열 데이터가 있는 토픽이 필요하다. 문자열 데이터를 만드는 가장 쉬운 방법은 kafka-verifiable-producer.sh 스크립트를 사용하면 된다. 로컬 호스트에서 동작하는 싱글 카프카 브로커와 주키퍼를 두고, 배포된 카프카 루트에서 아래를 실행하면 된다.

bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181

bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092

 그런 후, 세 멤버로 컨슈머 그룹을 셋업하는 작은 드라이버를 생성하고, 모두 금방 생성한 같은 토픽을 구독하게 한다.

public static void main(String[] args) { 
 int numConsumers = 3;
 String groupId = "consumer-tutorial-group"
 List<String> topics = Arrays.asList("consumer-tutorial");
 ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

 final List<ConsumerLoop> consumers = new ArrayList<>();
 for (int i = 0; i < numConsumers; i++) {
  ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
  consumers.add(consumer);
  executor.submit(consumer);
 }

 Runtime.getRuntime().addShutdownHook(new Thread() {
   @Override
   public void run() {
    for (ConsumerLoop consumer : consumers) {
     consumer.shutdown();
    } 
    executor.shutdown();
    try {
     executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     e.printStackTrace;
    }
   }
 });
}

 이 예는 executor에 세 개의 runnable 컨슈머를 submit 한다. 각 스레드에는 데이터를 받는 것을 (구분해서) 볼 수 있게 독립된 아이디가 주어진다. 셧다운 훅은 프로세스가 멈출 때 호출되며, wakeup을 사용하는 세 개의 스레드를 멈추고 스레드가 셧다운 될 때까지 기다린다. 실행해 보면, 모든 스레드에서 데이터를 볼 수 있다.

2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}

 아웃풋은 세 개의 파티션에서 소비한 메시지를 보여준다. 각 파티션은 스레드 중 하나에 할당되어 각 파티션 내에서 기대한 대로 증가하는 오프셋을 볼 수 있다. 커멘드 라인이나 IDE에서 Ctrl-C로 프로세스를 셧다운 가능하다.

Consumer Liveness


 컨슈머가 컨슈머 그룹멤버가 될때, 컨슈머는 구독하는 토픽의 파티션에 부분집합(subset)으로 할당된다. 기본적으로 이 파티션에서는 그룹 잠금(lock)이 적용되어 잠금이 유지되는 한, 그룹의 다른 멤버는 읽기를 할 수 없다. 컨슈머가 살아 있을 때가 중복 소비를 피할 수 있는 유일한 방법이다(이것이 정확히 원하는 상황임). 그렇지만 컨슈머가 기계적인 이슈나 애플리케이션의 실패로 죽는다면, 살아있는 멤버에게 다시 파티션이 할당되게 잠금을 풀어야 한다.

 카프카의 그룹 코디네이션 프로토콜은 하트빗(heartbeat) 메커니즘으로 이 문제를 다룬다. 모든 재균형 후에는 현세대의 모든 멤버는 그룹 코디네이터에게 주기적인 하트빗을 보내기 시작한다. 코디네이터가 하트빗을 지속적으로 받는 한, 멤버는 살아 있다고 가정되며, 하트빗을 받을 때 코디네이터는 타이머를 시작하거나 초기화한다. 만약 시간이 경과 되고 더이상 하트빗이 없다면, 코디네이터는 멤버를 죽은 것으로 마킹한다. 그리고 파티션을 재할당하기 위해 그룹에 죽은 멤버외 나머지 멤버에게 다시 조인해야 한다는 신호를 보낸다. 타이머의 지속 기한은 세션 타임아웃으로 알려져 있다. 그리고 클라이언트에 session.timeout.ms로 설정할 수 있다.

props.put("session.timeout.ms", "60000");

 머신이나 애플리케이션이 크래시 되거나, 컨슈머와 코디네이터가 네트워크 파티션 때문에 서로 고립된다면, 세션 타임아웃으로 (=세션 타임아웃이 지나면) 잠금을 풀리게 한다. 그러나, 애플리케이션 실패를 일반적으로 처리 하기엔 문제가 있다. 단순히, 컨슈머가 계속해서 코디네이터에게 하트빗을 보내는 것이 애플리케이션이 살아있음을 의미하는 필수조건은 아니기 때문이다.

 컨슈머의 poll 루프는 이 문제를 처리하기 위해 디자인되었다. 모든 네트워크 IO는 poll을 호출할 때나 다른 블록킹 API가 호출될 때 포어그라운드(foreground)에서 처리된다. 컨슈머는 어떠한 백그라운드 스레드도 사용하지 않는다. 이것은 하트빗은 poll 호출 때만 코디네이트에게 보내 진다는 의미다. 만약 애플리케이션이 poll을 멈춘다면 (코드 처리가 예외를 던지던지, 다운스트림 시스템이 크래시 나든지 등), 보내는 하트빗이 없어지게 되고 세션 타임아웃도 경과되어 그룹은 재균형된다.

 이 방법의 유일한 문제는 컨슈머의 메시지 처리가 세션 타임아웃보다 더 오래 걸릴때, 비논리적인 재균형이 발생할 수 있다는 것이다. 그래서 세션 타임아웃을 충분히 크게 설정해야 한다. 기본값은 30초다. 그러나 분단위로 높게 설정하는 것은 합리적이지 않다. 긴 세션 타임아웃의 불리한 점은 진짜 컨슈머 크래시를 발견해서 코디네이트 하기까지 시간이 오래 걸린다는 것이다.

Delivery Semantics


 컨슈머 그룹이 처음 생성될 때, 초기 오프셋은 auto.offset.reset 설정에 정의된 정책에 따라 세팅된다. 한번 컨슈머가 처리를 하기 시작하면, 애플리케이션은 필요에 따라 오프셋을 규칙적으로 커밋한다. 차후 모든 재균형 후에, 포지션은 그룹 내 파티션에서 마지막으로 커밋된 오프셋으로 세팅된다. 컨슈머가 성공적으로 처리한 메시지 오프셋을 커밋하기 전에 크래시 되면, 다른 컨슈머가 다시 작업을 한다. 오프셋을 좀 더 빈번하게 커밋하면 크래시가 생겨도 중복을 적게 보게 된다.

 이제까지 예제에서는, 자동 커밋 정책이 켜져 있는 걸로 가정했다. enable.auto.commit 세팅이 true(기본 설정)일때, 컨슈머는 auto.commit.interval.ms 값에 따라 자동으로 오프셋 커밋을 주기적으로 발생시킨다. 커밋 주기를 줄여서, 컨슈머가 크래시 이벤트에서 반드시 하게 되는 재처리(re-processing) 양을 제한할 수 있다.

 컨슈머의 커밋 API를 사용 하려면 먼저, 자동 커밋을 꺼야한다. 컨슈머 설정에서 enable.auto.commit을 false로 두면 된다.

props.put("enable.auto.commit", "false");

 커밋 API 자체는 사용이 미미하지만, 중요한 것은 poll 루프에서 어떻게 사용하느냐는 것이다. 커밋을 수동으로 하는 가장 쉬운 방법은 동기 커밋 API 사용이다.

try {
 while (running) {
  ConsumerRecords<String, String> records 
   = consumer.poll(1000);
  for (ConsumerRecord<String, String> record : records)
   System.out.println(record.offset() + ": " + record.value());

  try {
   consumer.commitSync();
  } catch (CommitFailedException e) {
   // application specific failure handling
  }
 }
} finally {
 consumer.close();
}

 파라미터 없이 commitSync API를 사용하면 마지막 poll 호출에서 리턴된 오프셋을 커밋한다. 이 호출은 커밋이 성공하거나 복구 불가능한 에러로 실패할 때 까지 무한 블럭 된다. 신경 써야 할 주된 에러는 세션 타임아웃보다 더 오래 걸리는 메시지 처리이다. 이런 상황이 생기면, 코디네이터는 컨슈머를 그룹에서 강제로 빼버리고, 그 결과로 CommitFailedException 이 생기게 된다. 애플리케이션은 이 에러가 생기면, 마지막으로 성공한 커밋 오프셋 이후에 소비한 메시지로 발생된 변경은 롤백 처리해야 한다.

 보통 오프셋은 메시지가 성공적으로 처리된 후 커밋되게 보장된다. 컨슈머가 커밋을 보내기 전에 크래시가 나면, 메시지는 다시 처리 된다. 마지막에 커밋된 오프셋은 절대 현재 포지션을 앞지르지 않는다는 커밋 정책이 보장 되면, “at least once” 전달이라는 의미가 된다.


Figure 3: The committed offset is ahead of the current position

현재 포지션이 마지막 커밋된 오프셋을 초과하지 않는다는 커밋 정책이 보장 되지 않으면, 위 다이어그램처럼, “at most once” 전달이라는 의미가 된다. 만약 컨슈머가 마지막 커밋된 오프셋까지 가기 전에 크래시되면, 그 사이에 있던 모든 메시지는 소실 되지만, 어떤 메시지도 몇번씩 처리 되지는 않는다. 이 정책을 구현하려면, 단지 커밋과 메시지의 처리 순서를 변경하기만 하면 된다.

try {
 while (running) {
  ConsumerRecords<String, String> records = consumer.poll(1000);

  try {
   consumer.commitSync();
   for (ConsumerRecord<String, String> record : records)
    System.out.println(record.offset() + ": " + record.value());
   } catch (CommitFailedException e) {
    // application specific failure handling
   }
 }
} finally {
 consumer.close();
}

 주목할 것은 컨슈머는 애플리케이션이 리턴한 메시지의 오프셋만 커밋하므로 자동 커밋을 사용하면 “at least once” 처리가 된다는 것이다. 최악의 경우로 재처리해야 한다면, 재처리 메시지 수는 애플리케이션이 커밋 간격(commit interval) 동안에 처리할 수 있는 메시지 수에 종속된다. (auto.commit.interval.ms로 설정함)

 커밋 API를 사용하면 많은 중복 처리를 세밀하게 통제 해야 한다. 극단적인 경우, 아래와 같이 메시지 처리 후 매번 오프셋이 커밋 될 수 있다.

try {
 while (running) {
  ConsumerRecords<String, String> records = consumer.poll(1000);

  try {
   for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.offset() + ": " + record.value());
    consumer.commitSync( //-- # --//
      Collections.singletonMap(record.partition(), 
      new OffsetAndMetadata(record.offset() + 1))
    );
   }
  } catch (CommitFailedException e) {
   // application specific failure handling
  }
 }
} finally {
 consumer.close();
} 

 이 예제에서, 커밋하고자 하는 명확한 오프셋이commitSync 호출에 전달된다. 커밋될 오프셋은 항상 애플리케이션이 읽을 다음 메시지의 오프셋이 되어야 한다. 파라미터 없이commitSync가 호출 될 때는, 컨슈머는 애플리케이션에 리턴된 마지막 오프셋(오프셋 +1 )을 커밋하지만, 실제 진척된 것 보다 초과한 커밋 포지션이 허용되기 때문에 사용할 수 없다.

 분명히 메시지 (처리) 이후에 커밋하는 것은 대부분의 경우 좋은 생각이 아니다. 왜나하면 커밋을 처리하는 스레드가 서버에서 응답을 리턴을 받기 위해 각 커밋 요청을 블록(block) 하므로 처리량이 희생되기 때문이다. 좀 더 합리적인 접근은 N개의 메시지 (처리) 이후 커밋을 하는 것이다. N은 성능에 따라 조절 될 수 있다.

 이 예에서 commitSync의 파라미터는 토픽 파티션에서 OffsetAndMetadata 인스턴스를 매핑한 것이다. 커밋 API는 각 커밋 마다 부가적인 몇몇 메타를 포함할 수 있다. 부가적인 메타 정보는 호스트가 보낸 커밋 시간을 레코딩 하는 데 사용 되거나, 애플리케이션에 필요로 하는 정보를 레코딩 하는 데 사용 될 수 있다.

 받은 메시지마다 매번 커밋하는 대신, 더 합리적인 방법은 각 파티션에서 메시지 처리를 한 만큼 오프셋을 커밋 하는 것이다. ConsumerRecords 컬렉션으로 파티션 세트와 각 파티션의 메세지에 접근 할 수 있다.

try {
 while (running) {
  ConsumerRecords<String, String> records 
   = consumer.poll(Long.MAX_VALUE);

  for (TopicPartition partition : records.partitions()) {
   List<ConsumerRecord<String, String>> partitionRecords 
    = records.records(partition);

   for (ConsumerRecord<String, String> record : partitionRecords)
    System.out.println(
     record.offset() + ": " + record.value()
    );

   long lastoffset 
    = partitionRecords.get(partitionRecords.size() - 1).offset();

   consumer.commitSync(
    Collections.singletonMap(
     partition, 
     new OffsetAndMetadata(lastoffset + 1)
    )
   );
  }
 }
} finally {
  consumer.close();
}

 이 예제는 동기화된 커밋 API에 더 중점을 맞췄지만, 컨슈머는 비동기 API인 commitAsync도 제공하고 있다. 비동기 커밋은 애플리케이션이 커밋을 리턴하기 전에 다음 배치를 처리를 시작할 수 있기 때문에, 일반적으로 높은 처리량을 제공한다. 실패한 커밋을 찾을 수 없다는 게 트레이드 오프. 아래 예는 기본 사용법.

try {
 while (running) {
  ConsumerRecords<String, String> records = consumer.poll(1000);
  for (ConsumerRecord<String, String> record : records)
   System.out.println(record.offset() + ": " + record.value());

  consumer.commitAsync(new OffsetCommitCallback() {
   @Override
   public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
                          Exception exception) {
     if (exception != null) {
      // application specific failure handling
     }
   }
  });
 }
} finally {
 consumer.close();
} 

 주의할것은 commitAsyc에 콜백을 제공한 것인데, 콜벡은 컨슈머가 커밋이 끝나면 호출된다 (성공 여부와 관련 없음). 콜벡이 필요 없다면 commitAsync를 파라미터 없이 호출할 수 있다.


Consumer Group Inspection


 컨슈머 그룹이 동작 중일때, 파티션 할당과 (메시지의) 소모된 진도를 consumer-groups.sh 스크립트로 점검(inspect) 할 수 있다. 이 스크립트는 bin 디렉터리에 있다.


bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial-group --bootstrap-server localhost:9092

아웃풋 결과는 아래:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 2, 6666, 6666, 0, consumer-3_/127.0.0.1

 이 결과는 컨슈머 그룹 내 할당된 모든 파티션을 보여준다. 어떤 컨슈머가 파티션을 소유하고 있는지, 그리고 마지막 커밋된 오프셋이 무엇인지(여기서는 “current offset”). 파티션의 “lag”는 로그의 끝 오프셋과 마지막 커밋된 오프셋의 차이이다. 관리자는 컨슈머 그룹이 프로듀서와 잘 통신하는지 모니터링 할 수 있다.


Using Manual Assignment


 이 튜토리얼 시작에서 언급한 것으로, 새 컨슈머는 컨슈머 그룹이 필요없는 경우는 저수준 접근을 구현하고 있다. 예전 “simple” 컨슈머도 제공하지만, 많은 에러처리가 필요하다. 새 컨슈머는 단지 읽을려는 파티션을 할당하고 폴링을 시작하기만 하면 된다.

아래는 토픽에서 어떻게partitionFor API로 모든 파티션을 할당하는지를 보여준다.

List<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partition : consumer.partitionsFor(topic))
  partitions.add(new TopicPartition(topic, partition.partition()));
consumer.assign(partitions);

 subscribe와 유사하게 assign호출은 읽기를 원하는 파티션 전체 리스트를 전달해야 한다. 한번 파티션이 할당 되면, poll 루프는 전과 동일하게 동작된다.

 한가지 주의할 것은, 그룹 코디네이터를 통하는 모든 오프셋 커밋은 이 커밋이 simple 컨슈머인지 컨슈머 그룹인지 개의치 않는다. 그러므로 오프셋을 커밋 해야 한다면, 여전히 다른 컨슈머와 충돌을 방지하기 위해 group.id를 세팅해야 한다. Simple 컨슈머가 동작중인 컨슈머 그룹과 매치되는 그룹 ID로 오프셋을 커밋 하려고 하면, 코디네이터는 커밋을 거부한다(CommitFailedException이 발생).


Conclusion


 새 컨슈머는 명확한 API, 더 좋아진 보안, 줄어든 종속성을 포함해서 카프카 커뮤니티에 여러 이점을 제공하고 있다. 이 튜토리얼은 poll semantics에 중점을 두고 기본 사용법을 소개했고 delivery semantics를 제어하기 위해 commit API 사용을 소개했다. 더 다뤄야 할 것들이 많지만 시작하기엔 이 정도로 충분하고, 이미 컨슈머를 사용하고 있더라도 새 컨슈머로 시도해 볼 것을 권장한다.

댓글

댓글 쓰기