-
Uber에서 Kafka consumer 모니터링 하는 방법개발하면서/타인글보면서 2021. 10. 25. 22:46반응형
"Kafka consumer monitoring"하면 어떤 게 떠오르나요?
__consumer_offsets 토픽, lag, autocommit, consumer rebalance, Burrow
※ Burrow에 정리한 dol9 글
https://dol9.tistory.com/272
https://dol9.tistory.com/273현재 우버는 대부분의 서비스에서 Kafka를 사용 중인데 Kafka consumer 모니터링을 위해
자체 개발한 얘기가 소개된게 있어 정리해보았습니다.
※ 규모의 경제로 여차하면 자체 개발하는 것 같다. -_-;;
자체 개발 -> 오픈 소스 -> 기술 회사 창업? ㅋㅋㅋ그리고 정리하다보니 __consumer_offsets 토픽 데이터뿐만 아니라 어떠한 이유로 내려간 consumer의
offset 모니터링도 가능하도록 KafkaAdminClient를 이용하는 부분도 간단히 살펴보았습니다.consumer가 내려가서 __consumer_offsets 토픽에 정보를 업데이트하지 않는 consumer 모니터링을 위해
uGroup에서 보완한 방법이 재미집니다.https://eng.uber.com/introducing-ugroup-ubers-consumer-management-framework/
preface
2015년, region 1개로 작게 사용했던 Kafka는 현재 우버 사업의 엄청난 확장과 비례해서 Kafka 사용도 증가했고
그에 따라 운영 이슈가 생겼다.그중 Kafka Consumer를 효과적으로 모니터링해야 할 필요가 생겨 리서치했는데 자체 개발하기로 했고
이번 포스트에서 회사 내부에서 사용 중인 Kafka Consumer 모니터링 서비스, uGroup을 소개한다.
초기 Kafka Consumer monitoring 한 방법
처음에는 consumer에서 메트릭을 제공했다.
. Uber 내부적으로 만든 Kafka Consumer library를 사용하면 내부적으로 메트릭 제공
. 오픈소스 Kafka Consumer library를 사용하면 담당자가 메트릭 제공 구현 필요
. Flink, Filebeat 같은 framework를 사용하는 경우 프레임웍 자체 메트릭을 사용
consumer에서 제공한 메트릭으로 모니터링을 할 때
메트릭이 100% 정확하지 않고 서비스가 추가될 때마다 중복된 작업이 반복된다는 점,
마지막으로 메트릭이 표준화되지 않아 Uber Kafka team에서 문제를 이해/발견하기가 어려웠다.Kafka 초기버전은 consumer group의 모든 정보를 Zookeeper에 저장했는데 확장성에 문제가 있어
0.9부터는 __consumer_offsets 토픽에 저장하는 걸로 바뀌었다.
__consumer_offsets에 저장되는 정보
. Consumer group 생성
. Consumer group에 consumer가 조인했다는 정보
. 가장 중요!! Consumer group에 있는 consumer들의 offset commit 정보
__consumer_offsets 토픽 데이터를 이용하여 consumer 모니터링 고고
3개 Module
Streaming Job Module: __consumer_offset 토픽에서 데이터를 읽어 필터링 후 출력
Observability Module: Stream Job에서 출력한 정보를 읽어 통계 데이터 생성(Kafka와 M3에 저장 )
※ M3를 간단히 살펴보니 Promethues의 확장 기능 부족을 개선하기 위해 Prometheus 호환을 유지해서 만든 Timeseries DB라고...Processing Module : Stream Job에서 출력한 정보를 읽어 타 서비스 호출(alert, event manager)
Streaming Job Module
- __consumer_offsets 토픽에서 메시지를 읽는다.
- Consumer monitoring에 불필요한 메시지 type은 거른다.
- 메시지를 디코드 해서 그룹의 활동 정보 얻는다.
- 화이트리스트와 블랙리스트 정보를 가지고 모니터링할 consumer group & topic을 필터
- 지정한 time interval 간격으로 메시지를 압축한다.
※ 예를 들어 time interval이 3초라고 한다면 특정 consumer group & topic의 데이터가 3초에 100건이 오든 1000건이 오든
3초의 처음과 끝 정보만 남기고 버리는 걸 말하는 듯
Observability Module
두 개 프로세스가 존재
- 메시지를 분석하고 리포트하는 프로세스
[consumer group & topic] 별 consume 하고 있는 파티션 할당 정보
consumer lag [consumer group & topic & partition]
토픽 별 실행되고 있는 consumer group 개수 - 요약 정보를 Kafka에 produce 하는 프로세스
Lag 계산할 때 datastore로부터 동적으로 읽거나 file로 읽어 watchlist를 만들고 해당 목록만 리포트한다.
보다 정확한 consumer lag 계산을 위해 KafkaAdminClient를 이용한 fetcher가 있는데,
이 fetcher는 토픽 파티션의 start/end offset 조회뿐만 아니라 __consumer_offsets에서 데이터를 읽지 않고
consumer group의 topic + partition offset 정보를 가져온다Fetcher가 필요한 이유는 항상 떠있지 않거나 실수로 종료한 consumer인 경우 __consumer_offset 토픽에 정보가
업데이트되지 않아 최신 offset 정보를 유지해서 정확한 모니터링을 위한 보완책이다.Processing Module
- Consumer lag alert 기능
- 멀티 리전에서 consumer offset 관리를 위해 consumer offset을 Kafka Offset sync service 기록
멀티 리전에서 offset 관리하는 내용 정리한 facebook 글
https://www.facebook.com/groups/kafka.kru/posts/1127515424356446 - A 토픽을 읽고 있는 consumer group은 어떤 게 있냐?라는 질의가 가능한 프로세스
Burrow와 차이점
- Burrow는 Go, uGroup은 Java로 만들어져서 Kafka와 호환성 면에서 좋다.
__consumer_offsets에는 많은 정보들이 binary로 담겨있다.
메시지 디코드 할 때 Burrow는 많은 코드가 필요하지만 uGroup은 한 개 라인으로 가능!! - Event processing framework 기반이어서 확장에 용이하다.
특히 Streaming Job에 있는 task들은 쉽게 추가/수정/삭제가 가능해서 다양한 목적으로 사용할 수 있다.
하지만 Burrow는 Kafka 모니터링 관점으로만 설계되어있어 유연하지 않다. - Observability Module에 fetcher가 있어서 동작을 멈춘 Consumer 도 쉽게 추적할 수 있다.
__consumer_offsets 토픽으로만 모니터링할 경우 동작하지 않는 consumer group 정보는 더 이상 업데이트되지 않는다.
KafkaAdminClient의 listOffsets, listConsumerOffsets을 이용하는데
그렇다고 모든 consumer group & topic 정보를 Broker에 질의해서 가져오는 건 Broker에 부담이 된다.
그래서 consumer가 expire 되고(특정 시간 동안 __consumer_offsets 토픽에 정보가 업데이트되지 않는 걸 말하는 듯) watchlist(Observability Module)에 consumer group & topic 정보가 있는 경우만 Broker에 질의하도록 했다.
※ 내가 생각한 Burrow와의 가장 큰 차이는 KafkaAdminClient를 이용하여 offset 정보를 유지하는 거라고 생각한다.
KafkaAdminClient로 start/end offset 조회와 consumer group의 offset 조회 알아보기
- . topic + partition의 start/end offset 조회
ReplicaManager에서 직접 partition의 offset을 조회한다.
listOffsets 첫 번째 인자가 Map인데 value인 OffsetSpec을 이용하여 start / end offset 조회
https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html#listOffsets - consumer group의 offset 조회
1 consumer gorupId를 담당하는 GroupCoordinator를 찾고
2 GroupCoordinator 안에 멤버 변수로 있는 groupMetadataCache(groupId -> GroupMetadata) 에서 groupId 정보 조회
3 2번에서 조회한 데이터를 Map(topic + partition -> offset) 형태로 가공해서 리턴
반응형