티스토리 뷰

반응형

 

 

아파치 카프카 애플리케이션 프로그래밍 with 자바 - YES24

아파치 카프카 애플리케이션 개발을 위한 「실전 가이드」아파치 카프카란 무엇일까? 카프카 애플리케이션은 어떻게 만들까? 데이터 파이프라인을 만들기 위해 어떤 카프카 라이브러리를 사용

www.yes24.com


 

어드민 API

AdminClient클래스로 내부 옵션을 설정하거나 조회할 수 있다.

Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
AdminClient admin = AdminClient.create(configs);

 

브로커 정보 조회

Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");

AdminClient admin = AdminClient.create(configs);

logger.info("== Get broker information");
for (Node node : admin.describeCluster().nodes().get()) {
	logger.info("node : {}", node);
	ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
	DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
	describeConfigs.all().get().forEach((broker, config) -> {
		config.entries().forEach(configEntry -> logger.info(configEntry.name() + "= " + configEntry.value()));
	});
}
	

 

토픽 정보 조회

Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
logger.info("{}", topicInformation);

 

어드민 API는 사용 후 명시적으로 종료 메소드를 호출해 리소스 낭비를 막아야 한다.

admin.close();

 

 

카프카 스트림즈

토픽에 적재된 데이터를 실시간으로 변환해 다른 토픽에 적재하는 라이브러리

 

토폴로지(topology): 2개 이상의 노드들과 선으로 이루어진 집합

  • 종류: 링형, 트리형, 성형 등

 

프로세서: 카프카 스트림즈에서 토폴로지를 이루는 노드

  • 소스 프로세서
    • 데이터를 처리하기 위해 최초로 선언하는 노드
    • 하나 이상의 토픽에서 데이터를 가져오는 역할
  • 스트림 프로세서
    • 다른 프로세서가 반환한 데이터를 처리하는 역할
  • 싱크 프로세서
    • 데이터를 특정 카프카 토픽으로 저장하는 역할

 

스트림: 노드와 노드를 이은 선, 토픽의 데이터

 

 

데이터 처리 구현 방법

  1. 스트림즈 DSL
  2. 프로세서 API

 

스트림즈DSL 기본 개념

KStream

  • 레코드의 흐름 → 데이터 조회시 토픽에 존재하는 모든 레코드를 출력한다.
  • (메시지 키, 메시지 값)

KTable

  • 메시지 키를 기준으로 묶어 가장 최신 레코드를 사용한다.

GlobalKTable

  • KTable과 동일하게 메시지 키를 기준으로 묶어서 사용한다.
  • KTable과 달리 모든 파티션 데이터가 각 태스크에 할당되어 사용된다.

 

코파티셔닝(co-partitioning)

조인하는 2개 데이터의 파티션 개수가 동일하도록 맞추는 작업

⇒ 코파티셔닝되지 않은 두 개의 토픽을 조인하는 경우 TopologyException이 발생

 

 

코파티셔닝되지 않은 KStream과 KTable을 조인하기 위한 두 가지 방법

  1. 리파티셔닝 과정을 거친다.
  2. KTable을 GlobalKTable로 선언해 사용한다. (작은 용량일 경우에만 추천)

 

 

스트림즈DSL 주요 옵션

필수 옵션

  • bootstrap.servers
  • application.id: 애플리케이션 고유 아이디

선택 옵션

  • default.key.serde : 레코드 메시지 키를 직렬화, 역직렬화하는 클래스 지정*(기본값: 바이트)*
  • default.value.serde: 레코드 메시지 값을 직렬화, 역직렬화하는 클래스 지정*(기본값: 바이트)*
  • num.stream.threads: 스트림 프로세싱 실행 시 실행될 스레드 개수*(기본값: 1)*
  • state.dir: 상태기반 데이터 처리 시 데이터를 저장할 디렉토리*(기본값: /tmp/kafka-streams)*

 

스트림즈DSL - stream() , to()

특정 토픽의 데이터를 다른 토픽으로 전달할 때 사용한다.

  • stream() : 특정 토픽을 KStream 형태로 가져온다.
  • to() : KStream의 데이터를 특정 토픽으로 저장한다.

 

 

스트림즈DSL - filter

토픽으로 들어온 문자열 데이터 중 길이가 5보다 큰 경우만 필터링하는 스트림즈 애플리케이션을 만들어보자.

 

 

스트림즈DSL - join()

KTable과 KStream을 함께 사용한다면 카프카에서는 실시간으로 들어오는 데이터들을 메시지 키를 기준으로 조인할 수 있다. 이를 통해 데이터를 DB에 저장하지 않고도 조인해 이벤트 기반 스트리밍 데이터 파이프라인을 구성할 수 있다. 조인할 때에는 코파티셔닝 여부를 꼭 확인해야 한다.

 

 

코파티셔닝이 되어있지 않은 토픽은 어떻게 조인해야 할까?

  1. 리파티셔닝을 수행하고 조인 처리를 한다.
  2. KTable 토픽을 GlobalKTable로 선언해 사용한다.

 

프로세서 API

스트림즈DSL보다 추가적인 상세 로직의 구현이 필요하면 프로세서 API를 활용한다.

토픽의 문자열 길이가 5 이상인 데이터를 필터링해 다른 토픽으로 저장해보자.

 

 

카프카 커넥트

  • 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션
  • 특정 작업 형태를 템플릿으로 만든 커넥터를 실행해 반복 작업을 줄인다.

 

커넥터의 종류

  • 소스 커넥터: 프로듀서 역할
  • 싱크 커넥터: 컨슈머 역할

ex) MySQL에서 데이터를 보낼 때, 저장할 때 JDBC 커넥터를 사용해 파이프라인 생성

 

 

실행 순서

  1. 사용자가 커넥트에 커넥터 생성을 명령
  2. 커넥트가 내부에 커넥터와 태스크 생성
  3. 커넥터는 태스크들을 관리
  4. 태스크는 실질적인 데이터 처리 수행

 

커넥트 실행 방법

  1. 단일 모드 커넥트: 1개의 프로세스만으로 실행 → 단일 장애점이 될 수도
  2. 분산 모드 커넥트: 2대 이상의 서버에서 클러스터 형태로 운영 → 안전

+ REST API(Port: 8083)를 사용해 현재 실행 중인 커넥트, 태스크 상태 등을 조회할 수 있다.

 

 

단일 모드 커넥트

  1. connect-standalone.properties와 connect-file-source.properties 설정 파일 수정
  2. 단일 모드 실행
  3. $ bin/connect-standalone.sh config/connect-standalone.properties \\ config/connect-file-source.properties

 

분산 모드 커넥트

  1. connect-distributed.properties 설정 파일 수정
  2. 분산 모드 실행
  3. $ bin/connect-distributed.sh config/connect-distributed.properties

 

 

소스 커넥터

  • 소스 애플리케이션, 파일로부터 데이터를 가져와 토픽으로 넣는 역할
  • 오픈소스 커넥터를 사용하거나 직접 개발해 사용한다.
  • 직접 개발하는 경우에는 SourceConnector 와 SourceTask 클래스를 사용한다.
    • SourceConnector : 태스크 실행 전 커넥터 설정파일을 초기화하고 어떤 태스크 클래스를 사용할 것인지 정의
    • SourceTask : 실제로 소스에서 데이터를 가져와 토픽으로 보내는 역할 수행

 

카프카 커넥터를 직접 개발할 때에는 사용자가 작성한 클래스뿐만 아니라 참조하는 라이브러리도 함께 빌드해 jar로 압축해야 한다!

 

 

싱크 커넥터

  • 토픽 데이터를 타깃에 저장하는 역할
  • SinkConnector와 SinkTask 클래스를 사용해 직접 구현할 수 있다.
  • SinkConnector : 태스크를 실행하기 전 설정값 초기화, 사용할 태스크 클래스 정의
  • SinkTask : 커넥트에서 컨슈머 역할을 하며 데이터를 저장 (데이터 처리 로직)

 

 

카프카 미러메이커2

  • 서로 다른 두 개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션
  • 토픽 데이터 뿐만 아니라 설정까지 복제한다.

 

 

단방향 토픽 복제

 

1. connect-mirror-maker.properties 파일 수정

카프카 클러스터 A와 B가 있을 경우를 가정해 수정해보자.

# 복제할 클러스터 닉네임
cluster = A, B

# 클러스터의 접속 정보
A.bootstrap.servers = a-kafka:9092
B.bootstrap.servers = b-kafka:9092

# A->B로 복제를 진행할 것인지, 어떤 토픽을 복제할 것인지 명시
A->B.enabled = true
A->B.topics = test

B->A.enabled = false
B->A.topics = .*

# 신규 생성된 토픽 복제 개수 설정
replication.factor=1

# 내부 토픽의 복제 개수 설정
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

 

2. 설정 파일과 함께 미러메이커2 실행

$ bin/connect-mirror-maker.sh config/connect-mirror-maker.properties

 

 

지리적 복제(Geo-Replication)

미리메이커2는 단방향, 양방향 복제 기능, ACL복제, 새 토픽 자동 감지 등의 기능을 제공한다.

 

 

Active-standby 클러스터 운영

  • 적용 상황: 서비스용 클러스터 외에 재해 복구를 위한 임시 클러스터를 하나 더 구성할 때
  • 액티브 클러스터: 서비스와 직접 통신하는 클러스터
  • 스탠바이 클러스터: 나머지 1개의 클러스터
  • 복제 랙이 있어 스탠바이 클러스터에 액티브의 모든 정보가 복제되지 않았을 수도 있다.

⇒ 이에 대한 대응 방안을 사전에 정해둘 필요가 있다.

⇒ 장애 복구 훈련을 계획하고 수행하는 것이 매우 중요!

 

 

Active-active 클러스터 운영

  • 적용 상황: 글로벌 서비스에서 통신 지연을 최소화하기 위해 2개 이상의 클러스터를 둘 때

 

 

Hub and spoke 클러스터 운영

  • 적용 상황: 각 팀의 소규모 클러스터 데이터를 한 클러스터에 모아 데이터 레이크로 사용하고 싶을 때
  • 허브: 중앙에 있는 한 개의 점
  • 스포크: 중앙의 점과 다른 점들을 연결한 선
반응형