ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka purgatory
    개발하면서/타인글보면서 2020. 6. 20. 19:31
    반응형

    업무 중에 request를 보내면 X초 이후에 실행되는 scheduling repository를 살펴보다가 Kafka purgatory는
    어떻게 구현했을지 호기심이 생겨 알아보았다.

    Confluent 블로그와 hierarchical timing wheels 논문을 쉽게 소개한 블로그, 두 개를 보며 나름대로 정리했다.
    https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/

    1. Purgatory란 무엇이고 Kafka 어디에 사용되나?

    네이버 사전에 Purgatory를 검색하면 연옥이라고 나오고 의미는 아래와 같다.

    가톨릭 교리에서 연옥은 천국으로 가기에는 자격이 부족하지만
    지옥으로 갈 정도의 큰 죄를 짓지 않은 죽은 자들의 천국 가기전 영혼을 정화하기 위해 머무는 곳이다

    개발로 치면 서버 <-> 클라 구조에서 클라에서 서버로 전송한 요청이 성공도 아니고 실패도 아닌 중간 상황을 의미한다.

    Kafka를 예로 들면 producer에서 ack=all로 한 경우 전송한 메시지가 모든 broker에 복사될 때까지 producer의 send 요청이
    broker에 머물러 있는걸 purgatory라고 한다.  ※ 서버(borker) <-> 클라(producer)

    또 다른 예는 consumer에서 fetch.min.bytes=1인 경우 consumer가 fetch 할 파티션에 새로운 데이터가 올 때까지
    consumer의 fetch 요청은 broker에 머물러 있는데 이것도 purgatory라고 한다. ※ 서버(broker) <-> 클라(consumer)

    purgatory에서 벗어나는 방법은 (1) 요청 조건을 만족하거나, (2) 지정한 시간이 흘러 timeout이 발생(Expire)하는 것이다.

    ※ 앞으로 요청을 만족하거나 Timeout이 지정된 요청을 "Expire task"라고 부르겠습니다.

    2. 기존 Kafka purgatory 디자인

    timeout을 체크하는 Timer와 purgatory에 있는 요청이 조건을 만족하는지 관리하는 watcher가 존재한다.
    기존에는 Timer를 Java의 DelayQueue로 구현하였다.
    Expire task 요청이 오면 Timer와 Watcher에 등록이 되고 파티션에 메시지가 추가될 때마다 Watcher를 이용하여
    조건을 만족하는지 판단하고 Timer에서는 주기적으로 peak에서 Expire 할 task인지 판단한다.
    Timer에서 Expire 해서 지웠다고 바로 Watcher에서 지우지 않고 반대로 Watcher에서 조건을 만족해서 완료했다고
    Timer에서 바로 지우지 않는다.
    대신 Reaper thread가 일정 주기로 전체를 순회하면서 완료된 task를 지워준다.

    Reaper thread가 순회하는 주기를 짧게 가져가면 메모리 관리는 되겠지만 전체 데이터를 순회하고 판단하는 연산이 증가돼
    성능 저하에 영향을 끼치고 주기를 길게 하면 메모리 관리가 잘 되지 않아 OOM의 원인이 될 수 있다.
    (Expire task가 갑자기 엄청 추가됐는데 바로 조건이 만족된 경우, 
    또한 Expire task 추가/삭제는 빈번한데 Delay Queue(Heap)의 시간 복잡도는 O(log n)인 것도 단점 중 하나이다.

    Kafka 0.8.2 Purgatory

    개선된 Purgatory는 Timer에 task를 추가/삭제를 O(1) 하려는 목표,
    그리고 Watcher의 task와 Timer에 추가되는 task가 연결되어 있어 Watcher에서 조건 만족하여 삭제되면
    Timer에서도 바로 삭제할 수 있는 걸 목표로 했다.   개선된 Purgatory를 보기 전에 Timer에 대해 조금 더 알아보자

     

    3. Timer Facility

    Timer를 개선한 논문 내용이다. Timer를 구현하는 7가지 방법을 소개하고 각각의 시간 복잡도를 설명한다.
    친절하게도 논문 내용을 쉽게 써준 두 개 포스트가 있어 이해한 내용을 동일한 순서로 정리했다.

    blog.acolyer.org/2015/11/23/hashed-and-hierarchical-timing-wheels/

    paulcavallaro.com/blog/hashed-and-hierarchical-timing-wheels/


    Timer에는 4가지 연산이 있는데 API로 제공되는 Start, Stop연산과
    내부에서 동작하는 Per-tick bookkeeping, Expiry processing 연산 두 개가 있다.

    * Start - client에서 timeout 시간(interval)과 expire 됐을 때 실행할 callback 함수, Request ID를 인자로 전달한다.
       그럼 내부적으로 Time Unit(시, 분, 초 등) 단위의 tick을 가진 timer가 실행된다.

    * Stop - client에서 Request ID를 전달하면 해당 timer를 찾고 삭제한다.
    * Per-tick bookkeeping - Time Unit T가 있을 때(ex: 1초, 5초 등). 매 T마다 bookkeeping이 발생하는데
       이때 미해결 된(완료되지도 않았고 timeout이 되지도 않은) timer를 순회하면서 expired 된 게 있는지 검사한다.
       expire 된 timer는 제거하고 expiry processing을 실행한다.

    * Expiry processing - Per-tick bookkeeping에서 expire 된 timer가 발견되면 실행되는 것으로
    Start에서 client가 전달한 callback 실행을 한다. (또는 callback이 아니라 미리 정의된 로직)

    timer 구현하는 7가지 방법

    scheme 1. Unordered Timer List

    timer 등록할 때 남은 시간 정보와 함께 저장한다.
    Per-tick이 timer 전체를 돌며 interval 1을 감소시키고 0이 된 timer는 Expiry processing을 실행시킨다.
    ~ Start O(1), Stop O(1), Per-tick O(n).  n은 미해결 된 timer 개수

    scheme1

    scheme 2. Ordered Timer List

    timer에 등록할 때 expire 되는 절대 시간을 함께 저장한다. 
    ※ 현재 시간이 2020/06/20 10:00:00이고 expire는 2020/06/20 10:30:00라고 했을 때 scheme1은 1800(30*60)을
    저장하지만 scheme 2는 2020/06/20 10:30:00을 저장한다.

    그리고 저장할 때 Ordered List에 저장해서 Per-tick bookkeeping이 timer 전체를 순회하지 않고
    head부터 expire가 아닌 timer 만날 때까지만 순회하도록 구조를 변경했다.
    ~ Start O(n), Stop O(1), Per-tick O(1).   저장 시 전체를 순회하면 expire time에 맞는 위치에 저장

    scheme 3. Tree-based Timer

    저장하는 자료구조를 balanced binary tree로 변경
    ~ Start O(log(n)), Stop O(1), Per-tick O(1). heap 생각하면 될 듯

    Timing Wheels

    기본 아이디어는 다음과 같다.

    크기가 N이고 time unit이 1초인 배열이 있고 현재시간이 t라고 했을 때
    expire time이 (t + i) * timeunit 인 i를 찾아 timer를 추가해주는 방식이다.
    배열을 이용해 최대 N * time unit까지의 이벤트를 등록할 수(scheme 4) 있고
    N * time unit 이후의 expire time도 등록할 수(schema 5, 6, 7) 있다.

    scheme 4. Simple Timing Wheels

    등록할 수 있는 timer의 interval 최대 크기는 배열의 크기 * time unit로 제한된다.
    크기가 N인 circular buffer를 이용하여 적절한 배열 인덱스를 찾아 timer를 등록한다.
    MaxInterval의 제한이 있다는 점과 등록할수 있는 interval이 커지는것과 비례해서 메모리 사용량도 증가하는 단점이 있다.
    ~ Start, Stop, Per-tick 모두 O(1)

    5

    scheme 5. Hashing Wheel with Ordered Timer List

    2의 제곱수의 크기로 circular buffer를 생성하고 expire time 32bit를 이용한다.
    slot 하나는 scheme 2와 동일한 방식으로 정렬된 timer가 저장된다.

    예를 들어 2^8, 256 크기의 circular buffer가 있다고 하면 slot 인덱스는 expire time % 256로 정해지고,
    timer 등록할 때는 expire time / 256의 값을 Ordered List에 적절한 위치를 찾아서 저장한다.
    ~ Start 최악 O(n)/평균 O(1), Stop O(1), Per-tick O(1)
    Start 평균이 나오려면 n < buffer크기이고 timer가 "잘" 분산되어야 한다. 즉 circular buffer의 크기를 잘 골라야 한다!!

    scheme 6. Hashing Wheel with Unordered Timer List

    scheme 5와 비슷하지만 timer 저장을 Unordered List에 한다.
    ~ Start O(1), Stop O(1), Per-tick 최악 O(n)/평균 O(1)

    scheme 7. Hierarchical Timing Wheel

    time unit이 서로 다른 timing wheel을 여러 개 두어 timer 저장 공간을 아껴보자는 게 주 내용이다.
    expire time이 100일 뒤인 timer의 경우 schema 4는 8,640,000(100 * 24 * 60 * 60)개의 timer array가 필요하지만
    schema 7은 244(100 + 24 + 60 + 60)개만 필요하다.

    초단위, 분단위, 시간 단위의 timing wheel 3개가 있다고 가정해보자.
    초단위는 60개의 slot이 있는데 한 바퀴 돌면 분단위의 timing wheel의 tick 한번 실행,
    분단위 timing wheel 한 바퀴가 돌면 시간 단위 timing wheel의 tick이 한번 실행한다.
    각각의 timing wheel은 schema 4처럼 등록 가능한 최대 interval이 있다.
    실제 Expiry Processing은 가장 time unit이 작은 timing wheel에서만 실행하고(예제 기준 초단위 timing wheel)
    다른 timing wheel은 아랫단계의 timing wheel로 보내기만 한다.(아랫 단계로 보내는 연산을 migrate라고 부름)

    https://www.slideshare.net/supperniu/timing-wheels page 9, 10 꼭 보세요~~
    그림 그리는 게 귀찮아서 그러는 거 아님

    모든 timing wheel에 timer를 저장하지 않고 time interval에 적합한 최상위 timing wheel 하나에 저장한다.
    예로 1시 30분 40초 뒤에 expire 될 timer가 등록되면 time interval이 한 시간 단위인 timing wheel의 1시간 slot에
    30분 40초의 timer 저장.
    한 시간 단위 timing wheel의 tick이 실행되어 Per-tick이 위에서 등록한 1시간 slot을 체크하면 30분 40초 timer는
    1 분단위의 timing wheel의 알맞은 slot에 40초 timer 저장.

    ~ Start O(m) m은 timing wheel 개수, Stop O(1), Per-tick O(1)

    scheme 6, 7을 선택할 때 고려할 점

    * n, timer 개수
    * M, slot 총 개수
    * m, level 개수 (scheme 6은 1개, scheme 7은 1개 이상)
    * T, 평균 timer interval (위에서 본 Start 할 때 인자로 전달되는 interval의 평균)
    * cost(index), scheme 6에서 해싱과 index 계산하는 비용
    * cost(migrate), scheme 7에서 task가 다음 wheel로 timer를 전달하는 비용

    scheme 6의 비용 계산은 n * cost(index) / M
    scheme 7의 비용 계산은 n * m * cost(migrate) / T

    cost(index), cost(migrate)의 비용은 크게 다르지 않다.
    때문에 T가 작고 M이 크다면 scheme 6을 T가 크고 M이 작다면 scheme 7을 선택하는 것을 권장한다.


    average와 worst case 그리고 amortized analysis

    hierarchical timing wheel을 단계별 그림으로 잘 설명한 포스트

    쓰다 보니 길어져서 2부에 계속...

    반응형

    댓글

Designed by Tistory.