💡 해당 글은 '카프카 핵심 가이드 2nd Edition'을 읽고 정리한 글입니다.
1️⃣ 오프셋과 커밋 ( Offset & commit )
카프카에서는 파티션에서의 현재 위치를 업데이트 하는 작업을 오프셋 커밋(offset commit)이라고 한다.
- 컨슈머는 커밋을 레코드마다 진행하지 않고, 파티션에서 성공적으로 처리해낸 마지막 메세지만 커밋을 한다.
- 정확하게는 카프카 특수 톡픽인 `__consumer_offsets`에 정상적으로 처리 완료한 오프셋 + 1을 커밋한다.
불필요한 커밋 횟수를 줄이는 좋은 방식이긴하지만, 마지막 메세지에 대한 커밋만 진행하다보니 레코드 처리 중 리밸런스가 발생하면 메세지 중복 및 누락 문제가 발생한다.
메세지 중복 발생 경우
- Offset 3에서부터 시작해서 폴링 반복문을 통해 Offset 11까지 받은 상태에서 Offset 10번을 진행하고 있다.
- 그런데 리밸런스가 발생하게 되면, 처리한 3번 ~ 10번까지는 커밋되지 않은 상태로 리밸런스에 들어가게 된다.
- 리밸런스가 완료된 이후 poll()을 수행하면 처음 받았던 Offset 3을 받게 되고, 3번 ~ 10번을 중복으로 읽게된다.
메세지 누락 발생 경우 ( 자동 커밋 사용시 )
- 자동 커밋을 사용시, `auto.commit.interval.ms`마다 커밋을 보내게 된다.
- poll()로 Offset 4번 ~ 11번을 받으면, auto.commit.interval.ms마다 poll()을 12번을 커밋한다.
- 이후 11번까지 처리가 완료되지 않은 상태에서 리밸런스 발생하면 중간에 멈춘 5번부터 11번까지는 누락된다.
🤔 두가지 현상이 발생하는 주 요인은 바로 리밸런스이다. 정상적으로 동작하는 경우에는 거의 중복과 누락은 발생하지 않으니 리밸런스를 최소화하는 방향으로 카프카를 설정하는 것이 좋은 것 같다.
🤔 Outbox Pattern은 트랜잭션의 일부로 데이터베이스(Redis...)에 메시지를 저장하고 별도의 프로세스가 저장된 이벤트를 읽어 메시지 브로커에 전송하는 방식을 말한다. 이를 적용하면 중복과 유실을 해결할 수 있지 않을까?
실제 올리브영이 카프카 메세지 중복과 유실을 어떻게 다루었는지도 살펴보면 좋을 것 같다.
2️⃣ 다양한 오프셋 처리 방식
오프셋을 처리하는 방식 5가지를 살펴보자.
1. 자동 커밋
- `enable.auto.commit`을 true로 잡아주면 기본적으로 `auto.commit.interval.ms`마다 커밋을 하게된다.
- 자동 커밋은 폴링 루프에 의해 실행되며, 커밋해야 하는지를 확인한 뒤 그러할 경우에는 마지막 poll()호출에 리턴된 오프셋을 커밋한다.
편리하긴 하지만, 앞에서 봤던 것처럼 중복과 누락의 위험이 존재한다. 만약 편하게 자동 커밋을 사용한다고 하면 폴링으로 가져오는 데이터의 크기를 줄여 누락정도는 막을 수 있지 않을까 싶다. 하지만 자동 커밋으로 중복은 방지하게은 어려움이 있다.
2. 현재 오프셋 커밋하기
메세지 유실의 가능성을 제거하고, 동시에 리밸런스 발생시 중복되는 메세지의 수를 줄이기 위해서 개발자가 원하는 시간에 현재의 오프셋을 직접 커밋하는 방식도 있다. `enable.auto.commit`을 false로 설정한 후 `commitSync()`를 활용해 커밋을 하면 된다.
Duration timeout = Duration.ofMillis(100);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
//record 처리
}
try {
consumer.commitSync(); //추가 메세지를 폴링하기 전에 커밋
} catch (CommitFailedException e) {
...
}
}
}
- `commitSync()`는 `poll()`에 의해 리턴된 마지막 오프셋을 커밋한다.
- 해결할 수 없는 에러가 발생하지 않는 한, commitSync는 커밋을 재시도 한다.
- 다만 브로커의 커밋 요청에 대한 응답이 올때까지 블록된다.
- 위 예시처럼 `commitSync()`를 record처리가 모두 완료된 이후 해준다면 메세지 누락을 방지할 수 있다.
3. 비동기 커밋
커밋에 대한 응답이 올때까지 기다린다는 수동 커밋의 단점이 걸린다면, 비동기 커밋으로 커밋 요청만 보내고 처리를 계속할 수 도 있다.
Duration timeout = Duration.ofMillis(100);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
//record 처리
}
consumer.commitAsync(); // 비동기 커밋
}
- `commitAsync()`은 요청만하고 성공, 실패의 유무를 따지지 않며 블록되지도 않는다.
- 하지만 `commitSync()`와 달리 재시도를 하지 않는다.
- 오프셋 2000이 실패하고 이후에 오프셋 3000을 성공적으로 커밋했다고 가정해보자. 이 상태에서 오프셋 2000에 대한 커밋을 재시도하여 성공하게 된다면 올바른 오프셋 커밋 순서를 보장하지 못하기 때문이다.
그래서 만약에 비동기 커밋을 사용하는 상태에서 재시도를 넣고 싶다면, `commitAsync`의 콜백함수와 count를 활용하면 된다.
// 대략적인 예시입니다.
long currentSequence = sequenceNumber.incrementAndGet(); // 커밋 시도 번호 증 (AtomicLong 클래스)
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> committedOffsets, Exception exception) {
if (exception == null) {
// 예외가 없다면 sequence number 증가
lastCommittedSequence.set(currentSequence);
} else {
// 커밋 실패: 재시도 로직 실행
// 만약 현재 currentSequence가 lastCommittedSequence보다 크다면
// 새로 커밋된 것이 없다는 의미로 재시도
if(currentSequence > lastCommittedSequence.get()){
consumer.commitAsync();
}
// 그렇지 않다면 재시도 X
}
}
});
- 커밋할 때마다 커밋을 시도하는 횟수를 기록한다.( sequenceNumber )
- 이후 커밋이 성공했다면 sequenceNumber + 1 을 하여 가장 최근에 커밋된 오프셋 넘버를 기록한다. (lastCommittedSequence)
- 만약에 실패했다면 lastCommittedSequence와 currentSequence를 비교하여 재시도 가능 유무를 따진다.
- 만약 가장 최근에 커밋된 오프셋 넘버보다 현재 시퀀스 넘버가 크다면 재시도 가능
- 반대라면, 이미 업데이트된 오프셋 넘버가 있다는 얘기임으로 재시도 불가능
4. 동기 & 비동기 커밋 같이 사용하기
사실 재시도 없는 커밋이 큰 문제가 되지 않는다. 뒤이은 커밋이 성공한다면 마지막 오프셋 번호 저장으로 커밋되지 않은 데이터도 받은것으로 인식할 수 있기 떄문이다. 하지만 컨슈머의 닫기 직전 혹은 리밸런스 전 마지막 커밋이라면 커밋이 성공했는지 따질 필요가 있다.
이 경우 동기와 비동기 커밋을 같이 사용하는 방법도 있다.
try{
boolean closing = false;
while (!closing) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
// record 처리
}
consumer.commitAsync(); // 비동기로 커밋
}
consumer.commitSync(); // 컨슈머를 닫기 직전에 마지막으로 재시도를 하는 동기 커밋 수행
} catch (Exception e) {
// 예외처리
}finally{
consumer.close();
}
- 정상적인 상황에서는 `commitAsync()`가 빠르며, 실패하더라도 다음 커밋 요청이 재시도 역할을 수행한다.
- 컨슈머를 종료하기 직전에는 다음 커밋이 존재하지 않기 때문에 `commitSync()`로 성공 혹은 회복 불가능한 에러가 발생할때까지 재시도한다.
💡 리밸런스 전 마지막 커밋이 성공했는지 확인하는 것은 뒤에서 나오는 리밸런스 리스너를 이용하면 해결이 가능하니 뒤에서 살펴보자.
5. 특정 오프셋 커밋하기
하지만 중간에 특정 오프셋을 커밋하고 싶을때는 어떻게 해야할까?
이때는 commit메서드의 매개변수로 커밋할 오프셋을 전달할 수 있으며, 형식은 Key : 파티션, Value : 오프셋 맵형식이다.
public void commitAsync(java.util.Map<TopicPartition,
OffsetAndMetadata> offsets,
OffsetCommitCallback callback)
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
// record 처리
...
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
if (count % 10 == 0) {
consumer.commitAsync(currentOffsets, null); // 콜백은 null 처리해줘야 한다.
}
count++;
}
}
- `Map<TopicPartition, OffsetAndMetadata>`으로 맵을 만들어 처리 완료한 파티션과 오프셋 번호를 전달해주면 된다.
- 하지만 해당 방식은 켠슈머가 2개 이상의 파티션으로부터 레코드를 받고 있다면 코드는 더욱 복잡해지니 주의하자.
🤔 그래서 가장 많이 사용되는 커밋 방식은 무엇일까?
: 일반적으로는 한 번에 한 개의 메시지를 처리하며 auto 커밋 방식을 많이 사용한다. 한 번에 한 개씩 처리하지 않고 여러 개의 메시지를 처리한 후 마지막 오프셋을 커밋할 수도 있다. 이때 커밋은 수동으로 직접 수행해야 한다. (네이버 D2 블로그)
3️⃣ 리밸런스 리스너 ( Rebalance Listener )
컨슈머를 종료하거나 리밸런스 수행시, 지금까지 처리했던 파티션 해제, 오프셋 업데이트 등 정리 작업이 필요하다. 이러한 정리 작업을 위해서 카프카에서 파티션 할당 해제 혹은 컨슈머 종료 직전에 사용자의 코드를 실행하는 메커니즘을 제공한다.
이를 리밸런스 리스너(RebalanceListener)라고 한다.
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> var1);
void onPartitionsAssigned(Collection<TopicPartition> var1);
default void onPartitionsLost(Collection<TopicPartition> partitions) {
this.onPartitionsRevoked(partitions);
}
}
- `onPartitionsAssigned` : 파티션이 컨슈머에게 재할당된 후에, 컨슈머가 메세지를 읽기 전에 호출된다. 필요한 오프셋을 탐색하는 등의 준비 작업을 해당 메서드에 작성한다. poll()을 보냄으로써 컨슈머가 살아있음을 알려야 하기 때문에 `max.poll.timeout.ms`안에 완료해야 함을 기억하자.
- `onPartitionsRevoked` : 파티션이 할당 해제될 때 호출된다. 조급한 리밸런스의 경우 컨슈머가 메세지 읽기를 멈춘뒤 리밸런스가 시작되기 전에, 협력적 리밸런스의 경우 리밸런스가 완료될 때(그때 컨슈머가 할당된 파티션을 끊기 때문에 할당해제 되어야 할 파티션들에 대해서만 호출된다.
- `onPartitionsLost` : 리밸런스 알고리즘에 의해 해제 되기 전에 다른 컨슈머에 먼저 할당되는 예외적 상황에서만 호출된다. 예외적 상황에서 호출되는 만큼 상태나 자원을 정리해주는 메서드가 들어간다. 만약 정의하지 않았다면 `onPartitionsRevoked`가 호출된다.
정리해보면
Eager | Cooperative | |
onPartitionsAssigned | 재할당 이후 컨슈머가 읽기 시작할때 ( 전체 컨슈머 ) |
재할당 이후 컨슈머가 읽기 시작할때 ( 일부 컨슈머 ) |
onPartitionsRevoked | 메세지 읽기를 멈춘 뒤 리밸런스 시작 전 | 리밸런스가 완료될 때 ( 일부 컨슈머 ) |
onPartitionsLost | - | 할당된 파티션이 해제되기 전에 다른 컨슈머에 먼저 할당된 예외적인 상황 |
private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { ... }
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { ... }
}
consumer.subscribe(Collections.singletonList("Topic"), new HandleRebalance());
- 위와 같이 `ConsumerRebalanceListener`을 인터페이스로 리밸런스 리스너를 만든 후, `subscribe()`을 호출할때 전달해주면 된다.
4️⃣ 특정 오프셋의 레코드 읽어오기
빠르게 특정 시간대의 레코드를 건너띄어서 받고 싶거나, 데이터 복구를 위해 과거 시점의 특정 오프셋 레코드를 받고 싶은 경우에는 seek()메서드를 활용하여 다음 poll()에 특정 레코드를 가져오게 할 수 있다.
public class KafkaConsumer<K, V> implements Consumer<K, V> {
...
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
this.delegate.seek(partition, offsetAndMetadata);
}
public void seekToBeginning(Collection<TopicPartition> partitions) {
this.delegate.seekToBeginning(partitions);
}
public void seekToEnd(Collection<TopicPartition> partitions) {
this.delegate.seekToEnd(partitions);
}
...
}
- `seek()` : 파티션과 타겟으로하는 오프셋 넘버를 지정하여, 특정한 레코드를 탐색할 수 있다.
- `seekToBeginning()` : 파티션의 맨 앞 메세지부터 가져온다.
- `seekToEnd()` : 앞의 메세지는 전부 건너뛰고 파티션에 새로 들어온 메시지부터 가져온다.
5️⃣ 폴링 루프를 벗어나는 방법
폴링루프를 깔끔하게 벗어나는 방벙이 필요한 이유는 하트비트와 연관이 있다. 컨슈머가 깔끔하게 종료되기 위해서는 그룹 코디네이터에게 `LeaveGroup`이라는 메세지를 보내야 한다. 그래야 그룹 코디네이터는 컨슈머의 하트비트를 기다리지 않고 곧바로 리밸런스를 수행한다.
즉 우리가 폴링 루프를 깔끔하게 정리하지 않는다면 코디네이터는 `session.timeout.ms`만큼 하트비트를 기다리게 되는 것이다.
consumer.wakeup()
- `wakeup()`은 `WakeupException`을 던져 폴링 루프를 중단시키게 한다.
- 메인스레드가 리밸런스 중일 수 있기 때문에 안전한 종료를 위해서는 다른 스레드에서 인터럽트를 해당 메서드를 호출하는 것이 가장 좋다.
💡 만약 메인 쓰레드 아래에서 동작하고 있다면 `ShutDownHook`을, ExecutorService와 같은 멀티 쓰레드 환경을 활용하고 있다면 ShutdownThread를 통해 각 스레드에 접근하여 shutdown()하는 것이 좋다.
6️⃣ 디시리얼라이저 ( Deserializer )
프로듀서가 생성한 데이터가 시리얼라이저를 이용해 직렬화 되서 오기 때문에, 컨슈머는 디시리얼라이저를 통해 해당 데이터를 역직렬할 필요가 있다. 그래서 컨슈머 생성시 디시리얼라이저를 지정해 줘야하는데 주의점이 한가지 있다.
이벤트를 쓰기 위해 사용되는 시리얼라이저와
이벤트를 읽어올때 사용되는 디시리얼라이저가 서로 맞아야 한다.
이렇듯 Kafka: 프로듀서(Producer) - (2)에서 설명한 아파치 에이브로 ( 범용 직렬화 라이브러리 )를 우리가 사용해야 하는 이유이기도 하다.
'개발 서적 > 카프카 핵심 가이드' 카테고리의 다른 글
Kafka : 카프카 내부 메커니즘 (4) | 2024.12.24 |
---|---|
Kafka : 어드민 클라이언트(Admin Client) (0) | 2024.12.08 |
Kafka : 컨슈머 (Consumer) - (1) (0) | 2024.11.24 |
Kafka : 프로듀서 (Producer) - (2) (2) | 2024.11.11 |
Kafka : 프로듀서 (Producer) - (1) (1) | 2024.11.11 |