개발 서적/카프카 핵심 가이드

Kafka : 프로듀서 (Producer) - (1)

H_JU_0527 2024. 11. 11. 10:38
💡 해당 글은 '카프카 핵심 가이드 2nd Edition'을 읽고 정리한 글입니다.

 

1️⃣ 프로듀서(Producer)란?

번역 그대로 생산자로 이해하면 좋다.

  • 카프카를 큐, 메세지 버스, 데이터 저장 플랫폼 등 다양한 형식으로 사용 가능한데,
         어떤 식으로 사용하든 간에 컨슈머(Consumer)에게 줄 데이터를 만들어내는 역할이 필요하다. 
  • 그 데이터를 만들어내는 역할을 수행하는 것이 바로 프로듀서(Producer)이다.

그렇다면 어떤 식으로 프로듀서는 동작할까?

카프카 프로듀서 요소 개괄

  1. 컨슈머가 사용할 데이터인 `Producer Record`객체를 만든다. 구성은 아래와 같이 구성되어 있으며,
         주제(Topic)와 값(Value)은 필수이고 파티션(Partition)과 키(Key)는 선택사항이다.
    • Topic : 카프카에 저장되는 여러 메세지를 구분하는 용도로 메세지를 구분하는 주제
    • Value : 해당 Topic에 저장할 데이터
    • Partition : 같은 Topic에 많은 메세지가 들어왔을 때, 병렬처리하기 위해 존재하는 것으로 Topic은
           여러 개의 파티션으로 나뉜다. 해당 값이 존재하는 경우, 특정 파티션에 메세지를 보낼 수 있다.
    • Key : Partition과 유사하게 특정 파티션을 지정하기 위한 용도
  2. 객체를 만들었다면, 해당 객체가 전송될 수 있도록 Serializer를 통해 직렬화한다.
  3. 파티셔너(Partitioner)에게 전달되고, 객체의 파티션, 키값의 유무에 따라 Record Batch에 분류된다.
  4. 그렇게 되면 스레드가 해당 배치를 카프카 브로커에게 전송한다.
    • 카프카 브로커 : 프로듀서와 컨슈머 사이에 존재하는 것으로 카프카 시스템 자체를 의미한다.
  5. 브로커에게 성공적으로 전달이 되었다면, 성공 응답으로 `RecordMetadata`객체를 받는다.
        해당 객체는 성공적으로 보낸 레코드의 오프셋 정보를 담고 있다. 
  6. 만약 실패했다면, 재전송할 수 있으며 몇 번의 재전송 시도 후에도 변화가 없다면 에러를 리턴한다.

2️⃣ 프로듀서 생성하기

이제 직접 코드로 프로듀서를 생성해 보자.


프로듀서 생성 시 필수 속성 3가지가 존재하는데 이를 하나씩 살펴보자.

💡 카프카 2.0 이후부터 설정값 오타를 방지하기 위해, 설정값을 정적 상수로 지정해 두었다.
      아래 Config 클래스를 참고하여 속성값을 지정하자. 

      프로듀서 : `org.apache.kafka.clients.producer.ProducerConfig`
      컨슈머 : `org.apache.kafka.clients.consumer.ConsumerConfig`

1. bootstrap.servers

Properties configs = new Properties();
// BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092", "broker2:9092");
  • 객체를 전송할 카프카 브로커의 `host:port`를 지정한다.
  • 지정한 브로커가 동작을 멈춘 경우 다른 브로커에게 전송하기 위해, 기본적으로 2개는 설정하는 것을 권장한다.

2. key.serializer

import org.apache.kafka.common.serialization.StringSerializer;

Properties configs = new Properties();
// KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  • 키 값을 직렬화하는 시리얼라이저(serializer)를 지정한다.
  • 코드의 가독성을 위해 `interface Producer<K,V>`는 자바 객체도 키 혹은 값으로 전송할 수 있도록 되어있다.
  • 이를 어떤 바이트 배열 형식으로 바꿔야 하는지 프로듀서에게 알려주는 것이다.
  • `kafka,common.serialization`에 자주 사용되는 타입의 시리얼라이저가 존재하여 사용하면 된다.

3.  value.serializer

import org.apache.kafka.common.serialization.IntegerSerializer;

Properties configs = new Properties();
// VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
  • 레코드의 데이터 값(Value)을 직렬화하는 시리얼라이저(serializer)를 지정한다.

이렇게 필수적인 3가지 속성값을 알아보았다. 이 3가지 이외에도 수많은 설정값들이 있는데,

대부분 합리적인 기본값으로 설정되어 있기 때문에 일일이 잡아줄 필요는 없다고 한다.

 

하지만 상황에 맞춰 이러한 설정값을 조정할 시 최적화 이점을 얻을 수 있기 때문에

다른 설정값에 대해 알고 있는 것도 중요하다. 나머지 설정값은 뒤에서 알아보자.

3️⃣ 프로듀서로 메세지 전송하기

이제 생성한 프로듀서를 이용해 메세지를 전송해보자.

 

크게 3가지 전송 방법이 있다.

 

1. 파이어 앤 포켓 (Fire and forget)

Producer<String, String> producer = new KafkaProducer<String, String>(configs);
try {
    producer.send(new ProducerRecord<String, String>("Topic", "Value"));
} catch (Exception e) {
    e.printStackTrace();
} finally {
    producer.close();
}

가장 기본이 되는 전송방법이며 제일 간단하다.

  • 메세지를 전송한 후, 성공 혹은 실패 여부를 신경쓰지 않은 것을 파이어 앤 포켓 방식이라고 한다.
  • 프로듀서와 Record 객체를 생성후 `producer.send()`를 통해 브로커에게 Record 객체를 전달한다.
  • `send()`는 반환값으로 `Future`객체를 반환하는데 위 코드에서는 해당 객체에 대한 처리를 전혀하고 있지 않다.

이 방식은 메세지가 누락되어도 아무런 조치를 취하고 있지 않기 때문에,

일반적으로 해당방법은 사용하지 않는다.

 

💡Send()메서드와 Future에 대해 알아보기

더보기

🧑‍💻 Send()

 public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return this.doSend(interceptedRecord, callback);
}
  • Send() 메서드를 살펴보면, 레코드를 보내기전에 인터셉터(Interceptor)의 `onSend`으로 혹시 모를 재정의 되어있는 Record 객체를 가져온 후 doSend()를 수행한다.
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    ...
    
    // Key 직렬화
    byte[] serializedKey;
    try {
    	serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
    } catch (ClassCastException var21) {
    	ClassCastException cce = var21;
    	throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", cce);
    }
    // Value 직렬화
    byte[] serializedValue;
    try {
    	serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
    } catch (ClassCastException var20) {
    	ClassCastException cce = var20;
    	throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", cce);
    }
	
    ...
      
    // 파티션 결정 및 
    if (result.abortForNewBatch) {
    	int prevPartition = partition;
    	this.onNewBatch(record.topic(), cluster, prevPartition);
    	partition = this.partition(record, serializedKey, serializedValue, cluster);
        result = this.accumulator.append(record.topic(), partition, timestamp, serializedKey, serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
    }
    
    ...

    return result.future;
}
  • doSend()는 긴 메서드이지만 핵심만 살펴보면 프로듀서 동작 과정에서 설명했던 것을 순서대로 수행하고 있다.
  • 지정해 놓은 시리얼라이저로 Key와 Value를 바이트 배열(byte[ ])로 직렬화한다.
  • 이후 파티션을 결정한 후, `RecordAccumulator`에 Record를 저장한다.
  • `Sender`가 Thread를 생성한 후 `RecordAccumulator`에 저장되어있는 Record를 전송한다.
  • 반환값은 Future 객체를 반환한다.

🧑‍💻 Future

public interface Future<V> {
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    boolean isCancelled();
    boolean isDone();
    boolean cancel(boolean mayInterruptIfRunning)
    ...
}
  • 비동기적 연산 처리의 결과를 표현하는 인터페이스이다.
  • 비동기 연산이 처리되었는지, 진행중이라면 기다린 후 연산 결과를 반환하는 등 다양한 메서드를 가지고 있다.
    • get() : 연산 결과를 가져오는 메서드로, 진행중이라면 결과값이 나올 때까지 기다린다. ( block )
    • get(long timeout, TimeUnit unit) : get()과 동일한 역할을 수행하면서 응답을 기다리는 Timeout을 가진다.
    • isDone() : 연산이 완료되었는지 결과를 반환한다.
    • isCancelled() : 연산이 취소되었는지를 반환한다.
    • cancel() : 현재 진행중인 연산을 중단한다. 반환값으로 취소되었다면 true를 취소하지 못했다면 false를 반환한다. 

2. 동기 메세지 전송

Producer<String, String> producer = new KafkaProducer<String, String>(configs);
try {
    RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("Topic", "Value")).get();
} catch (Exception e) {
    e.printStackTrace();
} finally {
    producer.close();
}
  • `producer.send()`가 반환하는 Future 객체의 get()을 통해 연산이 완료될때까지 대기한다.
  • 만약 전송이 성공적으로 되지 않았다면 예외를 발생시키고, 성공했다면 Metadata 객체를 반환한다.

해당 전송 방식에서 메세지를 전송하는 스레드는 그 시간 동안 아무것도 하지 못하고 대기해야 한다.

즉, 카프카 클러스터에 작업이 몰리게 되면 완료되기까지 지연이 발생할 수 있다는 뜻이기도 하다.

 

그래서 해당 방식 또한 잘 사용되지는 않는다.

3. 비동기 메세지 전송

그런데 이런 의문이 들 수 있다.

전송이 완료되고 받는 Metadata 객체는 정말 필요할까?

 

Metadata에는 해당 레코드의 토픽, 파티션, 오프셋 등을 담고 있는데

실제로 이러한 데이터는 전송을 성공하고 난 후 이기 떄문에 필요 없는 경우가 많다. 

 

즉, 굳이 send()의 응답을 기다리지 않아도 된다.

그래서 이를 비동기로 처리하면 훨씬 빠르게 전송할 수 있다.

 

그렇다면 예외가 발생하는 경우는 어떻게 해야 할까?

이는 콜백을 사용하면 예외에 대한 처리도 해줄 수 있다.

public interface Callback {
  void onCompletion(RecordMetadata var1, Exception var2);
}

class TestCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (metadata != null) {
            // metadata 처리가 필요하다면 
        } else {
            // Exception 처리
            e.printStackTrace();
        }
    }
}

Producer<String, String> producer = new KafkaProducer<String, String>(configs);
try {
    producer.send(new ProducerRecord<String, String>("Topic", "Value"), new TestCallback());
} catch (Exception e) {
    e.printStackTrace();
} finally {
    producer.close();
}
  • `Callback`인터페이스를 통해 콜백을 구현한다.
  • 해당 인터페이스에는 `onCompletion()`만 가지고 있으며, RecordMetadata와 Exception을 받는다.
  • 카프카에서 전송을 성공하면 해당 메서드는 Metadata객체를, 실패하면 Exception객체를 받게 된다.
  • 그래서 해당 메서드에서 상황에 따른 후처리를 진행해주면 된다.
  • 해당 콜백은 send()에 Record 객체과 함께 담아서 사용한다.

4️⃣ 다양한 프로듀서 설정값

앞에서 말했듯이 다양한 프로듀서 설정값이 있다.

그 중에서 성능 및 신뢰성 등에 상당한 영향을 미치는 설정값만 살펴보자.

1. client.id

브로커가 프로듀서가 보내온 메세지를 서로 구분할때 사용한다.

그래서 해당 값을 지정하면 특정 클라이언트에 대한 처리가 쉬워지고, 트러블 슈팅에도 용이하다.

 

상세한 로그 메세지를 출력하거나 클라이언트 별로 사용량을 조절해야하는 경우 등에 사용된다.

2. acks

acks 매개변수는 프로듀서가 쓰기 작업을 성공했다는 것을 어떻게 판별할지를 결정한다.

acks 값마다 성공 판별 기준이 다르다보니 해당 값은 메세지 유실 가능성과 매우 연관이 깊다.

 

설정 가능한 3가지 값을 알아보기 전에 사전 지식에 대해 먼저 짚고 넘어가자.

 

💡 파티션 복제 ( Partition Replication ) 

가용성을 위해서 카프카에서는 각 Topic의 파티션들을 복제하여 다른 브로커들에게 넣어둔다.
예시는 아래와 같다. (복제 수는 브로커의 수를 넘지 못한다)

브로커 1 - 파티션 1 | 파티션 2 - 복제 | 파티션 3 - 복제
브로커 2 - 파티션 2 | 파티션 1 - 복제 | 파티션 3 - 복제
브로커 3 - 파티션 3 | 파티션 1 - 복제 | 파티션 2 - 복제

Topic 생성시 `replication-factor` 값으로 복제 수를 지정할 수 있다.
이렇게 파티션 복제를 하게 되면 다른 브로커에도 복제 파티션이 존재하기 때문에 가용성이 높아지지만 복제하는 만큼 디스크 사용량 또한 증가한다는 단점도 존재한다.
그래서 보통은 `replication-factor` 값 3으로 설정하는 것이 안정적이다.
💡 Leader와 Follower 그리고 ISR(in-sync replica)

파티션 복제는 원본에서 복제본 방향으로 비동기 처리되고,

자연스럽게 원본은 Leader, 복제본은 Follower라고 불린다.

일관성을 위해 항상 원본(Leader)을 기준으로 동작하며,
만약 Leader가 중단되었을때는 Follower가 새로운 Leader로 선정된다.

그리고 정상적인 복제를 보장하는 Follower 그룹을 ISR(in-sync replica)라고 한다.

 

이제 ack값에 따른 판단 방법에 대해 알아보자.

acks = 0일때

  • 메세지를 보내면 브로커의 응답을 기다리지 않는다.  보내는 즉시 성공으로 인식한다.
  • 그래서 브로커가 메세지를 받지 못한 경우를 프로듀서는 인식할 수 없게 되고 메세지는 유실된다.
  • 하지만 속도는 빠르기 때문에, 높은 처리량이 필요할때 사용한다.

acks = 1일때

  • 리더(Leader)가 메세지를 받고 브로커가 이에 대한 성공 응답을 기준으로 판별한다.
  • acks =  0보다는 메세지 유실 확률은 낮아졌지면 여전히 유실 가능성은 존재한다.
  • 리더가 중지되었고 새로운 리더가 선정되지 않은 상태에서 받게 된다면 해당 메세지는 유실된다.

acks = all 일때

  • 메세지가 모든 ISR(in-sync replica)에 성공적으로 전달되어야 성공한 것으로 판별한다.
  • 리더가 중지되어도 최소 다른 팔로워가 가지고 있기 때문에 가장 안전한 형태이다.
  • 하지만 acks = 1 인 경우와 비교하면, 더 긴 지연시간을 가지게 된다.
💡 주의 사항 
acks = all 의 경우 `replication-factor`만큼 전달됬는지 확인하는 것이 아니라 `min.insync.replicas`만큼 분산되어 있는지 확인한다.
➕ 카프카 3.0부터 acks = all 이 기본값이 되었다. 
관련 글 : https://blog.voidmainvoid.net/507

3. 메세지 전달 시간

메세지 전달 시간은 send()를 호출한 시점부터 성공 혹은 실패 응답을 받을때 까지 시간을 말한다.

해당 시간을 카프카 2.1부터 2구간으로 나눠서 처리할 수 있도록 했다.

 

1구간 : send() 비동기 호출이 이루어진 시각부터 그에 대한 결과를 리턴할때 까지의 시간

  • 해당 구간에서는 send()를 호출한 스레드는 block된다.

2구간 : 결과 리턴한 시간부터 콜백이 호출될 때까지 걸리는 시간

  • Record 객체가 배치에 추가된 시점부터 성공, 실패, 재전송, 예외 등등 모든 과정이 완료된 시점까지

메세지 전달 시간 그래프

 

메세지 전달 시간을 구간별로 표현하면 위와 같고, 해당 시간에 영향을 주는 설정 매개변수를 알아보자.

I. max.block.ms

  • send() 호출 이후 해당 스레드가 얼마나 블록되는지 결정한다.

II. delivery.timeout.ms

  • 레코드 전송 준비가 완료된 시점부터 브로커의 응답을 받거나 포기하게되는 시점까지의 시간을 결정한다.
  • 즉, 최종 전송에 대한 결과값을 받기까지의 새로운 제한시간이다.
  • 만약 재전송 시도중에 timeout이 발생하면, 마지막 재시도 하기전의 브로커 예외 응답이 전달된다.
  • 재시도 관련 설정해야 한다면 delivery.timeout.ms를 가능한 최대로 설정하는 것이 합리적이다.

III. request.timeout.ms

  • 프로듀서가 데이터를 전송하고 서버로부터 응답을 얼마나 기다릴 것인지 결정한다.
  • 쓰기 요청후 전송을 포기하기까지의 대기하는 시간임을 주의하자. ( 재시도 시간 포함 X )
  • 응답이 없이 타임아웃이 발생하는 경우, 재전송을 시도하거나 예외를 던진다.

IV. retries, retry.backoff.ms

  • retries는 프로듀서다 전송을 포기하거나 예외가 발생할때 까지 재전송을 시도하는 횟수를 결정한다.
  • 이러한 재시도마다 프로듀서는 대기하는데, 이 시간을 retry.backoff.ms로 조정할 수 있다.
  • 하지만 이 값들을 변경하는 것은 현재 버전에서 권장하고 있지 않다.
  • 해당 값을 변경하려면, 크래시 난 브로커가 복구되기까지의 시간을 계산한 후
         재전송 시간을 앞의 시간보다 길게 잡아
    메세지 재전송을 너무 일찍 포기하기 않도록 해야한다.
  • 그래서 delivery.timeout.ms를 늘리는 것을 권장한다.

4. linger.ms

  • KafkaProducer는 배치가 가득차거나 linger.ms 제한시간이 되었을때 배치를 전송한다.
  • linger.ms는 현재 배치를 브로커에 전송하기 전까지 대기하는 시간을 결정한다.
  • 몇 ms 가량 더 기다리게 되고, 이는 지연시간을 조금 증가시키는 대신 처리량을 증대시킨다.

5. buffer.memory

  • 메세지를 대기시키는 버퍼의 크기를 결정한다.
  • 해당 버퍼에 가득 찼다면, 메모리 상에서 공간이 생겨 대기하게 되고 이마저도 다 찼다면 예외를 던진다.
  • 해당 타임 아웃은 send() 메서드에서 발생하지,
         send() 메서드가 리턴하는 Future 객체에서 발생하지 않는다.

6. compression.type

  • 기본적으로 메세지는 압축되지 않은 상태로 전송된다.
  • 하지만 네트워크 대역폭이 제한적인 상황에서 활성화하면 네트워크 사용량과 저장 공간을 절약할 수 있다.
  • `snappy`, `gzip`, `lz4`, `zstd` 중 한가지를 선택하여 사용하면 된다.

7. batch.size

  • 파티션에 레코드가 모일 경우, 배치 단위로 모아서 전송하는데 이때 사용되는 배치의 크기를 결정한다.
  • 여기서 size는 개수가 아닌 바이트 크기임을 주의하자.
  • 배치가 가득 차게 되면 프로듀서는 모든 메세지를 한번에 전송하게 되는데,
    이는 배치가 가득 찰때까지 기다린다는 뜻은 아니다. 배치에 하나의 메세지만 있어도 전송한다.
  • 배치 사이즈로 키워도 전송지연은 발생하지 않는다.

8. max.in.flight.requests.per.connection

  • 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메세지의 수를 결정한다.
  • 메모리 사용량이 증가하지만 처리량도 증가한다. 
  • 기본값 : 5
💡 순서 보장

카프카 파티션 내에서 메세지의 순서를 보존하게 된다.
즉, 프로듀서가 특정 순서로 메세지를 보내게 되면 컨슈머 또한 해당 순서대로 메세지를 받게된다.

그런데 만약에 retries > 0 인 상태에서 max.in.flight.requests.per.connection ≥ 1 로 잡아주면
순서가 뒤집힐 가능성이 존재한다.

ex)
time = 1  : 첫번째 배치 전송 실패 후 재전송 준비중

time = 2  : 두번쨰 배치 전송(in-flight) 중 첫번째 배치 재전송 시도
위와 같은 상황이면 두번째 배치가 전송된 후 첫번째 배치가 전송되게 된다.

하지만 성능상의 고려 때문에 in-flight 요청은 최소 2이상이 되어야 하고 신뢰성을 위해 재시도 횟수도 높아야 하기 때문에 이떄는 `enable.idempotence = true`로 설정하는 것이 좋다.

해당 설정은 최대 5개의 in-flight요청을 허용하면서 순서를 보장하고, 재전송에 대한 중복도 방지해준다.

9. max.request.size , message.max.bytes

  • max.request.size : 프로듀서의 쓰기 요청의 크기를 결정한다. (기본값 : 1MB)
  • message.max.bytes : 브로커가 받아들일 수 있는 최대 메세지의 크기 ( 브로커 설정 )
  • 이 2가지를 동일하게 맞춤으로써 브로커가 받아들이지 못하는 메세지를 보내는 것을 방지할 수 있다.

10. receive.buffer.bytes, send.buffer.bytes

  • 데이터를 읽거나 쓸 때 소켓이 사용되는 TCP 송수신 버퍼의 크기를 결정한다.
  • 해당 값을 -1로 설정시 운영체제의 기본값이 사용된다.
  • 다른 데이터 센터에 위치한 브로커와 통신할 경우, 이 값을 올려잡아 주는 것이 좋다.

11. enable.idempotence

이런 경우를 생각해보자.

 

acks = all인 상태로 특정 파티션의 메세지가 다른 브로커에게 잘 복제되어있는 상태이다.

이때 리더가 크래시되어 다른 팔로워가 리더가 되었다고 가정해보자.

 

그러면 기존 리더의 메세지를 새리더 브로커에게 전달하게 되는데
acks = all로 인해서 이미 가지고 있던 메세지를 새리더는 중복으로 가지게 된다.

 

이러한 사태를 방지해주는 것이 해당 설정 값이다.

  • `enable.idempotence = true`하게 되면 멱등적 프로듀서 기능이 활성화된다.
  • 레코드를 보낼때마다 순차적인 번호를 부여하여 전송하게 되고 해당 번호로 중복 여부를 확인한다.

그래서 멱등적 프로듀서 기능을 활성하기 위해 아래 3가지를 충족시키면 된다.

  1. retries  ≥ 1
  2. max.in.flight.requests.per.connection ≤ 5
  3. acks = all