티스토리 뷰
어드민 API
AdminClient클래스로 내부 옵션을 설정하거나 조회할 수 있다.
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
AdminClient admin = AdminClient.create(configs);
브로커 정보 조회
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
AdminClient admin = AdminClient.create(configs);
logger.info("== Get broker information");
for (Node node : admin.describeCluster().nodes().get()) {
logger.info("node : {}", node);
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
describeConfigs.all().get().forEach((broker, config) -> {
config.entries().forEach(configEntry -> logger.info(configEntry.name() + "= " + configEntry.value()));
});
}
토픽 정보 조회
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
logger.info("{}", topicInformation);
어드민 API는 사용 후 명시적으로 종료 메소드를 호출해 리소스 낭비를 막아야 한다.
admin.close();
카프카 스트림즈
토픽에 적재된 데이터를 실시간으로 변환해 다른 토픽에 적재하는 라이브러리
토폴로지(topology): 2개 이상의 노드들과 선으로 이루어진 집합
- 종류: 링형, 트리형, 성형 등
프로세서: 카프카 스트림즈에서 토폴로지를 이루는 노드
- 소스 프로세서
- 데이터를 처리하기 위해 최초로 선언하는 노드
- 하나 이상의 토픽에서 데이터를 가져오는 역할
- 스트림 프로세서
- 다른 프로세서가 반환한 데이터를 처리하는 역할
- 싱크 프로세서
- 데이터를 특정 카프카 토픽으로 저장하는 역할
스트림: 노드와 노드를 이은 선, 토픽의 데이터
데이터 처리 구현 방법
- 스트림즈 DSL
- 프로세서 API
스트림즈DSL 기본 개념
KStream
- 레코드의 흐름 → 데이터 조회시 토픽에 존재하는 모든 레코드를 출력한다.
- (메시지 키, 메시지 값)
KTable
- 메시지 키를 기준으로 묶어 가장 최신 레코드를 사용한다.
GlobalKTable
- KTable과 동일하게 메시지 키를 기준으로 묶어서 사용한다.
- KTable과 달리 모든 파티션 데이터가 각 태스크에 할당되어 사용된다.
코파티셔닝(co-partitioning)
조인하는 2개 데이터의 파티션 개수가 동일하도록 맞추는 작업
⇒ 코파티셔닝되지 않은 두 개의 토픽을 조인하는 경우 TopologyException이 발생
코파티셔닝되지 않은 KStream과 KTable을 조인하기 위한 두 가지 방법
- 리파티셔닝 과정을 거친다.
- KTable을 GlobalKTable로 선언해 사용한다. (작은 용량일 경우에만 추천)
스트림즈DSL 주요 옵션
필수 옵션
- bootstrap.servers
- application.id: 애플리케이션 고유 아이디
선택 옵션
- default.key.serde : 레코드 메시지 키를 직렬화, 역직렬화하는 클래스 지정*(기본값: 바이트)*
- default.value.serde: 레코드 메시지 값을 직렬화, 역직렬화하는 클래스 지정*(기본값: 바이트)*
- num.stream.threads: 스트림 프로세싱 실행 시 실행될 스레드 개수*(기본값: 1)*
- state.dir: 상태기반 데이터 처리 시 데이터를 저장할 디렉토리*(기본값: /tmp/kafka-streams)*
스트림즈DSL - stream() , to()
특정 토픽의 데이터를 다른 토픽으로 전달할 때 사용한다.
- stream() : 특정 토픽을 KStream 형태로 가져온다.
- to() : KStream의 데이터를 특정 토픽으로 저장한다.
스트림즈DSL - filter
토픽으로 들어온 문자열 데이터 중 길이가 5보다 큰 경우만 필터링하는 스트림즈 애플리케이션을 만들어보자.
스트림즈DSL - join()
KTable과 KStream을 함께 사용한다면 카프카에서는 실시간으로 들어오는 데이터들을 메시지 키를 기준으로 조인할 수 있다. 이를 통해 데이터를 DB에 저장하지 않고도 조인해 이벤트 기반 스트리밍 데이터 파이프라인을 구성할 수 있다. 조인할 때에는 코파티셔닝 여부를 꼭 확인해야 한다.
코파티셔닝이 되어있지 않은 토픽은 어떻게 조인해야 할까?
- 리파티셔닝을 수행하고 조인 처리를 한다.
- KTable 토픽을 GlobalKTable로 선언해 사용한다.
프로세서 API
스트림즈DSL보다 추가적인 상세 로직의 구현이 필요하면 프로세서 API를 활용한다.
토픽의 문자열 길이가 5 이상인 데이터를 필터링해 다른 토픽으로 저장해보자.
카프카 커넥트
- 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션
- 특정 작업 형태를 템플릿으로 만든 커넥터를 실행해 반복 작업을 줄인다.
커넥터의 종류
- 소스 커넥터: 프로듀서 역할
- 싱크 커넥터: 컨슈머 역할
ex) MySQL에서 데이터를 보낼 때, 저장할 때 JDBC 커넥터를 사용해 파이프라인 생성
실행 순서
- 사용자가 커넥트에 커넥터 생성을 명령
- 커넥트가 내부에 커넥터와 태스크 생성
- 커넥터는 태스크들을 관리
- 태스크는 실질적인 데이터 처리 수행
커넥트 실행 방법
- 단일 모드 커넥트: 1개의 프로세스만으로 실행 → 단일 장애점이 될 수도
- 분산 모드 커넥트: 2대 이상의 서버에서 클러스터 형태로 운영 → 안전
+ REST API(Port: 8083)를 사용해 현재 실행 중인 커넥트, 태스크 상태 등을 조회할 수 있다.
단일 모드 커넥트
- connect-standalone.properties와 connect-file-source.properties 설정 파일 수정
- 단일 모드 실행
- $ bin/connect-standalone.sh config/connect-standalone.properties \\ config/connect-file-source.properties
분산 모드 커넥트
- connect-distributed.properties 설정 파일 수정
- 분산 모드 실행
- $ bin/connect-distributed.sh config/connect-distributed.properties
소스 커넥터
- 소스 애플리케이션, 파일로부터 데이터를 가져와 토픽으로 넣는 역할
- 오픈소스 커넥터를 사용하거나 직접 개발해 사용한다.
- 직접 개발하는 경우에는 SourceConnector 와 SourceTask 클래스를 사용한다.
- SourceConnector : 태스크 실행 전 커넥터 설정파일을 초기화하고 어떤 태스크 클래스를 사용할 것인지 정의
- SourceTask : 실제로 소스에서 데이터를 가져와 토픽으로 보내는 역할 수행
카프카 커넥터를 직접 개발할 때에는 사용자가 작성한 클래스뿐만 아니라 참조하는 라이브러리도 함께 빌드해 jar로 압축해야 한다!
싱크 커넥터
- 토픽 데이터를 타깃에 저장하는 역할
- SinkConnector와 SinkTask 클래스를 사용해 직접 구현할 수 있다.
- SinkConnector : 태스크를 실행하기 전 설정값 초기화, 사용할 태스크 클래스 정의
- SinkTask : 커넥트에서 컨슈머 역할을 하며 데이터를 저장 (데이터 처리 로직)
카프카 미러메이커2
- 서로 다른 두 개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션
- 토픽 데이터 뿐만 아니라 설정까지 복제한다.
단방향 토픽 복제
1. connect-mirror-maker.properties 파일 수정
카프카 클러스터 A와 B가 있을 경우를 가정해 수정해보자.
# 복제할 클러스터 닉네임
cluster = A, B
# 클러스터의 접속 정보
A.bootstrap.servers = a-kafka:9092
B.bootstrap.servers = b-kafka:9092
# A->B로 복제를 진행할 것인지, 어떤 토픽을 복제할 것인지 명시
A->B.enabled = true
A->B.topics = test
B->A.enabled = false
B->A.topics = .*
# 신규 생성된 토픽 복제 개수 설정
replication.factor=1
# 내부 토픽의 복제 개수 설정
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
2. 설정 파일과 함께 미러메이커2 실행
$ bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
지리적 복제(Geo-Replication)
미리메이커2는 단방향, 양방향 복제 기능, ACL복제, 새 토픽 자동 감지 등의 기능을 제공한다.
Active-standby 클러스터 운영
- 적용 상황: 서비스용 클러스터 외에 재해 복구를 위한 임시 클러스터를 하나 더 구성할 때
- 액티브 클러스터: 서비스와 직접 통신하는 클러스터
- 스탠바이 클러스터: 나머지 1개의 클러스터
- 복제 랙이 있어 스탠바이 클러스터에 액티브의 모든 정보가 복제되지 않았을 수도 있다.
⇒ 이에 대한 대응 방안을 사전에 정해둘 필요가 있다.
⇒ 장애 복구 훈련을 계획하고 수행하는 것이 매우 중요!
Active-active 클러스터 운영
- 적용 상황: 글로벌 서비스에서 통신 지연을 최소화하기 위해 2개 이상의 클러스터를 둘 때
Hub and spoke 클러스터 운영
- 적용 상황: 각 팀의 소규모 클러스터 데이터를 한 클러스터에 모아 데이터 레이크로 사용하고 싶을 때
- 허브: 중앙에 있는 한 개의 점
- 스포크: 중앙의 점과 다른 점들을 연결한 선
- Total
- Today
- Yesterday
- 버추억박스에러
- linuxawk
- linuxtouch
- Linux
- 백준27211
- 사용자ID
- atq
- OnActivityForResult
- linuxgedit
- 리눅스cron
- cron시스템
- 버추억박스오류
- GithubAPI
- E_FAIL
- 쇼미더코드
- Baekjoon27211
- 리눅스
- 코테
- 백준27219
- awk프로그램
- whatis
- baekjoon
- SELECT #SELECTFROM #WHERE #ORDERBY #GROUPBY #HAVING #EXISTS #NOTEXISTS #UNION #MINUS #INTERSECTION #SQL #SQLPLUS
- cat
- linux파일
- virtualbox
- GitHubAPIforJava
- 백준
- Baekjoon27219
- api문서
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |