티스토리 뷰
CHAPTER 3. 카프카 기본 개념
카프카 브로커, 클러스터, 주키퍼
브로커
- 카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체
- 하나의 서버에 하나의 카프카 브로커 프로세스를 실행
- 안전한 데이터 보관을 위해 3대 이상의 브로커 서버를 1개의 클러스터로 묶어 운영
브로커가 하는 일
1. 데이터 저장, 전송
- 파일 시스템에 데이터를 저장한다.
- ‘페이지 캐시’를 이용해 높은 처리 속도를 보장한다.
2. 데이터 복제, 싱크
- 파티션 단위로 데이터를 복제하며 원본을 리더, 복사본을 팔로워라고 부른다.
- 리더 브로커가 다운되면 팔로워 중 하나가 리더 지위를 넘겨받는다.
3. 컨트롤러
- 하나의 브로커가 컨트롤러 역할을 한다.
- 다른 브로커들의 상태를 체크하며 리더 파티션을 재분배한다.
4. 데이터 삭제
- 데이터 삭제는 오직 브로커만 가능하다.
- 로그 세그먼트: 삭제되는 파일 단위
5. 컨슈머 오프셋 저장
- 컨슈머 그룹: 토픽이 어떤 파티션에서 어느 레코드까지 가져갔는지 확인하기 위해 오프셋을 커밋
- offsets 토픽에 저장된 오프셋을 토대로 컨슈머 그룹이 다음 레코드를 가져간다.
6. 코디네이터
- 하나의 브로커가 코디네이터 역할을 한다.
- 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 분배한다.
- 리밸런스: 파티션을 컨슈머로 재할당하는 과정
주키퍼
- 카프카의 메타데이터 관리 역할
토픽과 파티션
토픽
- 데이터 구분 단위
- 1개 이상의 파티션 소유
- 레코드: 프로듀서가 보내 파티션에 저장되는 데이터
- 카프카 병렬처리의 핵심
- FIFO 구조: 컨슈머는 먼저 들어간 레코드를 먼저 가져간다. ↔ 가져간 레코드는 삭제되지 않는다.
토픽 이름 작명 방법
- 어떤 개발환경에서 사용되는 것인지 판단 가능해야 한다.
- 어떤 애플리케이션에서 어떤 데이터 타입으로 사용되는지 유추할 수 있어야 한다.
- 카멜케이스보다는 케밥케이스, 스네이크 표기법이 어울린다.
예시)
- prd.marketing-team.sms-platform.json
- commerce.payment.prd.notification
- dev.email-sender.jira-1234.email-vo-custom
- aws-kafka.live.marketing-platform.json
레코드
- 타임스탬프 + 메시지 키 + 메시지 값 + 오프셋
- 브로커에 적재되면 수정할 수 없고 로그 리텐션 기간 또는 용량에 따라서만 삭제된다.
- 동일한 메시지 키는 동일 파티션에 들어간다.
- 키가 null인 레코드는 프로듀서 기본 설정 파티셔너에 따라 분배된다.
- 메시지 키, 값은 직렬화되어 브로커에 전송되므로, 컨슈머가 이용할 때에는 동일한 형태로 역직렬화를 해야 한다.
카프카 클라이언트
프로듀서 API
자바 애플리케이션과 카프카 라이브러리를 통해 프로듀서를 구현할 수 있다.
프로듀서는 데이터 전송 시 내부적으로 파티셔너, 배치 생성 단계를 거친다.
프로듀서 주요 옵션
- 필수 옵션
- bootstrap.servers : 전송할 호스트이름:포트
- key.serializer : 키를 직렬화하는 클래스
- value.serializer : 값을 직렬화하는 클래스
- 선택 옵션
- acks : 전송 성공 여부 확인 옵션
- == 1(default) : 데이터 저장 시 성공으로 판단
- == 0 : 데이터 전송 즉시 성공으로 판단
- == -1(all) : min.insync.replicas 개수에 해당하는 리더/팔로워 파티션에 저장 시 성공으로 판단
- buffer.memory : 배치 버퍼 메모리 양 (default: 33554432(32MB))
- retries : 에러 이후 재전송 시도 횟수 (default: 2147483647)
- batch.size : 배치로 전송할 최대 레코드 용량 (default: 16384)
- linger.ms : 배치 전송 전 기다리는 최소 시간 (default: 0)
- partitioner.class : 파티셔너 클래스 지정
- enable.idempotence : 멱등성 프로듀서 동작 여부 설정 (default: false)
- transactional.id : 레코드 전송 시 트랜잭션 단위로 묶을지 여부 설정(default: null)
- acks : 전송 성공 여부 확인 옵션
커스텀 파티셔너를 만들어보자.
메시지 키에 따라 파티션을 다르게 지정하고 싶을 때에는 직접 파티셔너를 만들어야 한다.
브로커 정상 전송 여부 확인하기
send() 메서드가 반환하는 Future 객체는 비동기 결과를 표현하는 것으로 정상적으로 적재되었는지 알려준다.
사용자 정의 Callback 클래스를 생성해 비동기로 결과를 확인할 수 있다.
컨슈머 API
컨슈머는 적재된 데이터를 가져와 필요한 처리를 한다.
ex) 토픽에서 고객 데이터를 가져와 마케팅 문자를 고객에게 보낸다.
컨슈머 중요 개념
컨슈머를 운영하는 방법
- 컨슈머 그룹을 운영한다.
- 토픽의 특정 파티션만 구독하는 컨슈머를 운영한다.
컨슈머에 장애가 발생한다면?
리밸런싱 컨슈머 크룹의 일부 컨슈머에 장애가 발생하면, 장애 발생 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다. 그리고 이슈 발생 컨슈머를 그룹에서 제외해 가용성을 높인다.
- 리밸런싱은 자주 발생하면 안된다.
- 그룹 조정자가 컨슈머 그룹에 컨슈머 추가 및 삭제 시 리밸런싱을 발동시킨다.
오프셋 커밋
데이터 중복 처리를 방지하기 위해 컨슈머 애플리케이션이 오프셋 커밋을 정상 처리했는지 검증 과정이 필요하다.
- 비명시 오프셋 커밋: 일정 간격마다 자동으로 커밋
- 리밸런싱 또는 컨슈머 강제종료 시에 처리 중이던 데이터가 중복 또는 유실될 수 있다.
- 명시 오프셋 커밋: poll() → commitSync() 호출
컨슈머 주요 옵션
필수 옵션
- bootstrap.servers
- key.deserializer
- value.deserializer
선택 옵션
- group.id : 컨슈머 그룹 ID(default: null)
- auto.offset.reset : 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어디부터 읽을지 선택
- == latest: 가장 높은 오프셋부터 읽음(default)
- == earliest: 가장 낮은 오프셋부터 읽음
- == none : 컨슈머 그룹 커밋 기록 확인
- enable.auto.commit : 자동 커밋 여부(default: true)
- auto.commit.interval.ms : 자동 커밋일 경우 오프셋 커밋 간격(default: 5000)
- max.poll.records : poll()로 반환되는 레코드 개수(default: 500)
- session.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 최대 시간
- 이 시간을 넘기면 이슈라고 판단하고 리밸런싱 진행
- 보통 hearbeat 시간 * 3배로 설정(default: 10000)
- hearbeat.interval.ms : hearbeat 전송 시간 간격(default: 3000)
- max.poll.interval.ms : poll() 메서드 호출 간격의 최대 시간
- 데이터 처리 시간이 최대 시간을 넘으면 리밸런싱 진행(default: 3000000)
- isolation.level : 레코드를 트랜잭션 단위로 보낼 경우 사용
- == read_committed(default)
- == read_uncommitted
비동기로 오프셋을 커밋하고 싶다면 ?
commitAsync()을 사용하자.
리밸런스 리스너, ConsumerRebalanceListener
리밸런스 발생 시 데이터를 중복 처리하지 않으려면 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 해야 한다. 리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원한다.
파티션 할당 컨슈머: assign()
- 파티션을 컨슈머에 명시적으로 할당해 운영한다.
- 직접 특정 토픽, 파티션에 할당하므로 리밸런싱이 없다.
컨슈머에 할당된 파티션 확인하기: assignment()
컨슈머 안전 종료: wakeup(), close()
- wakeup() : 컨슈머 인스턴스 안전 종료
- close() : 컨슈머 종료 여부를 명시적으로 알려줌
wakeup() 메서드는 셧다운 훅을 사용해 호출한다.
- 셧다운 훅이란 사용자 또는 OS로부터 종료 요청을 받으면 실행하는 스레드를 말한다.
- Total
- Today
- Yesterday
- linuxawk
- awk프로그램
- 쇼미더코드
- 백준
- whatis
- api문서
- virtualbox
- 버추억박스에러
- linuxtouch
- cat
- OnActivityForResult
- 리눅스cron
- atq
- 백준27211
- linux파일
- 버추억박스오류
- GithubAPI
- Baekjoon27211
- Baekjoon27219
- E_FAIL
- cron시스템
- 코테
- 사용자ID
- GitHubAPIforJava
- linuxgedit
- 리눅스
- baekjoon
- 백준27219
- SELECT #SELECTFROM #WHERE #ORDERBY #GROUPBY #HAVING #EXISTS #NOTEXISTS #UNION #MINUS #INTERSECTION #SQL #SQLPLUS
- Linux
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |