Apache Kafka Producer 동작 방식
Kafka Producer
Producer Application은 Kafka Broker에 전송할 데이터를 선언하고, Broker의 Topic내 특정 Partition에 Data를 전송한다.
Producer는 Leader Partition을 가지는 Broker와 직접 통신하며, Follower Partition으로 데이터 복제는 이 Leader Partition으로부터 데이터가 복제된다.
Kafka Producer 객체가 send() method를 호출하면 , Partitioner가 Topic의 어떤 Partition으로 데이터를 전송할지 결정한다. Partitioner를 별도로 설정하지 않으면 DefaultPartitioner가 동작한다.
- Partitioner에 의해 Topic의 어떤 Partition으로 보낼지가 결정되면 Accumulator에 데이터를 Buffer에 쌓아놓고 배치 전송한다.
Partitioner 종류
UniformStickyPartitioner , RoundRobinPartitioner가 있는데 둘 다 KEY가 존재하는 경우에는 KEY의 해시값을 통해 partition을 매칭시킨다. KEY가 존재하지 않을떄 동작이 다르다.
- UniformStickyPartitioner : Accumulator에서 데이터가 배치로 모두 묶일떄까지 대기하였다가, 모두 동일한 파티션에 전송
- RoundRobinPartitioner : 데이터가 들어오는 대로 Partition을 Round-Robin 방식으로 순회하며 전송한다.
성능적인 측면에서는 UniformStickyPartitioner가 낮은 리소스 사용률,높은 처리량을 가진다.
Kafka Producer Application 예시
먼저 kafka-client dependency를 추가한다. 여기 예제에서는 빌드 및 의존성 관리 도구로서 Gradle을 사용하였다.
1 | // build.gradle |
1 | public class SimpleProducerApp { |
실제로 로그에 찍힌 내용을 보면 정상적으로 Broker에 Record가 송신된것을 확인할 수 있다.
추가로 send method의 return 값은 Future 객체로 원한다면 동기적으로 record를 보낸 데이터를 가져올 수 있다.
1 | ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,testKey,testMessage); |
동기로 데이터를 확인할 경우, Producer 서버는 데이터 응답 전까지 대기하는 단점이 있다. 이를 커버하기 위해서 CallBack 인터페이스도 제공된다.
1 | public class ProducerCallBack implements Callback { |
CallBack 인터페이스 구현체를 하기와 같이 등록해서 비동기로 결과를 받아 처리할 수 있다. 다만 비동기로 받아올 경우에는 당연히 데이터 순서는 보장되지 않는다, 따라서 데이터 순서가 중요한 경우에는 동기로 처리해야 한다.
1 | producer.send(record,new ProducerCallBack()); // INFO ProducerCallBack - test-0@5 |
Producer 필수 옵션과 선택옵션
필수 옵션
- Bootstrap.servers : BrokerIp주소:포트번호 (2개이상 입력가능하다.)
- key.serializer : record 키 직렬화시 사용하는 클래스
- value.serializer : record 값 직렬화시 사용하는 클래스
선택옵션
- acks : producer가 전송한 데이터가 broker에 정상적으로 도착했는지 성공 여부를 확인하는 데 사용되는 옵션
- 0:무조건 성공으로 판단
- 1:리더 파티션에 저장되야만 성공으로 판단 (기본 값)
- -1:팔로워 파티션까지 모두 저장되면 성공으로 판단
- buffer.memory
- retries
- batch.size : 배치로 전송할 레코드 최대 용량
- linger.ms : 배치 전송하기 전까지 기다리는 최소 시간
- partitioner.class : partitioner class 지정 (default는 org.apache.clients.producer.internals.DefaultPartitioner)
Custom Partitioner 구현
특정 데이터를 특정 파티션으로 보내고 싶을떄는 Partitioner를 구현해서 등록하면 된다.
1 | public class CustomPartitioner implements Partitioner { |
1 | config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); |