개발/kafka

[kafka] producer interceptor

ttoance 2025. 1. 12. 07:35
반응형

 

 

ProducerInterceptorKafka 클러스터에 메시지가 발행되기 전 프로듀서에서 수신한 레코드를 가로채거나 변경(mutate)할 수 있도록 해주는 플러그인 인터페이스입니다.

 

 

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}

 

ProducerInterceptor의 동작

  1. onSend 메서드 호출
    • 메시지가 클러스터로 전송되기 직전에 onSend 메서드가 호출됩
    • 이 메서드는 메시지를 가로채거나 수정할 수 있는 기회를 제공
  2. onAcknowledgement 메서드 호출
    • Kafka 서버가 메시지 발행에 대해 확인 응답(Acknowledgement)을 보낸 후, onAcknowledgement 메서드가 호출.
    • 이 메서드는 프로듀서가 사용자 정의 콜백을 실행하기 바로 전에 호출

 

 

 

 

https://docs.confluent.io/platform/current/clients/javadocs/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html

 

ProducerInterceptor (clients 7.8.0-ce API)

This is called from KafkaProducer.send(ProducerRecord) and KafkaProducer.send(ProducerRecord, Callback) methods, before key and value get serialized and partition is assigned (if partition is not specified in ProducerRecord). This method is allowed to modi

docs.confluent.io

 

https://docs.spring.io/spring-kafka/reference/kafka/producer-interceptor-managed-in-spring.html

 

Producer Interceptor Managed in Spring :: Spring Kafka

Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration. If you go with this approach, then you

docs.spring.io

 

반응형