ABOUT ME

-

인기 태그


kafka elastic_search redis
Today
-
Yesterday
-
Total
-
  • Kafka purgatory
    개발하면서/타인글보면서 2020. 6. 20. 19:31

    request를 보내면 X초 이후에 실행되는 scheduling 프로젝트를 살펴보다가 Kafka purgatory는 어떻게 구현했을까(응?)...
    하는 생각이 들어 알아보게 되었다.

    confluent 블로그 글과 hierarchical timing wheels 논문을 블로그에 쉽게 소개한 두 개의 글을 보며 내 나름대로 정리하려고 한다.
    https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/

    1. Purgatory란 무엇이고 Kafka 어디에 사용되나?

    네이버 사전에 검색하면 연옥이라고 나오고 의미는 아래와 같다.

    가톨릭 교리에서 연옥은 천국으로 가기에는 자격이 부족하지만
    지옥으로 갈 정도의 큰 죄를 짓지 않은 죽은 자들의 영혼이 머무르는 곳이다

    개발로 치면 서버 <-> 클라 구조에서 클라에서 서버로 전송한 요청이 성공도 아니고 실패도 아닌 중간 상황을 의미한다.
    Kafka를 예로 들면 producer 설정에서 ack=all로 한 경우 전송한 메시지가 모든 broker에 전달될 때까지 producer의 send 요청이
    broker에 머물러 있는것을 purgatory라고 한다.  ※ 서버(borker) <-> 클라(producer)

    또 하나는 consumer에서 min.bytes=1인 경우 consumer가 담당하는 파티션에 새로운 데이터가 올때까지 consumer의
    fetch 요청은 broker에 머물러 있는것도 purgatory다. ※ 서버(broker) <-> 클라(consumer)

    purgatory에서 벗어나는 방법은(?) (1) 요청한 기준이 달성되거나, (2) 시간이 흘러 timeout이 발생하는 것이다.

    2. 기존 Kafka purgatory 디자인

    요청이 오면 timeout을 체크하는 timer(a)와 purgatory에 있는 요청들을 관리하는 watcher list(hash map)이 존재한다.
    기존에는 (a)를 Java의 DelayQueue로 구현하였다.
    요청이 완료 됐을때 timer나 watcher list에서 바로 지우는 대신 일정 주기로 timer와 watcher list의 조건을 비교해서
    지우는데 만약 이런 삭제 연산이 정상적으로 이루어지지 않으면 OOM의 원인이 된다.

    이를 방지하기 위해 reaper thread라 불리는 별도의 thread를 만들어 주기적으로 timer와 watcher list 전체를 순회하면서
    완료된 요청들을 삭제하도록 했다. reaper thread가 순회하는 주기를 짧게 가져가면 메모리 관리는 되겠지만 전체 데이터를
    순회하고 판단하는 연산이 증가돼 성능 저하에 영향을 끼친다.

    3. Timer Facility

    Timer를 개선한 논문 내용이다. Timer를 구현하는 7가지 방법을 소개하고 각각의 시간 복잡도를 설명한다.
    친절하게도 논문 내용을 쉽게 써준 두 개 포스트가 있어 이해한 내용을 동일한 순서로 정리했다.

    blog.acolyer.org/2015/11/23/hashed-and-hierarchical-timing-wheels/

    paulcavallaro.com/blog/hashed-and-hierarchical-timing-wheels/


    Timer에는 4가지 연산이 있는데 외부에서 사용 가능한 Start, Stop연산과
    내부에서 동작하는 Per-tick bookkeeping, Expiry processing연산 두 개가 있다.

    * Start - client에서 timeout 시간(interval)과 expire 됐을 때 실행할 callback 함수, Request ID를 인자로 전달한다. 그럼 내부적으로 Time Unit(시, 분, 초 등) 단위의 tick을 가진 timer가 실행된다.
    * Stop - client에서 Request ID를 전달하면 해당 timer를 찾고 삭제한다.
    * Per-tick bookkeeping - Time Unit T가 있을 때(ex: 1초, 5초 등). 매 T마다 bookkeeping이 발생하는데
    이때 미해결 된(완료되지도 않았고 timeout이 되지도 않은) timer를 순회하면서 expired 된 게 있는지 검사한다.
    expire 된 timer는 제거하고 expiry processing을 실행한다.

    * Expiry processing - Per-tick bookkeeping에서 expire 된 timer가 발견되면 실행되는 것으로 Start에서 client가 전달한
    callback 실행을 담당한다. (혹은 callback이 아닌 사용자가 정의한 어떠한 행동)

    timer 구현하는 7가지 방법

    scheme 1. Unordered Timer List

    timer 등록할 때 남은 시간 정보와 함께 저장한다.
    Per-tick bookkeeping이 timer 전체를 돌며 interval 1을 감소시키고 0이 된 timer는 Expiry processing을 실행시킨다.
    ~ Start O(1), Stop O(1), Per-tick bookkeeping O(n) n은 미해결 된 timer 개수

    j

    scheme 2. Ordered Timer List

    timer의 expire 되는 남은 기간이 아닌 expire 되는 절대 시간을 함께 저장한다. 
    ※ 현재 시간이 2020/06/20 10:00:00이고 expire는 2020/06/20 10:30:00라고 했을 때 scheme는 1800(30*60)을
    저장하지만 scheme 2는 2020/06/20 10:30:00을 저장한다.

    그리고 저장할 때 Ordered List에 저장해서 Per-tick bookkeeping이 timer 전체를 순회하지 않고
    head부터 expire가 아닌 timer 만날때까지만 순회하도록 구조를 변경했다.
    ~ Start O(n), Stop O(1), Per-tick bookeeping O(1)   저장 시 insertion sort 이용

    scheme 3. Tree-based Timer

    scheme 2에서 저장할 때 자료구조를 balanced binary tree로 변경
    ~ Start O(log(n)), Stop O(1), Per-tick bookeeping O(1)  heap 생각하면 될듯

    Timing Wheels

    기본 아이디어는 다음과 같다.

    크기가 N 인 배열이 있고 현재시간이 t라고 했을 때 expire time이 t + i * timeunit 인 i를 찾아 timer를 추가해주는 방식이다.
    배열을 이용해 최대 N * time unit 까지의 이벤트를 등록할 수(scheme 4) 있고
    t + (N-1) * timeunit 이후의 expire time도 등록할 수(schema 5, 6, 7) 있다.

    scheme 4. Simple Timing Wheels

    등록할수 있는 timer의 interval 최대 크기는 배열의 크기 * timeunit로 제한된다.
    크기가 N인 circular buffer를 이용하여 적절한 배열 인덱스를 찾아 timer를 등록한다.
    MaxInterval의 제한이 있다는 점과 등록할수 있는 interval이 커지면 비례해서 메모리 사용량도 증가한다는 단점이 있다.
    ~ Start, Stop, Per-tick bookkeeping 모두 O(1)

    5

    scheme 5. Hashing Wheel with Ordered Timer List

    2의 제곱수의 크기로 circular buffer를 생성하고 expire time 32bit를 이용한다.
    slot 하나는 scheme 2와 동일한 방식으로 정렬된 timer가 저장된다.

    예를 들어 2^8, 256 크기의 circular buffer가 있다고 하면 slot 인덱스는 expire time % 256로 정해지고,
    timer 등록할 때는 expire time / 256의 값을 적절한 위치에 저장한다.
    ~ Start 최악 O(n)/평균 O(1), Stop O(1), Per-tick bookkeeping O(1)
    Start 평균이 나오려면 n < buffer크기이고 timer가 "잘" 분산되어야 한다. 즉 circular buffer의 크기를 잘 골라야 한다는 것!!

    scheme 6. Hashing Wheel with Unordered Timer List

    scheme 5와 비슷하지만 timer 저장을 Unordered List에 한다.
    ~ Start O(1), Stop O(1), Per-tick bookkeeping 최악 O(n)/평균 O(1)

    scheme 7. Hierarchical Timing Wheel

    time unit이 다른 timing wheel을 여러 개 두어 timer 저장 공간을 아껴보자는 게 주요 내용이다.
    expire time이 100일 뒤인 timer의 경우 schema 4는 100 * 24 * 60 * 60개의 timer array가 필요하지만
    schema 7은 100 + 24 + 60 + 60개만 필요하다.

    초단위, 분단위, 시간 단위의 timing wheel 3개가 있다고 가정해보자.
    초단위는 60개의 slot이 있는데 한 바퀴 돌면 분단위의 timing wheel의 tick 한번 실행,  분단위 timing wheel 한 바퀴가 돌면
    시간 단위 timing wheel의 tick이 한번 실행한다. 각각의 timing wheel은 schema 4처럼 등록 가능한 최대 interval이 있다.
    실제 Expiry Processing은 가장 time unit이 작은 timing wheel에서만 실행하고(위예에서는 초단위)
    다른 timing wheel은 아랫단계의 timing wheel로 보내기만 한다.

    https://www.slideshare.net/supperniu/timing-wheels page 9, 10 꼭 보세요~~
    그림 그리는 게 귀찮아서 그러는 거 아님

    모든 timing wheel에 timer를 저장하지 않고 time interval에 적합한 최상위 timing wheel 하나에 저장한다.
    예로 1시 30분 40초 뒤에 expire 될 timer가 등록되면 시간 단위의 timing wheel에 알맞은 (a) slot에 30분 40초 정보와 함께 저장.
    시간단위의 timing wheel의 tick이 실행되어 Per-tick bookkeping이 (a)를 검사하게 되면 해당 timer는
    분단위의 timing wheel에 알맞은 slot을 계산하고 40초 정보와 함께 저장. timer 이동하는걸 migrate라고 한다.

    ~ Start O(m) m은 timing wheel 개수, Stop O(1), Per-tick bookkeeping O(1)

    scheme 6, 7을 선택할 때 고려할 점

    * n, timer 개수
    * M, slot 총 개수
    * m, level 개수 (scheme 6은 1개, scheme 7은 1개 이상)
    * T, 평균 timer interval (위에서 본 Start 할 때 인자로 전달되는 interval의 평균)
    * cost(index), scheme 6에서 해싱과 index 계산하는 비용
    * cost(migrate), scheme 7에서 task가 다음 wheel로 timer를 전달하는 비용

    scheme 6의 비용 계산은 n * cost(index) / M
    scheme 7의 비용 계산은 n * m * cost(migrate) / T

    cost(index), cost(migrate)의 비용은 크게 다르지 않다. 때문에 T가 작고 M이 크다면 scheme 6을
    T가 크고 M이 작다면 scheme 7을 선택하는 것을 권장한다.


    average와 worst case 그리고 amortized analysis

    hierarchical timing wheel을 단계별 그림으로 잘 설명한 포스트

    쓰다 보니 길어져서 2부에 계속...

    TAG

    댓글 0

Designed by Tistory.