Program/C & C++

소켓 입출력 모델 - Completion Port [IOCP] 모델

너구리V 2012. 9. 19. 14:59


 

* 입출력 완료 포트(I/O completion port)

- 비동기 입출력 결과와 이 결과를 처리할 스레드에 대한 정보를 담고 있는 구조로 Overlapped 모델(II)에서 나오는 APC 큐와 비슷한 개념

 

* 입출력 완료 포트 vs. APC 큐의 차이점

- 생성과 파괴

   APC 큐는 각 스레드마다 자동으로 생성되고 파괴. 입출력 완료 포트는 CreateIoCompletionPort() 함수로 생성하고

   CloseHandle() 함수를 호출하여 파괴한다.

- 접근 제약

   APC 큐에 저장된 결과는 APC 큐를 소유한 스레드만 확인할 수 있지만 입출력 완료 포트에는 이러한 제약이 없다.

   대게 입출력 완료 포트를 접근하는 작업 스레드를 별도로 도는데 이상적인 작업자 스레드 개수는 CPU 개수와 같게 하지만

   몇가지 이유로 인해 CPU 개수 * n개를 생성한다. (n 의 최소값은 1)

- 비동기 입출력 처리 방법

   APC 큐에 저장된 결과를 처리하려면 해다 스레드는 Alertable wait 상태에 진입해야 한다.

   입출력 완료 포트에 저장된 결과를 처리하려면 작업자 스래드는 GetQueuedCompletionStatus() 함수를 호출해야 한다.

 

 

[동작 원리]

 

1. 애플리케이션을 구성하는 임의의 스레드에서 비동기 입출력 함수를 호출함으로써 운영체제에서 입출력 작업을 요청한다.

2. 모든 작업자 스레드는 GetQueuedCompletionStatus() 함수를 호출하여 입출력 완료 포트를 감시한다.

   완료한 비동기 입출력 작업이 아직 없다면 모든 작업자 스레드는 대기상태가 된다.

   이때 대기중인 작업자 스레드 목록은 입출력 완료 포트 내부에 저장.

3. 비동기 입출력 작업이 완료되면 운영체제는 입출력 완료 포트에 결과를 저장.

    이때 저장되는 정보를 입출력 완료 패킷(I/O completion packet) 이라 부른다.

4. 운영체제는 입추력 완료 포트에 저장된 작업자 스레드 목록에서 하나를 선택하여 깨운다.

   대기 상태에서 개어난 작업자 스레드는 비동기 입출력 결과를 처리한다.

   이후 작업자 스레드는 필요에 따라 다시 비동기 입출력 함수를 호출할 수 있다.

 

 

[Completion Port 모델을 이용한 소켓 입출력 절차]

 

1. CreateIoCompletionPort() 함수를 호출하여 입출력 완료 포트를 생성한다.

2. CPU 개수에 비례하여 작업자 스레드를 생성. 모든 작업자 스레드는 GetQueuedCompletionStatus() 함수를 호출하여 대기상태가 됨.

3. 비동기 입출력을 지원하는 소켓을 생성. 이 소켓에 대한 비동기 입출력 결과가 입출력 완료 포트에 저장되려면

   CreateIoCompletionPort() 함수를 호출하여 소켓과 입출력 완료 포트를 연결시켜야 한다.

4. 비동기 입출력 함수를 호출한다. 비동기 입출력 작업이 곧바로 완료되지 않으면, 소켓 함수는 오류를 리턴하고,

   오류 코드는 WSA_IO_PENDING 으로 설정된다.

5. 비동기 입출력 작업이 완료되면 운영체제는 입출력 완료 포트에 결과를 저장하고 대기중인 스레드 하나를 깨운다.

   대기 상테에서 깨어난 작업자 스레드는 비동기 입출력 결과를 처리한다.

6. 새로운 소켓을 생성하면 3 ~ 5 를 그렇지 않으면 4 ~ 5를 반복한다.

 

 

* 출력 완료 포트 생성하거나 소켓과 입출력 완료 포트를 연결

 

HANDLE CreateIoCompletionPort(

    HANDLE FileHandle,

    HANDLE ExistingCompletionPort,

    ULONG CompletionKey,

    DWORD NumberOfConcurrentThreads

);    성공 : 입출력 완료 포트 핸들.    실패 : NULL

 

FileHandle : 입출력 완료 포트와 연결한 파일 핸들.  소켓 프로그래밍에서는 소켓 디스크립터를 넣어주면 된다.

                새로운 입출력 완료 포트를 생성할때는 INVALID_HANDLE_VALUE 값을 사용해도 된다.

ExistingCompletionPort : 파일 또는 소켓과 연결할 입출력 완료 포트 핸들. NULL 이면 새로운 입출력 완료 포트를 생성.

CompletionKey : 입출력 완료 패킷에 들어갈 부가적인 정보로 32비트 값을 줄수 있다.

                      입출력 완료 패킷은 비동기 입출력 작업이 완료할 때마다 생성되어 입출력 완료 포트에 저장된다.

NumberOfConcurrentThreads : 동시에 실행할 수 있는 작업자 스레드의 개수. 0 을 사용하면 자동으로 CPU 개수와 같은수로 설정

 

 

* 비동기 입출력 결과 확인

 

입출력 완료 패킷이 들어올때 까지 대기하다 입출력 완료 패킷이 입출력 완료 포트에 들어오면 운영체제는 실행중인 작업자 쓰레드를 체크하여

CreateCompletionPort() 에서 설정한 값보다 작다면 대기 상태인 작업자 스레드를 깨워서 입출력 완료 패킷을 처리하도록 한다.

 

BOOL GetQueuedCompletionStatus(

    HANDLE CompletionPort,

    LPDWORD lpNumberOfBytes,

    LPDWORD lpCompletionKey,

    LPOVERLAPPED* lpOverlapped,

    DWORD dwMilliseconds

);    성공 : 0 이 아닌 값.    실패 : 0

 

CompletionPort : 입출력 완료포트 핸들

lpNumberOfBytes : 비동기 입출력 작업으로 전송된 바이트 수가 여기에 저장된다.

lpCompletionKey : CreateCompletionPort() 함수 호출시 전달한 세 번째 인자(32비트)가 여기에 저장.

lpOverlapped : 비동기 입출력 함수 호출시 전달한 OVERLAPPED 구조체의 주소값이 여기에 저장된다.

dwMilliseconds : 작업자 스레드가 대기할 시간을 설정한다. INFINITE 값을 사용하면 입출력 완료 패킷이 생성되어

                      체제가 자신을 깨울때까지 무한대기.

 

 

[Completion Port(IOCP) 모델을 이용한 에코서버]

 

#include <winsock2.h>
#include <stdlib.h>
#include <stdio.h>

 

#define BUFSIZE 512

 

// 소켓 정보 저장을 위한 구조체
struct SOCKETINFO
{
    OVERLAPPED overlapped;
    SOCKET sock;
    char buf[BUFSIZE+1];
    int recvbytes;
    int sendbytes;
    WSABUF wsabuf;
};

 

// 소켓 입출력 함수
DWORD WINAPI WorkerThread(LPVOID arg);

// 오류 출력 함수
void err_quit(char *msg);
void err_display(char *msg);

 

int main(int argc, char* argv[])
{
    int retval;
 
    // 윈속 초기화
    WSADATA wsa;
    if(WSAStartup(MAKEWORD(2,2), &wsa) != 0)
        return -1;
 
    // 입출력 완료 포트 생성
    HANDLE hcp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if(hcp == NULL) return -1;
 
    // CPU 개수 확인
    SYSTEM_INFO si;
    GetSystemInfo(&si);
 
    // (CPU 개수 * 2)개의 작업자 스레드 생성
    HANDLE hThread;
    DWORD ThreadId;
    for(int i=0; i<(int)si.dwNumberOfProcessors*2; i++) {
        hThread = CreateThread(NULL, 0, WorkerThread, hcp, 0, &ThreadId);
        if(hThread == NULL) return -1;
        CloseHandle(hThread);
    }
 
    // socket()
    SOCKET listen_sock = socket(AF_INET, SOCK_STREAM, 0);
    if(listen_sock == INVALID_SOCKET) err_quit("socket()");
 

    // bind()
    SOCKADDR_IN serveraddr;
    ZeroMemory(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(9000);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    retval = bind(listen_sock, (SOCKADDR *)&serveraddr, sizeof(serveraddr));
    if(retval == SOCKET_ERROR) err_quit("bind()");
 
    // listen()
    retval = listen(listen_sock, SOMAXCONN);
    if(retval == SOCKET_ERROR) err_quit("listen()");
 
    while(1)

    {
        // accept()
        SOCKADDR_IN clientaddr;
        int addrlen = sizeof(clientaddr);
        SOCKET client_sock = accept(listen_sock, (SOCKADDR *)&clientaddr, &addrlen);
        if(client_sock == INVALID_SOCKET) {
            err_display("accept()");
            continue;
        }
        printf("[TCP 서버] 클라이언트 접속: IP 주소=%s, 포트 번호=%d\n", 
            inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
  
        // 소켓과 입출력 완료 포트 연결
        HANDLE hResult = CreateIoCompletionPort((HANDLE)client_sock, hcp, (DWORD)client_sock, 0);
        if(hResult == NULL) return -1;
  
        // 소켓 정보 구조체 할당
        SOCKETINFO *ptr = new SOCKETINFO;
        if(ptr == NULL) {
            printf("[오류] 메모리가 부족합니다!\n");
            break;
        }
        ZeroMemory(&(ptr->overlapped), sizeof(ptr->overlapped));
        ptr->sock = client_sock;
        ptr->recvbytes = 0;
        ptr->sendbytes = 0;
        ptr->wsabuf.buf = ptr->buf;
        ptr->wsabuf.len = BUFSIZE;
  

        // 비동기 입출력 시작
        DWORD recvbytes;
        DWORD flags = 0;
        retval = WSARecv(client_sock, &(ptr->wsabuf), 1, &recvbytes, &flags, &(ptr->overlapped), NULL);
        if(retval == SOCKET_ERROR) {
            if(WSAGetLastError() != ERROR_IO_PENDING) {
                err_display("WSARecv()");
            }
            continue;
        }
    }
    // 윈속 종료
    WSACleanup();
    return 0;
}


DWORD WINAPI WorkerThread(LPVOID arg)
{
    HANDLE hcp = (HANDLE)arg;
    int retval;
 
    while(1)

    {
        // 비동기 입출력 완료 기다리기
        DWORD cbTransferred;
        SOCKET client_sock;
        SOCKETINFO *ptr;
        retval = GetQueuedCompletionStatus(hcp, &cbTransferred,
            (LPDWORD)&client_sock, (LPOVERLAPPED *)&ptr, INFINITE);
  
        // 클라이언트 정보 얻기
        SOCKADDR_IN clientaddr;
        int addrlen = sizeof(clientaddr);
        getpeername(ptr->sock, (SOCKADDR *)&clientaddr, &addrlen);
  
        // 비동기 입출력 결과 확인
        if(retval == 0 || cbTransferred == 0) {
            if(retval == 0) {
                DWORD temp1, temp2;
                WSAGetOverlappedResult(ptr->sock, &(ptr->overlapped), &temp1, FALSE, &temp2);
                err_display("WSAGetOverlappedResult()");
            }
            closesocket(ptr->sock);
            printf("[TCP 서버] 클라이언트 종료: IP 주소=%s, 포트 번호=%d\n", 
                inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
            delete ptr;
            continue;
        }
  
        // 데이터 전송량 갱신
        if(ptr->recvbytes == 0) {
            ptr->recvbytes = cbTransferred;
            ptr->sendbytes = 0;
            // 받은 데이터 출력
            ptr->buf[ptr->recvbytes] = '\0';
            printf("[TCP/%s:%d] %s\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port), ptr->buf);
        }
        else {
            ptr->sendbytes += cbTransferred;
        }
  
        if(ptr->recvbytes > ptr->sendbytes) {
            // 데이터 보내기
            ZeroMemory(&(ptr->overlapped), sizeof(ptr->overlapped));
            ptr->wsabuf.buf = ptr->buf + ptr->sendbytes;
            ptr->wsabuf.len = ptr->recvbytes - ptr->sendbytes;
   
            DWORD sendbytes;
            retval = WSASend(ptr->sock, &(ptr->wsabuf), 1, &sendbytes, 0, &(ptr->overlapped), NULL);
            if(retval == SOCKET_ERROR) {
                if(WSAGetLastError() != WSA_IO_PENDING) {
                    err_display("WSASend()");
                }
                continue;
            }   
        }
        else {
            ptr->recvbytes = 0;
   

            // 데이터 받기
            ZeroMemory(&(ptr->overlapped), sizeof(ptr->overlapped));
            ptr->wsabuf.buf = ptr->buf;
            ptr->wsabuf.len = BUFSIZE;
            DWORD recvbytes;
            DWORD flags = 0;
            retval = WSARecv(ptr->sock, &(ptr->wsabuf), 1, &recvbytes, &flags, &(ptr->overlapped), NULL);
            if(retval == SOCKET_ERROR) {
                if(WSAGetLastError() != WSA_IO_PENDING) {
                    err_display("WSARecv()");
                }
                continue;
            }
        }
    }
    return 0;
}

 

// 소켓 함수 오류 출력 후 종료
void err_quit(char *msg)
{
    LPVOID lpMsgBuf;
    FormatMessage( 
        FORMAT_MESSAGE_ALLOCATE_BUFFER|
        FORMAT_MESSAGE_FROM_SYSTEM,
        NULL, WSAGetLastError(),
        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
        (LPTSTR)&lpMsgBuf, 0, NULL);
        MessageBox(NULL, (LPCTSTR)lpMsgBuf, msg, MB_ICONERROR);
    LocalFree(lpMsgBuf);
    exit(-1);
}


// 소켓 함수 오류 출력
void err_display(char *msg)
{
    LPVOID lpMsgBuf;
    FormatMessage( 
        FORMAT_MESSAGE_ALLOCATE_BUFFER|
        FORMAT_MESSAGE_FROM_SYSTEM,
        NULL, WSAGetLastError(),
        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
        (LPTSTR)&lpMsgBuf, 0, NULL);
    printf("[%s] %s", msg, (LPCTSTR)lpMsgBuf);
    LocalFree(lpMsgBuf);
}

반응형

'Program > C & C++' 카테고리의 다른 글

db관련  (0) 2012.12.11
odbc 샘플  (0) 2012.11.22
c++ flex 보안  (0) 2012.09.17
Flex3 Socket 통신시 Policy Request 처리하기 ( 소켓 인증 권한 획득하기 )  (0) 2012.09.17
Multi-thread programming(멀티 쓰레딩)  (0) 2011.07.30