아파치 카프카 - 애플리케이션 프로그래밍 (최원영 지음) 4장
4.1 토픽과 파티션
4.1.1 적정 파티션 개수
토픽의 파티션 개수는 성능과 관련이 있다. 그래서 파티션 개수를 정할땐 신중히 정해야 한다. 파티션 개수 고려사항은 아래 3개가 있다.
- 데이터 처리량
- 메시지 키 사용 여부
- 브로커, 컨슈머 영향도
데이터 처리량
파티션은 카프카 병렬처리의 핵심 요소이다. 파티션의 개수가 많아지면 컨슈머와 1:1 매핑되는 개수가 늘어나기 때문이다.
데이터 처리 속도를 올리는 방법은 컨슈머의 처리량을 늘리는 방법과 컨슈머,파티션을 추가해서 병렬처리량을 늘리는 방법이 있다.
컨슈머 처리량을 늘리는 방법은 Scale up, GC 튜닝 등이 있지만 일정 수준 이상 처리량을 올리기 어렵다. 반면 파티션 개수를 늘리는 방법은 데이터 처리 속도를 올릴 수 있는 확실한 방법이다. 프로듀서가 보내는 데이터양과 컨슈머 데이터 처리량을 계산해 파티션 개수를 정하면 된다.
프로듀서 전송 데이터량 < 컨슈머 데이터 처리량 * 파티션 개수
EX) 1000개 < 100개 * 최소10개
컨슈머 데이터 처리량을 구할 때는 운영중인 카프카에서 더미 데이터로 테스트해야 한다. 컨슈머는 다른 시스템과 연동될 수 있기 때문에 로컬 또는 테스트 환경에서 진행하면 운영 환경과 달라 데이터 처리량에 차이가 있을 수 있기 때문이다.
메시지 키 사용 여부
메시지 키 사용 여부는 데이터 처리 순서와 밀접한 연관이 있다. 프로듀서가 기본 파티셔너를 사용한 경우에 메시지 키를 사용하면 프로듀서가 토픽으로 데이터를 보낼 때 메시지 키를 해시 변환하여 메시지 키를 파티션에 매칭시킨다. 만약 파티션 개수가 달라지면 이미 매칭된 파티션과 메시지 키의 매칭이 깨지고 전혀 다른 파티션에 데이터가 할당된다. 그러므로 파티션 개수가 달라지면 컨슈머는 특정 메시지 순서를 더는 보장받지 못하게 된다. 그림으로 보면 아래와 같은 상황이다.
메시지 처리 순서가 보장되어야 한다면 최대한 파티션의 변화가 발생하지 않는 방향으로 운영해야 한다. 만약 파티션 개수가 변해야 하는 경우가 발생한다면 커스텀 파티셔너를 개발해 기존 컨슈머와 파티션의 매칭은 유지하게 해야 한다. 이런 어려움 때문에 메시지 키별로 처리 순서를 보장하기 위해서는 파티션 개수를 프로듀서가 전송하는 데이터양보다 더 넉넉하게 잡고 생성하는 것이 좋다.
브로커, 컨슈머 영향도
카프카에서 파티션은 각 브로커의 파일 시스템을 사용하기 때문에 파티션이 늘어나는 만큼 브로커에서 접근하는 파일 개수가 많아진다. 운영체제에서는 프로세스당 열 수 있는 파일 최대 개수를 제한하고 있다. 그래서 카프카 브로커가 접근하는 파일 개수를 안정적으로 유지하기 위해서는 각 브로커당 파티션 개수를 모니터링해야 하고, 만약 브로커가 관리하는 파티션 개수가 너무 많다면 파티션 개수를 분산하기 위해서 카프카 브로커 개수를 늘리는 방안도 고려해야 한다.
4.1.2 토픽 정리 정책
토픽의 데이터는 시간, 용량에 따라 삭제 규칙을 정할 수 있다. 데이터를 삭제하지 않으면 추후에 데이터가 필요할 때 오프셋을 지정해서 지난 데이터를 가져올 수 있다. 또한 영구 보존할 수도 있지만 저장소 사용량이 계속 늘어나게 된다. 토픽 정리 정책엔 2가지가 있다.
토픽 삭제 정책(delete policy)
토픽의 데이터를 삭제하는 정책이다. 토픽의 데이터를 삭제할 땐 세그먼트 단위로 삭제를 진행한다. 세그먼트는 토픽의 데이터를 저장하는 명시적인 파일 시스템 단위이다. 세그먼트는 파티션마다 별개로 생성되며 세그먼트의 파일 이름은 오프셋 중 가장 작은 값이 된다. segment.bytes 옵션으로 크기를 지정할 수 있고, 지정한 크기보다 커질 경우엔 기존 세그먼트 파일을 닫고 새로운 세그먼트를 열어 데이터를 저장한다. 데이터를 저장하기 위해 사용중인 세그먼트를 액티브 세그먼트라 한다.
삭제정책 실행 시점은 시간 또는 용량이 기준이 된다.
- 시간 : 일정 주기마다 세그먼트 파일의 마지막 수정 시간과 retention.ms 값을 비교하는데, 세그먼트 파일의 마지막 수정 시간이 retention.ms 를 넘어가면 세그먼트 삭제한다
- 용량 : retention.bytes를 넘어간 세그먼트 파일을 삭제한다
토픽 압축 정책(compact policy)
메시지 키별로 해당 메시지 키의 레코드 중 오래된 데이터를 삭제하는 정책이다. 즉 가장 최근의 메시지 키 레코드만 남기고 나머지는 삭제하는 것이다. 토픽 압축 정책은 메시지 키를 기반으로 데이터를 처리할 경우 유용하다. 가장 마지막으로 업데이트된 메시지 키의 데이터가 중요할 경우 가장 최신의 데이터를 제외한 나머지 데이터들을 삭제할 수 있기 때문이다.
압축 정책은 액티브 세그먼트를 제외한 나머지 세그먼트들에 한해서만 데이터를 처리한다.
4.1.3 ISR(In-Sync-Replicas)
ISR 은 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 뜻한다. 리더 파티션에 0~3의 오프셋이 있다고 할때, 팔로워 파티션에 동기화가 완료되려면 0~3의 오프셋이 존재해야 한다. 동기화가 완료됐다는 의미는 리더 파티션의 모든 데이터가 팔로워 파티션에 복제된 상태를 말한다.
프로듀서가 특정 파티션에 데이터를 저장하는 작업은 리더 파티션을 통해 처리한다. 이때 리더 파티션에 새로운 레코드가 추가되어 오프셋이 증가하면, 팔로워 파티션이 위치한 브로커는 리더 파티션의 데이터를 복제한다. 팔로워 파티션이 리더 파티션으로부터 데이터를 복제하는 시간차 때문에 오프셋 차이가 발생할 수 있고, 이런 차이를 모니터링 하기 위해 리터 파티션은 replica.lag.time.max.ms값 만큼의 주기를 가지고 팔로워 파티션이 데이터를 복제하는지 확인한다. 만약 설정한 값보다 긴시간 동안 데이터를 가져가지 않는다면 팔로워 파티션에 문제가 생긴것으로 간주하고 ISR 그룹에서 제외한다.
ISR로 묶인 팔로워 파티션은 리더에 문제가 생기면 리더로 선출되도록 설정할 수 있다. 리더 파티션이 있는 브로커의 문제가 해결될때까지 기다리는 방법도 있지만, 이때 해당 토픽은 사용할 수 없다. 때문에 데이터 유실은 발생하지 않지만 서비스 운영에 문제가 생긴다.
unclean.leader.election.enable을 true 로 설정하면 ISR이 아닌 팔로워 파티션, 즉 동기화가 되지 않는 파티션이 리더로 선출될 수 있다. 이때 서비스 운영에 문제는 안생기지만, 동기화가 완벽히 되지 않은 팔로워 파티션이 리더로 선출되기 때문에 데이터 유실이 발생할 수 있다.
4.2 카프카 프로듀서
4.2.1 acks 옵션
acks 옵션을 통해 프로듀서가 전송한 데이터가 카프카 클러스터에 얼마나 신뢰성 높게 저장할지 지정할 수 있다. 그리고 acks 옵션에 따라 성능이 달라질 수 있으므로 acks 옵션에 따른 카프카의 동작방식을 상세히 알고 설정해야 한다.
acks=0
리더 파티션으로 데이터를 전송했을 때 리더 파티션으로 데이터가 저장되었는지 확인하지 않는 다는 설정이다. 리더 파티션은 데이터가 저장된 이후에 데이터가 몇 번째 오프셋에 저장되었는지 리턴하는데, acks 가 0이면 리턴하지 않는다. 데이터가 잘 전송되었다고 가정하고 바로 다음 데이터를 전송하기 때문에 데이터 전송이 실패한 경우를 알 수 없다. retries 가 2 이상으로 설정되어 있더라도 재시도를 하지 않기 때문에 retries 옵션이 무의미하다. acks 를 0으로 했을 때의 장점은 속도는 빠르다.
acks=1
프로듀서는 리더 파티션에만 정상적으로 저장되었는지 확인한다. 리더에 정상적으로 저장되지 않았다면 재시도 할 수 있다. 팔로워 파티션에 동기화가 되지 않을 경우엔 데이터 유실이 발생한다.
acks=all or acks=-1
리더 파티션과 팔로워 파티션에 모두 저장되었는지 확인한다. 리더와 팔로워에 저장되었는지 확인하기 때문에 속도가 느리다. 하지만 안전하게 데이터를 전송할 수 있다.
acks를 all 로 설정할 경우 토픽 단위로 설정이 가능한 min.insync.replicas 옵션값에 따라 데이터 안정성이 달라진다. all 옵션값은 모든 리더, 팔로워 파티션에 저장을 뜻하는게 아니고 ISR에 포함된 파티션들을 뜻하는 것이다. min.insync.replicas 옵션은 프로듀서가 리더 파티션과 팔로워 파티션에 데이터가 저장되었는지 확인하기 위한 최소 ISR 그룹의 파티션 수이다. min.insync.replicas 옵션값이 1이라면 ISR 중 최소 1개 이상의 파티션에 데이터가 저장되었음을 확인하는 것이다. 이 경우 acks=1 과 동일하게 동작한다.
min.insync.replicas 를 설정할 때는 파티션 복제 개수도 함께 고려해야 한다. 왜냐하면 운영하는 카프카 브로커 개수가 min.insync.replicas의 옵션값보다 작은 경우에는 프로듀서가 더는 데이터를 전송할 수 없기 때문이다.
min.insync.replicas 옵션값을 3, 파티션 복제 개수를 3 으로 주었을때 한 브로커에 이상이 생기면 데이터를 전송할 수 없다. 최소한으로 복제되어야 하는 파티션 개수가 3인데 팔로워 파티션이 위치할 브로커의 개수가 부족하기 때문이다. 이때는 예외가 발생하여 토픽으로 데이터가 전송되지 않는다. 브로커가 3대인 클러스터를 운영한다면 파티션 복제 개수는 3, min.insyc.replicas를 2로 설정하고 프로듀서 acks 는 all 로 설정하는 것을 추천한다.
4.2.2 멱등성(idempotence) 프로듀서
멱등성 : 여러번 연산을 수행하더라도 동일한 결과를 나타내는 것을 뜻한다
기본 카프카 프로듀서 동작 방식은 적어도 한번 전달 (at least once delivery) 을 지원한다. 적어도 한번은 전송을 하여 데이터가 유실되지 않지만, 두번 이상 전송하여 중복이 발생할 수 있다.
중복을 막기 위해 enable.idempotence 옵션을 사용할 수 있다. 기본값은 false 지만 true 로 설정하면 데이터를 전송할때 프로듀서 PID 와 시퀀스 넘버를 함께 전달한다. 브로커는 프로듀서의 PID 와 시퀀스 넘버를 확인하여 동일한 메시지의 저장 요청이 오더라도 단 한번만 데이터를 저장함으로써 중복을 방지한다.
네트워크 문제가 생겨 데이터를 다시 전송하더라도 PID 와 시퀀스 넘버를 이용해 중복을 체크해 데이터가 중복저장될 위험이 없다.
단 멱등성 프로듀서는 동일한 프로세스에서만 가능하다. 만약 프로듀서에 이상이 생겨 어플리케이션을 재실행한다면 PID가 달라질것이고, 동일한 메시지를 전달할때 새로운 PID 를 전달하기 때문에 브로커에서는 다른 데이터가 들어왔다고 생각해 데이터의 중복이 발생할 수 있다. 멱등성 프로듀서는 장애가 발생하지 않을 경우에만 정확히 한번 저장함을 보장한다는 것을 기억해야 한다.
4.2.3 트랜잭션 프로듀서
카프카의 트랜잭션 프로듀서는 다수의 파티션에 데이터를 저장할 경우 모든 데이터에 대해 동일한 원자성을 보장하기 위해 사용된다.
원자성 : All or Nothing (전체 데이터를 처리하거나 전체 데이터를 처리하지 않거나)
트랜잭션 프로듀서를 사용하려면 enable.idempotence=true 로 설정하고 transaction.id 를 임의의 String 값으로 정의한다. 그리고 컨슈머의 isolation.level 을 read_committed로 설정하면 프로듀서와 컨슈머는 트랜잭션으로 처리 완료된 데이터만 쓰고 읽게 된다.
트랜잭션은 파티션의 레코드로 구분한다. 구분하기 위해서 프로듀서는 데이터를 레코드 파티션에 저장할 뿐만 아니라 트랜잭션의 시작과 끝을 표현하기 위해 트랜잭션 레코드를 한개 더 보낸다. 컨슈머는 파티션에 저장된 트랜잭션 레코드를 보고 트랜잭션이 완료(commit) 되었음을 확인하고 데이터를 가져간다.
4.3 카프카 컨슈머
4.3.1 멀티 스레드 컨슈머
카프카는 처리량을 늘리기 위해 파티션과 컨슈머 개수를 늘려서 운영할 수 있다. 파티션을 여러 개로 운영하는 경우 데이터를 병렬처리 하기 위해 파티션 개수와 컨슈머 개수를 동일하게 맞추는게 가장 좋은 방법이다. 파티션 개수가 n개라면 컨슈머 그룹으로 묶인 컨슈머 스레드를 최대 n개 운영할 수 있다. n개의 스레드를 가진 1개의 프로세스를 운영하거나 1개의 스레드를 가진 프로세스를 n개 운영할 수 있다.
카프카 컨슈머 멀티 워커 스레드 전략
브로커로부터 받은 레코드들을 병렬로 처리한다면 1개의 컨슈머 스레드로 받은 데이터를 빠르게 처리할 수 있다. for문으로 순차 처리하게 되면 이전 레코드 처리가 끝날 때까지 다음 레코드는 기다리게 된다. 하지만 멀티스레드를 사용하면 데이터 처리를 동시에 실행할 수 있기 때문에 처리 시간을 줄일 수 있다. Executors 를 사용하여 스레드 풀을 생성 후 poll() 메서드를 통해 받은 레코드들을 처리하는 스레드를 레코드마다 개별 실행하는 방법이 있다.
병렬처리하면 속도는 빠르지만 주의해야할 점이 있다. 나중에 생성된 스레드의 레코드 처리가 더 빠를수도 있다는 것이다. 레코드 처리에 있어 역전현상이 발생할 수 있기 때문에 이러한 현상이 발생해도 되며 빠른 처리 속도가 필요한 데이터 처리시 적합한 전략이다.
카프카 컨슈머 멀티 스레드 전략
하나의 파티션은 동일 컨슈머 그룹내 컨슈머 중 최대 1개까지 할당된다. 그리고 하나의 컨슈머는 여러 파티션에 할당될 수 있다. 이런 특징을 잘 살리는 방법은 1개의 어플리케이션에 토픽의 파티션 개수만큼 컨슈머 스레드 개수를 늘려서 운영하는 것이다. 각 파티션의 레코드들을 병렬처리 할 수 있다.
주의할 점은 구독하고자 하는 토픽의 파티션 개수만큼만 컨슈머 스레드를 운영하는 것이다. 컨슈머 스레드가 파티션 개수보다 많아지면 할당할 파티션 개수가 더는 없으므로 할당 받지 못한 컨슈머 스레드는 아무일도 하지 않는다.
4.3.2 컨슈머 랙
컨슈머 랙(LAG)은 토픽의 최신 오프셋과 컨슈머 오프셋간의 차이를 말한다. 프로듀서가 보내는 데이터양이 컨슈머 데이터 처리량 보다 크다면 컨슈머 랙은 늘어난다. 컨슈머 랙이 0이라면 데이터 처리에 지연이 없다는 것이다.
컨슈머 랙을 모니터링함으로써 컨슈머의 장애를 확인할 수 있고 파티션 개수를 정하는데 참고할 수 있다. 특정 트래픽이 몰리는 날엔 컨슈머 랙이 증가하게 될 것이다. 이때 파티션과 컨슈머 개수를 늘려서 몰리는 트래픽을 병렬로 처리해 문제를 해결할 수 있을 것이다.
💡 파티션 개수는 한번 늘리면 다시 줄일 수 없다
컨슈머 랙은 카프카 명령어, KafkaConsumer의 metrics() 메서드, 외부 모니터링 툴을 사용하여 조회할 수 있다. 주로 외부 모니터링 툴을 사용하는데 버로우(Burrow)를 사용하여 컨슈머 랙을 조회한다. 버로우를 사용하면 컨슈머 그룹별로 랙을 확인할 수 있다. 외부 모니터링 툴을 사용하면 카프카 클러스터에 있는 모든 컨슈머, 토픽들의 랙 정보를 한번에 모니터링 할 수 있고, 이러한 모니터링 툴들은 컨슈머의 데이터 처리와는 별개로 지표를 수집하기 때문에 프로듀서나 컨슈머의 동작에 영향을 미치지 않는다는 장점이 있다.
모니터링 구성
버로우를 통해 컨슈머 랙 모니터링 하기 위해선 별개의 저장소와 대시보드를 사용하는게 효과적이다. 지난 몇시간 또는 며칠간의 컨슈머 랙 추이를 확인하기 위해선 별개의 저장소와 대시보드가 필요하기 때문이다. 구성하는 방법은 다양하겠지만 아래와 같은 툴을 사용하면 무료로 컨슈머 랙 모니터링을 구성할 수 있다.
- 버로우 : REST API를 통해 컨슈머랙 조회
- 텔레그래프 : 데이터 수집 및 전달에 특화된 툴. 버로우를 조회하여 데이터를 전달할 수 있다
- 엘라스틱서치 : 컨슈머 랙 정보를 담는 저장소 (InfluxDB 도 많이 쓰는 방법중 하나다)
- 그라파나 : 엘라스틱서치 정보를 시각화하고 특정 조건에 따라 슬랙 알람을 보낼 수 있는 대시보드 툴
4.3.3 컨슈머 배포 프로세스
짧은 시간 동안 중단이 되어 지연이 발생하더라도 서비스 운영에 지장이 없다면 중단 배포를 사용하고, 중단이 발생했을때 서비스 영향이 클 경우에는 무중단 배포를 수행해야 한다.
중단배포
중단 배포는 기존 어플리케이션을 완전히 중단하고 신규 어플리케이션을 실행한다. 중단배포를 실행할 경우 새로운 로직이 적용된 신규 어플리케이션의 실행 전후를 명확하게 특정 오프셋 지점으로 나눌 수 있다는 점이 장점이다.
무중단배포
무중단 배포 방법은 블루/그린, 롤링, 카나리 배포가 있다.
- 블루/그린
- 이전 버전 어플리케이션과 신규 어플리케이션을 동시에 띄어놓고 트래픽을 전환하는 방법
- 파티션 개수와 컨슈머 개수가 동일할 때 유용하다
- 동일하지 않다면 일부 파티션은 기존 어플리케이션에 할당되고 일부 파티션은 신규 어플리케이션에 할당되어 섞이기 때문에 적합하지 않은 방법이다.
- 신규 버전이 준비되면 기존 어플리케이션을 중단한다. 그러면 리밸런싱이 발생하면서 파티션은 모두 신규 컨슈머와 연동된다.
- 롤링
- 파티션 개수가 인스턴스 개수와 같거나 그보다 많아야 한다
- 리밸런스가 여러번 발생한다
- 컨슈머 스레드 하나씩 신규버전으로 교체하는 방법이라고 생각하면 된다
- 카나리
- 신규 버전 문제가 있는지 사전에 탐지할 수 있다.
- 소수 파티션에 신규 버전 어플리케이션을 할당하면서 사전 테스트를 하는 방법
- 카나리 배포로 사전 테스트가 완료되면 나머지 파티션에 대해 롤링 or 블루/그린 배포를 수행하면 무중반 배포가 가능하다.
4.4 스프링 카프카
스프링 카프카에서는 KafkaTemplate 을 통해 데이터를 전송할 수 있다. KafkaTemplate 은 스프링 카프카에서 제공하는 기본 클래스를 사용하는 방법과 ProducerFactory 로 생성하는 방법이 있다.
4.4.1 스프링 카프카 프로듀서
기본 카프카 템플릿
application.yaml 에 옵션에 입력하고 사용하면 된다.
커스텀 카프카 템플릿
프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것이다. 스프링 카프카 어플리케이션 내부에 다양한 종류의 카프카 프로듀서 인스턴스를 생성하고 싶다면 이 방식을 사용하면 된다.
private Map<String, Object> producerConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 100 * 1000); // DEFAULT 120초. send() 후 성공or실패를 보고하는 시간의 상한
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 0); // DEFAULT 0 초. Batch 처리 될때까지 대기하는 시간. 0초로 하면 즉시전송.
return configProps;
}
private ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
위와 같이 KafkaTemplate 빈을 생성하고 사용하면 됩니다. 아래 예시는 ListenableFutureCallback을 사용해 메시지 전송에 성공, 실패했을때 실행되는 콜백 메서드를 구현했습니다.
String message = objectMapper.writeValueAsString(payMessage);
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("pay-test-topic","test-key", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
log.error("Message Exception:{}", ex);
}
@Override
public void onSuccess(SendResult<String, Object> result) {
ProducerRecord<String, Object> producerRecord = result.getProducerRecord();
RecordMetadata recordMetadata = result.getRecordMetadata();
log.info("topic: {},partition: {}, offset: {}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
log.info("topic: {}, header: {}, key: {}, partition: {}",
producerRecord.topic(), producerRecord.headers(), producerRecord.key(), producerRecord.partition());
}
});
4.4.2 스프링 카프카 컨슈머
스프링 카프카 컨슈머는 레코드 리스너(MessageListener)와 배치 리스너(BatchMessageListener)가 있다. 리스너 종류에 따라 한번 호출하는 메서드에서 처리하는 레코드의 개수가 달라진다. 레코드 리스너는 1개를 처리하고, 배치 리스너는 poll() 메서드로 리턴받은 ConsumerRecords 처럼 한번에 여러개 레코드들을 처리할 수 있다.
서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너를 구현하기 위해선 커스텀 리스너 컨테이너를 구현해야 한다. 카프카 리스너 컨테이너 팩토리를 빈으로 등록하고 KafkaListener 어노테이션에서 커스텀 리스너 컨테이너 팩토리를 등록하면 커스텀 리스너 컨테이너를 사용할 수 있다. 아래는 커스텀 리스너 컨테이너를 만드는 예시다.
/*
MAX_POLL_RECORDS_CONFIG : poll() 을 호출해 Topic 에서 한번에 가져오는 메시지 개수
MAX_POLL_INTERVAL_MS_CONFIG : 컨슈머가 poll 하고 commit 하기까지의 최대 시간
HEARTBEAT_INTERVAL_MS_CONFIG : 컨슈머가 heartbeat 를 보내는 주기. sessionTimeout 보다 작아야하고 일반적으로 1/3으로 설정.
SESSION_TIMEOUT_MS_CONFIG : 컨슈머와 브로커 사이의 세션 유지 시간
REQUEST_TIMEOUT_MS_CONFIG : 요청의 응답을 기다리는 최대 시간
*/
private Map<String, Object> consumerConfig() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configMap;
}
private ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
@Qualifier("deadLetterPublishingRecoverer") DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(false);
factory.setCommonErrorHandler(errorHandler(deadLetterPublishingRecoverer));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
private DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
BackOff fixedBackOff = new FixedBackOff(1000L, 3L);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, fixedBackOff);
errorHandler.addRetryableExceptions(SocketTimeoutException.class, MessageRetryHandleException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class, NotCreateTopicMessageException.class, ValidationException.class);
return errorHandler;
}
위 예에서 보이는 ErrorHandler, DeadLetterPublishingRecoverer 을 사용한 로직은 메시지를 처리하다가 발생한 예외를 처리하는 로직이다. 재시도 정책과 관련된 간단한 구현 예이니 참고하자.
@Configuration
public class MemberUpdateDeadLetterPublishingRecovererConfig {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, String> template) {
return new DeadLetterPublishingRecoverer(template,
(consumerRecord, ex) -> {
logger.error("=== consumerRecord: {}", consumerRecord);
return new TopicPartition("DLT토픽이름", consumerRecord.partition());
});
}
}
컨테이너 리스너와 재시도 정책 방법까지 구현했으니 이제 카프카 리스너에 적용해보자
@KafkaListener(
topics = "토픽이름",
groupId = "그룹아이디",
containerFactory = "kafkaListenerContainerFactory"
)
public void onMessage(ConsumerRecord<String, String> data) {
logger.info("offset: {}, message: {}", data.offset(), message);
// process...
}
위 예는 AckMode 를 RECORD 로 했을 때의 예시이다. 레코드 단위로 처리후 커밋을 하는 방식이다. 하지만 Acknowledgement.acknowledge() 메서드 호출을 통해 커밋 하는 타이밍을 직접 제어하고 싶으면 AckMode 를 MANUAL_IMMEDIATE 로 변경하고 아래와 같은 리스너를 구현하면 된다.
// AcknowledgingMessageListener 구현
// onMessage 파라미터에 Acknowledgement 추가
@Component
public class TopicListener implements AcknowledgingMessageListener<String, String> {
@KafkaListener(
topics = "토픽이름",
groupId = "그룹아이디",
containerFactory = "kafkaListenerContainerFactory"
)
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment ack) {
logger.info("offset: {}, message: {}", data.offset(), message);
// process...
ack.acknowledge();
}
}
동기 커밋, 비동기 커밋을 사용하고 싶다면 consumer 인스턴스를 onMessage 파라미터로 추가해서 사용할 수 있다. consumer 인스턴스의 commitSync(), commitAsync() 메서드를 호출하면 사용자가 원하는 타이밍에 커밋할 수 있도록 로직을 추가할 수 있다. 다만, 리스너가 커밋하지 않도록 AckMode 는 MANUAL, MANUAL_IMMEDIATE 로 설정해야 한다.