본문 바로가기

Tech/Kafka

[Kafka] 카프카 컨슈머 그룹 삭제하기

회사에서 다른 파트의 동료로 부터 “Kafka 컨슈머 그룹을 삭제하고 싶어요.”라는 요청이 왔었다.

 

카프카 컨슈머 그룹 삭제?

그거 그냥 해당 그룹에 속한 컨슈머 커넥션이 계속 없으면 알아서 없어지는거 아닌가?라고 알고있었지만,

확실하게 답해주기 위해서 정보를 찾아봤다.

찾아본 결과는 이렇다.

 

1. server.properties에 offsets.retention.minutes 속성을 통해 오프셋 초기화 주기를 설정한다. default는 7분이라고 한다.

즉, 컨슈머 그룹에 해당하는 컨슈머가 일정 기간동안 컨슈밍을 하지 않으면 해당 컨슈머 그룹의 오프셋이 0으로 초기화 된다는 것이다.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days

 

2. kafka-consumer-groups.sh 관리 스크립트를 통해 직접적으로 컨슈머 그룹을 삭제하는 방법이다.

https://support.huaweicloud.com/intl/en-us/kafka_faq/kafka-faq-200426034.html

 

위 두가지 방식이 컨슈머 그룹을 삭제하는 행위에 해당된다.

추가적으로, CMAK을 사용하는 중에, 컨슈머 그룹을 삭제해도 CMAK에서는 삭제한 컨슈머 그룹이 계속해서 보여진다.

구글링을 해보니, 해당 이슈로 많은 글이 있었고, CMAK을 재시작하면 사라진다고 한다.

 

찾아본 내용은 여기까지고,

실제로 간단하게 구축해서 2번의 경우를 테스트 해보자.

 

먼저 로컬에서 카프카를 띄우자

아래 docker-compose 출처: https://akageun.github.io/2020/05/01/docker-compose-kafka-cluster-manager.html

version: '3.6'

services:
    zookeeper:
        container_name: zookeeper
        image: wurstmeister/zookeeper:3.4.6
        volumes:
            - "./zookeeper/data:/data"
            - "./zookeeper/logs:/datalog"
        ports:
            - "2181:2181"

    kafka1:
        container_name: kafka1
        image: wurstmeister/kafka:2.12-2.3.0
        restart: on-failure
        ports:
            - "9095:9092"
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            JMX_PORT: 9093
            KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${EXPOSED_HOSTNAME} -Dcom.sun.management.jmxremote.rmi.port=9393
            KAFKA_ADVERTISED_HOST_NAME: ${EXPOSED_HOSTNAME}
            KAFKA_ADVERTISED_PORT: 9092
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${EXPOSED_HOSTNAME}:9095
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        depends_on:
            - zookeeper
    kafka2:
        container_name: kafka2
        image: wurstmeister/kafka:2.12-2.3.0
        restart: on-failure
        ports:
            - "9096:9092"
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock
        environment:
            KAFKA_BROKER_ID: 2
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            JMX_PORT: 9093
            KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${EXPOSED_HOSTNAME} -Dcom.sun.management.jmxremote.rmi.port=9393
            KAFKA_ADVERTISED_HOST_NAME: ${EXPOSED_HOSTNAME}
            KAFKA_ADVERTISED_PORT: 9092
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${EXPOSED_HOSTNAME}:9096
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        depends_on:
            - zookeeper
    kafka3:
        container_name: kafka3
        image: wurstmeister/kafka:2.12-2.3.0
        restart: on-failure
        ports:
            - "9097:9092"
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock
        environment:
            KAFKA_BROKER_ID: 3
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            JMX_PORT: 9093
            KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${EXPOSED_HOSTNAME} -Dcom.sun.management.jmxremote.rmi.port=9393
            KAFKA_ADVERTISED_HOST_NAME: ${EXPOSED_HOSTNAME}
            KAFKA_ADVERTISED_PORT: 9092
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${EXPOSED_HOSTNAME}:9097
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        depends_on:
            - zookeeper

    # <https://github.com/hleb-albau/kafka-manager-docker>
    kafka-manager:
        container_name: kafka-manager
        image: hlebalbau/kafka-manager:2.0.0.2
        restart: on-failure
        depends_on:
            - kafka1
            - kafka2
            - kafka3
            - zookeeper
        environment:
            ZK_HOSTS: zookeeper:2181
            APPLICATION_SECRET: "random-secret"
            KM_ARGS: -Djava.net.preferIPv4Stack=true
        ports:
            - "9000:9000"

 

docker-compose를 실행 시키고, CMAK(127.0.0.1:9000)에 접속하여 테스트 용도의 Cluster, Topic을 생성하자.

[클러스터 생성]

 

[토픽 생성]

 

 

카프카를 띄우고 토픽까지 생성했으니,

이제 test_topic 토픽에 컨슈머를 붙여보자.

# docker ps -a
> kafka Container ID를 알아낸 후,

# docker exec -ti <container ID> bash

# cd /opt/kafka_2.12-2.0.1/bin

# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --group test_consume_group --topic test_topic

이슈:

카프카 브로커에서 kafka-console-consumer.sh을 실행하려하면 PORT가 이미 사용중이라고 실행이 되지 않는다..

컨테이너에 접속하여 /opt/kafka_2.12-2.0.1/bin 이동 후,

kafka-run-class.sh 파일을 수정한다.

# 변경 전

if [  $JMX_PORT ]; then
  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
# 변경 후

ISKAFKASERVER="false"
if [[ "$*" =~ "kafka.Kafka" ]]; then
    ISKAFKASERVER="true"
fi
if [  $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then
  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi

 

CMAK을 통해 컨슈머 그룹이 test_topic에 붙은 것을 확인할 수 있다.

 

GUI로는 확인했으니 CLI로 컨슈머 그룹이 잘 붙었는지 한번 더 확인해보자.

새로 터미널을 띄우고 위에서 접속한 동일한 컨테이너에 다시 접속 후, 아래 명령어를 실행한다.

# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

 

test_consume_group을 확인 할 수 있다.

이제 지워보자.

# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group test_consume_group

 

컨슈머가 토픽에 붙어서 컨슈밍을 하고 있는 상태라면 아래와 같이 그룹이 비어있지 않다고 에러가 발생한다.

 

컨슈밍을 하고있던 터미널에서 실행중인 프로세스를 종료하고 다시 삭제 명령어를 해보면

​삭제가 된 것을 확인할 수있다.

 

삭제가 완료된 후, CMAK을 통해 다시 확인해보면

consumer group이 아직까지 보일 것이다.

 

CMAK 컨테이너를 restart해준 후, 보면 없어진 것을 확인 할 수 있다.

반응형