Apache Kafka Consumer 동작 방식

Consumer

  • producer가 전송한 데이터는 kafka broker에 적재되며, consumer가 하는 역할은 broker로부터 data를 가져와 필요한 처리를 수행한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SimpleConsumer {

private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);

private final static String TOPIC_NAME = "test";
// Consumer Group
private final static String GROUP_ID = "test-group";

public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ,KafkaConst.SERVER_ADDR);
configs.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
logger.info("record :{}" , records);
for (ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
}
}
}
}
  • Consumer Group 을 통해 Consumer의 목적을 구분한다. 동일 기능을 수행하는 app인 경우 하나의 Group으로 묶어서 관리하며, Consumer Group을 기준으로 Consumer Offset을 관리한다. 따라서 subscribe method를 사용해 특정 토픽을 구독하는 경우에 Consumer Group을 선언해야 한다.
  • Consumer Group을 선언하지 않으면 어떤 그룹에도 속하지 않는 Consumer로 동작하게 된다.
  • Consumer는 poll method를 호출하여 데이터를 가져와 처리한다. 이떄 Duration 타입의 인자값을 받는데, 이 값은 Consumer 버퍼에 데이터를 가져오는 타임아웃 간격을 뜻한다.

파티션 할당 Consumer

  • Consumer 운영시 위와 같이 subscribe 형태로 특정 토픽을 구독하는 형태로 사용하는것 외에도 파티션까지 명시적으로 선언할 수 있다.
    1
    consumer.assign(TOPIC_NAME,PARITITON_NUM);
    이때는 Consumer가 특정 Topic의 특정 파티션에 직접적으로 할당됨으로 rebalancing 과정이 안일어나는데 rebalancing에 대한 설명은 아래에 정리하였다.

Consumer 운영 방식

  1. 1개 이상의 Consumer로 이루어진 Consumer Group을 운영한다. 이떄 Consumer들은 Topic의 1개 이상의 파티션들에 할당되어 데이터를 가져갈 수 있다.

이떄 1개의 파티션은 최대 1개의 Consumer에 할당가능한 반면 1개의 Consumer는 여러 개의 Partition에 할당될 수 있다. 따라서 Consumer Group의 Consumer개수는 가져가고자 하는 Topic의 파티션 개수보다 같거나 작아야 한다.

물론 Consumer개수가 파티션의 개수보다 많게 설정할 수는 있겠지만, 파티션은 1개의 Consumer까지만 가질 수 있기 때문에 놀고 있는 Consumer는 Thread만 차지하고 아무 데이터도 처리하지 않음으로 불필요하다.

Consumer Group은 다른 Consumer Group과 격리되는 특징을 가지고 있다. 따라서 Kafka Producer가 보낸 데이터를 각기 다른 역할을 하는 Consumer Group끼리 영향을 받지 않게 처리할 수 있다는 장점을 가진다.

Rebalancing

  • Rebalancing : Consumer Group으로 이루어진 Consumer 들 중 일부 Consumer에 장애가 발생하면 , 장애가 발생한 Consumer에 할당된 partition은 장애가 발생하지 않은 Consumer에 소유권이 넘어가는 것

  • Consumer가 추가되거나 장애발생으로 제외되는 상황에서 발생한다.

  • Rebalancing 발생 시 Consumer들이 Topic의 데이터를 읽을 수 없기 떄문에 빈번히 Rebalancing이 일어나는 상황은 피해야 한다.

  • Broker중 한 대가 Group Coordinator(그룹 조정자)로서, Rebalancing을 발동시키는 역할을 한다.

  • Rebalancing 직전에 데이터를 커밋하지 않아서, consumer가 처리했던 데이터의 오프셋이 기록되지 않고, 또 다시 데이터를 중복처리하는 경우가 생길수도 있다. org.apache.kafka.clients.consumer.ConsumerRebalanceListener 는 rebalancing 직후 , 직전에 호출되는 method를 가지고 있다.

1
2
3
4
5
6
7
8
9
10
public class RebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// partition 할당 완료시 호출되는 메소드
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// partition 할당이전에 호출되는 메소드로 offset commit 하는 로직이 주로 들어간다.
}
}

Commit

  • Consumer는 Broker로부터 Data를 어디까지 가져갔는지 Commit을 통해 기록한다.

  • 특정 Topic의 Partition을 어떤 Consumer Group 이 몇 번쨰까지 가져갔는지 Broker 내부에서 사용되는 내부 토픽에 기록된다.

  • offset commit은 Consumer Application에서 명시적/비명시적으로 수행가능한데, default는 poll method가 수행될때 일정간격마다 offset을 commit하도록 설정되어 있다. 이를 비명시 오프셋 커밋이라고 부른다.
1
2
enable.auto.commit=true
auto.commit.interval.ms=설정된시간값 // 설정시간값이후에 그 시점까지 읽은 레코드의 오프셋을 커밋한다.
  • 비명시 오프셋 커밋의 장점은 poll method 실행시 auto.commit.interval.ms 설정값 이후면 오프셋을 자동으로 커밋해주어, 코드상 수정이 필요없지만 반대로 단점은 poll method 실행 이후에 rebalancing이나 consumer 장애발생시에 오프셋이 커밋되지 않아, 데이터 중복이나 유실이 일어날 수 있는 가능성이 있다.

  • 명시적으로 오프셋 커밋을 수행하려면 commitSync method를 통해 poll method를 통해 반환된 record의 가장 마지막 오프셋을 기준으로 커밋을 수행하면 된다. commitSync method는 동기적으로 broker에게 응답을 기다리지만 이를 비동기적으로 수행하고 싶다면 commitAsync method를 실행하면 된다.

동기 오프셋 커밋

1
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  • Broker로 Commit을 요청한 이후에 Commit이 완료될떄까지 기다린다. 따라서 비동기 오프셋 커밋보다 데이터 처리량은 떨어진다.

가장 최근에 받아온 레코드의 오프셋을 커밋하는 경우

1
2
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
consumer.commitSync(); // 동기 오프셋커밋 (가장 마지막 레코드의 오프셋을 커밋함)

개별 레코드 단위로 커밋하는 경우로, topic,partition,offset등의 정보를 담은 map을 파라미터로 넘겨준다. 이때 offset은 넘겨준값부터 레코드를 넘겨주기 떄문에 + 1 한 값을 넣는다.

1
2
3
4
5
6
7
8
9
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();

for (ConsumerRecord<String, String> record : records) {
currentOffset.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1,null));
consumer.commitSync(currentOffset);
}

비동기 오프셋 커밋

  • Broker로 Commit을 요청한 이후에 응답을 기다리지 않고, 데이터를 처리한다.
1
consumer.commitAsync();
  • consumer.commitAsync method는 콜백 인터페이스를 제공하며 , 비동기요청이 완료되었을때 수행할 행동을 지정할 수 있다.
1
2
3
4
5
6
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
// commitAsync 응답을 받아서 처리하는 callback method
}
});

Fetcher instance

1
consumer.poll(Durtaion.ofSeconds(1));

consumer는 poll method를 통해 record를 반환받지만, 실제로 이 method가 실행될떄 Broker cluser로부터 데이터를 가져오는 것이 아니라, Consumer Application 실행시점에 내부에서 미리 Fetcher Instance가 생성되어 poll method 호출전에 record를 미리 내부 Queue로 가져온다.

따라서 poll method는 Fetcher instace의 queue에 있는 record를 반환받는 것이다.

Read more

Apache Kafka Producer 동작 방식

Kafka Producer

  • Producer Application은 Kafka Broker에 전송할 데이터를 선언하고, Broker의 Topic내 특정 Partition에 Data를 전송한다.

  • Producer는 Leader Partition을 가지는 Broker와 직접 통신하며, Follower Partition으로 데이터 복제는 이 Leader Partition으로부터 데이터가 복제된다.

  • Kafka Producer 객체가 send() method를 호출하면 , Partitioner가 Topic의 어떤 Partition으로 데이터를 전송할지 결정한다. Partitioner를 별도로 설정하지 않으면 DefaultPartitioner가 동작한다.

  • Partitioner에 의해 Topic의 어떤 Partition으로 보낼지가 결정되면 Accumulator에 데이터를 Buffer에 쌓아놓고 배치 전송한다.

Partitioner 종류

UniformStickyPartitioner , RoundRobinPartitioner가 있는데 둘 다 KEY가 존재하는 경우에는 KEY의 해시값을 통해 partition을 매칭시킨다. KEY가 존재하지 않을떄 동작이 다르다.

  • UniformStickyPartitioner : Accumulator에서 데이터가 배치로 모두 묶일떄까지 대기하였다가, 모두 동일한 파티션에 전송
  • RoundRobinPartitioner : 데이터가 들어오는 대로 Partition을 Round-Robin 방식으로 순회하며 전송한다.

성능적인 측면에서는 UniformStickyPartitioner가 낮은 리소스 사용률,높은 처리량을 가진다.

Kafka Producer Application 예시

먼저 kafka-client dependency를 추가한다. 여기 예제에서는 빌드 및 의존성 관리 도구로서 Gradle을 사용하였다.

1
2
3
4
5
// build.gradle 
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30' // for logging
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SimpleProducerApp {

private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);

// Broker 의 토픽이름
private final static String TOPIC_NAME = "test";

// Broker Ip 주소와 Port 번호
private final static String SERVER = "BROKER_PUBLIC_IP_ADDR:9092";


public static void main(String[] args) {

// Kafka Producer 객체를 생성하기 위한 필수 옵션
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,SERVER);
// Message 키와 값을 직렬화하기 위한 직렬화 클래스
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Producer 객체 생성
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(config);

String testMessage = "testMessage";
// Broker로 보낼 데이터 생성 . 키를 별도로 지정하지 않으면 null이 들어가며, Key와 Value는 당연히 직렬화 클래스와 타입이 동일해야 한다.
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,testMessage);
// 배치 전송 (즉각적인 전송이 아니라, 배치단위로 묶어서 전송한다. )
producer.send(record);
logger.info("record : {}",record);
// producer 내부 버퍼의 레코드 배치를 Broker로 전송한다.
producer.flush();
producer.close();
}
}

실제로 로그에 찍힌 내용을 보면 정상적으로 Broker에 Record가 송신된것을 확인할 수 있다.

추가로 send method의 return 값은 Future 객체로 원한다면 동기적으로 record를 보낸 데이터를 가져올 수 있다.

1
2
3
4
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,testKey,testMessage);
Future<RecordMetadata> result = producer.send(record);
RecordMetadata recordMetadata = result.get();
logger.info("recordMetadata : {}",recordMetadata); // recordMetadata : test-0@4 0번 파티션의 4번 오프셋에 저장됨

동기로 데이터를 확인할 경우, Producer 서버는 데이터 응답 전까지 대기하는 단점이 있다. 이를 커버하기 위해서 CallBack 인터페이스도 제공된다.

1
2
3
4
5
6
7
8
9
10
11
12
public class ProducerCallBack implements Callback {

private static final Logger logger = LoggerFactory.getLogger(ProducerCallBack.class);
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null){
logger.error(exception.getMessage());
return;
}
logger.info(metadata.toString());
}
}

CallBack 인터페이스 구현체를 하기와 같이 등록해서 비동기로 결과를 받아 처리할 수 있다. 다만 비동기로 받아올 경우에는 당연히 데이터 순서는 보장되지 않는다, 따라서 데이터 순서가 중요한 경우에는 동기로 처리해야 한다.

1
producer.send(record,new ProducerCallBack()); // INFO ProducerCallBack - test-0@5
Read more

Apache Kafka

Kafka 의 역할

  • 각각의 application끼리 연결하여 데이터 처리하는 방식이 아니라 중앙에서 처리할 수 있도록 데이터 스트림을 관리해주는 프레임워크

  • application끼리 직접적으로 데이터를 처리하는 방식이 아니라, 카프카가 중앙에 배치됨으로써, 소스 application과 타켓 application 간의 의존도를 약화시킨다. 어떤 타켓 application으로 데이터를 보내든 kafka를 통해 송신한다.

  • 데이터를 보내는 application을 Producer , 데이터를 받는 application을 Consumer 라고 한다.

  • Kafka는 대개 3대 이상의 서버 (Kafka가 운영중인 서버를 broker라고도 부른다.)에서 분산 운영해서 데이터를 중복 저장함으로 고가용성을 제공한다.

Kafka의 장점

  • Producer가 Broker로 데이터를 송신할때, Consumer가 Broker로부터 데이터를 수신할때, 묶어서 전송한다. 즉 네트워크 통신 횟수를 최소화 한다.

  • 동일 데이터를 Kafka Broker에 여러개를 분산 저장하고, 병렬 처리도 가능하다.

  • 들어오는 데이터에 따라 Scale-In 또는 Scale-Out이 가능하다.

  • 영속성 : 데이터를 메모리에 저장하지 않고, Broker내 File System에 저장한다. 따라서 FailOver되더라도 데이터가 소실되지 않는다.

  • 고가용성 : 3개 이상의 서버로 운영되는 Kafka cluster를 통해서 일부 서버에 장얘가 생기더라도 무중단 서비스 제공이 가능하다.

Read more