반응형
4.1 예제
- 웹사이트가 고객을 위해 어떻게 작동하는지에 대한 사용자 피드백을 받는 애플리케이션을 가정한다.
- 사용자는 지원 계정이나 챗봇에 이메일 생성하는 양식을 웹사이트에 제출
- 지원 담당자가 받은 편지함을 열어 확인한다.
- 카프카 프로듀서를 도입하고 나면
- 이메일 대신 메시지를 카프카 토픽에 보내서 데이터에 더 쉽게 접근하게 하려고 한다.
- 데이터를 필요한 형식으로 추출할 수 있다.
4.1.1 프로듀서 설명
📌 프로듀서 작업과 메타데이터
- 프로듀서의 역할: 메시지를 전송하는 역할을 담당.
- 메타데이터: 프로듀서가 클러스터에 접근하려면 토픽 이름 외에도 각 파티션의 리더 레플리카 정보가 필요한데, 프로듀서에서 메타데이터에 대한 정보 가져오기가 포함됨
📌 최종 사용자의 브로커 연결
- 사용자(또는 애플리케이션)는 적어도 하나 이상의 브로커에 연결해야 함
- 나머지 연결 관리(리더 레플리카 파악 등)는 카프카 클라이언트 라이브러리에서 처리
📌 분산 시스템의 오류 처리
- 카프카는 순간적인 네트워크 오류(네트워크 블립 network blip)를 감안해 설계.
- 자동으로 재시도(retry) 로직이 적용.
🛠️ 네트워크 블립 ▶ https://ddoance.tistory.com/306
- 만약 메시지 순서 보장하려면 아래 옵션을 설정하면 가능하다.
- retries=3 (재시도 3번)
- 네트워크 오류나 임시 장애 발생 시, 프로듀서는 메시지를 최대 3번까지 재시도
- 이를 통해 전송 실패 시에도 데이터 손실을 방지
- max.in.flight.requests.per.connection=1 (한 번에 하나의 요청만 전송)
- 한 번에 한 요청만 전송하도록 제한
- 동시에 여러 요청이 처리될 경우, 재시도 시 요청 순서가 뒤바뀔 위험이 있음
- 이 설정은 메시지 순서를 엄격히 보장하는 데 필수적
- acks=all (모든 리플리카에서 데이터 저장 확인)
- 모든 리플리카(파티션 리더 및 팔로워)에 데이터가 저장되었음을 확인한 후에만 성공 응답을 반환
- 리더 장애 시 데이터 손실을 방지
- retries=3 (재시도 3번)
📌 스레드 안전성 (내장된 특성)
- 브로커는 메시지를 덮어쓰지 않고 추가만 하기 때문에 프로듀서 간 충돌 걱정이 없음
📌 멱등성 프로듀서
활성화 (enable.idempotence=true)
- 동일한 메시지를 여러 번 전송해도 브로커에 한 번만 저장
- 프로듀서가 실패하거나 재시도하는 상황에서도 데이터 중복이 발생하지 않도록 보장
- 자동으로 다음 설정이 활성화:
- acks=all (모든 리플리카에 저장되었는지 확인)
- retries=MAX_INT (재시도를 무제한으로 허용)
- max.in.flight.requests.per.connection=5 (동시에 처리 가능한 요청 개수는 기본 5로 설정)
비활성화 (enable.idempotence=false)
- 동일한 메시지가 여러 번 전송될 경우 중복 저장이 발생할 수 있음
- 네트워크 오류나 재시도 로직이 동작할 때 데이터 불일치 위험 존재.
- 설정이 유연하지만, 안정성이 떨어짐
- acks 설정에 따라 데이터 유실 가능 (acks=1이나 acks=0 사용 시 특히 위험)
- retries가 제한적이므로 전송 실패 가능성 증가.
4.2 프로듀서 옵션
키 | 용도 |
acks | 메시지 전달이 성공하기 위한 프로듀서가 요구하는 복제 확인 (acknowledgement)의 수 |
bootstrap.servers | 시작 시 연결할 하나 이상의 카프카 브로커 |
value.serializer | 값의 직렬화에 사용되는 클래스 |
key.serializer | 키의 직렬화에 사용되는 클래스 |
모든 프로듀서 구성 키 이름 확인하는 방법 ▶
https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html
4.2.1 브로커 목록 구성
- 프로듀서는 부트스트랩 서버와 연결
- 부트스트랩 서버를 통해 클러스터의 전체 브로커 정보를 받아옴
- 이후 프로듀서는 필요한 브로커(리더 브로커)에 직접 연결하여 작업을 수행
4.2.2 더 빨리(또는 안전하게) 처리하기
- 카프카의 비동기 메시지 패턴
- 프로듀서 전송 요청의 결과를 코드에서 기다리지 않고
- 콜백 또는 Future 객체를 사용해 비동기적으로 성공 또는 실패 처리
- acknowledgement = ack
- 프로듀서가 완료된 요청을 반환하기 전에 파티션 리더의 팔로워로부터 얼마나 많은 수신확인을 받아야 하는지를 제어
- 유효한 값으로는 all, -1, 1, 0 이 있음
📌 acks = 0
- 동작방식
- 프로듀서는 메시지를 브로커에 전송
- 브로커는 메시지를 수신하지만, 확인 응답(ACK)을 프로듀서에게 보내지 않음
- 프로듀서는 브로커로부터 응답을 기다리지 않으므로, 메시지가 브로커에 제대로 기록되었는지 알 수 없음
- 가장 낮은 대기 시간을 얻을 수 있지만 메시지의 안전한 배달은 희생해야 함.
- 실패 감지 불가
- 프로듀서는 브로커로부터 응답을 기다리지 않으므로 전송 실패(예: 네트워크 장애, 브로커 장애)를 감지할 방법이 없음.
- 실패를 감지하지 못하면 재시도 로직을 실행할 수 없음
- ex, 페이지 클릭 수집하고 이벤트를 카프카로 보내는 웹 추적 플랫폼이 있다고 가정했을 시, 단일 링크 누르기 또는 hover 이벤트를 잃어도 비즈니스 영향은 없음
- 프로듀서로부터 전송되면 잊혀진다 ▶ 메시지가 파티션에 도달하지 않았을 수 있음
- 리더 레플리카에 도달한 경우에도 프로듀서는 팔로워 레플리카가에 복사 성공했는지 여부를 알 수 없음.
📌 acks = all 또는 -1
- 배달 보장에 있어서 가장 강력한 옵션
- all값은 파티션 리더의 레플리카가 ISR in-sync-replica (동기화 상태의 레플리카)의 전체 목록에 대한 복제 완료 확인하기를 기다림
- 프로듀서는 파티션에 대한 모든 레플리카가 성공하기 전까지는 성공 확인을 받지 못함.
📌 acks = 1
- 수신 확인에는 메시지 수신 (특정 파티션의 리더 레플리카)가 확인을 프로듀서에게 다시 보내는 과정이 포함
- 프로듀서 클라이언트는 이 수신확인을 기다림
- 그러나 실패로 인해 리더가 다운되기 전에 팔로워가 메시지를 복사하지 않았을 경우, 해당 파티션의 팔로워 레플리카에는 메시지가 나타내지 않음
4.2.3 타임스탬프
- 최신 버전의 프로듀서 레코드에는 타임스탬프가 포함되어 있다.
- message.timestamp.type 토픽 구성을 CreateTime으로 설정하면 클라이언트에서 설정한 시간이 사용되는 반면, LogAppendTime으로 설정하면 브로커 시간이 사용된다.
- ex, 결제 시간이 브로커에게 전달되는 시간이 아니라 발생하는 시간을 얻기 위해 사용
- 프로듀서를 사용할 때의 또 다른 옵션은 프로듀서 인터셉터를 만들어서 측정과 모니터링을 지원함
- 필수적인 것이 아니다
4.3 코드
public class AlertProducer {
public static void main(String[] args) {
Properties kaProperties = new Properties();
kaProperties.put("bootstrap.servers", "localhost:9092,localhost:9093");
kaProperties.put("key.serializer", AlertKeySerde.class.getName()); //<1>
kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/** Use {@link org.kafkainaction.partitioner.AlertLevelPartitioner} to determine partition */
kaProperties.put("partitioner.class", AlertLevelPartitioner.class.getName()); //<2>
try (Producer<Alert, String> producer = new KafkaProducer<>(kaProperties)) {
Alert alert = new Alert(1, "Stage 1", "CRITICAL", "Stage 1 stopped");
ProducerRecord<Alert, String>
producerRecord = new ProducerRecord<>("kinaction_alert", alert, alert.getAlertMessage()); //<3>
producer.send(producerRecord, new AlertCallback());
}
}
}
- 직렬화: 사용자 정의 키 직렬화(AlertKeySerde)와 기본 값 직렬화(StringSerializer)를 활용
- 파티셔닝: AlertLevelPartitioner를 통해 특정 조건(예: 심각도)에 따라 메시지를 적절한 파티션에 저장
- 비동기 전송: send 메서드와 콜백(AlertCallback)을 사용해 비동기 메시지 전송과 에러 처리의 유연성을 제공
public class AlertLevelPartitioner implements Partitioner { //<1>
public int partition(final String topic,
final Object objectKey,
final byte[] keyBytes,
final Object value,
final byte[] valueBytes,
final Cluster cluster) {
int criticalLevelPartition = findCriticalPartitionNumber(cluster, topic);
return isCriticalLevel(((Alert) objectKey).getAlertLevel()) ?
criticalLevelPartition :
findRandomPartition(cluster, topic, objectKey);
}
public int findCriticalPartitionNumber(Cluster cluster, String topic) {
//not using parameters but could if needed for your logic
return 0;
}
public int findRandomPartition(Cluster cluster, String topic, Object objectKey) {
//not using parameter objectKey but could if needed for your logic
List<PartitionInfo> partitionMetaList =
cluster.availablePartitionsForTopic(topic);
Random randomPart = new Random();
return randomPart.nextInt(partitionMetaList.size());
}
public boolean isCriticalLevel(String level) {
if (level.toUpperCase().contains("CRITICAL")) {
return true;
} else {
return false;
}
}
@Override
public void close() {
}
@Override
public void configure(final Map<String, ?> map) {
}
}
- "CRITICAL" 레벨 메시지: 항상 파티션 0에 저장.
- 그 외 메시지: 랜덤 파티션에 저장
- 이 로직은 특정 중요도가 높은 메시지를 한 곳에서 처리하게함(예: 시스템 경고, 긴급 알림).
public class AlertTrendingProducer {
private static final Logger log =
LoggerFactory.getLogger(AlertTrendingProducer.class);
public static void main(String[] args)
throws InterruptedException, ExecutionException {
Properties kaProperties = new Properties();
kaProperties.put("bootstrap.servers",
"localhost:9092,localhost:9093,localhost:9094");
kaProperties.put("key.serializer",
AlertKeySerde.class.getName()); //<1>
kaProperties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
try (Producer<Alert, String> producer = new KafkaProducer<>(kaProperties)) {
Alert alert = new Alert(0, "Stage 0", "CRITICAL", "Stage 0 stopped");
ProducerRecord<Alert, String> producerRecord =
new ProducerRecord<>("kinaction_alerttrend", alert, alert.getAlertMessage()); //<2>
RecordMetadata result = producer.send(producerRecord).get();
log.info("kinaction_info offset = {}, topic = {}, timestamp = {}",
result.offset(), result.topic(), result.timestamp());
}
}
}
- Kafka 프로듀서를 생성
- Alert 객체를 생성하고, 메시지 전송을 위해 ProducerRecord를 구성
- 메시지를 전송하고, RecordMetadata를 통해 전송 결과를 확인
- 로그에 전송 결과(오프셋, 토픽 이름, 타임스탬프)를 출력
public class AuditProducer {
private static final Logger log = LoggerFactory.getLogger(AuditProducer.class);
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties kaProperties = new Properties(); //<1>
kaProperties.put("bootstrap.servers", "localhost:9092,localhost:9093");
kaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kaProperties.put("acks", "all"); //<2>
kaProperties.put("retries", "3"); //<3>
kaProperties.put("max.in.flight.requests.per.connection", "1");
try (Producer<String, String> producer = new KafkaProducer<>(kaProperties)) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kinaction_audit", null,
"audit event");
RecordMetadata result = producer.send(producerRecord).get();
log.info("kinaction_info offset = {}, topic = {}, timestamp = {}", result.offset(), result.topic(), result.timestamp());
}
}
}
신뢰성 보장을 위한 주요 설정 요약
- acks=all
- 모든 리플리카가 메시지를 저장했음을 보장하므로, 데이터 유실 가능성을 최소화
- retries=3
- 전송 실패 시 최대 3번까지 재시도해 네트워크 오류나 브로커 문제를 해결
- max.in.flight.requests.per.connection=1
- 메시지가 순서대로 전송되도록 보장
4.3.1 클라이언트와 브로커 버전
- 카프카 브로커와 클라이언트 버전이 일치할 필요는 없음
https://www.youtube.com/watch?v=geMtm17ofPY
반응형