기본 콘텐츠로 건너뛰기

카프카 프로듀서(Producer) 0.9.0 설정

프로듀서 설정은 config/producer.properties 파일에서..

bootstrap.servers 

호스트/포트 쌍으로 된 리스트로 카프카 초기에 클러스터에 연결할 때 사용한다. 이 리스트는 단순히 처음에 전체 서버를 찾는데 사용되는 호스트 리스트다 - host1:post1,host2:port2.. 형식 전체 클러스터 멤버(클러스터 멤버는 동적으로 바뀐다)를 찾는 초기 커넥션으로 사용하기 때문에, 모든 서버 리스트를 포함할 필요는 없다. (서버가 다운되었을 경우를 대비해 서버를 하나이상 적으면 된다)

key.serializer, value.serializer

기본 org.apache.kafka.common.serialization.ByteArraySerializer 전달받은 byte[]를 그대로 리턴한다.

buffer.memory

 프로듀서가 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수. 레코드가 서버로 전달 될 수 있는 것 보다 더 빨리 보내지면 버퍼는 소진되고 프로듀서는 max.block.ms 동안 레코드를 보내지 않고 블럭 한다. 블럭을 피하려면 block.on.buffer.full(아래참고)을 false로 두면 된다.
 이 세팅은 대략 프로듀서가 사용할 메모리 이지만, 프로듀서가 사용하는 메모리가 버퍼링만 있는게 아니기 때문에 설정에 딱떨어 지지는 않는다. 처리중인 요청 때문에 유지 해야하는 메모리 뿐만 아니라 압축 설정이 켜있으면 압축하는데 부가적인 메모리가 사용된다.

기본값은 약 33MB

acks

  •  acks=0: acks를 0으로 설정하면, 서버에서 수신확인(acknowledment)이 오길 기다리지 않는다. 소켓 버퍼에 레코드가 즉시 추가되고 보내진것으로 간주된다. 이 경우에는 서버가 레코드를 수신했다는 보장이 없고, retries 설정도 먹지 않는다 (클라이언트는 통상 실패를 알지 못하기 때문에 retry를 할 수 없.. ). 각 레코드에 대해 되돌려 받은 오프셋은 항상 -1로 세팅 된다.
  • acks=1: 리더가 레코드를 자신의 로컬 로그에 쓰고 모든 팔로워로 부터 수신확인(acknowledgement)을 기다리지 않고 응답한다. 이 경우 레코드를 수신확인 알림 후에 리더가 즉시 실패 할 수 있지만 팔로워가 레코드를 복제 하기전 이면 레코드는 소실 될 수 있다.
  • akcs=all: 리더는 레코드가 동기로 복제되고 알림이 오기를 기다린다. 최소한 하나의 복제가 살아 있는 한 레코드가 소실되지 않는 것을 보장한다. 
기본값은 1이다

compression.type

기본으로 압출을 하지 않는다.  gzip, snappy, lz4 를 지원한다. 보통 압축된 포멧으로 메세지를 보내는게 좋다.

retries

프로듀서가 레코드를 보낼 때 에러가 나면 재시도 횟수를 말한다. 잠재적으로 레코드 순서가 바뀔 수 있다. 예를들어, 하나의 파티션에 두개의 레코드가 보내졌을때, 첫번째는 실패하고 재시도 되고 두번째는 성공했을 경우 두번째 레코드가 먼저 소모된다.

batch.size

동일 파티션에 여러개의 레코드가 보내질때 지정된 크기 보다 작으면 레코드를 모아서 보내게 된다. 지정된 크기보다 더 많은 레코드를 보내는 시도는 처리 되지 않는다. 브로커에 보내지는 요청은 여러개의 배치가 포함되고, 각 파티션에 나누어 데이터가 보내진다. 배치크기와 throughput 관계는 아래 linger.ms 부분 참고.

max.request.size

요청 보낼 수 있는 최대 바이트수로, 레코드 배치수를 제한하게 된다. 단일 요청에 너무 큰 요청을 보내는 것을 피할 수 있다. 최대로 보낼 수 있는 레코드 크기를 제한 하는데 효과적이지만, 서버도 별도로 레코드 크기를 제한 하는 설정이 있어서 서로 값이 다를 수 있으니 주의 해야 한다.

connections.max.idle.ms

지정시간 후 놀고있는 물리적인 소켓 채널을 닫는다. Selector에 전달되는 connectionMaxIdleMs 시간이 해당 값이다. Selector.java#L94 Selector:poll() 호출 마다 maybeCloseOldestConnection()에서 체크해서 처리한다. Selector.java#L377

linger.ms

Nagle 알고리즘 처럼 레코드를 특정시간 동안 버퍼링해서 묶어 하나의 배치 요청으로 보내는 것을 말한다. 지정된 값 만큼 레코드 전송을 지연을 하게 되지만, batch.size 에 다다르면 즉시 메시지를 보낸다.

latency와 throughput, 그리고 batch.size와 linger.ms 관련해, "프로듀서 성능개선" 글 참고

샘플코드


max.block.ms


참고: https://issues.apache.org/jira/browse/KAFKA-3236

  • KafkaProducer.send(): BufferPool이 가득 차거나 메타데이터를 사용 할 수 없을때. 
  • KafkaProducer.partitionFor(): 메타데이타를 사용할 수 없을 때.
위 두경우 max.block.ms가 초과되면 예외를 던진다.

BufferPool 크기는 batch.size가 기준이 된다. BufferPool.allocate(size, ..) 할때 할당 크기가 batch.size보다 크면 예외를 발생시킨다.

KafkaProducer.java#L261
RecordAccumulator.java#L108
RecordAccumulator.java#L177
BufferPool.java#L67
BufferPool.java#L129

size는 레코드 크기와 오프셋을 고려한 크기와 batch.size 중 큰값이다.

Records.java#L27
RecordAccumulator.java#L175

max.request.size

요청 보낼 수 있는 최대 바이트수로, 레코드 배치수를 제한하게 된다. 단일 요청에 너무 큰 요청을 보내는 것을 피할 수 있다. 최대로 보낼 수 있는 레코드 크기를 제한 하는데 효과적이지만, 서버는 별도로 레코드 크기를 제한 하는 설정이 있어서 서로 다를 수 있으니 주의 해야 한다.

partitioner.class

서브토픽간 메시지 파티셔닝하는 기본 값은 key의 해시값. 
partitioner.class=kafka.producer.DefaultPartitioner

receive.buffer.bytes

데이터를 읽을때 사용하는 TCP 소켓 버퍼 크기

request.timeout.ms

클라이언트에 에러를 되돌려 보내기 전에 request.required.ack 요건을 충족시키려 브로커가 기다릴 시간을 설정. 타임아웃전에 응답이 없으면 프로듀서는 재요청을 보내거나 요청은 실패한다.

producer.type

sync와 async 두가지 값을 설정 할 수 있다. async모드는 프로듀서가 브로커에 백그라운드 스레드로 데이터를 보낸다.  이 스레드는 배치로 요청을 보내 수 있지만, 클라이언트가 실패하면 데이터 유실이 생긴다.

serializer.class

serializer 클래스를 선언하는 설정. 이 serializer는 메시지를 직렬화하고 나중에 검색되기 쉽게 적당한 형식으로 저장한다.  기본 인코더는 byte 배열을 받아 그대로 반환한다.
serializer.class=kafka.serializer.DefaultEncoder

block.on.buffer.full

메모리 버퍼가 소진되면 새로운 레코드를 받지 않고 블럭하거나 에러를 던진다. 기본 false 이고 프로듀서는 BufferExhaustException를 던지지 않지만 max.block.ms 값을 기준으로 요청을 블럭한다. 블럭시간이 지나면 TimeoutException을 던진다. true로 값을 설정하면 max.block.ms는 Long.MAX_VALUE값이 되고, metadata.fetch.timeout.ms 설정도 따르지 않는다. Deprecated 된 설정이기 때문에, max.block.ms 설정으로 사용해야 한다. 

interceptor.classes

인터셉터 클래스리스트를 지정 할 수 있다. 카프카 클러스터에 보내기전 메시지를 프로듀서가 받은 레코드를 인터셉트 할 수 있다.

max.in.flight.requests.per.connection

클라이언트가 블로킹 되기전에 단일 커넥션에 보낼 답을 받지못한(unacknowledged) 최대 요청수. 1보다 큰값이고 실패한 보내기가 있다면, 재시도로 때문에 메시지 재정렬 이슈가 있다.

retry.backoff.ms

리더를 선출하는 데는 시간이 걸린다. 이 시간 동안 프로듀서는 메타데이터를 갱신하지 않는다. 데이터를 보내는 동안 발생한 에러는 메타데이터를 갱신해야 함을 의미한다. 이 속성은 프로듀서가 재시도 전에 기다릴 시간을 지정한다. 기본 100.


(Deprecated, 0.10.0) metadata.broker.list

토픽, 파티션 그리고 복제(replicas) 와 같은 메타데이터를 얻는 데 사용되는 가장 중요한 세팅이다. 이 정보는 데이터를 생성하는 커넥션 셋업에 사용된다. host1:port,host2:port2와 같은 형식이다. 전체 카프카 브로커를 나열할 필요는 없지만 브로커의 서브셋이나 VIP는 브로커의 서브셋을 가리켜야 한다.

프로듀서가 각 토픽의 리더를 결정하기 위해 하나 이상의 브로커를 찾을 수 있다. 어떤 브로커가 토픽(또는 파티션)의 리더인지를 알아내야 할지 걱정할  필요는 없다. 프로듀서는 브로커에 어떻게 접속할지 알고 있고 메타데이터를 요청하고 올바른 브로커에 연결한다. 

(Deprecated, 0.10.0) request.required.acks

프로듀서가 생성한(produced) 메시지를 언제 완료로 할지 결정한다. 메세지를 받았음을 응답(acknowledge)하기 전에 로그에 얼마나 많은 브로커가 커밋을 할지 정할 필요가 있다. 기본값은 0.

  • request.required.acks=-1 : 리더는 응답을 쓰기 전에 모든 동기화된 복제에서 응답이 있을 때 까지 기다린다. -1 값은 분명히 데이터를 유지하는 가장 내구성이 좋은 방법이지만, 가장 느린 방법이다.
  • request.required.acks=0 : 프로듀서는 리더에서 어떤 응답(ack)이 있을지 기다리지 않는다. 보내고 잊어버리는 것을 의미.
  • request.required.acks=1 : 프로듀서는 리더가 메시지를 받을 때 까지 기다리고, 리더는 로컬 로그에 메시지를 쓰고 즉시 응답한다.


참고: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1+-+Remove+support+of+request.required.acks

=> 0.8.3 이상에서는 request.required.acks > 1 인경우 InvalidRequiredAcksException이 발생한다.

댓글