-
Valkey 8에 반영된 개선사항 - multi thread개발하면서/코드보면서 2025. 5. 18. 17:29반응형
지난 글에서 new hash table을 보면서 많은 수정이 있다고 느꼈는데 이번에 볼 multi thread도 많은 수정이 있다.
Valkey 8에 반영된 개선사항 - new hash table
작년 3월, Redis는 기존 BSD 라이선스에서 Redis Source Available License (RSALv2)과Server Side Public License (SSPLv1) 듀얼 라이센스를 적용했다.https://redis.io/blog/redis-adopts-dual-source-available-licensing/ 개인 사용자나 Re
dol9.tistory.com
Redis는 single thread다 -> single thread니까 분산락하면 Redis지!! 라는 생각의 흐름이였는데 multi thread??
성능 개선의 비교군인 Valkey 7.2에서는 어떻게 처리했는지 알아보고 Valkey 8에선 multi thread를 어떻게 적용했는지 알아본다.
7.2 이해하는데 시간이 꽤 오래걸렸지만 나름 파악이 된 상태에서 8을 보니 금방 이해가 됐다.
※ Valkey 7.2에 설명 중 틀리거나 보완이 필요한거 알려주시면 감사하겠습니다.
Valkey 7.2 single thread
rdb, aof 같은 별도의 스레드는 차치하더라도
알아보니 클라이언트의 모든 요청을 정말 single thread로 처리하는 게 아니라
node.js 처럼 IO Multiplexing 기술을 이용해서 여러 IO 연산을 single thread로 동시에 처리하는 거였다.
※ 지금까지 잘못 알고 있었네... (코 쓱..)
Valkey 7.2 코드를 보기 전에 Multiplexing 개념을 알면 좋을 것 같다.. 찍먹 해보자
IO Multiplexing
아래 두 글에서 Multiplexing 개념과 실제 코드까지 설명이 너무 잘 됐는데 내가 부족해서 온전히 이해를 못 하겠다. ㅜ,ㅜ
https://blog.naver.com/n_cloudplatform/222189669084
https://blog.naver.com/n_cloudplatform/222255261317
[네이버클라우드 기술&경험] IO Multiplexing (IO 멀티플렉싱) 기본 개념부터 심화까지 -2부-
I/O Multiplexing (IO 멀티플렉싱) *톺아보기 포스팅! *톺아보다 : 샅샅이 더듬어 뒤지면서 찾아보다....
blog.naver.com
대강 느낌적으로 정리해 보면
Everything is a File
OS 입장에서는 일반 파일뿐 아니라 socket, 터미널, 파이프, 디바이스 모두 File Descriptor(이하 fd) 중 하나로 취급한다.
fd는 표준입력(0), 표준 출력(1), 표준 에러(2) 총 3가지 종류가 있다.
Multiplexing은 여러 fd를 하나의 프로세스가 관리하는 기술이고
이 fd를 어떤 상태로 대기하냐에 따라서
리눅스는 select, poll, epoll. 맥은 kqueue, 윈도우는 iocp 등 다양한 기법이 있다.Multiplexing은 Asynchronous Blocking I/O라고 한다.
Application에서 Kernel로 System call 하고 제어권은 Application에서 유지한 채 다른 작업을 한다.
이후 Kernel에서 작업이 완료되어 데이터가 준비되면 Kernel에서 Application으로 콜백 신호를 보내고
Application은 Kernel에 있던 데이터를 user 영역의 buffer로 복사하는 흐름이다.
https://developer.ibm.com/articles/l-async/ 그래서 Valkey 7.2에서는 어떻게 동작한다고?
Valkey 7.2에서 클라이언트 요청을 어떻게 처리했는지 알아보자.
1. 서버가 시작되면 Server와 Listener가 초기화된다.
Server 초기화할 때는 File Event와 Timer Event를 만든다.
Listner 초기화할 때는 server listner에 accept 했을 때 실행할 AcceptHandler를 등록한다.int main(int argc, char **argv) { ... initServer(); ... initListeners(); ... aeMain(server.el); } void initServer(void) { ... // event loop fd 생성 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); ... // 앞에서 만들어진 event loop fd에 timer event 추가 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } ... } void initListeners(void) { /* Setup listeners from server config for TCP/TLS/Unix */ int conn_index; connListener *listener; if (server.port != 0) { conn_index = connectionIndexByType(CONN_TYPE_SOCKET); if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET); listener = &server.listeners[conn_index]; ... listener->port = server.port; } ... /* create all the configured listener, and add handler to start to accept */ int listen_fds = 0; for (int j = 0; j < CONN_TYPE_MAX; j++) { ... if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK) serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL)); listen_fds += listener->count; } }
우선 initServer 함수에서 생성되는 이벤트만 알아보자. (File Event, Fired Event, Time Event)
읽기와 쓰기 그리고 BARRIER 중 하나로 마스킹 된 server.maxclients+CONFIG_FDSET_INCR 개의 File Event 풀을 만든다.
epoll 기준, epoll_wait로 처리할 이벤트를 받아 실행할 File Event 정보를 마스킹과 fd를 보관하는 Fired Event 풀을 만든다.
마지막으로 Time Event는 양방향 리스트로 만들어져 있고 id와 시간을 갖고 있고 trigger 될 때 실행할 timeProc가 있다.
2. initListners에서 AcceptHandler는 어떻게 처리될까?
아래 글에서 소켓 리스너부터 EventLoop가 어떻게 처리되는지 잘 설명되어 있다.
Redis가 싱글 스레드 모델임에도 높은 성능을 보장하는 이유
Redis가 싱글 스레드 모델임에도 높은 성능을 보장하는 이유 (I/O Multiplexing)
틀린 내용이 있을 수 있어 피드백 해주시면 감사하겠습니다 😀Redis는 Remote Dictionary Server이며, Dictionary, Set 등과 같은 메모리 내 자료구조를 제공하는 TCP 서버이다. Redis는 캐싱, 세션 저장소, 실
velog.io
1번에서 봤듯이 initListners 함수 안에서 클라이언트가 접속하면 실행할 acceptHandler 함수를 붙인다.
if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
createSocketAcceptHandler 함수는 listener fd를 read 형태의 File Event로 Multiplexing에 등록한다.
int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) { int j; for (j = 0; j < sfd->count; j++) { if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) { /* Rollback */ ... } } return C_OK; }
클라이언트가 접속하면 아래 흐름으로 accept_handler가 실행된다.
1. 서버 main 함수에서 while loop를 돌면서 aeProcessEvents를 실행하는 aeMain을 실행한다.
2. aeProcessEvents함수는 aeApiPoll를 실행해 발생한 이벤트 목록을 받는다.
2-1. aeApiPoll이 하는 일은 epoll, kqueue 등 기술에 맞게(?) waiting 후 이벤트 목록을 받으면
Fired Event에 마스킹 정보와 fd 정보를 저장한다.
3. 마스킹이 read인지 write인지에 따라 분기가 있어 알맞은 함수를 실행한다.
accept_handler을 타고 가다 보면 networking.c -> acceptCommonHandler에서
createClient 함수를 호출해 클라이언트 구조체를 초기화한다.
이때 클라이언트의 요청받는 handler를 설정해 준다.
※ 응답 쓰는 건 writeToClient를 직접 호출한다.
왼쪽 위부터 시계방향으로 진행
1. connection을 맺으면 reader handler를 설정해 주는데 readQueryFromClient 함수다.
2. connection type에 reader_handler 함수를 실행한다.
3. connection fd를 aeCreateFileEvent에 등록하는데 이벤트가 발생하면 ae_handler를 실행하도록 설정했다.
4. ae_handler는 connSocketEventHandler 함수인데 결국 마스킹값에 따라 분기처리된다.
5. readQueryFromClient를 실행하는데 이 함수는 클라이언트로부터 명령어를 읽어 실행하는 역할을 한다.3. 클라이언트 요청은 이것이 끝이 아니었다.
데이터 구조와 ttl 정도만 살펴봤지 이 정도로 클라이언트 처리 과정을 본 건 처음이었다.
2번 정도 했으면 다 봤겠지 했는데 웬걸... 별도의 pthread가 돌고 있었다. :wow:
앞에서 살펴본 aeProcessEvents에서 aeApiPoll을 실행하기 전에 beforeSleep 함수를 실행하는데
이때 지연된 read, write 요청을 IO 스레드를 이용해서 처리하는 로직이 있다. networking.c -> IOThreadMainfan-out -> fan-in을 이용해 스레드 세이프하게 처리한다.
Main Thread가 0보다 큰 값으로 setIOPendingCount를 호출하면서 IO Thread fan-out
Main Thread가 getIOPendingCount를 주기적으로 호출하면서 0이 될 때까지 기다리다가 0이 되면 Main Thread fan-in
3. File Event & Filred Event
위에서 클라이언트 흐름을 설명하면서 File Event 처리도 함께 설명했지만 정리하면 다음과 같다.
굉장히 단순화했지만 뭐.. 느낌 있으니까..ㅋㅋㅋ
4. Time Event
initServer 함수에서 Time Event 초기화 부분을 알아보자.
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } // eventLoop에 Time Event head세팅! 실행하는 함수는 serverCron long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; te->when = getMonotonicUs() + milliseconds * 1000; te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; }
aeProcessEvents 마지막 즈음에 Time Event 실행하는 함수가 있다.
Time Event에 설정한 시간보다 현재시간이 과거면 timeProc(serverCron)를 실행한다.
AE_NOMORE은 -1인데 serverCron은 `1000/server.hz;` 를 리턴하기 때문에 -1이 나올 수 없다.
즉 serverCron 실행 후 삭제하지 않고 when 값을 now + retval*1000로 수정한다.
이런 느낌
7.2에서 File Event와 Time Event가 어떻게 실행되는지 알아봤고
File Event를 이용해서 Valkey가 listner와 client event가 어떻게 실행되는지 알아봤다.pending 요청을 별도의 thread가 처리하긴 하지만 대부분의 일을 Main Thread가 한다.
Redis EventLoop에 디버깅으로 자세하게 소개한 글
Valkey 8에 적용된 multi thread
Valkey 블로그에 소개된 multi thread 그림이다.
7.2에서 pending read/write 요청을 fan-out, fan-in 방식으로 IO Thread로 처리하는 건 알았는데
아래 그림은 뭔가 peding 구분 없이 IO Thread에서 많은것들을 처리하는 걸로 보인다.
먼저 블로그에서 설명한 글을 정리하고 코드를 살펴보자.
https://valkey.io/blog/unlock-one-million-rps/ Valkey 블로그 글 정리
IO Thread가 하는 작업에는
1. 클라이언트의 명령을 읽어 구문 분석하거나
2. 클라이언트에게 응답을 다시 작성하거나
3. TCP 연결에서 I/O 이벤트를 폴링 하거나
4. 메모리 할당을 해제하는 작업이 포함된다.
IO Thread는 I/O 처리하느라 바쁘지만 Main Thread는 실제 명령을 실행하는 데 더 많은 시간을 할애한다.
코드 살펴보기
1. IO Job을 ring buffer 형태로 만들어 data와 함께 job_handler 실행하도록 만들었다.
io_jobs 256개를 만들고
io_jobs 하나당 2048개의 ringbuffer를 만들었다. 와우!!
총 256*2048= 524,288 개
#define IO_THREADS_MAX_NUM 256 static pthread_t io_threads[IO_THREADS_MAX_NUM] = {0}; static pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; /* IO jobs queue functions - Used to send jobs from the main-thread to the IO thread. */ typedef void (*job_handler)(void *); typedef struct iojob { job_handler handler; void *data; } iojob; typedef struct IOJobQueue { iojob *ring_buffer; size_t size; _Atomic size_t head __attribute__((aligned(CACHE_LINE_SIZE))); /* Next write index for producer (main-thread) */ _Atomic size_t tail __attribute__((aligned(CACHE_LINE_SIZE))); /* Next read index for consumer (IO-thread) */ } IOJobQueue; IOJobQueue io_jobs[IO_THREADS_MAX_NUM] = {0};
IOJobQueue에 Job을 추가하는 `IOJobQueue_push` 함수를 어디서 호출하는지 알면 파악이 될 것 같다.
2. `IOJobQueue_push` 함수를 실행하는 곳을 찾아서..
io_threads.c에서 IOJobQueue_push 호출하는 곳을 찾아봤다.
1. client의 요청 읽을 때
2. client에 응답 쓸 때
3. client 해제할 때
4. 이미 존재하는 key의 value를 덮어쓰는 경우 기존 value 해제할 때
5. beforeSleep 할 때 이벤트 목록을 가져오는 aePollApi 호출
6. TLS 한정 Accept도 IO thread로 처리
1. client의 요청 읽을 때
void readQueryFromClient(connection *conn) { ... if (postponeClientRead(c)) return; } // Valkey 7.2 int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) && io_threads_op == IO_THREADS_OP_IDLE) { listAddNodeHead(server.clients_pending_read,c); c->pending_read_list_node = listFirst(server.clients_pending_read); return 1; } else { return 0; } } // Valkey 8.1 int postponeClientRead(client *c) { if (ProcessingEventsWhileBlocked) return 0; // IOJobQueue에 push return (trySendReadToIOThreads(c) == C_OK); }
2. client에 응답 쓸 때
// Valkey 7.2 void sendReplyToClient(connection *conn) { client *c = connGetPrivateData(conn); writeToClient(c,1); } // Valkey 8.1 void sendReplyToClient(connection *conn) { client *c = connGetPrivateData(conn); if (trySendWriteToIOThreads(c) == C_OK) return; writeToClient(c); }
3. client 해제할 때
// Valkey 7.2 void freeClientArgv(client *c) { int j; for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); c->argc = 0; c->cmd = NULL; c->argv_len_sum = 0; c->argv_len = 0; zfree(c->argv); c->argv = NULL; } // Valkey 8.1 void freeClientArgv(client *c) { /* If original_argv exists, 'c->argv' was allocated by the main thread, * so it's more efficient to free it directly here rather than offloading to IO threads */ if (c->original_argv || tryOffloadFreeArgvToIOThreads(c, c->argc, c->argv) == C_ERR) { for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); zfree(c->argv); } c->argc = 0; c->cmd = NULL; c->io_parsed_cmd = NULL; c->argv_len_sum = 0; c->argv_len = 0; c->argv = NULL; }
4. 이미 존재하는 key의 value를 덮어쓰는 경우 기존 value 해제할 때
// Valkey 7.2 static void dbSetValue(...) { if (server.lazyfree_lazy_server_del) { freeObjAsync(key,old,db->id); } else { /* This is just decrRefCount(old); */ db->dict->type->valDestructor(db->dict, old); } } // Valkey 8.1 static void dbSetValue(...) { /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ if (tryOffloadFreeObjToIOThreads(old) == C_OK) { /* OK */ } else if (server.lazyfree_lazy_server_del) { freeObjAsync(key, old, db->id); } else { decrRefCount(old); } *valref = new; }
5. beforeSleep 할 때 이벤트 목록을 가져오는 aePollApi 호출
// Valkey 8에 추가 void beforeSleep(struct aeEventLoop *eventLoop) { /* When I/O threads are enabled and there are pending I/O jobs, the poll is offloaded to one of the I/O threads. */ trySendPollJobToIOThreads(); ... } void trySendPollJobToIOThreads(void) { ... IOJobQueue_push(jq, IOThreadPoll, server.el); } void IOThreadPoll(void *data) { aeEventLoop *el = (aeEventLoop *)data; struct timeval tvp = {0, 0}; int num_events = aePoll(el, &tvp); server.io_ae_fired_events = num_events; atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release); } int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) { AE_LOCK(eventLoop); int ret = aeApiPoll(eventLoop, tvp); AE_UNLOCK(eventLoop); return ret; }
6. TLS 한정 Accept도 IO thread로 처리
// Valkey 8.1 static int connTLSAccept(connection *_conn, ConnectionCallbackFunc accept_handler) { ... /* Try to offload accept to IO threads */ if (trySendAcceptToIOThreads(_conn) == C_OK) return C_OK; } int trySendAcceptToIOThreads(connection *conn) { ... IOJobQueue_push(job_queue, ioThreadAccept, c); } static void ioThreadAccept(void *data) { client *c = (client *)data; connAccept(c->conn, NULL); c->io_read_state = CLIENT_COMPLETED_IO; }
multi thread라고 했지만 실제 명령어 실행은 Main Thread에서 진행하고
IO 작업이나 epoll_wait 같은 대기작업들을 IO Thread가 처리하면서 Transaction 기능은 온전히 유지되는 것 같다.
자료 찾다 보니 Redis 8도 보게 되었는데 뭔가 비슷한 느낌이 물씬 난다.
어라? Redis 8도 비슷하게 적용했나 보네..
https://riferrei.com/the-engineering-wisdom-behind-rediss-single-threaded-design/
The Engineering Wisdom Behind Redis’s Single-Threaded Design – Ricardo Ferreira
In the relentless pursuit of performance, our industry often gravitates toward seemingly obvious solutions: more cores, more threads, more concurrency. Yet Redis—one of the most performant databases in the world—has maintained its commitment to a prima
riferrei.com
앞으로 어떻게 될진 모르겠지만 선의의(?) 경쟁은 좋다.
반응형