Kqueue로 구현해보는 I/O Multiplexing

이벤트 루프를 통해 단일 스레드로 여러 클라이언트 요청 처리하기

2024-11-04


이번 글은 Kqueue로 구현해보는 I/O 멀티플렉싱(I/O Multiplexing)에 대한 글입니다.
일반적으로 소켓 프로그래밍을 통해 서버를 구현한다면 하나의 스레드(Thread)에 하나의 클라이언트의 요청을 처리하는 방식인 Thread Per Request 모델을 사용합니다.
해당 모델은 무거운 자원인 스레드를 많이 다룬다는 점, 블로킹(Blocking) I/O가 발생하면 스레드가 낭비된다는 점에서 굉장히 비효율적인 방법입니다.
그래서 Node.js나 Spring WebFlux 등의 비동기 및 논블로킹(Non-blocking) 기술들이 떠오르기 시작했습니다.
이러한 기술들은 모두 I/O 멀티플렉싱을 기반으로 하고 있습니다.

I/O Multiplexing

I/O 멀티플렉싱이란 하나의 채널로 여러 파일 디스크립터(File Descriptor)를 관리하는 기법입니다.
네트워크 통신에 사용되는 소켓(Socket)도 결국 파일 디스크립터이므로 하나의 스레드에서 여러 소켓을 관리하는 것도 I/O 멀티플렉싱입니다.

Event Driven Architecture

I/O 멀티플렉싱을 구현하기 이전에, 기존의 Thread Per Request 모델을 사용했던 이유를 이해할 필요가 있는데요.
하나의 스레드에 하나의 클라이언트 요청을 처리해야 했던 이유는 서버가 계속해서 클라이언트의 요청을 기다려야 하기 때문입니다.
즉, 하나의 스레드는 하나의 클라이언트의 요청을 기다려야 하므로 다른 작업을 수행할 수 없습니다.

그렇다면 서버가 더 이상 클라이언트의 요청을 기다리지 않는다면 이러한 문제를 해결할 수 있게 될텐데요.
이러한 부분에서 이벤트 지향 아키텍처(Event Driven Architecture)를 적용할 수 있습니다.
클라이언트의 요청을 하나의 이벤트로 보고 이벤트가 발생한 경우에만 해당 요청을 처리하면 됩니다.
이렇게 되면 모든 클라이언트들에 대한 이벤트를 모니터링하는 하나의 스레드만 있으면 논블로킹하게 여러 클라이언트들의 요청을 처리할 수 있습니다.

Event Loop

그렇다면 계속해서 이벤트를 모니터링하고 발생한 모든 이벤트들을 처리해야 하는데요.
이러한 모델을 이벤트 루프 모델이라고 합니다.

이벤트 루프란 I/O 멀티플렉싱을 리액터 패턴(Reactor Pattern)으로 구현한 모델입니다.
리액터 패턴에는 이벤트에 반응하고 핸들러를 매핑시키는 리액터(Reactor)가 있는데요.
이벤트 루프 모델에서는 이벤트 루프가 리액터 역할을 하게 됩니다.
while (true) {
    const std::vector<event> events = get_events();
 
    while (auto &event : events) {
        if (event == READ_EVENT) {
            read_handler(event);
        } else if (event == WRITE_EVENT) {
            write_handler(event);
        }
    }
}
이벤트 루프는 반복문을 순회하면서 발생한 이벤트들을 가져오고 해당 이벤트에 맞는 핸들러를 호출하게 되는데요.
여기서 이벤트를 관리하는 get_events() 부분에서 epoll() 등의 시스템 콜을 사용하게 됩니다.

Kqueue

이벤트 관리 시스템 콜에는 select(), poll(), epoll() 등이 있는데요.
이 시스템 콜들은 여러 파일 디스크립터들의 상태를 모니터링하고 I/O 이벤트가 발생했을때, 애플리케이션에 해당 이벤트를 통지하는 것이 목적입니다.
이 중 select()poll()은 파일 디스크립터들의 상태를 커널(Kernel)이 아닌 유저 스페이스(User Space)에서 관리하므로 커널과 유저 스페이스 간 불필요한 통신이 발생하게 되는데요.
이와 달리, epoll() 등의 시스템 콜은 커널에서 직접 파일 디스크립터들의 상태를 관리하므로 성능 부분에서 효율적이므로 일반적으로 epoll()을 사용해 I/O 멀티플렉싱을 구현하게 됩니다.

저의 경우, MacOS를 사용하고 있으므로 Linux의 epoll()을 사용할 수 없었습니다.
그래서 불가피하게 BSD(Berkeley Software Distribution) 계열 운영체제의 epoll()kqueue()를 통해 이벤트 루프를 구현해 보겠습니다.
event.h
struct kevent {
	uintptr_t       ident;
	int16_t         filter;
	uint16_t        flags;
	uint32_t        fflags;
	intptr_t        data;
	void            *udata;
};
Kqueue에서 이벤트 구조체인 kevent는 파일 디스크립터인 ident, 이벤트를 선별하기 위해 사용하는 filter, 새로 등록할 이벤트에 대해 수행할 작업인 flags 등을 가지고 있습니다.
소켓 프로그래밍에서는 ident를 해당 이벤트가 발생한 파일 디스크립터가 어떤 소켓의 파일 디스크립터인지 구별하기 위해 사용하게 됩니다.
또한 읽기가 준비된 상태인지 쓰기가 준비된 상태인지를 확인하기 위해 filter를 사용합니다.
event.h
int kevent(int kq,
           const struct kevent *changelist,
           int nchanges,
           struct kevent *eventlist,
           int nevents,
           const struct timespec *timeout);
Kqueue에 새로운 이벤트를 등록하거나 이미 등록된 파일 디스크립터들에 대해 발생한 이벤트들을 가져오는 함수는 kevent()입니다.
등록할 임의의 이벤트들은 changelist로 전달하며, 이미 Kqueue에서 관리하는 파일 디스크립터들로부터 발생한 이벤트들은 eventlist에 전달하게 됩니다.
event.h
#define EV_SET(kevp, a, b, c, d, e, f) do {     \
	struct kevent *__kevp__ = (kevp);       \
	__kevp__->ident = (a);                  \
	__kevp__->filter = (b);                 \
	__kevp__->flags = (c);                  \
	__kevp__->fflags = (d);                 \
	__kevp__->data = (e);                   \
	__kevp__->udata = (f);                  \
} while(0)
등록할 임의의 이벤트인 kevent는 직접 구조체를 생성해도 되지만, EV_SET()이라는 매크로 함수를 통해 편리하게 생성할 수 있습니다.

Event Loop 구현

이제 앞서 설명드린 kqueue()를 통해 이벤트 루프를 구현해 보겠습니다.
multiplexing.h
class EventLoop {
    const int &server_socket;
    const sockaddr_in &server_address;
    const socklen_t &address_len;
 
public:
    EventLoop(const int &server_socket, const sockaddr_in &server_address, const socklen_t &address_len): server_socket(server_socket), server_address(server_address), address_len(address_len) {}
 
    int run() const;
};
우선 이벤트 루프에 대한 헤더 파일을 작성하였습니다.
이제 EventLooprun()을 구현해보겠습니다.
EV_SET(&changed_event, server_socket, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, NULL);
change_events.push_back(changed_event);
이벤트 루프의 반복문에 들어가기 전에 서버 소켓을 Kqueue에 등록하기 위해 EV_SET()을 통해 서버 소켓에 대한 읽기 작업을 감지하는 이벤트를 생성해야 하는데요.
해당 이벤트는 kevent()changelist에 전달됩니다.
kevent(kq, change_events.data(), static_cast<int>(change_events.size()), events, MAX_EVENT_COUNT, nullptr)
이제 이벤트 루프의 반복문에 들어가면 먼저 kevent()를 통해 change_events 내의 이벤트들을 Kqueue에 등록하고, 발생한 이벤트들을 events로 전달 받습니다.
이때, kevent()timeout이 없다면 임의의 이벤트가 발생할때까지 무한정 대기하게 됩니다.
대기하는 중에 특정 소켓이 데이터를 받아 읽을 준비가 되면 이벤트가 발생하고 kevent()는 즉시 해당 이벤트를 반환하게 됩니다.
for (const auto &event: events) {
    if (event.flags & EV_ERROR) {
        continue;
    }
 
    if (event.filter == EVFILT_READ) {
        if (event.ident == server_socket) {
            const int client_socket =
                    accept(
                        server_socket,
                        reinterpret_cast<struct sockaddr *>(const_cast<struct sockaddr_in *>(&server_address)),
                        const_cast<socklen_t *>(&address_len)
                    );
 
            if (client_socket < 0) {
                std::cerr << "accept() failed";
                continue;
            }
 
            char client_address[address_len];
            inet_ntop(AF_INET, &server_address.sin_addr, client_address, address_len);
            std::cout << "New connection from " << client_address << ":" << ntohs(server_address.sin_port) << std::endl;
 
            EV_SET(&changed_event, client_socket, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, NULL);
            change_events.push_back(changed_event);
        } else {
            const int client_socket = static_cast<int>(event.ident);
            char buffer[MAX_BUFFER_SIZE];
            const long n = read(client_socket, buffer, MAX_BUFFER_SIZE);
 
            if (n <= 0) {
                std::cout << "Client disconnected, fd: " << client_socket << std::endl;
                close(client_socket);
            } else {
                std::cout << "Received: " << buffer << std::endl;
                write(client_socket, buffer, n);
            }
        }
    }
}
kevent()가 발생한 이벤트들을 반환한 후, 이벤트 루프는 해당 이벤트들에 맞는 처리를 수행합니다.
이때, 이벤트들을 구별하기 위해서 keventidentflags를 사용하는데요.
예를 들어, 해당 이벤트의 ìdent가 서버 소켓의 파일 디스크립터이고 flags가 읽기 작업에 대한 EVFILT_READ라면 서버 소켓이 연결 요청을 받았다는 것을 의미합니다.
그래서 accept()를 호출해 새로운 클라이언트 소켓에 대한 파일 디스크립터를 받고 해당 파일 디스크립터에 대한 이벤트를 생성해 change_events에 저장합니다.
다음 루프에 해당 이벤트가 kevent()changelist로 전달되어 Kqueue에 해당 이벤트에 대한 파일 디스크립터가 저장되게 됩니다.

만약 flagsEVFILT_READ인데 ident가 서버 소켓의 파일 디스크립터가 아니라면 클라이언트 소켓으로부터 데이터를 읽을 준비가 되었다는 것을 의미합니다.
그래서 ident로부터 클라이언트 소켓을 가져오고 해당 소켓으로부터 read()write()를 통해 데이터를 읽고 클라이언트에게 그대로 전송하도록 했습니다.
multiplexing.cpp
int EventLoop::run() const {
    int kq;
    std::vector<struct kevent> change_events;
    struct kevent events[MAX_EVENT_COUNT];
    struct kevent changed_event{};
 
    if (listen(server_socket, 5) < 0) {
        std::cerr << "listen() failed";
 
        return -1;
    }
 
    if ((kq = kqueue()) < 0) {
        std::cerr << "kqueue() failed";
 
        return -1;
    }
 
    EV_SET(&changed_event, server_socket, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, NULL);
    change_events.push_back(changed_event);
 
    std::cout << "Server listening on " << ntohs(server_address.sin_port) << std::endl;
 
    while (true) {
        if (kevent(kq, change_events.data(), static_cast<int>(change_events.size()), events, MAX_EVENT_COUNT, nullptr) == -1) {
            std::cerr << "kevent() error";
            break;
        }
 
        change_events.clear();
 
        for (const auto &event: events) {
            if (event.flags & EV_ERROR) {
                continue;
            }
 
            if (event.filter == EVFILT_READ) {
                if (event.ident == server_socket) {
                    const int client_socket =
                            accept(
                                server_socket,
                                reinterpret_cast<struct sockaddr *>(const_cast<struct sockaddr_in *>(&server_address)),
                                const_cast<socklen_t *>(&address_len)
                            );
 
                    if (client_socket < 0) {
                        std::cerr << "accept() failed";
                        continue;
                    }
 
                    char client_address[address_len];
                    inet_ntop(AF_INET, &server_address.sin_addr, client_address, address_len);
                    std::cout << "New connection from " << client_address << ":" << ntohs(server_address.sin_port) << std::endl;
 
                    EV_SET(&changed_event, client_socket, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, NULL);
                    change_events.push_back(changed_event);
                } else {
                    const int client_socket = static_cast<int>(event.ident);
                    char buffer[MAX_BUFFER_SIZE];
                    const long n = read(client_socket, buffer, MAX_BUFFER_SIZE);
 
                    if (n <= 0) {
                        std::cout << "Client disconnected, fd: " << client_socket << std::endl;
                        close(client_socket);
                    } else {
                        std::cout << "Received: " << buffer << std::endl;
                        write(client_socket, buffer, n);
                    }
                }
            }
        }
    }
 
    return 0;
}
최종적인 코드는 위와 같습니다.

블로킹

지금까지 kqueue()를 통해 이벤트 루프를 구현했는데요.
이벤트 루프가 항상 모든 상황에서 Thread Per Request 모델보다 좋은 성능을 보이지는 않습니다.

이벤트 루프 중간에 지연 시간이 긴 블로킹 I/O나 CPU 집약적인 작업 등의 블로킹 작업이 존재하는 상황에서는 이벤트 루프도 함께 멈추게 되는데요.
스레드가 많은 Thread Per Request 모델은 블로킹 작업 하나로는 전체 서비스에 큰 영향을 주진 못하지만, 이벤트 루프 모델에서는 일반적으로 하나의 이벤트 루프 스레드가 모든 요청을 처리하는 경향이 있어 전체 서비스에 큰 지연을 초래하게 됩니다.
그래서 이벤트 루프 모델에서는 이벤트 루프 중간에 절대 블로킹 작업이 존재하면 안됩니다.
이러한 사실은 이벤트 루프를 사용하는 Node.js나 Spring WebFlux에서도 확인할 수 있는데요.
특히 Node.js는 공식 문서에 'Don't Block the Event Loop'라는 제목의 페이지가 존재할 정도입니다.

만약 불가피하게 블로킹 작업을 포함해야 하는 경우에는 이벤트 루프 스레드가 아닌 다른 스레드에 해당 작업을 배정해야 합니다.