-
Tapjoy에서 실시간으로 중복 제거하기개발하면서/타인글보면서 2017. 8. 2. 21:29반응형
메시지 전송은 3가지 타입이 존재한다.
- at-most-once
- at-least-once
- at-exactly-once
at-most-once은 유실이 있어도 괜찮을 때 사용하고 대부분 at-least-once(유실은 없지만 중복 존재)를 사용한다.
하지만 메시지 처리된 결과가 고객의 돈과 관련된 경우(금융, 광고 집행비 등) at-least-once로 메시지 전송이 이루어진다면
회사 운영에 큰 어려움이 생길 것이다.개발자 유우머에도 있는 메시지 전송 타입
https://twitter.com/mathiasverraes/status/632260618599403520
모바일 광고 및 앱 수익 창출 플랫폼 Tapjoy에서는 1분에 2백만 개의 광고 관련 이벤트가 생성되고
이를 집계해서 광고주에게 노출이나 클릭률을 보여준다고 한다.
Kafka로 적재하고 Spark로 집계 처리, 집계된 데이터는 Postgres에 저장한다.Tapjoy에서 중복 제거를 위한 요구사항과 메모리 절약을 위한 과정을 기록한 글이 있어 정리했다.
원글: http://eng.tapjoy.com/blog-list/real-time-deduping-at-scale
중복 제거 기능의 요구사항은 다음과 같다.
- 확장성 - 서비스가 잘 돼서 메시지 양이 많아져도 그에 맞추어 중복제거 처리량도 유지
- 고가용성
- 지속성 - Spark가 처리하는 동안 중복 제거에 사용되는 데이터는 유지되어야 한다. Spark가 재시작될 때마다 reset 된다면
Spark가 재시작될 때마다 매번 중복이 발생할 것이다. - 속도와 가성비 - 1분에 2백만 메시지 유입되는데 중복 제거한다고 처리량이 떨어지면 안 되고
중복제거를 위해 많은 비용이 안 들었으면 한다. - 효과적인 저장소 사용 - 중복제거에 사용되는 데이터는 12~24시간 유지해서 계속 증가하지 않도록 한다.
Tapjoy가 선택한 중복제거 방법
- 메시지 별 transaction id를 만든다.
- "set if not already set" 명령을 실행해서 저장소에 저장한다.
- 정상적으로 저장됐다면 중복이 아니므로 처리하고, 실패하면 중복이므로 무시한다.
하지만 처리 단계인 Spark job이 여러 단계로 실행되는데 어떠한 이유로 특정 단계가 재시작될 수 있다.
그럴 때 위 설명대로 처리가 되면 메시지 누락이 발생할 수도 있다.
중복제거도 해야 되지만 재처리위해 다음과 같이 진행한다.- 메시지 별 transaction id를 만든다.
- "set if not already set" 명령을 실행하는데 key는 transaction id이고, value는 Kafka partition + offset으로 한다.
- 정상적으로 저장됐다면 처리하고,
key가 존재하고 value가 동일하다면 실패해서 다시 처리하는 걸로 간주하고 처리한다. 그 외는 실패이므로 무시한다.
Kafka partition + offset 값을 통해 여러 번 처리도 가능해졌다.
Redis로 우선 시작!!
2백만/min 메시지 유입과 중복 제거를 위한 데이터를 24시간 유지를 한다고 했을 때 Redis 메모리 사용량을 계산해보자.
메시지의 transaction id는 36 bytes UUID(32 문자와 4개 dash)이고
콤마로 구분한 kafka partition + offset 데이터는 13 bytes이다.
마지막으로 Redis key-value 저장할 때 드는 오버헤드 64 bytes를 포함한다면
(오버헤드 + UUID(key) + [partition + offset](value)) * 2백만/m + 60 * 24
(64 + 36 + 13) * 2,000,000 * 60 * 24 = 300GB300GB면 Spark job이 몇 개야... 개선할 방법을 알아보자.
오버헤드가 왜 64 bytes인지 궁금해서 찾아보았다. 다음을 실행하면 결과는 몇일까?
$> set "" ""
$> memory usage ""
놀랍게도 58이다. 디버깅으로 봤더니
value 껍데기(?) 18 bytes
key 껍데기(?) 16 bytes
dict 한 개 정보를 담당하는 dictEntry 24 bytes이다.
계산 편의를 위해 64로 한듯하다.dictEntry? http://redisgate.kr/redis/configuration/internal_key_hashtable.php
저장되는 값 크기를 줄이기
UUID를 문자열 그대로 저장하지 않고 binary로 변환해서 저장한다. 36 bytes -> 16 bytes
Kafka partition의 경우 내부 Kafka 클러스터에서 최대 파티션 개수가 32767이라고 가정해서 2 bytes로 제한했고
Kafka offset의 경우 Kafka 코드는 8 bytes로 되어있지만 우리는 24시간 동안 Offset 유지해도 충분해서 계산해보니
6 bytes -> 2 ^ 47 - 1 -> 281,474,976,710,656 가 나왔다. 2백만/m 메시지를 24시간 유지하기에 충분하다.(오버헤드 + UUID(key) + [partition + offset](value)) * 2백만/m + 60 * 24
(64 + 16 + 2 + 6) * 2,000,000 * 60 * 24 = 230GB이전 과정보다 70GB 줄었지만 아직 부족하다.
key-value로 저장하지 않고 Hash(ziplist)를 이용하여 저장
Hash type으로 저장해서 Redis key 개수를 줄였다는 instagram 엔지니어 블로그 글에서 영감을 받아
메시지를 Hash type, 그중에서도 ziplist를 사용하기로 했다.
(ziplist는 메모리 절약을 위한 Hash라고 생각하면 어느 정도 맞다.)내부 테스트와 instagram 블로그 글을 통해 bucket에 키 1000개가 넘으면 효율이 떨어지는 것을 알았다.
그리고 다양한 테스트를 해보니 하나의 bucket에 100개에서 1000개 사이의 키가 저장되는 게 이상적이었다.
만약 bucket 한 개에 100개 키를 저장한다고 하면
1분에 생성되는 bucket 개수 = 2백만 / 100 = 20,000((key overhead + UUID + [partition + offset]) * 2백만) * 60 * 24
+ (bucket overhead + bucket key) * 1분에 생성되는 bucket 개수 * 60 * 24 = 69GBbucket overhead는 이전 과정에서 진행한 key overhead와 동일하다.
key overhead에 있는 1 byte는 Hash data type 정보를 저장하는 용도이다.bucket key는 timestamp와 나중에 나올 샤딩키 정보가 담겨있는 14 bytes이다.
이전 과정 대비 70% 줄였다!!
※ 이하는 파이프라인과 Lua script, Sharding 얘기인데 속도 개선 얘기여서 패스!!
Redis에서 제공하는 자료구조 중 hash로 전환하면서 메모리 절약하는 방법을 알아보았다.
여기서 얘기하는 건 Hash가 좋아요!! key-value 쓰지 마요!! 가 아니다.
TTL이 key마다 다르거나 데이터 양이 적어 메모리 사용량이 부담되지 않는다면 key-value로 안 할 이유 없다.제목은 중복 제거였지만 결국 메시지 고유 id를 만들어서 Redis에 24시간 저장했다가 처리할 때 쓴다는 거였고
기능뿐만 아니라 가성비를 만족하기 위한 과정을 알아보았다.
돈도 시간도 모두 비용이다!!반응형