스터디/[카프카 인 액션] (2024.12)

[카프카인액션] 7장. 토픽과 파티션 | 카프카 로그 | 토픽 컴패션

ttoance 2025. 2. 8. 11:29
반응형

7.1 토픽

  • 사용자 작업 이벤트를 카프카 클러스터로 보내는 웹 기반 애플리케이션을 사용해 교육 수업을 위한 자리를 판매한다고 가정해보자.
    • 위치에 대한 초기 검색 이벤트, 고객이 선택한 특정 교육에 대한 이벤트, 확인된 클래스에 대한 세 번째 이벤트가 있을 수 있다.
    • 이벤트를 생성하는 애플리케이션이 모든 데이터를 단일 토픽으로 보내야 할까? 아니면 여러 토픽으로 나눠 보내야 할까?
    • 각 메시지는 특정 유형의 이벤트인가? 각각 다른 토픽으로 분리되어 있어야 하는가 ?

 

  • 토픽 설계는 2단계 프로세스로 본다.
    • 첫번재는 우리가 가진 이벤트를 본다. 하나의 토픽에 속해 있는가 ? 둘 이상에 속해 있는가?
    • 두번째는 각 토픽을 고려한다. 사용해야 하는 파티션의 수는 얼마인가? 
      • 가장 중요한 점은 파티션이 토픽별 설계에 대한 질문이지 클러스터 전체의 제한이나 요구가 아니라는 것이다.
      • 토픽 생성을 위한 기본 파티션 수를 설정할 수 있지만, 대부분의 경우 토픽이 사용되는 방식과 보유할 데이터를 고려해야 한다.
  • 일반적으로 고려해야 할 항목은 다음 3가지이다.
    • 데이터 정확성
      • 순서를 지정해야 하는 이벤트가 동일한 토픽에 있어야 한다
      • 타임스탬프를 기반으로 이벤트를 배치할 수 있다 ▶ 교차 토픽 이벤트를 조정할 수 있다.
      • keyed message를 사용해야 한다 ▶ 파티션에 대한 향후 변경사항을 주의해야 한다.
      • 이론상의 시스템에서 이벤트를 실제 예약 및 확인/청구서 이벤트에 대한 해결책은 메시지 키(학생 ID 포함)가 있는 이벤트를 실제 예약 및 확인/청구서 이벤트에 대한 2개의 토픽에 배치하는것 
    • 컨슈머당 메시지의 양
      • 이론상의 교육 시스템에서 토픽 배치를 고려할 때 이벤트 수를 살펴봐야 한다. 
        • 검색 이벤트 자체는 다른 이벤트보다 많다. 
        • 50,000개의 검색 이벤트와 100개 미만의 예약된 교육 이벤트를 생산한다 했을때, 전체 메시지의 1%미만을 갖는 토픽을 구독하는 것에 대한 고려가 필요하다. 
    • 가지고 있거나 처리해야 할 데이터의 양 
      • 애플리케이션에서 요구하는 제한된 시간 내에 처리하기 위해 메시지 수에 따라 여러 컨슈머에서 실행되고 있을 수 있다. 
      • 복수 개의 컨슈머가 설정되어 있다면 토픽의 파티션에 의해 어떻게 제한되는지 알고 있어야 한다. 

∨ 토픽에 대해 파티션 결정할 때 고려해야 할 사항은 파티션 수를 줄이는 기능이 현재 지원되지 않는다는 점이다.

  • 컨슈머가 재할당된 파티션에서 읽기 시작하는 경우 파티션을 제거하면 현재 위치를 잃을 수 있다.
  • 여기에서 keyed 메시지와 컨슈머 클라이언트가 브로커 수준에서 변경한 사항을 따를 수 있는지 확인해야 한다. 

https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/

 

How to Choose the Number of Topics/Partitions in a Kafka Cluster? | Confluent

Confluent is building the foundational platform for data in motion so any organization can innovate and win in a digital-first world.

www.confluent.io

 

7.1.1 토픽 생성 옵션 

1️⃣ 옵션 (Option)

  • delete.topic.enable
    • 토픽을 삭제해야 하는 지 여부를 결정한다.
    • 기본값: true (Kafka 2.0 이후)
    • false로 설정하면 kafka-topics.sh --delete 명령어로 토픽을 삭제할 수 없음
  • auto.create.topics.enable
    • 기본적으로 auto.create.topics.enable=true로 설정되어 있으며,
      이 경우 생성되지 않은 토픽에 데이터를 전송하거나 소비하려 하면 자동으로 해당 토픽이 생성
    • auto.create.topics.enable=false로 설정하면 자동으로 토픽이 생성되지 않는다.

 

2️⃣ 명령어 (Command)

  • --create 매개변수로 토픽 옵션을 조회할 수 있다.
bin/kafka-topics.sh -create

 

  • --describe 명령으로 토픽을 생성한 후에는 설정이 올바른지 확인할 수 있다.
kafka-topics.sh --bootstrap-server localhost:9092 --topic my_topic --describe

 

7.1.2 복제 팩터

  • 실질적으로 운영되기 위해서는 총 레플리카 수가 브로커 수보다 적거나 같도록 계획해야 한다.
    • 총 브로커 수보다 많은 레플리카 수로 토픽 생성하려고 하면 InvalidReplicationFactorException 오류가 발생한다. 
    • 2개의 브로커만 있고 3개의 파티션 레플리카가 있는 경우
      • 2개의 레플리카를 호스팅하는 브로커를 잃어버리면 데이터 사본 하나만 남게 된다.
      • 여러 레플리카를 잃어버리는 것은 장애 발생 시 복구를 제공하는 이상적인 방법이 아니다.

 

7.2 파티션

  • 컨슈머 관점에서 각 파티션은 변경할 수 없는 메시지 로그다.
    • 데이터를 수정하는 것이 아니라 추가하는 것으로 유지 관리한다.
    • 또한 컨슈머 클라이언트는 메시지를 직접 삭제할 수 없다. 

 

7.2.1 파티션 위치 

  • 로그 파일 디렉터리 조회 시 
    • .log인 파일은 데이터 페이로드가 저장되는 위치이다.
    • .index와 .tieindex 파일을 사용해 논리적 메시지 오프셋과 인덱스 파일 내의 물리적 위치 간의 매핑을 저장한다.
ls -lh /var/lib/kafka-logs/my_topic-0
total 100M
-rw-r--r-- 1 kafka kafka  50M Feb 5 12:34 00000000000000000000.log
-rw-r--r-- 1 kafka kafka 1.5M Feb 5 12:34 00000000000000000000.index
-rw-r--r-- 1 kafka kafka  500K Feb 5 12:34 00000000000000000000.timeindex

 

  • 파티션은 많은 파일(여러 세그먼트)로 구성된다.
    • 활성 세그먼트는 현재 새 메시지가 기록되고 있는 파일이다.

 

7.2.2 로그 보기 

kinaction_topicandpart 토픽의 파티션 1에 대한 세그먼트 로그 파일 보기 위한 명령어

bin/kafka-dump-log.sh --print-data-log \
--files /tmp/kafkalinaction/kafka-logs-0/kinaction_topicandpart-1/*.log \
| awk -F: '{print $NF}' | grep kinaction

 

  • kafka-dump-log.sh --print-data-log
    • Kafka의 .log 파일을 분석하여 메시지와 메타데이터를 출력
  • --files /tmp/kafkalinaction/kafka-logs-0/kinaction_topicandpart-1/*.log
    • /tmp/kafkalinaction/kafka-logs-0/kinaction_topicandpart-1/ 경로의 모든 .log 파일을 대상으로 분석
  • awk -F: '{print $NF}'
    • 콜론(:)을 구분자로 사용하여 마지막 필드($NF, 즉 payload 부분)만 출력
  • grep kinaction
    • 출력된 내용 중에서 "kinaction"이라는 단어가 포함된 메시지만 필터링
offset: 0  keySize: -1 valueSize: 10 timestamp: 1712256000000 crc: 123456789 magic: 2
  payload: "test_message"
offset: 1  keySize: -1 valueSize: 12 timestamp: 1712256001000 crc: 123456790 magic: 2
  payload: "kinaction_test_1"
offset: 2  keySize: -1 valueSize: 15 timestamp: 1712256002000 crc: 123456791 magic: 2
  payload: "kinaction_important_message"

 

  "kinaction_test_1"
  "kinaction_important_message"

 

 

 

7.3 EmbeddedKafkaCluster를 사용한 테스트 

  • Kafka Streams는 모의 객체와 본격적인 클러스터 사이의 중간 지점 역할을 하는 EmbeddedKafkaCluster라는 통합 유틸리티 클래스를 제공 
package org.kafkainaction.producer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.kafkainaction.consumer.AlertConsumer;
import org.kafkainaction.model.Alert;
import org.kafkainaction.serde.AlertKeySerde;


public class EmbeddedKafkaClusterTest {

	private static final String TOPIC = "kinaction_alert";
	private static final int PARTITION_NUMER = 3;
	private static final int REPLICATION_NUMBER = 3;
	private static final int BROKER_NUMBER = 3;
	
	@ClassRule
	public static final EmbeddedKafkaCluster embeddedKafkaCluster 
		= new EmbeddedKafkaCluster(BROKER_NUMBER);
	
    private Properties kaProducerProperties;
    private Properties kaConsumerProperties;

	@Before
	public void setUpBeforeClass() throws Exception {
		embeddedKafkaCluster.createTopic(TOPIC, PARTITION_NUMER, REPLICATION_NUMBER);
        kaProducerProperties = TestUtils.producerConfig(embeddedKafkaCluster.bootstrapServers(),
        		AlertKeySerde.class,
                StringSerializer.class);

        kaConsumerProperties = TestUtils.consumerConfig(embeddedKafkaCluster.bootstrapServers(),
                AlertKeySerde.class,
                StringDeserializer.class);
	}

	@Test
	public void testAlertPartitioner() throws InterruptedException {
		AlertProducer alertProducer =  new AlertProducer();
		try {
			alertProducer.sendMessage(kaProducerProperties);
		} catch (Exception ex) {
			fail("kinaction_error EmbeddedKafkaCluster exception" + ex.getMessage());
		}
		
		AlertConsumer alertConsumer = new AlertConsumer();
		ConsumerRecords<Alert, String> records = alertConsumer.getAlertMessages(kaConsumerProperties);
		TopicPartition partition = new TopicPartition(TOPIC, 0);
		List<ConsumerRecord<Alert, String>> results = records.records(partition);
		assertEquals(0, results.get(0).partition());
	}

}

 

7.4 토픽 컴패션 

  • 컴팩션의 목표는 메시지를 만료시키는 것이 아니라 키의 최신 값이 존재하는지 확인하고 이전 상태를 새 상태로 교체하는 것이다.
    • 메시지의 일부인 키와 해당 키가 null이 아닌 경우에 따라 달라진다.
  • 컴팩션된 토픽을 만드는데 사용한 구성 옵션은 cleanup.policy = compact 이다. 
  • 가장 최근의 멤버십 수준만 필요한 케이스를 살펴보자. => 컴팩션된 토픽 

 

 

 

  • 이전 세그먼트의 경우 컴팩션이 완료되면 각 키에 대한 중복값을 하나의 값으로 줄여야 한다.
  • 활성 세그먼트는 아직 컴팩션을 거치지 않았다. 
    • 모든 메시지가 정리될 때까지 특정 키에 대한 메시지에 대해 여러 값이 존재할 수 있다.

 

마무리

  • 토픽은 물리적인 구조라기보다는 추상적 개념이다. 토픽의 동작을 이해하려면 해당 토픽의 컨슈머는 파티션 수와 동작 중인 복제 팩터에 대해 알아야 한다.
  • 파티션은 토픽을 구성하며 토픽 내 데이터의 병렬 처리를 위한 기본 단위다.
  • 로그 파일 세그먼트는 파티션 디렉터리에 기록되며 브로커에서 관리된다.
  • 테스트는 파티션 로직의 유효성을 검사하는 데 사용할 수 있으며 메모리 내 클러스터를 사용할 수 있다.
  • 토픽 컴팩션은 특정 레코드의 최신 값 보기를 제공하는 방법이다.

 

 

 

반응형