Kafka : 어드민 클라이언트(Admin Client)
💡 해당 글은 '카프카 핵심 가이드 2nd Edition'을 읽고 정리한 글입니다.
1️⃣ 어드민 클라이언트 (Admin Client)
토픽 목록 조회, 생성, 삭제, 클러스터 상세 정보 등의 프로그램적 관리를 위해 제공되는 API를 말한다.
명령줄로도 특정 정보는 확인이 가능하지만, 사용자 입력에 기반한 관리를 위해서는 해당 API가 필요하다.
특징
- 비동기
- 클러스터 컨트롤러 API 요청을 보내면, 1개 이상의 Future 객체를 리턴한다.
- Future는 비동기적 연산 결과를 표현하는 인터페이스이다.
- 이 객체들은 컨트롤러의 상태가 완전히 업데이트된 시점에서 완료된 것으로 간주한다.
- Future 객체를 감싼 Result 객체를 반환한다. Result 객체는 작업이 끝날때까지 대기하거나 적업 결과에 대해 일반적으로 뒤이어 쓰이는 작업을 수행하는 헬퍼 메서드를 가지고 있다.
public class CreateTopicsResult {
private final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures;
protected CreateTopicsResult(Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) {
this.futures = futures;
}
...
}
- 최종적 일관성 (eventual consistency)
- 분산시스템의 일관성을 나타내는 정도로, 시스템의 모든 복제본이 결국 최종적으로 같은 값을 가지게 되는 모델을 뜻합니다.
- 해당 뜻은, 최종적으로 같은 값을 가지기 까지 시간이 얼마나 걸리는지는 알 수 없음을 의미하고도 한다.
- 일시적인 불일치를 허용하고, 가용성에 집중한 일관성 모델이다.
- 옵션 (Options)
- AdminClient 메서드들은 각 메서드에 맞는 Options 객체를 인수로 받는다.
- 해당 객체는 요청 받은 브로커가 요청을 어떻게 처리할지에 대한 설정을 담고 있다.
- 공통적으로 모든 Options 객체는 timeoutMs 메서드를 가지고 있으며, 클러스터로부터의 응답을 기다리는 시간을 설정한다.
- 그외에도, 각각 Options 객체마다 요청을 어떻게 처리할지에 대한 설정을 할 수 있는 메서드들이 있다. ( `ListTopicsOptions`는 `listInternal(boolean)`로 내부 토픽을 리턴할지 결정하고, `DescribeClusterOptions`는 `includeAuthorizedOperations(boolean)`로 권한을 가진 직업을 리턴할지를 결정한다.)
// Options 객체
package org.apache.kafka.clients.admin;
public abstract class AbstractOptions<T extends AbstractOptions> {
protected Integer timeoutMs = null;
public AbstractOptions() {
}
public T timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return this.timeoutMs;
}
}
- 수평구조
- 어떠한 의존관계나 네임스페이스를 가지지 않는다. 실제로 코드를 살펴보면 하나의 `KafkaAdminClient`객체 안에 모든 객체 및 메서드가 구현되어 있다.
💡 어드민 클라이언트가 클러스터에게 create, delete, alter 요청을 보내게되면 해당 작업은 컨트롤러에 의해 수행된다. 반면, 단순 일기만 하는 작업인 list, describe는 부하가 작은 브로커에게 전달되게 된다. 그래서 해당 브로커가 최신의 변경사항(예를 들어 새로운 토픽이 추가되는 등)을 알고 있지 않다면 최신 내용은 반환하지 않는다는 점을 유의하자.
이것이 최종적 일관성 속성을 가지는 이유이기도 하다.
💡 어드민 작업은 크게 AdminClient를 통해 진행하거나, 주키퍼에 저장되어 있는 메타데이터를 직접 수정하는 방식으로 진행할 수 있다. 하지만 책은 주키퍼로 진행하는 방식은 사용하지 말라고 강력하게 말하고 있다. 이유는 아래와 같다.
1. 메타데이터를 아파치 카프카와 주키퍼에 저장하게 되면서 중복의 문제 및 데이터 일관성의 문제가 있다.
2. 외부에서 메타데이터를 관리하는 것은 효율성이 떨어진다.
3. 카프카의 확장성이 제한된다.
Apache ZooKeeper의 문제가 아닌 외부 메타데이터 관리 방식의 문제점임을 유의하자.
링크 : Apache Kafka Needs No Keeper: Removing the Apache ZooKeeper Dependency
2️⃣ 어드민 클라언트 : 생성 및 닫기 , 설정
생성 및 닫기
어드민 클라이언트 생성 및 닫는 예시 코드
Properties configs = new Properties();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(configs);
admin.close(Duration.ofSeconds(30));
생성하기는 `create()`를 통해 수행할 수 있다.
- 설정값을 담고 있는 `Properties`객체를 인수로 받는다.
- 필수로 담아야 하는 설정은 클러스터 URI이다.
- 브로커 중 하나에 장애가 발생할 경우에 대비해서 최소 3개 이상의 브로커를 지정하는 것이 보통이다.
어드민 클라이언트를 사용완료했다면, `close()`를 통해 닫아줘야 한다.
- 시간을 입력을 받는데, 해당 시간만큼 응답을 기다리는 것을 의미한다.
- close()가 실행된 시점에 아직 진행중인 작업이 있을 수 있는데, 해당 작업을 위해 응답을 기다리는 시간을 가지는 것이다.
- 만약 타임아웃이 된다면, 모든 작동을 멈추고 자원을 해제한다.
- 시간 값을 넣어주지 않는다면, 모든 진행 중이 작업이 완료될때 까지 기다린다는 것을 의미한다.
설정
1. client.dns.lookup ( 기본값 : default )
기본적으로 카프카는 부트스트랩 서버 설정에 포함된 호스트명을 기준으로 연결을 시도한다. (default)
웬만한 경우 정상적으로 동작하지만, DNS를 사용하는 상황에서는 다른 방법이 필요했다.
그래서 해당 설정값이 등장했고, DNS를 사용하는 상황도 2가지로 나뉘기에 각 상황에 따른 설정하는 방벙에 대해 알아보자.
DNS 별칭을 사용하는 경우
- 다양한 브로커가 존재하는 상황에서 모든 브로커를 부트스트랩 설정에 일일이 지정하는 것은 유지관리에 취약하다. 그때 하나의 DNS 별칭으로 전체의 브로커는 가리킬 수 있다.
- 그런데, 해당 경우 `SASL`을 사용하면 인증을 하려고 할때 문제 발생한다.
- 부트스트랩에 설정되어 있는 DNS 별칭으로 인증을 시도하려고 하는데, SASL은 서버의 보안 주체와 호스트명이 일치하지 않으면 인증을 거부하게된다.
- 그때 `client.dns.lookup = resolve_cannonical_bootstrap_servers_only`로 설정해주자.
- 해당 설정은 DNS 별칭을 펼치게 도아주면, 모든 브로커의 이름을 일일이 부트스트램 서버 목록에 넣어주는 것과 동일한 역할을 수행한다.
💡 카프카 인증 : SSL 과 SASL
카프카에서 인증을 처리하는 방식은 크게 SSL과 SASL이 있다.
SSL은 대칭키와 공개키를 활용한 암호화 통신 방식으로, 클라이언트에게 메세지가 암호화 되어 전달된다. 즉, 해당 메세지를 복호화하는 작업이 필요로 하고, 그만큰 CPU 부하가 발생한다.
SASL은 프레임워크로 인증 처리 방식을 plug-in 형태로 요구사항에 맞게 변경하여 사용하는 것이 특징이다. 문자열 아이디, 패스워드를 사용하는 인증(PLAIN), SCRAM 방식을 이용한 인증, OAuth2 방식을 이용한 인증(OAUTHBEARER), Kerberos를 이용한 인증(GSSAPI)이 있다.
다수의 IP주소로 연결되는 DNS 이름을 사용하는 경우
- 로드 밸런서 뒤에 모든 브로커를 숨기는 경우, 로드 밸런서가 단일 장애점이 되는 것을 방지하기 위해 브로커를 여러개의 IP 주소로 연결한다.
- 카프카 클라리언트는 첫번째 호스트명으로 연결시도하게 되고, 만약 해석된 IP가 불능 상태라면 다른 IP가 정상적으로 동작하더라도 연결 실패를 띄운다.
- 이때 `client.dns.lookup = use_all_dns_ips`로 설정해주자.
나중에 참고할 글 : Kafka-client client.dns.lookup 옵션 정리
2. request. timeout.ms ( 기본값 : 120초 )
어드민 클라이언트의 응답을 기다릴 수 있는 시간의 최대값을 나타낸다.
- 클라이언트가 재시도가 가능한 에러를 받고 재시도하는 시간도 포함되어 있다.
이외에 다양한 설정값은 이 글을 참고하자.
3️⃣ 토픽 관리 기능
토픽 목록 조회
public static void main(String[] args) throws ExecutionException, InterruptedException {
...
// 전체 토픽 목록 조회
ListTopicsResult topics = adminClient.listTopics();
topics.names().get().forEach(System.out::println);
...
// 특정 토픽 조회
DescribeTopicsResult sampleTopic = adminClient.describeTopics(TOPIC_LIST);
try {
topicDescription = sampleTopic.topicNameValues().get(TOPIC_NAME).get();
} catch (ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
log.error(e.getMessage());
throw e;
}
}
...
}
`listTopics()`를 사용하면 토픽 목록을 가져올 수 있다.
- ListTopicsResult 객체를 반환하며, names()을 수행하면 감싸고 있는 Future 객체를 반환한다.
- get()은 서버가 토픽 이름 집합을 모두 리턴할 때까지 기다리게 된다.
만약 특정 토픽을 가져오고 싶다면, `describeTopics()`를 사용하면 된다.
- DescribeTopicsResult 객체를 반환하며, 입력받은 리스트에 해당하는 토픽을 반환한다.
- topicIdValues()를 사용하면 Map<Uuid, Future> 형태로, topicNameValues()를 사용하면 Map<String, Future> 형태로 반환된다. ( all() 와 values()는 Deprecated 되었다. )
- Future 객체에는 토픽에 속한 모든 파티션의 목록이 담겨있다.. (TopicDescription)
서버에서 에러를 리턴하면, Future 객체는 `ExecutionException`을 발생시키게 된다. 해당 예외의 cause 안에 서버에서 리턴한 실제 예외가 있기 때문에 예외처리시 getCause()를 사용하자.
토픽 생성하기
public static void main(String[] args) throws ExecutionException, InterruptedException {
...
CreateTopicsResult newTopic = adminClient.createTopics(
List.of(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR)));
...
}
`createTopices`를 사용하면 토픽을 생성할 수 있다.
- 토픽의 이름, 파티션의 수, 레플리카를 설정해줄 수 있으며 토픽의 이름만 설정한다면 나머지는 기본값으로 설정된다.
토픽 삭제하기
public static void main(String[] args) throws ExecutionException, InterruptedException {
...
adminClient.deleteTopics(TOPIC_LIST).all().get();
try {
sampleTopic.topicNameValues().get(TOPIC_NAME).get();
} catch (ExecutionException e) {
...
}
...
}
`deleteTopics()`를 사용하면 토픽을 삭제할 수 있다.
- DeleteTopicsResult 객체를 반환하며, Future 객체는 void를 가지고 있다.
- 하지만 토픽 삭제는 돌이킬 수 없음을 명시하자.
지금까지는 Future 객체의 get()을 통한 블로킹 방식을 이용했다.
그런데 만약 서버 스레드가 블록되는 것을 원하지 않는다면 아래와 같은 방식을 사용하면 된다.
Non blocking 방식
public static void main(String[] args) throws ExecutionException, InterruptedException {
...
demoTopic.topicNameValues().get(topic)
.whenComplete( //get() 대신에 Future 객체 작업이 완료되면 호출된 메서드 정의
(final TopicDescription topicDescription, final Throwable throwable) -> {
if(throwable != null){
// 예외가 발생하면, 예외처리
}else{
// 성공한다면, 성공처리
}
});
...
}
- get() 대신에 whenComplete()를 사용하여 Future 객체 반환 완료시 수행할 메서드를 정의해주면 된다.
4️⃣ 설정 관리
브로커, 브로커 로그, 토픽의 설정을 관리할 수 있으며 ConfigResource 객체를 사용한다.
ConfigResource의 종류는 다양하며, 하나의 요청에 여러가지 서로 다른 타입의 자원을 지정할 수 있다.
예시
public static void main(String[] args) throws ExecutionException, InterruptedException {
...
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
DescribeConfigsResult configsResult = adminClient.describeConfigs(List.of(configResource));
Config configs = configsResult.all().get().get(configResource);
configs.entries().stream()
.filter(entry -> !entry.isDefault())
.forEach(System.out::println);
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); // cleanup.policy, compact
if (!configs.entries().contains(compaction)) {
Collection<AlterConfigOp> configOp = new ArrayList<>();
configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = new HashMap<>();
alterConfigs.put(configResource, configOp);
adminClient.incrementalAlterConfigs(alterConfigs).all().get();
} else {
log.info("Topic {} is compacted topic.", TOPIC_NAME);
}
...
}
- ConfigResource 객체로, 설정을 변경할 타입과 그 타입에 해당하는 이름을 정해준다.
- describConfigs()은 `Map<ConfigResource, KafkaFuture<Config>>`형태를 반환하며, 설정값 모음을 담고 있다.
- isDefault() 메서드는 해당 값이 기본값인지 확인해준다.
- ConfigEntry객체는 특정 설정을 만들때 사용하며, 설정 타입과 그에 해당하는 값을 넣어주면 된다.
- 설정값을 변경하고 싶다면 `Map<configresource, Collection<AlterConfigOp>>` 형태로 만들어 incrementalAlterConfigs를 수행해주면 된다.
- AlterConfigOp는 조작할 설정값을 담는 객체로, ConfigEntry객체와 설정 처리할 방식을 가진다.
- 방식으로는 설정값을 잡아주는 SET , 현재 설정값을 삭제하고 기본값으로 되돌리는 DELETE, 추가하는 APPEND, 제거하는 SUBSTRACT가 있다.
💡 여기서 compact란 토픽 정리 정책 중 한가지이다. 세그먼트 단위로 토픽을 삭제할 수 있으며 토픽 정리 정책은 총 2가지가 있다. 데이터를 완전히 삭제하는 DELETE, 오프셋을 기준으로 동일한 메세지 키의 가장 오랜된 데이터를 삭제하는 COMPACT가 있다.
5️⃣ 컨슈머 그룹 관리
컨슈머 그룹 목록 보기
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 컨슈머 그룹 목록 조회
adminClient.listConsumerGroups().valid().get().forEach(System.out::println);
// 특정 컨슈머 그룹 조회
ConsumerGroupDescription groupDescription =
adminClient
.describeConsumerGroups(CONSUMER_GROUP_LIST)
.describedGroups().get(CONSUMER_GROUP).get();
}
`listConsumerGroups()`를 사용하면 전체 컨슈머 그룹을 볼 수 있다.
- ListConsumerGroupsResult를 반환하며 ConsumerGroupListing을 가진 Future객체를 통해 목록을 확인할 수 있다.
- valid()와 all() 메서드 모두 목록을 가져올 수 있는데, 차이점은 아래와 같다.
- valid() : 예외가 발생한 경우, 클러스터가 예외 없이 리턴한 컨슈머 그룹을 가져온다. 예외는 무시된다.
- all() : 에러가 발생한 경우, 컨슈머 그룹 없이 예외 중 첫번째 예외만 반환한다.
만약 특정 컨슈머 그룹을 가져오고 싶다면, `describeConsumerGroups()`를 사용하면 된다.
- describedGroups()는 `Map<String, KafkaFuture<ConsumerGroupDescription>>` 를 반환한다.
- ConsumerGroupDescription에는 그룹 멤버, 호스트명, 멤버별 할당된 파티션, 할당 알고리즘 등 컨슈머 그룹에 관한 상세한 정보가 담겨있다.
- 하지만 이곳에는 컨슈머 그룹이 읽고 있는 파티션의 마지막 커밋 오프셋 정보는 없다.
마지막 커밋 오프셋 정보 보기
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> reqLatestOffsets = new HashMap<>();
for (TopicPartition tp : offsets.keySet()) {
reqLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
adminClient.listOffsets(reqLatestOffsets).all().get();
}
`listConsumerGroupOffsets()`를 사용하면 컨슈머 그룹의 오프셋 정보를 가져올 수 있다.
- 컨슈머 그룹의 모음이 아닌 하나의 컨슈머 그룹을 받아온다는 점을 주의하자.
- OffsetSpec는 원하는 오프셋을 지정하는 것으로 첫번째 오프셋은 earliest(), 마지막 오프셋은 latest(), 특정 시간대의 오프셋은 forTimestamp()를 사용하면된다.
`listOffsets`은 특정 파티션의 오프셋 정보를 가져올 수 있다.
컨슈머 그룹 수정하기 (오프셋 변경)
예제를 보기 전, 주의점이 2가지가 있다.
첫번째, 오프셋 변경을 시도했다면, 오프셋이 변경되었다는 사실은 컨슈머 그룹에게 전달되지 않는다.
- 이를 방지하기 위해, 카프카에서는 현재 작업 중인 컨슈머 그룹에 대한 오프셋 수정은 막고 있다.
두번째, 상태를 가지고 있는 컨슈머 그룹에 오프셋 변경은 일관성을 깨트릴 수 있다.
- 그래서 상태를 가지고 있는 컨슈머 그룹의 오프셋을 변경하려면, 먼저 해당 상태를 초기화 혹은 적절히 변경해 줄 필요가 있다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =
adminClient.listOffsets(reqEarliestOffsets).all().get();
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e : earliestOffsets.entrySet()) {
resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset()));
}
try {
adminClient.alterConsumerGroupOffsets(CONSUMER_GROUP, resetOffsets).all().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownMemberIdException) {
log.error("Check if consumer group is still active.");
}
}
}
- alterConsumerGroupOffsets()를 통해 오프셋을 변경하는데, 해당 메서드는 파티션을 key로, OffsetAndMetadata를 value로 한 맵을 필요로 한다.
- 오프셋 변경시, 컨슈머 그룹을 미리 정지시켜 놓지 않는다면 UnknownMemberIdException가 발생한다.
6️⃣ 클러스터 메타 데이터
클러스터의 메타 데이터를 명시적으로 읽어야 하는 경우는 매우 드물다. 클러스터에 대한 정보 없이 메세지를 읽고 쓰기가 가능하기 때문이다. 그래도 정확한 클러스터에 연결되었는지 확인하는 용도로는 유용하니 참고하자.
public static void main(String[] args) throws ExecutionException, InterruptedException {
DescribeClusterResult cluster = adminClient.describeCluster();
cluster.nodes().get().forEach(node -> log.info(" * {}", node));
}
7️⃣ 고급 어드민 작업
토픽에 파티션 추가하기
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2));
adminClient.createPartitions(newPartitions).all().get();
}
public class NewPartitions {
private int totalCount; // 파티션 수
private List<List<Integer>> newAssignments; // 새로운 파티션에 대한 복제 할당
}
`NewPartitions.increaseTo()`를 사용하면 파티션을 추가할 수 있다.
- NewPartitions 객체는 파티션의 수인 totalCount 와 레플리카 할당에 관한 newAssignments를 가지고 있다.
- increaseTo()는 해당 객체를 생성하는 생성자 역할을 하며, 기존 파티션수 + 추가할 파티션 수를 넣어주면 된다.
- createPartitions() 사용해 최종적으로 지정된 토픽에 파티션을 추가한다.
- 여러 토픽을 한번에 확장할 경우 일부는 성공하고, 나머지는 실패할 수 도 있으니 주의하자.
위에서 봤듯이 해당 방법을 사용하기 위해서는 토픽 상세 정보를 통해 기존의 파티션의 수를 알아야 한다.
⚠️ 파티션의 수는 키를 활용한 메세지 처리에서 중요하기 때문에 만약 토픽에 파티션을 추가해야 한다면, 이것 때문에 토픽을 읽고 있는 어플리케이션이 깨지지 않는 지 확인할 필요가 있다.
토픽에서 레코드 삭제하기
토픽에 30일간의 보존 기한이 설정되어 있다 하더라도 파티션별로 모든 데이터가 하나의 세그먼트에 저장되어 있다면 보존 기한을 넘긴 데이터라 한들 삭제되지 않을 수도 있다. 이때는 특정 시간 이전의 쓰여진 레코드를 지울 필요가 있다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =
adminClient.listOffsets(reqOrderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e : olderOffsets.entrySet()) {
recordsToDelete.put(e.getKey(), RecordsToDelete.beforeOffset(e.getValue().offset()));
}
adminClient.deleteRecords(recordsToDelete).all().get();
}
- deleteRecords()는 지정된 오프셋보다 더 오래된 모든 레코드에 삭제 표시를 함으로써, 컨슈머가 접근할 수 없도록 한다.
- 반환값으로 삭제한 오프셋 중 가장 큰 값을 리턴한다.
- 삭제 표시된 레코드는 디스크에서 비동기적으로 일어난다.
세그먼트란?

브로커로 전송된 메세지는 토픽의 파티션에 저장되는데, 이때 각 메세지들은 세그먼트라는 로그 파일의 형태로 브로커의 로컬 디스크에 저장된다. 즉 세그먼트는 로그 파일의 가장 작은 단위(디스크 상의 영속적으로 저장되는 단위)를 말한다.
- 효율적인 데이터 관리 및 로그 컴팩션(중복 제거) 및 삭제에 유용한 역할을 수행한다.
- 일정 크기나 일정 시간 등으로 새로 생성되며, 새로운 메세지가 추가될 때마다 세그먼에 기록된다. (최대 크기 1GB)
리더 선출하기
리더 선출에는 아래와 같이 2가지 종류가 있다.
- 선호 리더 선출(preferred leader election)
- 선호 리더란, 토픽이 처음 생성 되었을 때, 리더 레플리카였던 레플리카를 말한다.
- 파티션이 처음 생성되던 시점에서는 리더 레플리카가 모든 브로커에 걸쳐 균등하게 분포되기 때문에 '선호'라는 말이 붙었다.
- 카프카는 5분마다 선호 리더 레플리카가 리더를 맡았는지 확인하고, 맡을 수 있음에도 맡고 있지 않는다면 해당 레플리카를 리더로 선정한다.
- 언클린 리더 선출(unclean leader election)
- 리더가 될 수 없는 레플리카를 리더롤 선정하는 것이다.
- 리더 레플리카가 불능상태에서 다른 레플리카에 데이터가 없는 경우에 파티션 사용 불능상태로 만들 수 없기 떄문에 사용된다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition(TOPIC_NAME, 0));
try {
adminClient.electLeaders(ElectionType.PREFERRED, electableTopics).all().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof ElectionNotNeededException) {
log.error("All leaders are preferred leaders, no need to do anything.");
}
}
}
- electLeaders()를 통해 특정 파티션에 대한 리더를 선출 타입에 따라 선출한다.
- 해당 메서드는 성공적으로 완료된 이후에도, 리더 변경 사실이 모든 브로커에게 전달되기까지 시간이 걸린다. 그래서 describeTopics()를 호출하면 일관적이지 않은 결과를 받을 수 있다.
- 해당 리더 선출은 선호 리더가 아닌 레플리카가 현재 리더를 맡고 있는 경우에만 동작한다.
레플리카 재할당
브로커에 너무 많은 레플리카가 있어 옮기고 싶거나, 다른 장비로 레플리카를 보내야 하거나 혹은 추가하고 싶은 경우 사용할 수 있다.
주의점은 레프리카를 하나의 브로커에서 브로커로 재할당하는 일은 대량 데이터 복제를 초래한다는 점이다. 그래서 해당 작업을 수행하기 전에 네트워크 대역폭을 확인하고 필요하다면 쿼터를 설정하자.
아래 예시는 ID가 0인 브로커 하나를 가지고 있는 상황에서 새로운 브로커를 추가한 뒤,
새 브로커에 기존 브로커 속 레플리카를 저장하는 과정이다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment = new HashMap<>();
reassignment.put(new TopicPartition(TOPIC_NAME, 0),
Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1)))); // (1)
reassignment.put(new TopicPartition(TOPIC_NAME, 1),
Optional.of(new NewPartitionReassignment(Arrays.asList(0)))); // (2)
reassignment.put(new TopicPartition(TOPIC_NAME, 2),
Optional.of(new NewPartitionReassignment(Arrays.asList(1, 0)))); // (3)
reassignment.put(new TopicPartition(TOPIC_NAME, 3), Optional.empty()); // (4)
adminClient.alterPartitionReassignments(reassignment).all().get();
}
- 각 번호는 아래와 같이 파티션을 배치한다.
- 파티션 0번에 새 레플리카를 새 브로커(ID = 1)에 추가한다. 단 리더는 변경하지 않는다.
- 파티션 1번에는 레플리카 추가 없이 기존에 있던 레플리카를 가져온다. 레플리카가 하나뿐인 만큼 이것이 리더가 된다.
- 파티션 2번에 새로운 레플리카를 추가하고, 이것을 선호 리더로 선정한다.
- 파티션 3번에는 재할당을 하지 않는다.
- 이렇게 만든 파티션 추가 할당 전략을 alterPartitionReassignments()를 통해 적용한다.
8️⃣ 테스트 하기
AdminClient를 목업(mock-up)한 MockAdminClient클래스는 실제 어드민 작업을 수행할 필요 없이 어플리케이션이 제대도 동작하는지 확인할때 사용한다. API의 일부가 아닌 만큼 언제나 경고 없이 변경될 수 있지만, 공개된 메서드에 대한 목업이기에 메서드 시그니처는 유지된다.
모든 메서드에 대한 목업이 존재하는 것이 아니며, 변경으로 인해 언제든 테스트가 깨질 수 있다는 점을 유의하자.
예시
class Test {
private AdminClient adminClient;
@BeforeEach
void setUp() {
Node broker = new Node(0, "localhost", 9092);
this.adminClient = spy(new MockAdminClient(List.of(broker), broker));
AlterConfigsResult emptyResult = mock(AlterConfigsResult.class);
doReturn(KafkaFuture.completedFuture(null)).when(emptyResult).all();
doReturn(emptyResult).when(adminClient).incrementalAlterConfigs(any());
}
@Test
public void testCreateTopic() throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(adminClient);
tc.maybeCreateTopic("test.is.a.test.topic");
verify(adminClient, times(1)).createTopics(any());
}
@Test
public void tetNotTopic() throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(adminClient);
tc.maybeCreateTopic("not.a.test");
verify(adminClient, never()).createTopics(any());
}
}
'test'라는 이름을 받을 경우, 토픽을 생성하는 `TopicCreator` 클래스를 테스트하는 예시이다. 핵심만 살펴보자.
- 어드민 클라이언트 생성시, `new MockAdminClient()`로 목업 어드민 클라이언트를 할당해주고 있다.
- TopicCreator클래서에서 토픽 설정을 변경하는 과정이 있는데, 목업 클라이언트에 해당 메서드가 구현이 안되어 있어서incrementalAlterConfigs을 스터빙(stubbing)을 통해 해결하고 있다. 이렇게 목업이 되어있지 않은 메서드의 경우, Mockito와 같은 테스트 프레임워크를 사용하면 된다.
MockAdminClient를 test jar에 담아서 공개하기 때문에 pom.mxl 의존성을 추가하는 것을 잊지말자.