스터디/[카프카 인 액션] (2024.12)

[카프카인액션] 3장. 카프카 커넥트 | 카프카 소스 커넥터 | 카프카 소스 + 싱크 커넥터 | 카프카 설계시 고려요소 | 카프카 Apache avro

ttoance 2025. 1. 4. 09:36
반응형

3.1 카프카 프로젝트 설계 

3.1.1 기존 데이터 아키텍쳐 인수

- 새로운 가상의 컨설팅 회사는 원격으로 전기 자전거를 관리하는 공장을 재설계하는 계약을 막 따냈다.

- 센서는 모니터링하는 내부 장비 상태와 이 상태 이벤트를 지속적으로 제공하는 자전거 전체에 설치된다.

- 하지만 현재 시스템은 대부분의 메시지를 무시해야 할 정도로 많은 이벤트가 생성되고 있다.

- 현재 데이터는 대용량 클러스터 형태로 구성된 전통저인 관계형 데이터베이스 시스템에 있다.  

 

3.1.2 첫 변경 

- 가장 쉬운 방법은 카프카 커넥트로 시작하는 것

 

3.1.3 내장 기능

- 카프카 커넥트의 용도는 자체 프로듀서와 컨슈머를 작성하지 않고 카프카 안팎으로의 데이터 이동을 돕는 것이다.

- 커넥트가 전형적인 애플리케이션 로그 파일을 가져와 카프카 토픽으로 옮기는 방식

- 로컬 머신에서 커넥트를 실행하고 테스트하기 가장 쉬운 방식은 독립 실행형 방식 standalone 모드 이다. 

 

 

[참고] standalone 모드 vs distributed 모드 

 

[kafka] Kafka Connect High-Level Overview + Cluster and Distributed Architecture

Kafka Connect High-Level Overview  1 Kafka Connector Source (클러스터에서 실행됨)는 외부 소스에서 데이터를 가져옵니다.2 Kafka Connector Source는 수집된 데이터를 Kafka 클러스터로 전송합니다 (이 단계에서

ddoance.tistory.com

 

 

케이스 1: 단일 소스 커넥터 실행

설명: alert.txt 파일의 데이터를 Kafka 토픽(kinaction_alert_connect)으로 전송하는 단일 소스 커넥터 실행 케이스입니다.

데이터를 Kafka로 전송하는 역할을 하는 소스 커넥터(Source Connector)

name=alert-source
connector.class=FileSreamSource # 소스 파일과 상호 작용하는 클래스 명시
tasks.max=1 # 독립 실행형 모드로 테스트하기 위한 셋업
file=alert.txt # 이 파일 변경을 모니터링
topic=kinaction_alert_connect # 데이터가 전송될 토픽 이름

 

소스 커넥터 실행

bin/connect-standalone.sh config/connect-standalone.properties \alert-source.properties

 

카프카 파일 메시지 입력 확인 

bin/kafka-console-consumer.sh 
-- bootstrap-server localhost:9094
topic kinaction_alert_connect --from-beginning

 

 

케이스 2: 소스 + 싱크 커넥터 동시 실행

설명: alert.txt 파일 데이터를 Kafka 토픽(kinaction_alert_connect)으로 전송하고, 이 데이터를 다시 읽어와 sink.txt 파일로 저장하는 파이프라인 구성입니다.

Kafka 토픽에서 데이터를 읽어 지정된 대상에 저장하는 싱크 커넥터(Sink Connector)

name=alert-sink
connector.class=FileSreamSource # 소스 파일과 상호 작용하는 클래스 명시
tasks.max=1 # 독립 실행형 모드로 테스트하기 위한 셋업
file=sink.txt # 카프카 파토픽에 있는 모든 메시지의 대상 페일 
topic=kinaction_alert_connect # 데이터 가져올 토픽

 

소스와 싱크 커넥터 동시 실행

bin/connect-standalone.sh config/connect-standalone.properties \alert-source.properties alert-sink.properties

 

 

3.1.4 주문 송장을 위한 데이터 

- 로컬 데이터베이스로부터 테이블 업데이트를 카프카 토픽에 스트리밍하는 이미 개발되어 있는 소스 커넥터 적용 

conflunet-hub install confluentinc/kafka-connect-jdbc:10.2.0
confluent local services connect start 

##
confluent local services connect connector config jdbc-source
--config etc/kafka-connect-jdbc/kafkatest-sqlite.properties

 

kafkatest-sqlite.properties

name=jdbc-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:/path/to/sqlite.db
table.whitelist=#Kafka로 전송할 데이터베이스 테이블 목록.
mode=incrementing #데이터 가져오는 방식 : incrementing: ID 컬럼을 기준으로 데이터베이스의 변경 사항만 읽음.
incrementing.column.name=id
topic.prefix=#Kafka 토픽 이름의 접두사.

 


3.2 센서 이벤트 설계 

3.2.1 기존 문제 : 데이터 사일로 data silo 와 복구 가능성 recoverability 과제 처리 

데이터 사일로 다루기 

- 다른 사람이 그 데이터를 사용하기 위해 원시 소스에 있는 데이터를 누구나 사용할 수 있게 만든다.

- 들어오는 데이터에 접근 가능한 경우, 애플리케이션 API가 데이터를 특정 형식으로 노출하거나 커스텀 변환 후에 노출하는 것에 대해 걱정할 필요 없음 

 

복구 가능성

- 카프카는 분산 시스템으로써 실패를 고려하고 설계되었기 때문에 예상되는 실패는 통제가 가능하다. 

- 인적 요소로 실패했을 경우, --from-beginning 플래그로 간단하게 토픽 처음부터 소비할 수 있다. 

- 큐잉 시스템에의 메시지 구독자가 메시지를 읽은 후 브로커에서 제거되면, 다시 생산 못하는데, 

  - 버전 1.1부터 데이터 리플레이를 할 수 있음 

 

 

3.2.2 카프카가 적합한 이유 

- 데이터베이스가 수직으로 확장하는 데 비용이 많이 든다. 

  - 클러스터를 수평으로 확장할 수 있는 기능을 통해 비용에 대해 이점을 얻을 수 있다. 

- 지속적으로 생상되는 이벤트가 있기 때문에 종료 시간이나 중단 지점이 정의되어 있지 않고, 메시지가 일반적으로 10KB 미만이다. 

- 다만, 데이터 보안에 대한 암호화는 없다. 외부에서 보안이 필요하다. 

 

 

3.2.3 우리 설계에 대한 시작점 (p.98)

- 주의할 점은 카프카 버전에서 사용할 수 있는 기능 

카프카 버전 기능
2.0.0 접두사 지원 및 호스트 이름 확인이 포함된 ACL
1.0.0 자바 9 지원 및 JBOD 디스크 장애 개선
0.11.0.0 관리 API
0.10.2.0 클라이언트 호환성 향상
0.10.1.0 시간 기반 검색
0.10.0.0 카프카 스트림즈, 타임스탬프, 랙 어웨이니스(rack awareness)
0.9.0.0 다양한 보안 기능(ACL, SSL), 카프카 커넥트, 새 컨슈머 클라이언트

- 기능이 개선된 클라이언트의 호환성 

  - 0.10.0 버전 이후의 브로커는 최신 클라이언트 버전에서 작동 

- 아래 질문 목록 리스트도 함께 확인하면 좋다 (p.99)

1) 시스템에서 메시지를 잃어도 괜찮은가 ? 대출금 지급 이벤트 vs RSS피드 게시물 누락

2) 어떤 방식으로든 데이터를 그룹화해야하는가 ? 미리 그룹화하면 애플리케이션이 토픽 읽는 동안 컨슈머 메시지 코디네이트 할 필요 없음  

 

[참고] 메시지 코디네이트

 

[kafka] producer message key = 메시지 코디네이트(Message Coordinate)

메시지 코디네이트(Message Coordinate)란?메시지 코디네이트는 Kafka와 같은 분산 메시징 시스템에서 메시지의 위치와 흐름을 조정 및 관리하는 작업을 의미합니다. 이를 통해 메시지가 올바른 파티

ddoance.tistory.com

 

 

3) 특정 순서로 데이터를 전달해야 하는가 ? 주문 전에 주문 취소 알림을 받는 케이스 vs 비즈니스에 대한 SEO데이터

4) 특정 항목의 마지막 값만 필요한가 ? 아니면 해당 항목의 이력이 중요한가 ?

5) 얼마나 많은 컨슈머를 가질 것인가 ? 

 

 

3.2.4 사용자 데이터 요구사항 

감사 데이터 

- 규정 준수를 위해 센서 자체에 대해 누가 어떤 관리 작업을 수행하는지 알고 싶다.

- 우리의 주요 관심사는 처리할 모든 데이터가 있는지다. 순서는 중요하지 않다. 

얼럿 추세 데이터

- 키를 사용해 이 데이터를 그룹화하는 것이 도움이 된다.

- 센서가 설치된 내부 시스템의 각 스테이지에서 자전거의 부품 ID를 키로 사용할 확률이 높다.  

- 센서가 이틀에 한 번씩 '유지 관리 필요' 메시지를 보낸다면, 이는 장비 오류의 추세를 파악하는데 필요한 정보 유형이다. 

얼럿 데이터 

- 스테이지인 키로 그룹화하려고 한다. 

- 현재 상태인 요구사항에 대해 우리가 관심을 갖고 필요로 한다. 

  - 이때 사용하는 기술이 로그 컴팩션 log compaction 이라는 프로세스를 사용한다.

[참고] log compaction

 

[kafka] log compaction | log tail | log head

log는 head와 tail을 가지고 있다. compacted log의 head는 전통적인 카프카 로그와 같다. 새로운 레코드들이 head의 끝에 append 된다. 모든 log compaction은 log의 tail에서 작동한다. 오직 tail

ddoance.tistory.com

 

- 특정 얼럿 파티션에 할당된 컨슈머의 사용량이 중요하다.

  - 크리티컬 얼럿은 해당 이벤트를 신속하게 처리해야 하는 가동 시간 요구사항이다. 

카프카 기능 감사 데이터 얼럿 추세 데이터 얼럿 데이터
메시지 유실 가능성 아니요 아니요
그룹화 기능 (grouping) 아니요
순서 보장 (ordering) 아니요 아니요 아니요
최종값만 필요 아니요 아니요
독립된 컨슈머 지원 아니요

 

 


3.3 데이터 형식 

3.3.1 데이터를 위한 계획 Apache Avro 

- 카프카는 기본적으로 데이터 유효성 검사를 수행하지 않는다. 

- 그러나 각 애플리케이션은 해당 데이터의 의미와 사용중인 형식을 이해할 필요 있다. 

 

Specification

Introduction This document defines Apache Avro. It is intended to be the authoritative specification. Implementations of Avro must adhere to this document. Schema Declaration A Schema is represented in JSON by one of: A JSON string, naming a defined type.

avro.apache.org

{
  "type": "record",
  "name": "LongList",
  "aliases": ["LinkedLongs"],                      // old name for this
  "fields" : [
    {"name": "value", "type": "long"},             // each element has a long
    {"name": "next", "type": ["null", "LongList"]} // optional next element
  ]
}

 

3.3.2 의존성 설정

https://avro.apache.org/docs/1.12.0/getting-started-java/

 

Getting Started (Java)

This is a short guide for getting started with Apache Avro™ using Java. This guide only covers using Avro for data serialization; see Patrick Hunt’s Avro RPC Quick Start for a good introduction to using Avro for RPC. Download Avro implementations for C

avro.apache.org

 

반응형