스터디/[카프카 인 액션] (2024.12)

[카프카인액션] 4장. 카프카 프로듀서 | 옵션 (acks, bootstrap.servers, value.serializer, key.serializer) | acks = all, -1, 1, 0 | 타임스탬프

ttoance 2025. 1. 11. 00:17
반응형

 

 

 

4.1 예제

  • 웹사이트가 고객을 위해 어떻게 작동하는지에 대한 사용자 피드백을 받는 애플리케이션을 가정한다.
    • 사용자는 지원 계정이나 챗봇에 이메일 생성하는 양식을 웹사이트에 제출
    • 지원 담당자가 받은 편지함을 열어 확인한다. 

 

  • 카프카 프로듀서를 도입하고 나면 
    • 이메일 대신 메시지를 카프카 토픽에 보내서 데이터에 더 쉽게 접근하게 하려고 한다.
    • 데이터를 필요한 형식으로 추출할 수 있다.

 

 

4.1.1 프로듀서 설명 

📌 프로듀서 작업과 메타데이터

  • 프로듀서의 역할: 메시지를 전송하는 역할을 담당.
  • 메타데이터: 프로듀서가 클러스터에 접근하려면 토픽 이름 외에도 각 파티션의 리더 레플리카 정보가 필요한데, 프로듀서에서 메타데이터에 대한 정보 가져오기가 포함됨 

 

📌 최종 사용자의 브로커 연결

  • 사용자(또는 애플리케이션)는 적어도 하나 이상의 브로커에 연결해야 함
  • 나머지 연결 관리(리더 레플리카 파악 등)는 카프카 클라이언트 라이브러리에서 처리 

 

📌 분산 시스템의 오류 처리

  • 카프카는 순간적인 네트워크 오류(네트워크 블립 network blip)를 감안해 설계.
    • 자동으로 재시도(retry) 로직이 적용.

🛠️ 네트워크 블립 ▶  https://ddoance.tistory.com/306

 

network blip

네트워크 블립(Network Blip): 짧은 네트워크 장애를 설명하는 "공식적인" 표현으로, 원인을 조사하기 귀찮을 때 사용하는 표현입니다."네트워크 접근이 복구되었습니다. 잠깐의 네트워크 블립이 있

ddoance.tistory.com

 

  • 만약 메시지 순서 보장하려면 아래 옵션을 설정하면 가능하다. 
    • retries=3 (재시도 3번)
      • 네트워크 오류나 임시 장애 발생 시, 프로듀서는 메시지를 최대 3번까지 재시도
      • 이를 통해 전송 실패 시에도 데이터 손실을 방지
    • max.in.flight.requests.per.connection=1 (한 번에 하나의 요청만 전송)
      • 한 번에 한 요청만 전송하도록 제한
      • 동시에 여러 요청이 처리될 경우, 재시도 시 요청 순서가 뒤바뀔 위험이 있음
      • 이 설정은 메시지 순서를 엄격히 보장하는 데 필수적
    • acks=all (모든 리플리카에서 데이터 저장 확인)
      • 모든 리플리카(파티션 리더 및 팔로워)에 데이터가 저장되었음을 확인한 후에만 성공 응답을 반환
      • 리더 장애 시 데이터 손실을 방지

 

  •  

📌 스레드 안전성 (내장된 특성) 

  • 브로커는 메시지를 덮어쓰지 않고 추가만 하기 때문에 프로듀서 간 충돌 걱정이 없음

 

📌 멱등성 프로듀서

활성화 (enable.idempotence=true)

  • 동일한 메시지를 여러 번 전송해도 브로커에 한 번만 저장
  • 프로듀서가 실패하거나 재시도하는 상황에서도 데이터 중복이 발생하지 않도록 보장
  • 자동으로 다음 설정이 활성화:
    • acks=all (모든 리플리카에 저장되었는지 확인)
    • retries=MAX_INT (재시도를 무제한으로 허용)
    • max.in.flight.requests.per.connection=5 (동시에 처리 가능한 요청 개수는 기본 5로 설정)

비활성화 (enable.idempotence=false)

  • 동일한 메시지가 여러 번 전송될 경우 중복 저장이 발생할 수 있음
  • 네트워크 오류나 재시도 로직이 동작할 때 데이터 불일치 위험 존재.
  • 설정이 유연하지만, 안정성이 떨어짐
    • acks 설정에 따라 데이터 유실 가능 (acks=1이나 acks=0 사용 시 특히 위험)
    • retries가 제한적이므로 전송 실패 가능성 증가.

 

 

4.2 프로듀서 옵션 

용도
acks 메시지 전달이 성공하기 위한 프로듀서가 요구하는 복제 확인 (acknowledgement)의 수
bootstrap.servers 시작 시 연결할 하나 이상의 카프카 브로커
value.serializer 값의 직렬화에 사용되는 클래스
key.serializer 키의 직렬화에 사용되는 클래스

 

모든 프로듀서 구성 키 이름 확인하는 방법 ▶  

https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html

 

ProducerConfig (kafka 2.7.0 API)

 

kafka.apache.org

 

 

4.2.1 브로커 목록 구성 

 

  • 프로듀서는 부트스트랩 서버와 연결
  • 부트스트랩 서버를 통해 클러스터의 전체 브로커 정보를 받아옴
  • 이후 프로듀서는 필요한 브로커(리더 브로커)에 직접 연결하여 작업을 수행

 

4.2.2 더 빨리(또는 안전하게) 처리하기 

  • 카프카의 비동기 메시지 패턴
    • 프로듀서 전송 요청의 결과를 코드에서 기다리지 않고
    • 콜백 또는 Future 객체를 사용해 비동기적으로 성공 또는 실패 처리
  • acknowledgement = ack 
    • 프로듀서가 완료된 요청을 반환하기 전에 파티션 리더의 팔로워로부터 얼마나 많은 수신확인을 받아야 하는지를 제어
    • 유효한 값으로는 all, -1, 1, 0 이 있음 

 

📌  acks = 0

  • 동작방식
    • 프로듀서는 메시지를 브로커에 전송
    • 브로커는 메시지를 수신하지만, 확인 응답(ACK)을 프로듀서에게 보내지 않음
    • 프로듀서는 브로커로부터 응답을 기다리지 않으므로, 메시지가 브로커에 제대로 기록되었는지 알 수 없음
  • 가장 낮은 대기 시간을 얻을 수 있지만 메시지의 안전한 배달은 희생해야 함. 
  • 실패 감지 불가 
    • 프로듀서는 브로커로부터 응답을 기다리지 않으므로 전송 실패(예: 네트워크 장애, 브로커 장애)를 감지할 방법이 없음.
    • 실패를 감지하지 못하면 재시도 로직을 실행할 수 없음 
  • ex, 페이지 클릭 수집하고 이벤트를 카프카로 보내는 웹 추적 플랫폼이 있다고 가정했을 시, 단일 링크 누르기 또는 hover 이벤트를 잃어도 비즈니스 영향은 없음 
  • 프로듀서로부터 전송되면 잊혀진다 ▶ 메시지가 파티션에 도달하지 않았을 수 있음
    • 리더 레플리카에 도달한 경우에도 프로듀서는 팔로워 레플리카가에 복사 성공했는지 여부를 알 수 없음. 

 

📌  acks = all 또는 -1 

 

  • 배달 보장에 있어서 가장 강력한 옵션 
  • all값은 파티션 리더의 레플리카가 ISR in-sync-replica (동기화 상태의 레플리카)의 전체 목록에 대한 복제 완료 확인하기를 기다림 
    • 프로듀서는 파티션에 대한 모든 레플리카가 성공하기 전까지는 성공 확인을 받지 못함. 

 

📌  acks = 1

리더 레플리카가 메시지를 확인하고 프로듀서에게 전송하는 동안 메시지 복사본을 만드는데 실패한 레플리카가 클러스터에서 메시지가 전달되지 않은 것처럼 나타남

  • 수신 확인에는 메시지 수신 (특정 파티션의 리더 레플리카)가 확인을 프로듀서에게 다시 보내는 과정이 포함
  • 프로듀서 클라이언트는 이 수신확인을 기다림
  • 그러나 실패로 인해 리더가 다운되기 전에 팔로워가 메시지를 복사하지 않았을 경우, 해당 파티션의 팔로워 레플리카에는 메시지가 나타내지 않음

 

4.2.3 타임스탬프

  • 최신 버전의 프로듀서 레코드에는 타임스탬프가 포함되어 있다.
  • message.timestamp.type 토픽 구성을 CreateTime으로 설정하면 클라이언트에서 설정한 시간이 사용되는 반면, LogAppendTime으로 설정하면 브로커 시간이 사용된다.
    • ex, 결제 시간이 브로커에게 전달되는 시간이 아니라 발생하는 시간을 얻기 위해 사용

메시지가 순서가 맞지 않을 수 있는 경우의 예시

  • 프로듀서를 사용할 때의 또 다른 옵션은 프로듀서 인터셉터를 만들어서 측정과 모니터링을 지원함
    • 필수적인 것이 아니다 

 

4.3 코드 

https://github.com/Kafka-In-Action-Book/Kafka-In-Action-Source-Code/tree/master/KafkaInAction_Chapter4

 

Kafka-In-Action-Source-Code/KafkaInAction_Chapter4 at master · Kafka-In-Action-Book/Kafka-In-Action-Source-Code

Source Code for the book Kafka in Action. Contribute to Kafka-In-Action-Book/Kafka-In-Action-Source-Code development by creating an account on GitHub.

github.com

public class AlertProducer {

  public static void main(String[] args) {

    Properties kaProperties = new Properties();
    kaProperties.put("bootstrap.servers", "localhost:9092,localhost:9093");
    
    kaProperties.put("key.serializer", AlertKeySerde.class.getName());   //<1>
    kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    /** Use {@link org.kafkainaction.partitioner.AlertLevelPartitioner} to determine partition */
    kaProperties.put("partitioner.class", AlertLevelPartitioner.class.getName());    //<2>

    try (Producer<Alert, String> producer = new KafkaProducer<>(kaProperties)) {
      Alert alert = new Alert(1, "Stage 1", "CRITICAL", "Stage 1 stopped");
      ProducerRecord<Alert, String>
          producerRecord = new ProducerRecord<>("kinaction_alert", alert, alert.getAlertMessage());   //<3>

      producer.send(producerRecord, new AlertCallback());
    }
  }

}

 

  • 직렬화: 사용자 정의 키 직렬화(AlertKeySerde)와 기본 값 직렬화(StringSerializer)를 활용
  • 파티셔닝: AlertLevelPartitioner를 통해 특정 조건(예: 심각도)에 따라 메시지를 적절한 파티션에 저장
  • 비동기 전송: send 메서드와 콜백(AlertCallback)을 사용해 비동기 메시지 전송과 에러 처리의 유연성을 제공

 

 

public class AlertLevelPartitioner implements Partitioner {   //<1>

  public int partition(final String topic,
                       final Object objectKey,
                       final byte[] keyBytes,
                       final Object value,
                       final byte[] valueBytes,
                       final Cluster cluster) {
    
    
    int criticalLevelPartition = findCriticalPartitionNumber(cluster, topic);
    
    return isCriticalLevel(((Alert) objectKey).getAlertLevel()) ?
        criticalLevelPartition :
        findRandomPartition(cluster, topic, objectKey);
  }
  
  public int findCriticalPartitionNumber(Cluster cluster, String topic) {
    //not using parameters but could if needed for your logic
   return 0; 
  }
  
  public int findRandomPartition(Cluster cluster, String topic, Object objectKey) {
    //not using parameter objectKey but could if needed for your logic
    List<PartitionInfo> partitionMetaList =
        cluster.availablePartitionsForTopic(topic);
    
      Random randomPart = new Random(); 
      return randomPart.nextInt(partitionMetaList.size());
  }
  
  public boolean isCriticalLevel(String level) {
    if (level.toUpperCase().contains("CRITICAL")) {
      return true;
    } else {
      return false;
    }
  }

  @Override
  public void close() {
    
  }

  @Override
  public void configure(final Map<String, ?> map) {

  }
}
  • "CRITICAL" 레벨 메시지: 항상 파티션 0에 저장.
  • 그 외 메시지: 랜덤 파티션에 저장
    • 이 로직은 특정 중요도가 높은 메시지를 한 곳에서 처리하게함(예: 시스템 경고, 긴급 알림).

 

 

public class AlertTrendingProducer {

  private static final Logger log =
      LoggerFactory.getLogger(AlertTrendingProducer.class);

  public static void main(String[] args)
      throws InterruptedException, ExecutionException {

    Properties kaProperties = new Properties();
    kaProperties.put("bootstrap.servers",
                           "localhost:9092,localhost:9093,localhost:9094");
    kaProperties.put("key.serializer",
                          AlertKeySerde.class.getName());   //<1>
    kaProperties.put("value.serializer",
                           "org.apache.kafka.common.serialization.StringSerializer");

    try (Producer<Alert, String> producer = new KafkaProducer<>(kaProperties)) {
      Alert alert = new Alert(0, "Stage 0", "CRITICAL", "Stage 0 stopped");
      ProducerRecord<Alert, String> producerRecord =
          new ProducerRecord<>("kinaction_alerttrend", alert, alert.getAlertMessage());    //<2>

      RecordMetadata result = producer.send(producerRecord).get();
      log.info("kinaction_info offset = {}, topic = {}, timestamp = {}",
               result.offset(), result.topic(), result.timestamp());
    }
  }
}
  • Kafka 프로듀서를 생성
  • Alert 객체를 생성하고, 메시지 전송을 위해 ProducerRecord를 구성
  • 메시지를 전송하고, RecordMetadata를 통해 전송 결과를 확인
  • 로그에 전송 결과(오프셋, 토픽 이름, 타임스탬프) 출력

 

public class AuditProducer {

  private static final Logger log = LoggerFactory.getLogger(AuditProducer.class);

  public static void main(String[] args) throws InterruptedException, ExecutionException {

    Properties kaProperties = new Properties();    //<1>
    kaProperties.put("bootstrap.servers", "localhost:9092,localhost:9093");
    kaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    kaProperties.put("acks", "all");   //<2>
    kaProperties.put("retries", "3");    //<3>
    kaProperties.put("max.in.flight.requests.per.connection", "1");

    try (Producer<String, String> producer = new KafkaProducer<>(kaProperties)) {
      ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kinaction_audit", null,
                                                                           "audit event");

      RecordMetadata result = producer.send(producerRecord).get();
      log.info("kinaction_info offset = {}, topic = {}, timestamp = {}", result.offset(), result.topic(), result.timestamp());

    }
  }

}

신뢰성 보장을 위한 주요 설정 요약

  1. acks=all
    • 모든 리플리카가 메시지를 저장했음을 보장하므로, 데이터 유실 가능성을 최소화
  2. retries=3
    • 전송 실패 시 최대 3번까지 재시도해 네트워크 오류나 브로커 문제를 해결
  3. max.in.flight.requests.per.connection=1
    • 메시지가 순서대로 전송되도록 보장



4.3.1 클라이언트와 브로커 버전 

 

KIP-97: Improved Kafka Client RPC Compatibility Policy - Apache Kafka - Apache Software Foundation

Status Current state: Implemented Discussion thread: Here JIRA: KAFKA-4462 Motivation Currently, we have a “one-way” backwards compatibility policy.  New brokers support older clients, but new clients do not support older broker versions.  This polic

cwiki.apache.org

 

 

 

https://www.youtube.com/watch?v=geMtm17ofPY

 

반응형