ABOUT ME

-

인기 태그


kafka elastic_search redis
Today
-
Yesterday
-
Total
-
  • Kafka purgatory (1)
    개발하면서/타인글보면서 2020. 6. 29. 00:18

    4. 그래서 Kafka에서는 어떻게 사용되고 있냐?

    우선 토픽이 생성되면 다음과 같이 topic/partition 별로 DelayedOperations가 생긴다.

    val delayedOperations = new DelayedOperations(
      topicPartition,
      replicaManager.delayedProducePurgatory,
      replicaManager.delayedFetchPurgatory,
      replicaManager.delayedDeleteRecordsPurgatory)

     

    Kafka purgatory

    xxxPurgatory의 전체 모습은 위 그림과 같다.
    Expire 판단을 위한 1개 이상의 Timing Wheel과 성공 여부를 지속적으로 판단하기 위한 Watcher로 이루어져 있다.

    조금 더 자세히 살펴보면

    TimerTaskEntry와 TimerTask

    하나의 요청은 TimerTask로 볼 수 있고 이를 Timing Wheel bucket에서 리스트로 연결시키기 위해 TimerTaskEntry로 감쌌다.

    TimerTask는 Watcher, TimerTaskEntry는 Timing Wheel에서 사용한다고 보면 될 것 같다.

    즉 Watcher의 TimerTask와 Timing Wheel bucket에 있는 TimerTaskEntry는 동일한 요청이라고 보면 되고
    Watcher 혹은 Timing Wheel 중 반드시 한 곳에서요청을 처리해야 한다.

     

     

    extends TimerTask

    TimerTask를 extends 한 DelayedOperation을 구현한 클래스 목록.
    토픽/파티션 관련 operation은 DelayedFetch, DelayedProduce, DelayedDeleteRecords이다.

    Timing Wheel

    앞에서 보았던 circular buffer와 하나의 bucket(slot)마다 해당 시간에 expire 될 task들이 리스트로 연결되어있다.

    watcher

    배열로 되어있는 WatcherLists와 Map으로 되어있는 WatcherByKey 그리고 WatcherByKey의 value로
    ConcurrentLinkedQueue가 있어서 task들이 연결되어있다.
    [Topic name]-[Partion Number]가 키가 되어 hash() % WatcherLists.size() 값으로 WatcherLists의 인덱스를 선택
    키 값 그대로 WatcherByKey의 키가 되고 값에는 TiimerTask가 추가된다.

     

    대강의 모습은 알겠는데 몇 가지 궁금한 점이 생겼다.
    궁금한 것과 내부적으로는 어떻게 처리하고 있는지 알아본다. ※ consumer fetch를 예로 진행

     

    Q1: fetch.max.wait.ms를 2000으로 하면 Timing Wheel은 겁나 많이 만들어지겠네?

    A: 여기서 Hierarchical이 빛을 발한다.

    처음 Timing Wheel이 만들어질 때 wheelSize(bucket size)는 20, tickMs 1ms, interval은 20ms의 Wheel이
    하나 만들어진다. 이때 fetch.max.wait.ms: 1000 요청이 들어온다면 해당 wheel은 감당할 수 없어
    추가 Timing Wheel을 만든다. 바로 아래처럼

      private[this] val interval = tickMs * wheelSize
      @volatile private[this] var overflowWheel: TimingWheel = null
      ...
      private[this] def addOverflowWheel(): Unit = {
        synchronized {
          if (overflowWheel == null) {
            overflowWheel = new TimingWheel(
              tickMs = interval,
              wheelSize = wheelSize,
              startMs = currentTime,
              taskCounter = taskCounter,
              queue
            )
          }
        }
      }
      

    새로 생성되는 Timing Wheel의 tickMs는 이전 interval을 설정하면서 
    wheelSize 20, tickMs 20ms, interval 20 * 20ms = 400ms로 생성된다.
    아직도 fetch.max.wait.ms 1000 요청은 감당할 수 없으니 추가 Timing Wheel을 만드는데
    예상한 것처럼 wheelSize 20, tickMs 400ms, interval 20 * 400ms = 8000ms로 생성이 되고 요청은
    8000ms interval Timing Wheel에 저장된다.

     

    ※ 이미 위에 그려버렸네...ㅎㅎㅎㅎ

     

    Q2: consumer fetch.max.wait.ms를 2분, fetch.min.bytes를 3으로 설정했을 때 첫 요청 시
    새로운 record가 없어서 purgatory에 대기하는 것까지는 이해했다.
    그럼 2분 동안 마냥 기다리고 있다가 2분 되면 새로운 record 확인을 하는 건가?
    아니면 watcher를 주기적으로 살펴보는 별도의 thread가 있는 건가?

    A: 생각보다 굉장히 간단하게 구현되어있다. 리더 파티션에 record가 기록되면 토픽/파티션에 DelayedOperations의
    purgatory 작업들을 한 바퀴 쭉~~ㅋ 돌면서 실행해본다.

      // appendRecordsToLeader method 마지막에 tryCompleteDelayedRequests 호출
    
      private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll()
    
      def checkAndCompleteAll(): Unit = {
        val requestKey = TopicPartitionOperationKey(topicPartition)
        fetch.checkAndComplete(requestKey)
        produce.checkAndComplete(requestKey)
        deleteRecords.checkAndComplete(requestKey)
      }

     

    Q3. Watcher의 operation이나 Timing Wheel의 Timer Task 정리가 중요해 보인다.

    Watcher에서 완료했다면 Timing Wheel은 tick이 돌면서 해당 bucket의 task를 정리하는 것 같은데

    Timing Wheel에서 완료했을 때 Watcher의 정리는 어떻게 하나?

    A: 별도의 Expire Reaper 스레드가 돌면서 DelayQueue에서 200ms 내의 task를 가져와 실행하고 watcher를
    순회하면서 완료된 테스트를 삭제한다. 기존과 다른 게 뭐야? 할 수도 있지만 기존 Design에선 task당
    하나의 DelayQueue가 할당됐지만 이젠 Timing Wheel당 비슷한 expire의 task는 bucket으로 관리되면서
    하나의 Delay Queue가 할당됐다.

     

    Q4. consumer에서 fetch 할 때 대기 중인(purgatory에 머물러있는) request는 consumer에서
    다시 요청하면 안 될 텐데 어떻게 구현했을까?

    A: consumer가 broker에게 fetch 요청을 보낼 때 실행되는 함수 sendFetches.
    fetch request를 만들기 전에 prepareFetchRequest라는 method로 fetch 요청 보낼 Broker Node_Id -> 토픽/파티션 정보를 조회한다.

    public synchronized int sendFetches() {
      Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
      ...
      RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
      // fetch 요청을 보낼 broker node id
      this.nodesWithPendingFetchRequests.add(entry.getKey().id());
    
      future.addListener(new RequestFutureListener<ClientResponse>() {
        @Override
        public void onSuccess(ClientResponse resp) {               
          // 성공해도 nodesWithPendingFetchRequests에서 제거
          nodesWithPendingFetchRequests.remove(fetchTarget.id());
        }
        @Override
        public void onFailure(RuntimeException e) {
          ...
          // 실패해도 nodesWithPendingFetchRequests에서 제거
          nodesWithPendingFetchRequests.remove(fetchTarget.id());
        }
    }
    

     

    그리고 요청을 보낸 Broker Node_Id를 nodesWithPendingFetchRequests에 저장한다.
    다음 sendFetches 할 때 prepareFetchRequests()에서 nodesWithPendingFetchRequests에 있는 Broker에는 요청을 skip 하는
    것으로 KafkaConsumer가 구현되어있다.

    그림으로 간단하게 풀어보면

     

    Consumer 하나가 3개 파티션이 있는 TopicA를 subscribe 하고 있고 파티션은 브로커 3개에 골고루 퍼져있다고 가정하면
    위에처럼 nodesWithPendingFetchRequests에 1, 2, 3이 저장되고 3개 노드에 요청을 보낸다.

    두 개의 Broker는 조건이 만족되어 response가 왔지만 Broker 2는 조건이 만족되지 않아 purgatory에 있는 상태

    다음번 fetch 요청할 때는 Broker 2는 건너뛰고 요청을 보낸다.
    nodesWithPendingFetchRequests를 List 같이 그렸지만 Set이다.

     

    5. 내 생각

    올해 4월부터 서비스 백엔드 개발을 하기 시작하면서(아무도 안 궁금한 근황 토크) 스케줄러 프로젝트를 보게 됐고,
    이때 에버노트 깊숙한 곳에 있던 Kafka purgatory가 생각났다.

    초년생 때는 이 오픈소스를 싹 다 뜯어봐서 한 분야에 최고가 돼야지!!(지금 생각해보니 뜯어보기만 한다고 최고는 아닌데...;;)
    라는 생각이었다면 요새는 지금 맡은 일하면서 마주한 문제들을 오픈소스 플젝에서는 어떻게 해결하고 있는지 파악하자!!
    에 무게를 두고 있다.

     

    물론 이 생각이 내가 개발에 대한 열정이 줄어서 투자하는 절대 시간이 준 것을 방어하기 위한 핑계일 수도 있지만... 쨋든

     

    Kafka purgatory를 나름 정리해보면

    1. Hierarchical Timing Wheel의 bucket으로 task 관리해서 DelayQueue의 개수가 확연히 줄어듦

    2. Watcher 체크하는 타이밍이 기가 막히고 (특정 토픽/파티션에 새로운 record가 들어올 때 해당 Watcher만 체크!!)

    3. broker에서 대기 중인 요청에 대한 관리도 클라에서 적절하게 잘해주고 있다.

     

    아직 완벽하게 이해하지 못했고 그대로 실무에 적용할 수는 없겠지만 손에 익혀두면 써먹을 데가 있을 것 같다.

    TAG

    댓글 0

Designed by Tistory.