ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Consumer Lag 모니터링, Burrow를 알아보자 (2)
    개발하면서/코드보면서 2019.08.21 01:09

    이전 포스트에서 Burrow의 기본 구조와 Consumer group list 관리가 어떻게 되고 있는지 알아보았습니다.

     

    이번에는 Burrow의 모듈 중 storage, evaluator를 알아보겠습니다.


    storage는 Burrow에서 수집하는 데이터(Topic/Consumer 정보) 저장/조회 역할을 담당하고 있습니다.

    evaluator는 Consumer Offset/Lag을 지켜보면서 Consumer Group의 상태를 판단하는 역할을 합니다.

                         Topic/Partition 상태를 판단해서 가장 안 좋은(?) 상태가 Consumer Group의 상태가 됩니다.

    storage

    storage는 카프카의 topic, offset, consumer offset 정보를 저장/조회 역할을 하는 모듈입니다.

    Burrow 코드에서 helpers.TimeoutSendStorageRequest 검색을 하면 kafka_client.go, kafka_zk_client.go, kafka_cluster.go
    (notifier/coordinator.go 설명 편의를 위해 제외)가 보이는데요.

    즉 3개의 module에서 storage 저장을 위해

    아래와 같은 방식으로 storage module에 전달되고 전달된 데이터는 inmemory.go의 mainLoop에서 처리 합니다.

    client, cluster module이 storage module을 이용하여 데이터 저장하는 과정

    Storage RequestWorker에 보이는 귀여운 고퍼 이미지 출처

    https://medium.com/@trevor4e/learning-gos-concurrency-through-illustrations-8c4aff603b3

     

    storage.inmemory.workers 설정 값만큼 requestworker가 생기고

    storage.inmemory.queue-depth 수만큼 storage requestchannel의 버퍼 갯수가 설정됩니다.

     

    storage에서 사용하는 설정 중 몇가지를 살펴보겠습니다.

    intervals

    (default 10개), 각 토픽 파티션 별 consumer offset을 몇 개 저장할 건지를 정하는 설정입니다.
    https://golang.org/pkg/container/ring/  를 이용해서 깔끔하게 구현했네요.

    expire-group

    초 단위(default 7일), 설정한 시간동안 offset 커밋이 없다면 expire 상태로 인지합니다.
    해당 값을 2시간으로 했다고 해서 2시간 동안 commit이 없는 consumer group은 자동 삭제될 것 같지만 아닙니다.
    자세한 건 https://dol9.tistory.com/272 마지막 부분을 참고해주세요

    min-distance

    초 단위(default 0), 마지막으로 저장된 consumer offset 시간과 추가되는 consumer offset
    시간 간격의 최소 단위 입니다.

    예를 들어 min-distance를 5로 설정했다면 마지막 consumer offset 저장 후 4초 뒤에 새롭게 consumer offset이 추가된다면 추가되는 게 아니라 마지막 consumer offset 저장 위치에

    timestamp만 기존값을 유지한 채 덮어씁니다. 말로 설명하려니 복잡해서 코드 첨부합니다.

    consumerPartitionRing := module.getPartitionRing(consumerMap, request.Topic, request.Partition, partitionCount, requestLogger)
    if consumerPartitionRing.Prev().Value != nil {
    	// 추가될 consumer offset timestamp와 마지막 ring에 저장된 timestamp의 차이가 min-distance보다 작은 경우
    	if (request.Timestamp - consumerPartitionRing.Prev().Value.(*protocol.ConsumerOffset).Timestamp) < (module.minDistance * 1000) {
    		// ring의 위치 rewind
    		consumerPartitionRing = consumerPartitionRing.Prev()
    		consumerMap.topics[request.Topic][request.Partition].offsets = consumerPartitionRing
    
    		// 그리고 추가될 consumer offset의 시간은 마지막 ring에 저장된 timestamp로 교체
    		// 이렇게 하는 이유는 추가될 consumer offset의 timestamp로 할 경우 ring에는 영영 새로운 offset 저장은 못함
    		request.Timestamp = consumerPartitionRing.Value.(*protocol.ConsumerOffset).Timestamp
    	}
    }

     

    evaluator

    주기적으로 가져오는 topic offset 정보와 consumer group이 commit하고 있는 offset 정보를 이용해 상태를 판단하는 모듈입니다.

    /v3/kafka/:cluster/consumer/:consumer/status, /v3/kafka/:cluster/consumer/:consumer/lag 를 호출했을 때

    판단을 시작합니다.

     

    앞에서 본 Storage module의 interval 크기의 window에 저장된 consumer offset, lag, time을 이용하여 상태 판단을 합니다.

    consumer offset은 client module에 가져온 값이고,

    lag은 broker로부터 가져온 topic offset과 consumer offset의 HEAD 차이를 계산한 값입니다.

    broker로부터 가져온 topic offset은 일정 주기로 가져오지만 consumer offset은 commit 될 때마다 가져오므로

    음수가 나올 수 있는데(topic offset 보다 consumer commit된 offset이 더 클 여지가 있음)  이때는 0으로 처리합니다.

     

     

    ※※※ Burrow 공식 위키에 나온 consumer group의 상태를 판단하는 방법을 정리했습니다. (너무 잘 설명되어있음...) ※※※

     

    존재하는 모든 토픽의 파티션 마다 아래 rule을 적용하여 상태를 판단합니다.

    ※ 1~5에서 언급하는 offset은 consumer offse을 의미합니다. topic offset은 별도 표기

     

    1. window 내에 lag이 0인 경우가 있다면 "OK"

    2. offset 변화는 없고 lag이 고정이거나 증가하는 경우 consumer 상태는 "ERROR", partition은 "STALLED"

     

    3. offset은 증가하고 있는데 lag은 고정이거나 증가하는 경우 consumer 상태는 "WARNING", consume 속도가 느려졌다고 판단

     

    4. 현재 시간과 가장 최근의 offset 시간 차이가 가장 최근의 offset 시간과 가장 오래된 offset의 시간 차이보다 큰 경우
    consumer는 "ERROR", partition은 "STOPPED"  하지만 offset과 topic의 offset이 같다면 해당 partition은 잘못됐다고 판단하지 않습니다.

     

    5. lag이 -1은, consumer offset은 존재하지만 topic offset은 존재하지 않는 경우입니다.

     

    interval window 크기만큼 데이터가 수집되지 않은 상태라면 잘못된 판단을 할 수 있습니다.

    그래서 "complete" flag를 두어 window가 꽉 찬 상태에서 판단한 건지 아닌지를 나타냅니다.

    https://godoc.org/github.com/linkedin/Burrow/core/protocol#StatusConstant

    유연한 대응을 위해 complete를 판단하는 기준을 조절할수 있습니다. [minimum-complete]

    https://github.com/linkedin/Burrow/pull/388

    Example (예제 설명할 때 말하는 offset은 consumer offset입니다)

    example 1

    lag이 0인 경우가 있어서 "OK" 상태가 됩니다. Rule#1

    하지만 시간이 지나 W1 ~ W6이 밀려서 lag에 0인 게 window에 존재하지 않는다면 다른 rule이 적용됩니다.

      W1 W2 W3 W4 W5 W6 W7 W8 W9 W10
    Offset 10 20 30 40 50 60 70 80 90 100
    Lag 0 0 0 0 0 0 1 3 5 5
    Timestamp T T+60 T+120 T+180 T+240 T+300 T+360 T+420 T+480 T+540
    func isLagAlwaysNotZero(offsets []*protocol.ConsumerOffset) bool {
    	for _, offset := range offsets {
    		if offset.Lag == 0 {
    			return false
    		}
    	}
    	return true
    }

     

    example 2

    offset은 변하지 않는데 lag이 증가하였습니다. w5까지는 큰 문제는 아니었는데 W6부터 lag이 증가했네요. Rule#2

    offset commit은 시도하지만 실패한다고 판단되어 consumer는 "ERROR", partition는 "STALLED" 상태가 됩니다.

     

    W1 W2 W3 W4 W5 W6 W7 W8 W9 W10
    Offset 10 10 10 10 10 10 10 10 10 10
    Lag 1 1 1 1 1 2 2 2 3 3
    Timestamp T T+60 T+120 T+180 T+240 T+300 T+360 T+420 T+480 T+540
    func checkIfOffsetsStalled(offsets []*protocol.ConsumerOffset) bool {
    	for i := 1; i < len(offsets); i++ {
    		if offsets[i].Offset != offsets[i-1].Offset {
    			return false
    		}
    	}
    	return true
    }

     

    example 3

    offset은 증가하고 lag도 증가합니다. Rule#3

    consume 속도가 느려진다고 판단되어 "WARNING" 상태가 됩니다.

      W1 W2 W3 W4 W5 W6 W7 W8 W9 W10
    Offset 10 20 30 40 50 60 70 80 90 100
    Lag 1 1 1 1 1 2 2 2 3 3
    Timestamp T T+60 T+120 T+180 T+240 T+300 T+360 T+420 T+480

    T+540

     

    func checkIfLagNotDecreasing(offsets []*protocol.ConsumerOffset) bool {
    	for i := 1; i < len(offsets); i++ {
    		if offsets[i].Lag < offsets[i-1].Lag {
    			return false
    		}
    	}
    	return true
    }

     

    example 4

    현재 시간이 T+1200이고 아래 window를 가졌다고 했을 때 partition은 "STOPPED", consumer는 "ERROR" 상태가 됩니다. Rule#4

    window의 최근 offset과 가장 오래된 offset의 시간 차이는 540인데 현재 시간과 최근 offset의 시간 차이는 660입니다.

    consumer가 어떤 이유에선지 더 이상 commit offset을 안 하는 상태라고 판단합니다.

      W1 W2 W3 W4 W5 W6 W7 W8 W9 W10
    Offset 10 20 30 40 50 60 70 80 90 100
    Lag 5 3 5 2 1 1 2 1 4 6
    Timestamp T T+60 T+120 T+180 T+240 T+300 T+360 T+420 T+480 T+540
    func checkIfOffsetsStopped(offsets []*protocol.ConsumerOffset, timeNow int64) bool {
    	firstTimestamp := offsets[0].Timestamp
    	lastTimestamp := offsets[len(offsets)-1].Timestamp
    	return ((timeNow * 1000) - lastTimestamp) > (lastTimestamp - firstTimestamp)
    }

     

    storage에서 expire-group은 짧게 주는 것보다 서비스에 맞게 여유 있게 주는 것을 추천합니다.
    짧게 주면 offset 정보가 날아가기 쉬워서 consumer group 상태를 판단하기에 데이터가 부족할 경우가 많아지기 때문입니다.

     

    주기적으로 Burrow의 정보를 조회해 ES에 넣고 이를 감시하는 방식으로 Kafka monitoring을 해도 좋을 것 같네요.

    댓글 0

Designed by Tistory.