ProducerInterceptor는 Kafka 클러스터에 메시지가 발행되기 전 프로듀서에서 수신한 레코드를 가로채거나 변경(mutate)할 수 있도록 해주는 플러그인 인터페이스입니다.
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
ProducerInterceptor의 동작
- onSend 메서드 호출
- 메시지가 클러스터로 전송되기 직전에 onSend 메서드가 호출됩
- 이 메서드는 메시지를 가로채거나 수정할 수 있는 기회를 제공
- onAcknowledgement 메서드 호출
- Kafka 서버가 메시지 발행에 대해 확인 응답(Acknowledgement)을 보낸 후, onAcknowledgement 메서드가 호출.
- 이 메서드는 프로듀서가 사용자 정의 콜백을 실행하기 바로 전에 호출
ProducerInterceptor (clients 7.8.0-ce API)
This is called from KafkaProducer.send(ProducerRecord) and KafkaProducer.send(ProducerRecord, Callback) methods, before key and value get serialized and partition is assigned (if partition is not specified in ProducerRecord). This method is allowed to modi
docs.confluent.io
https://docs.spring.io/spring-kafka/reference/kafka/producer-interceptor-managed-in-spring.html
Producer Interceptor Managed in Spring :: Spring Kafka
Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration. If you go with this approach, then you
docs.spring.io
'개발 > kafka' 카테고리의 다른 글
[kafka] rack awareness | rackAssignment 옵션 (+네이버의 파티션 할당 전략: RackAwareRangeAssignor) (0) | 2025.02.01 |
---|---|
역압(back pressure)을 처리하는 애플리케이션 (kafka/spark) (0) | 2025.01.18 |
[kafka] 소비자 그룹 (Consumer Group) (0) | 2025.01.05 |
[kafka] log compaction | log tail | log head (1) | 2025.01.04 |
[kafka] producer message key = 메시지 코디네이트(Message Coordinate) (1) | 2025.01.04 |