회사에서 프로젝트를 하는 도중 카프카의 특정 토픽이 비어있는지 확인하는 메서드를 찾아봤는데, 팀원분께서 작성한 코드를 활용하니 특정 토픽이 비어있는지 확인할 수 있었다.
AdminClient 클래스를 이용하면 작동중인 Consumer나 Producer의 작업에 영향을 미치지 않고 해당 정보를 얻을 수 있는데, 이에대해 코드를 좀 더 다듬어 포스팅 한다.
package com.example;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class TopicChecker
{
private final static Logger logger = LoggerFactory.getLogger(TopicChecker.class);
public boolean isTopicEmpty(String topic, String groupId) throws ExecutionException, InterruptedException
{
boolean isTopicEmpty = true;
// AdminClient Configuration setting
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
AdminClient adminClient = null;
try
{
adminClient = AdminClient.create(configs);
// consumer group의 offset 리스트를 가져오기
ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupId);
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> mapKafkaFuture
= result.partitionsToOffsetAndMetadata();
// consumer group의 파티션들에 대해 루프를 돌며 offset정보를 확인.
for (TopicPartition partition : mapKafkaFuture.get().keySet())
{
String topicName = partition.topic();
if (topicName.equals(topic)) // 특정 토픽에 대해서만 offset, endOffset 비교
{
long offset = mapKafkaFuture.get().get(partition).offset();
long endOffset = getEndOffset(adminClient, partition);
if (offset != endOffset)
{
isTopicEmpty = false;
break;
}
}
}
}
finally
{
adminClient.close();
}
return isTopicEmpty;
}
// 특정 파티션의 endOffset을 가져오는 메서드
private long getEndOffset(AdminClient adminClient, TopicPartition partition)
{
long endOffset = 0;
try
{
HashMap<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<>();
topicPartitionOffsets.put(partition, OffsetSpec.latest());
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(topicPartitionOffsets);
endOffset = listOffsetsResult.partitionResult(partition).get().offset();
}
catch (InterruptedException e)
{
e.printStackTrace();
} catch (ExecutionException e)
{
e.printStackTrace();
}
return endOffset;
}
public static void main (String[] args)
{
TopicChecker checker = new TopicChecker();
try
{
boolean isTopicEmpty = checker.isTopicEmpty("test", "angel");
logger.info("{}", isTopicEmpty);
} catch (ExecutionException e)
{
e.printStackTrace();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
'IT > kafka' 카테고리의 다른 글
Kafka 버전확인 명령어 (0) | 2021.05.15 |
---|---|
Apache Kafka(카프카) 주요 용어 (0) | 2021.05.09 |
카프카 컨슈머 주요 옵션 (0) | 2020.10.09 |
카프카 프로듀서 주요 옵션 (0) | 2020.10.09 |
[Kafka에러] java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/JsonNode (0) | 2020.09.26 |