Apache Kafka Producer 동작 방식

Kafka Producer

  • Producer Application은 Kafka Broker에 전송할 데이터를 선언하고, Broker의 Topic내 특정 Partition에 Data를 전송한다.

  • Producer는 Leader Partition을 가지는 Broker와 직접 통신하며, Follower Partition으로 데이터 복제는 이 Leader Partition으로부터 데이터가 복제된다.

  • Kafka Producer 객체가 send() method를 호출하면 , Partitioner가 Topic의 어떤 Partition으로 데이터를 전송할지 결정한다. Partitioner를 별도로 설정하지 않으면 DefaultPartitioner가 동작한다.

  • Partitioner에 의해 Topic의 어떤 Partition으로 보낼지가 결정되면 Accumulator에 데이터를 Buffer에 쌓아놓고 배치 전송한다.

Partitioner 종류

UniformStickyPartitioner , RoundRobinPartitioner가 있는데 둘 다 KEY가 존재하는 경우에는 KEY의 해시값을 통해 partition을 매칭시킨다. KEY가 존재하지 않을떄 동작이 다르다.

  • UniformStickyPartitioner : Accumulator에서 데이터가 배치로 모두 묶일떄까지 대기하였다가, 모두 동일한 파티션에 전송
  • RoundRobinPartitioner : 데이터가 들어오는 대로 Partition을 Round-Robin 방식으로 순회하며 전송한다.

성능적인 측면에서는 UniformStickyPartitioner가 낮은 리소스 사용률,높은 처리량을 가진다.

Kafka Producer Application 예시

먼저 kafka-client dependency를 추가한다. 여기 예제에서는 빌드 및 의존성 관리 도구로서 Gradle을 사용하였다.

1
2
3
4
5
// build.gradle 
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30' // for logging
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SimpleProducerApp {

private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);

// Broker 의 토픽이름
private final static String TOPIC_NAME = "test";

// Broker Ip 주소와 Port 번호
private final static String SERVER = "BROKER_PUBLIC_IP_ADDR:9092";


public static void main(String[] args) {

// Kafka Producer 객체를 생성하기 위한 필수 옵션
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,SERVER);
// Message 키와 값을 직렬화하기 위한 직렬화 클래스
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Producer 객체 생성
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(config);

String testMessage = "testMessage";
// Broker로 보낼 데이터 생성 . 키를 별도로 지정하지 않으면 null이 들어가며, Key와 Value는 당연히 직렬화 클래스와 타입이 동일해야 한다.
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,testMessage);
// 배치 전송 (즉각적인 전송이 아니라, 배치단위로 묶어서 전송한다. )
producer.send(record);
logger.info("record : {}",record);
// producer 내부 버퍼의 레코드 배치를 Broker로 전송한다.
producer.flush();
producer.close();
}
}

실제로 로그에 찍힌 내용을 보면 정상적으로 Broker에 Record가 송신된것을 확인할 수 있다.

추가로 send method의 return 값은 Future 객체로 원한다면 동기적으로 record를 보낸 데이터를 가져올 수 있다.

1
2
3
4
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,testKey,testMessage);
Future<RecordMetadata> result = producer.send(record);
RecordMetadata recordMetadata = result.get();
logger.info("recordMetadata : {}",recordMetadata); // recordMetadata : test-0@4 0번 파티션의 4번 오프셋에 저장됨

동기로 데이터를 확인할 경우, Producer 서버는 데이터 응답 전까지 대기하는 단점이 있다. 이를 커버하기 위해서 CallBack 인터페이스도 제공된다.

1
2
3
4
5
6
7
8
9
10
11
12
public class ProducerCallBack implements Callback {

private static final Logger logger = LoggerFactory.getLogger(ProducerCallBack.class);
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null){
logger.error(exception.getMessage());
return;
}
logger.info(metadata.toString());
}
}

CallBack 인터페이스 구현체를 하기와 같이 등록해서 비동기로 결과를 받아 처리할 수 있다. 다만 비동기로 받아올 경우에는 당연히 데이터 순서는 보장되지 않는다, 따라서 데이터 순서가 중요한 경우에는 동기로 처리해야 한다.

1
producer.send(record,new ProducerCallBack()); // INFO ProducerCallBack - test-0@5

Producer 필수 옵션과 선택옵션

필수 옵션

  1. Bootstrap.servers : BrokerIp주소:포트번호 (2개이상 입력가능하다.)
  2. key.serializer : record 키 직렬화시 사용하는 클래스
  3. value.serializer : record 값 직렬화시 사용하는 클래스

선택옵션

  1. acks : producer가 전송한 데이터가 broker에 정상적으로 도착했는지 성공 여부를 확인하는 데 사용되는 옵션
  • 0:무조건 성공으로 판단
  • 1:리더 파티션에 저장되야만 성공으로 판단 (기본 값)
  • -1:팔로워 파티션까지 모두 저장되면 성공으로 판단
  1. buffer.memory
  2. retries
  3. batch.size : 배치로 전송할 레코드 최대 용량
  4. linger.ms : 배치 전송하기 전까지 기다리는 최소 시간
  5. partitioner.class : partitioner class 지정 (default는 org.apache.clients.producer.internals.DefaultPartitioner)

Custom Partitioner 구현

특정 데이터를 특정 파티션으로 보내고 싶을떄는 Partitioner를 구현해서 등록하면 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null){
throw new InvalidRecordException("메시지키 필수");
}
// 특정 키값일떄는 0번 파티션에만 적재
if (((String) key).equals("ZERO_PARTITION_KEY")){
return 0;
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 해시값에 의해 파티션 매치
return Utils.toPositive(Utils.murmur2(keyBytes)) % partitions.size();
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}
1
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

Comments