느린 것을 걱정하지 말고, 멈춰서는 것을 걱정하라

회사에서 프로젝트를 하는 도중 카프카의 특정 토픽이 비어있는지 확인하는 메서드를 찾아봤는데, 팀원분께서 작성한 코드를 활용하니 특정 토픽이 비어있는지 확인할 수 있었다. 

 

AdminClient 클래스를 이용하면 작동중인 Consumer나 Producer의 작업에 영향을 미치지 않고 해당 정보를 얻을 수 있는데, 이에대해 코드를 좀 더 다듬어 포스팅 한다. 

package com.example;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TopicChecker
{
    private final static Logger logger = LoggerFactory.getLogger(TopicChecker.class);

    public boolean isTopicEmpty(String topic, String groupId) throws ExecutionException, InterruptedException
    {
        boolean isTopicEmpty = true;

        // AdminClient Configuration setting
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
        AdminClient adminClient = null;
        try
        {
            adminClient = AdminClient.create(configs);

            // consumer group의 offset 리스트를 가져오기
            ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupId);
            KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> mapKafkaFuture
                    = result.partitionsToOffsetAndMetadata();

            // consumer group의 파티션들에 대해 루프를 돌며 offset정보를 확인.
            for (TopicPartition partition : mapKafkaFuture.get().keySet())
            {


                String topicName = partition.topic();
                if (topicName.equals(topic)) // 특정 토픽에 대해서만 offset, endOffset 비교
                {
                    long offset = mapKafkaFuture.get().get(partition).offset();
                    long endOffset = getEndOffset(adminClient, partition);

                    if (offset != endOffset)
                    {
                        isTopicEmpty = false;
                        break;
                    }
                }
            }
        }
        finally
        {
            adminClient.close();
        }

        return isTopicEmpty;
    }


    // 특정 파티션의 endOffset을 가져오는 메서드
    private long getEndOffset(AdminClient adminClient, TopicPartition partition)
    {
        long endOffset = 0;
        try
        {
            HashMap<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<>();
            topicPartitionOffsets.put(partition, OffsetSpec.latest());
            ListOffsetsResult listOffsetsResult = adminClient.listOffsets(topicPartitionOffsets);
            endOffset = listOffsetsResult.partitionResult(partition).get().offset();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        } catch (ExecutionException e)
        {
            e.printStackTrace();
        }

        return endOffset;
    }

    public static void main (String[] args)
    {
        TopicChecker checker = new TopicChecker();
        try
        {
            boolean isTopicEmpty = checker.isTopicEmpty("test", "angel");
            logger.info("{}", isTopicEmpty);
        } catch (ExecutionException e)
        {
            e.printStackTrace();
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
profile

느린 것을 걱정하지 말고, 멈춰서는 것을 걱정하라

@주현태

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!