Kafka : 프로듀서 (Producer) - (2)
💡 해당 글은 '카프카 핵심 가이드 2nd Edition'을 읽고 정리한 글입니다.
1️⃣ 시리얼라이저( Serializer )
앞에서 `org.apache.kafka.common.serialization`를 통해 기본 시리얼라이저를 사용해보았다.
그런데 이 기본 시리얼라이저로는 모든 타입의 데이터를 직렬화할 수 없다.
즉, 더 일반적인 시리얼라이저가 필요하다.
2가지의 방법이 존재한다.
1. 커스텀 시리얼라이저 만들기
코드를 자세하게 볼 필요없다.
public class CustomerSerializer implements Serializer<Customer> {
...
@Override
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8"); //필드 접근 메서드
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getId()); //필드 접근 메서드
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
...
}
간략하게 보면
- `interface Serializer<T>`를 활용해 직렬화할 객체를 매개변수로 가지는 시리얼라이저를 정의한다.
- 이후 객체 필드 접근 메서드를 통해 필드들의 값을 직접 직렬화한다.
해당 방법에는 큰 취약점이 존재한다. 객체의 필드에 변화가 생기면 호환성에 큰 문제를 일으킨다.
그래서 에이브(Avro), 스리프트(Thrifht), 프로토버프(Protobuf) 등의 범용 직렬화 라이브러리를 권장한다.
그 중 아파치 에이브로에 대해 알아보자.
2. 아파치 에이브로 ( 범용 직렬화 라이브러리 )
동작 방식은 간단하다.
직렬화하려는 데이터의 전체 스키마를 저장하여 업데이트 전과 후 버전 사이의 호환성을 유지하는 것이다.
예를 들어 아래와 같이 스키마가 변경되었다고 해보자.
// 업데이트 전
{
"namespace": "ex.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "phoneNumber", "type": ["String", "null"]}
]
}
// 업데이트 후
{
"namespace": "ex.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "email", "type": ["String", "null"]} // 변경 부분
]
}
- 위와 같이 변경하게 되면, 변경 전인 `phoneNumber`와 변경 후인 `email`을 모두 저장하는 것이다.
- 만약 구버전의 어플리케이션이 신 버전의 스키마가 사용된 메세지를 읽는다면?
- 동일한 필드는 그대로 반환되고 phoneNumber는 null로 반환된다.
- 만약 신버전의 어플리케이션이 구 버전의 스키마가 사용된 메세지를 읽는다면?
- 동일한 필드는 그대로 반환되고 email은 null로 반환된다.
아파치 에이브로의 주의점은 2가지이다.
- 데이터를 쓸때 사용하는 스키마와 읽을 때 기대하는 스키마가 호환되어야 한다. ( 호환 규칙 )
- 역직렬화 할때 데이터를 쓸 때 사용했던 스키마에 접근 가능해야 한다.
여기서 아래와 같은 의문이 생길 수 있다.
모든 스키마를 저장하면 그만큼의 저장공간도 필요할텐데..?
맞는 말이다. 전체 스키마를 저장해야 한다면, 전체 레코드 사이즈의 2배 이상이 필요할 수도 있다.
게다가 주의점 2번째에서 봤듯이 역직렬화를 위해 스키마에 접근하는 방식도 필요하다.
그래서 이를 해결하기 위해 보통 스키마 레지스토리와 함께 사용한다.
스키마 레지스토리( Schema Registry )
아키텍쳐 패턴으로 카프카의 일부가 아닌 오픈소스 구현체를 하나 골라 사용하면 된다.
- 그렇게 되면 스키마 자체는 해당 레지스토리에 저장하고
카프카의 경우 스키마에 해당하는 고유 식별자만 가지고 있으면 된다.
에이브로와 레지스토리 사용을 전체적인 코드로 보면 아래와 같다.
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092", "broker2:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
configs.put("schema.registry.url", schemaUrl);
Producer<String, User> producer = new KafkaProducer<>(configs);
while (true) {
User user = UserGenerator.getNext();
ProducerRecord<String, User> record = new ProducerRecord<>("Topic", user.getName(), user);
producer.send(record)
}
- `io.confluent.kafka.serializers.KafkaAvroSerializer`로 시리얼라이저를 설정해준다.
- `props.put("schema.registry.url", schemaUrl);`로 시리얼라이즈에게 스키마가 저장된 위치를 전달한다.
- 객체를 생성할때 자바가 아닌 에이브로의 코드 생성 기능을 사용해 에이브로 특화 객체를 생성한다.
2️⃣ 파티션( Partition )
필수는 아니지만 특정 파티션에 레코드를 전달하고 싶다면, 특정 키값을 넣어 파티션을 지정할 수 있다.
// Key가 없는 경우 (null)
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "Value");
// Key가 있는 경우
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "Key", "Value");
기본 파티셔너는 아래와 같이 동작한다.
키 값이 null인 경우
- 접착성 처리를 위해 다음 배치로 넘어가기 전에 이전 배치에 먼저 채우는 라운드 로빈 알고리즘을 사용한다.
💡 집착성 처리 비교
레코드가 아래와 같이 있고 모두 키값이 지정되지 않았으며 파티션의 크기를 4이라고 가정해보자.
레코드 번호 : 0 1 2 3 4 5 6 7 9
집착성 처리를 하지 않으면 보내야하는 요청의 수가 4인 반면, 집착성 처리를 한다면 요청의 수를 3으로 줄일 수 있다.
집착성 처리 X 집착성 처리 O 파티션 1 0 4 8 파티션 1 0 1 2 3 파티션 2 1 5 9 파티션 2 4 5 6 7 파티션 3 2 6 파티션 3 8 9 파티션 4 3 7 파티션 4
키 값이 있는 경우
- 해당 키값을 해시한 결과를 기준으로 파티션을 진행한다.
- 동일한 키값은 항상 동일한 파티션에 저장되는 것이 원칙이기 때문에 토픽의 모든 파티션을 기준으로 count한다.
기본 파티셔너 이외에도 카프카는 `RoundRobinPartitioner`, `UniformStickyPartitioner`를 가지고 있다.
- 이 파티셔너는 키값을 포함하고 있어도 랜텀 파티션, 접착성 파티션을 수행한다.
주의사항
동작 도중 파티션의 수를 조정하게 된다면, 동일한 키값은 동일한 파티션에 저장되다는 원칙이 유효하지 않게 된다.
- 그래서 파티션을 결정하는 키값이 컨슈머 입장에서 매우 중요하다면,
토픽을 생성할 때부터 충분한 수의 파티션 수를 정의하자.
이외에도 상황에 따라 `inferface Partitioner`를 이용해 커스텀 파트셔너도 만들 수 있으니 참고하자.
➕ 적절한 파티션의 수는 얼마인가?

아래 글의 핵심을 얘기해보면
- 도중 파티션의 수를 변경하면 레코드 전달이 원활히 진행되지 않을 수 있다. 그러므로 현재 필요한 양보다 훨씨 많은 양으로 지정하여 파티션 수 조정을 최소화하라.
- 레이턴시가 신경쓰인다면, 100 * 브로커 수 * 파티션 리플렉션 수로 정의해라.
관련 정리 글 : How to Choose the Number of Topics/Partitions in a Kafka Cluster?
3️⃣ 헤더 ( Header )
레코드에 Topic, Value, Key 이외에도 헤더를 포함 할수 있다.
- 주 용도는 라우팅, 모니터링을 위함이며, 메세지 전달 내역을 기록한다.
- Key/Value 집합으로 구현되어 있으며 키값은 항상 String 타입, Value는 직렬화가 가능한 객체를 담아야 한다.
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "Value");
record.headers().add(new RecordHeader("message", "test".getBytes(StandardCharsets.UTF_8)));
4️⃣ 인터셉터 ( Interceptor )
모든 어플리케이션에 동일한 동작을 추가해야 하거나, 원래 코드를 수정하지 않고 변경사항을 넣아야 하는 경우있다.
이때 인터셉터를 사용하면 레코드를 send()하기 전에 변경사항을 추가할 수 있다.
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
- onSend() : 레코드를 브로커로 보내기 전, 직렬화되기 전에 호출되어 레코 정보를 확인하거나, 수정할 수 있다.
- onAcknowledgement() : 브로커가 보낸 응답을 확인할 수 있다. ( 수정 X )
일반적으로 모니터링, 정보 추적, 헤더 삽입 등에 사용된다.
5️⃣ 쿼터(Quota), 스로틀링(Throttling)
네트워크 대역폭이 제한되어있거나 디스크,메모리 와 같은 외부적인 환경요소에 제한이 있다면,
카프카 브로커는 쓰기/읽기 속도에 한도(Quota)를 설정할 수 있다.
- 쓰기/ 읽기 쿼터 : 클라이언트가 데이터를 받거나 전송하는 속도 제한
- 요청 쿼터 : 브로커가 요청을 처리하는 시간 비율 단위 제한
해당 한도는 기본값으로 설정(전체 적용)하거나, 인증된 특정 클라이언트를 지정할 수 있다.
`quota.producer.default = 2M` 혹은 `client.quota.callback.class` 등 정적으로 제한을 걸어둘 수 도 있지만,
일반적으로 config.sh 또는 AdminClient API를 통해 동적으로 설정한다.
// 1번
bin/kafka-configs --bootstrap-server localhost:9092 --alter \
--add-config 'producer_byte_rate=1024' --entity-name clientC \
-- entity-type clients
// 2번
bin/kafka-configs --bootstrap-server localhost:9092 --alter \
--add-config 'producer_byte_rate 1024,consumer_byte_rate=2048' \
--entity-name user1 --entity-type users
- 1번 : clientC(client-id)의 쓰기 속도를 1024바이트, 읽기 속도는 2048바이트로 제한한다.
- 2번 : user1(인증 주체)의 쓰기 속도를 1024바이트, 읽기 속도는 2048바이트로 제한한다.
이렇게 지정된 제한이 모두 채워졌다면,
그때부터 브로커는 클라이언트에게 요청에 대한 늦게 보낸다. (Throttling)
- 클라이언트마다의 응답을 대기하는 시간이 제한되어 있기 때문에
자연스럽게 요청속도는 줄어들면서 메세지 사용량 또한 줄어드는 원리이다. - 클라이언트의 오작동으로 인해 스로틀링 이후에 요청이 오는 경우 일시적으로 무시한다.