💡 해당 글은 '카프카 핵심 가이드 2nd Edition'을 읽고 정리한 글입니다.
1️⃣ 컨슈머(Consumer)란 ?
- 프로듀서(Producer)에서 생성한 메세지를 사용하는 주체이다.
- 프로듀서가 같은 토픽에 여러 개의 메세지를 쓰듯이, 여러개의 컨슈머가 같은 토픽으로부터 데이터를 분할해서 읽어올 수 있게 하기 위해 컨슈머 그룹(Consumer Group)으로 동작할 수 있다.
- 단일 컨슈머 구성하는 경우도 있지만, 보통 그룹으로 묶어서 사용한다.
컨슈머와 파티션 관계
컨슈머는 토픽의 파티션에서 메세지를 가져오기 때문에 파티션 수와 밀접한 관계가 있다.
- 컨슈머 그룹에 한개의 컨슈머이 있다면, 해당 컨슈가 토픽의 모든 파티션에서 가져오게 된다.
- 그룹에 한개 이상의 컨슈머가 있다면 파티션 할당 전략에 의해 파티션을 배정받아 각 역할을 수행하게 되고, 한개의 컨슈머가 있는 경우 보다 빠르게 데이터를 읽어올 수 있다.
- 즉, 읽어올 데이터의 수를 늘리고 싶다면 컨슈머 혹은 컨슈머 그룹을 추가하면 된다.
하지만 주의할 것이 있다.
- 그룹의 컨슈머 수가 토픽 파티션의 수보다 많아지면 위 그림처럼 유휴(inactive) 컨슈머가 발생하게 된다.
- 사용 자원량만 늘리는 행위가 되어버리고, 즉 아무 의미가 없어진다.
- "Kafka: 파티션 개수 설정에 대하여"에서 파티션의 개수를 설정시 컨슈머의 개수도 고려해야한다는 것이 이러한 이유 때문이다.
💡 partition 을 여러 개로 운영하는 경우 데이터를 병렬처리하기 위해서 partition 개수와 consumer 개수를 동일하게 맞추는 것이 가장 좋은 방법이다. partition 개수가 n개 라면 동일 consumer group 으로 묶인 consumer thread 를 최대 n 개 운영 할 수 있다. ( ref. 아파치 카프카 애플리케이션 프로그래밍 )
2️⃣ 파티션 리밸런스
컨슈머가 크래시난 경우, 크래시가 난 컨슈머가 담당하던 파티션을 나머지 컨슈머 중 하나가 대신 읽어오게 한다. 이를 파티션을 재할당한다는 의미에서 리밸런스(Rebalance)라고 하며, 컨슈머 그룹 코디네이터(브로커)와 컨슈머 그룹 리더가 수행한다.
1. 조급한 리밸런스 (Eager Rebalance)
- 모든 컨슈머, 정상적으로 동작하던 컨슈머도 모두 중단시킨 후 재할당하는 과정을 거친다.
- 자세한 동작 순서는 아래와 같다.
- 각 컨슈머는 각자 본인이 맡은 모든 파티션을 할당 해제 한다.
- 이후 JoinGroup요청을 보내게 되고, 모든 컨슈머의 `JoinGroup 요청`이 코디네이터에게 도착하면 동기화 베리어를 동작시킨다.
- 파티션 전략과 컨슈머 그룹 리더를 활용해 파티션을 재할당한다.
- 간단한 동작방식이지만, 모든 컨슈머가 재할당 받을 동안 모든 동작이 멈추는(stop the world) 문제가 있다.
2. 협력적 리밸런스 (cooperative Rebalance)
- 조급한 리밸런스의 문제점을 해결한 것으로, 재할당이 필요한 파티션을 가진 컨슈머만 재할당한다.
- 자세한 동작 순서는 아래와 같다.
- 각 컨슈머는 코디네이터에게 본인이 맡은 파티션 정보를 인코딩에서 `JoinGroup요청`에 담아 보낸다.
- 코디네이터는 해당 정보를 모으고 정리하여 그룹 리더에게 보낸다.
- 그룹 리더는 재할당할 파티션을 제외한 파티션 할당 결과를 만들어 컨슈머에게 전송한다.
- 컨슈머는 파티션 할당 결과를 받으면 이전과 차이점이 없는지를 비교한다. 이때 만약 이전에는 할당받았던 파티션이 새로운 파티션 할당 결과에 없다면 해당 파티션을 할당해제 한다.
- 해당 과정을 거치게 되면, 재할당을 해야하는 파티션을 가지지 않은 컨슈머는 아무런 동작을 할 필요가 없다.
- 현재 리밸런스 기본값은 협력적 리밸런스이다.
컨슈머는 브로커에게 하트비트(heartbeat)를 전송함으로써 할당된 파티션에 대한 소유권을 유지한다. 즉, 리밸런스를 동작하는 시점, 컨슈머가 크래시가 났다는 기준을 하트비트 전송 유무를 기준으로 판단한다.
💡초기에는 컨슈머가 poll()을 호출하면 바로 heartbeat를 보내도록 했는데, poll()의 규모가 커짐에 따라 heartbeat을 보내는 간격도 커져 현재는 Heartbeat Thread라는 별도의 스레드에서 전송하고 있다.
➕ 알고보면, 그룹 코디네이터 보다 그룹 리더가 메인?
파티션이 컨슈머에게 할당되는 과정을 살펴보자.
- 카프카 클러스터는 컨슈머의 JoinGroup 요청을 받는다.
- JoinGroup 요청에는 컨슈머의 관심 토픽와 같은 컨슈머 정보가 담겨있다.
- JoinGroup 요청이 들어오면 클러스터는 Group을 생성한다.
- 관심 토픽의 리더 레플리카가 존재하는 브로커가 그룹 코디네이터로 지정된다.
- 코디네이터는 컨슈머 그룹을 생성하고 해당 점보를 그룹 리더에게 전송한다.
- 그룹 리더는 JoinGroup을 첫번째로 보낸 컨슈머가 된다.
- 코디네이터는 파티션의 수를 최대값으로 JoinGroup 요청을 기다리고, 요청이 오는 순서대로 그룹을 형성한다.
- 컨슈버 그룹 정보를 받은 그룹리더는 파티션 할당 전략을 활용해 파티션을 배정한다.
- 파티션 할당 결과는 다시 그룹 코디네이터에게 전달한다.
- 코디네이터는 각 컨슈머에게 각자 맡은 파티션에 대한 정보를 전달한다.
즉 전체적인 흐름을 살펴보면, `컨슈머 > 코디네이터 > 그룹리더(파티션 할당) > 코디네이터 > 컨슈머`의 과정을 거치게 되는데 가장 메인이 되는 파티션할당은 그룹리더가 하는 것을 알 수 있다.
코디네이터는 JoinGroup의 요청을 수집하고, 파티션 정보를 전달하는 서브 역할을 수행한다. 즉 컨슈머와 그룹리더 사이에 중간 매개체 역할이다.
3️⃣ 정적 그룹 멤버십
컨슈머가 컨슈머 그룹을 떠났다가 다시 참여하면, 새로운 멤버 ID가 발급되면서 새로운 파티션을 할당받게 된다. 하지만 이를 유지하고 싶다면 정벅 그룹을 지정해주면된다.
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- `group.instance.id`으로 설정값을 넣어주면 된다.
- `session.timeout.ms`를 초과하지 않는 한, 그룹을 나갔다와도 동일한 파티션을 할당 받는다.
- 코디네이터에 파티션 할당에 대한 캐시값이 존재하기 때문에, 리밸런스가 발생하지 않고 이전 파티션을 그대로 할당받는 것이다.
4️⃣ 카프카 컨슈머 생성 및 토픽 구독
카프카 프로듀서 생성과정과 매우 유사하다.
// 설정값 넣기
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "ex_group");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 컨슈머 생성
Consumer<String, String> consumer = new KafkaConsumer<>(configs);
// 토픽 구독
consumer.subscribe(Collections.singleton("ex_topic"));
// 토픽 구독 (정규식 : 모든 ex. ~ 토픽 )
consumer.subscribe(Collections.singleton("ex.*"));
- `bootstrap.servers`, `key.deserializer`, `value.deserializer`은 필수 속성 값이다.
- 컨슈머 생성 후 할 일은 토픽 구독하기로, `subscribe()`에 토픽 이름을 Collection 형식으로 담아 넣어주면 된다.
- String이 아닌 정규표현식으로 토픽을 지정해줄 수 있으며, 서로 다른 유형의 데이터를 처리해야 할때 유용하다.
⚠️ 파티션이 많다면 정규식으로 파티션을 지정하는 것은 자제하자.
토픽의 일부를 정규표현식으로 지정할 경우, 전체 토픽과 파티션에 대한 정보를 브로커에게 요청하고 그 목록에서 관련된 토픽을 검색하게 된다. 해당 과정은 상당한 오버헤드를 발생시킬 뿐더러, 전체 토픽에 대한 조회 권한도 필요로 한다.
5️⃣ 폴링(polling) 루프
컨슈머는 브로커와 폴링 방식으로 데이터를 주고 받는다. 폴링이란 네트워크 용어로 일정한 주기(특정한 시간)을 가지고 서버와 응답을 주고 받는 방식을 말하며, 컨슈머는 브로커에 추가 데이터가 들어왔는지 주기적으로 확인한다.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
// record 처리
...
}
}
- `poll()`으로 할당된 파티션에서 레코드를 가져오는 메서드로, 반환값은 `ConsumerRecord` 객체이다.
- `max.poll.interval.ms`동안 폴링을 하지 않으면 컨슈머는 죽은 것으로 판정되기 때문에 무한 루프에서 poll()을 수행한다.
- 매개변수로 컨슈머 버퍼에 데이터가 없는 경우 poll()이 블록될 수 있는 최대 시간을 결정한다.
해당 예시에서는 레코드를 가져오는 것으로면 표현되었지만, 이외에도 많은 중요한 역할을 수행한다.
컨슈머 그룸에 참가하고 파티션을 할당받는 역할 / 리밸런스 관련 예외 등이 poll()와 연관성이 높다.
자세한 내용은 Kafka : 컨슈머 (Consumer) - (2)에서 다룰 예정이다.
하나의 스레드에서 동일한 그룹 내에 여러 개의 컨슈머를 생성할 수 없다. "하나의 스레드당 하나의 컨슈머" 의 원칙으로 여러 개의 컨슈머를 운영하고 싶다면 여러 개의 스레드를 사용해야 한다. 그래서 실행자(ExcutorService)를 사용해 다수의 스레드를 시작시키면 좋다.
// 출처 : https://dzone.com/articles/kafka-consumer-and-multi-threading
public class KafkaProcessor {
private final KafkaConsumer<String, String> myConsumer;
private ExecutorService executor;
private static final Properties configs = new Properties();
static {
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
}
public KafkaProcessor() {
this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);
this.myConsumer.subscribe(Arrays.asList("Topic"));
}
public void init(int numberOfThreads) {
//Create a threadpool
executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = myConsumer.poll(100);
for (final ConsumerRecord<String, String> record : records) {
executor.submit(new KafkaRecordHandler(record)); //
}
}
}
...
}
public class KafkaRecordHandler implements Runnable {
private ConsumerRecord<String, String> record;
public KafkaRecordHandler(ConsumerRecord<String, String> record) {
this.record = record;
}
@Override
public void run() {
// Record 처리
System.out.println("value = "+record.value());
System.out.println("Thread id = "+ Thread.currentThread().getId());
}
}
6️⃣ 컨슈머 설정하기
앞에서 봤던 필수 설정 이외에 다양한 설정들이 있는데 그 중 중요한 설정들만 살펴보자.
1. fetch.min.bytes ( 기본 : 1 byte )
- 컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량이다.
- 브로커가 해당 설정값보다 작은 메세지를 가지고 있다면, 충분한 메세지를 보낼 수 있을 때까지 기다린다.
- 요청을 하거나 쓸 데이터가 많이 없는 경우 해당 설정값을 높여, 브로커와 컨슈머 사이의 부하를 줄일 수 있다.
2. fetch.max.wait.ms ( 기본 : 500ms )
- `fetch.min.bytes`로 인해 데이터가 쌓이길 브로커가 기다리게 되었을때, 얼마나 기다리는지를 나타내는 것이다.
- 브로커는 `min.bytes` 혹은 `max.wait.ms` 둘 중 하나가 만족되는 대로 리턴한다.
3. fetch.max.bytes ( 기본 : 50MB )
- 컨슈머가 브로커를 폴링할 때 리턴 받는 최대 바이트 수이다.
- 브로커가 컨슈머에게 레코드를 배치 단위로 보내는데, 만약 첫번째 레코드 배치의 크기가 해당 설정값을 넘길 경우, 설정값을 무시하고 해당 배치를 그대로 전송한다. ( 작업 진행 보장 )
💡 브로커 설정에도 최대 읽기 크기를 제한할 수 있다. fetch.max.bytes가 크다면 대량의 디스크(디렉토리) 읽기와 오랜 네트워크 전송 시간으로 브로커 부하를 증가시킬 수 있다. 이런 경우 브로커의 읽기 크기를 줄이는 것도 방법이다.
[ message.max.bytes(브로커 읽기 설정 ) ≤ fetch.max.bytes ]
4. max.poll.records
- poll() 호출할 때마다 리턴되는 최대 레코드 수를 말한다.
- 폴링 루프에서 반환받는 레코드의 수를 제어하고 싶을 때 사용한다.
5. max.partition.fetch.bytes ( 기본 : 1 MB )
- 각 파티션별로 리턴하는 최대 바이트 수를 말한다.
- 총 리턴 받는 최대 바이트 수를 제한할 수는 있지만, 특별한 이유가 아니라면 `fetch.max.bytes`로 제한을 걸자.
- 해당 방법으로 제한을 두려면, 한 응답에 담을 수 있는 파티션 수를 제한할 수 있어야 되는데 우리는 브로커가 보내온 응답에 얼마나 많은 파티션이 포함되어 있는지 결정할 수 있는 방법이 없다.
6. session.timeout.ms ( 기본_과거 : 10초, 기본_현재 : 45초 )
- 브로커에게 heartbeat를 보내지 않고도 살아 있는 것으로 판정된는 최대시간을 말한다.
- 해당 설정시간을 초과하면, 컨슈머는 죽은 것으로 판닥하여 리밸런스를 수행한다.
- 초기에 온프레미스를 기준으로 기본값이 설정되었지만, 순간부하 집중과 네트워크에 약점이 있는 클라우드가 많이 활용되면서 45초로 기본값이 변경되었다.
7. heartbeat.interval.ms ( 기본 : 1초)
- 컨슈머가 그룹 코디네이터에게 얼마나 자주 하트비트를 보내는지를 결정한다.
- `session.timeout.ms`와 연관성이 높으며 대체로 `session.timeout.ms`의 1/3로 설정한다.
- 기본값보다 낮추면, 더 빠르게 죽은 컨슈머를 찾을 수 있지만, 더 잦은 리밸런스를 초래할 수도 있다.
💡 session.timeout.ms가 45초로 변경되면서 "session.timeout.ms의 1/3"라는 규칙도 깨졌다. 하지만 그렇다고 해당값을 규칙에 맞춰서 수정하지 말자. 해당 값은 리밸런스중인 컨슈머를 탐지할때 사용된다.
8. max.poll.interval.ms ( 기본 : 5분 )
- 컨슈머가 폴링을 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간을 결정한다.
- `session.timeout.ms`와 `heartbeat.interval.ms`로 컨슈머가 죽었는지를 확인할 수 있지만, 하트비트는 별도의 스레드에서 진행된다.
- 메인스레드에 데드락이 생겼는데, 백그라운드 스레드에서 계속 하트비트를 전송하는 경우가 발생할 수 있다.
- 즉, `max.poll.interval.ms`는 메인스레드에서 컨슈머가 여전히 레코드를 처리하고 있는지를 확인하는 용도이다.
💡 만약에 하나의 레코드를 처리하는데에 걸리는 시간이 5분이라고 한다면, 기본 max.poll.interval.ms 값을 사용시 하나의 레코드를 처리만하다가 종료되고 리밸러스가 동작하게 된다. 뒤에서 설명하지만, 불필요한 리밸런스는 최소화하는 것이 중요하기 때문에 T (하나의 레코드르 처리하는데 걸리는 시간) * max.poll.records ≤ max.poll.interval.ms을 해줘야 할 것이다.
9. default.api.timeout.ms ( 기본 : 1분 )
- 명시적으로 타임아웃을 지정하지 않는 한, 거의 모든 컨슈머 API 호출에 적용되는 타임 아웃 값이다.
- API 사용 실패시, 필요한 경우 해당 시간안에 재시도할 수 있다.
- poll()의 경우 매개변수로 타임아웃을 주기 때문에, 적용되지 않는다.
10. request.timeout.ms ( 기본 : 30초 )
- 컨슈머가 브로커로부터의 응답을 기다릴 수 있는 최대 시간이다.
- 해당 시간을 초과할경우, 클라이언트는 브로커가 응답하지 않은 것으로 판단하여 연결을 닫은 뒤 재연결을 시도한다.
- 브로커에게 충분한 처리 시간을 주는 것이 중요하기 때문에, 해당시간은 변경하지 않은 것을 추천한다.
11. auto.offset.reset ( 기본 : lastest )
- 컨슈머가 파티션을 읽기 시작할때 파티션의 오프셋을 읽어오는 방식을 정의한다.
- 기본값은 lastest으로 가장 최근에 적용된 오프셋을 읽어오는 것이다.
- 다른 값으로 earliest가 존재하는데, 이는 유효한 오프셋이 없을 경우 파티션의 처음부터 모든 데이터를 읽어오는 것이다.
- none으로도 설정하지만 유효하지 않은 오프셋을 읽으려는 경우 예외가 발생한다.
- lastest > earliest > none
⚠️ 카프카 컨슈머의 auto.offset.reset 옵션을 반드시 earliest로 변경해야 하는 이유
12. enable.auto.commit
- 컨슈머가 데이터를 성공적으로 읽고나서 마지막에 읽은 데이터의 오프셋을 커밋하는데, 이를 자동으로 커밋할지를 결정하는 것이다.
- true로 설정시, `auto.commit.interval.ms`를 사용해 얼마나 자주 commit할지 제어할 수 있다. ( 기본 5초 )
- false로 설정시, 직접 커밋하는 것으로 중복 최소화하고 유실되는 데이터를 방지할 수 있다.
중복 최소화 및 유실되는 데이터에 관해서는 Kafka : 컨슈머 (Consumer) - (2)에서 다룰 것이다.
13. partition.assignment.strategy
파티션을 컨슈머에게 할당할때 사용되는 할당 전략을 정의하는 것으로 크게 4가지 있다.
( 이미지 출처 : [kafka] 리밸런싱 종류와 컨슈머 파티션 할당 전략 )
- Range
그림과 같이 파티션을 컨슈머의 수만큼을 나눠 균등하게 배분하는 것이다. 파티션의 수와 컨슈머의 수가 깔끔하게 나눠지는 상황이라면 균등하게 배분 가능하지만, 위 사진과 같이 수가 맞아떨어지지 않아 앞선 컨슈머가 더 많이 할당받는 경우도 발생한다. 초기 `partition.assignment.strategy`의 기본값으로 Range였다.
장점 : 컨슈머에게 여러 파티션의 동일한 파티션 넘버를 일관되게 할당할 수 있다.
단점 : 할당된 파티션의 수 불균형, Eager 프로토콜 사용으로 리밸런스시 모든 컨슈머 중단
- Round-Robin
모든 파티션을 기준으로 파티션을 컨슈머에게 순차적으로 하나씩 할당하는 전략이다. 컨슈머 그룹내 모든 컨슈머들이 동일한 토픽을 구독한다면, 위 사진과 같이 균등하게 거의 동일한 수의 파티션을 할당할 수 있다.
장점 : 균등한 파티션 할당
단점 : Eager 프로토콜 사용으로 리밸런스시 모든 컨슈머 중단, 하나의 컨슈머가 중단되어도 균등한 배분을 위해 리밸런스 수행
- Sticky
두 가지 목표를 가지고 진행되는 전략이다. 첫 번째는 균등한 파티션 전략, 두번째는 가능하면 많은 파티션들이 같은 파티션에 할당되어 컨슈머에서 다른 컨슈머로 옮길 때 발생하는 오버헤드를 최소하는 것이다. 그래서 리밸런스 발생시, Round-Robin에 비해 이동하는 파티션의 수는 최소화하고 기존에 할당되었던 파티션을 보장할 수 있다. 다만 우선 순위는 첫번째가 더 높기 때문에, 두번째의 경우 항상 성립되는 것은 아니다.
장점 : 균등한 파티션 배분, 변경되는 파티션의 수 최소화
단점 : Eager 프로토콜 사용으로 리밸런스시 모든 컨슈머 중단
- Cooperative Sticky
Sticky전략과 동작방식이 동일하지만 차이점은 Cooperative 프로토콜을 사용하여 재할당 대상이 아닌 파티션을 가진 컨슈머는 리밸런스 과정에서도 중단되지 않고 계속 동작가능하다. `partition.assignment.strategy`의 기본값은 현재 Cooperative Sticky이다.
14. client.id
- 브로커가 요청을 보낸 클라이언트를 식별하기 위해 사용하는 값니다.
- 로깅, 모니터링 지표에 주로 사용한다.
15. client.rack
- 컨슈머는 기본적으로 일관성을 위해 리더 레플리카로부터 메세지를 읽어온다. 하지만 이때 물리적 혹은 네트워크 적으로 가까운 거리에 있는 레플리카로부터 메세지를 읽어와 성능과 비용을 절약할 수 있다.
- 이러한 가까운 레프리카로부터 읽어올 수 있게 해주는 설정이다. (String)
- `broker.rack`으로 브로커 설정때 지정해주면, client.rack이 매칭되는 broker.rack을 찾아 저장된 레플리카에서 읽어온다.
16. group.instacne.id
- 정적 컨슈머 그룹을 지정할 때 사용한다.
17. receive.buffer.bytes, send.buffer.bytes
- TCP의 수신 및 수신 버퍼의 크기를 제어하는 것이다.
18. offsets.retention.minutes ( 브로커 설정, 기본값 : 7일 )
- 컨슈머 그룹이 각 파티션에 대해 커밋한 마지막 오프셋 값을 브로커가 특정 기간동안 보관하고 있는다.
- 그룹이 비게된다면 오프셋 기록을 삭제하지 않고 해당 값을 보관하는데, 해당 기간을 제어하는 설정이다.
- 컨슈머가 재연결하는 경우도 고려하여 생긴 설정값같다.
'개발 서적 > 카프카 핵심 가이드' 카테고리의 다른 글
Kafka : 카프카 내부 메커니즘 (4) | 2024.12.24 |
---|---|
Kafka : 어드민 클라이언트(Admin Client) (0) | 2024.12.08 |
Kafka : 컨슈머(Consumer) - (2) (0) | 2024.11.24 |
Kafka : 프로듀서 (Producer) - (2) (2) | 2024.11.11 |
Kafka : 프로듀서 (Producer) - (1) (1) | 2024.11.11 |