Notificações de estado do soquete Winsock
Introdução
As APIs de notificações de estado de soquete na tabela abaixo fornecem uma maneira escalonável e eficiente de obter notificações sobre alterações de estado de soquete (eficientes em termos de CPU e memória). Isso inclui notificações sobre itens como leitura sem bloqueio, gravação sem bloqueio, condições de erro e outras informações.
API | Descrição |
---|---|
Função ProcessSocketNotifications | Associa um conjunto de soquetes a uma porta de conclusão e recupera todas as notificações que já estão pendentes nessa porta. Uma vez associada, a porta de conclusão recebe as notificações de estado do soquete que foram especificadas. |
estrutura SOCK_NOTIFY_REGISTRATION | Representa as informações fornecidas para a função ProcessSocketNotifications . |
Função SocketNotificationRetrieveEvents | Essa função auxiliar embutida é fornecida como uma conveniência para recuperar a máscara de eventos de um OVERLAPPED_ENTRY. |
O fluxo de trabalho começa com você associando soquetes a uma porta de conclusão de E/S (ProcessSocketNotifications e SOCK_NOTIFY_REGISTRATION). Depois disso, a porta fornece informações sobre alterações de estado de soquete usando os métodos de consulta de porta de conclusão de E/S usuais.
Essas APIs permitem a fácil construção de abstrações independentes de plataforma. Dessa forma, há suporte para sinalizadores persistentes e únicos e disparados por nível e borda. Por exemplo, registros disparados em nível único são o padrão recomendado para servidores multi-threaded.
Recomendações
Essas APIs fornecem uma alternativa escalonável para o WSAPoll e selecionam APIs.
Eles são uma alternativa à E/S de soquete sobreposta usada com portas de conclusão de E/S e evitam a necessidade de buffers de E/S permanentes por soquete. Mas em um cenário em que buffers de E/S por soquete não são uma consideração importante (o número de soquetes é relativamente baixo ou são usados constantemente), a E/S de soquete sobreposta pode ter menos sobrecarga devido a um número menor de transições de kernel, bem como um modelo mais simples.
Um soquete pode ser associado a apenas uma única porta de conclusão de E/S. Um soquete pode ser registrado com uma porta de conclusão de E/S apenas uma vez. Para alterar as chaves de conclusão, registre a notificação, aguarde a mensagem de SOCK_NOTIFY_EVENT_REMOVE (consulte os tópicos ProcessSocketNotifications e SocketNotificationRetrieveEvents) e registre novamente o soquete.
Para evitar a liberação de memória que ainda está em uso, você deve liberar as estruturas de dados associadas a um registro somente depois de receber a notificação de SOCK_NOTIFY_EVENT_REMOVE para o registro. Quando o descritor de soquete usado para registrar notificações é fechado usando a função closesocket , suas notificações são automaticamente desregistrados. No entanto, as notificações já enfileiradas ainda podem ser entregues. Um cancelamento de registro automático por meio de closesocket não gerará uma notificação de SOCK_NOTIFY_EVENT_REMOVE .
Se você quiser processamento multi-threaded, deverá usar uma única porta de conclusão de E/S com várias notificações de processamento de threads. Isso permite que a porta de conclusão de E/S escale horizontalmente o trabalho em vários threads, conforme necessário. Evite ter várias portas de conclusão de E/S (por exemplo, uma por thread), pois esse design é vulnerável a gargalos em um único thread enquanto outras estão ociosas.
Se vários threads estiverem desativando pacotes de notificação com notificações disparadas por nível, SOCK_NOTIFY_TRIGGER_ONESHOT deverá ser fornecido para evitar que vários threads recebam notificações para uma alteração de estado. Depois que a notificação de soquete tiver sido processada, a notificação deverá ser registrada novamente.
Se vários threads estiverem desativando pacotes de notificação em uma conexão orientada a fluxo em que as mensagens individuais precisam ser processadas em um único thread, considere usar notificações de disparo único disparadas em nível. Isso reduz a probabilidade de que vários threads recebam fragmentos de mensagem que precisam ser montados novamente entre threads.
Se você estiver usando notificações disparadas por borda, não recomendamos notificações pontuais porque o soquete precisa ser esvaziado depois de habilitar os registros. Esse é um padrão mais complicado de implementar e é mais caro porque sempre requer uma chamada que retorna WSAEWOULDBLOCK.
Se você quiser expandir a aceitação da conexão em um único soquete de escuta, os servidores deverão usar a função AcceptEx em vez de assinar notificações para solicitações de conexão. Aceitar conexões em resposta a notificações limita implicitamente a taxa de aceitação da conexão em relação ao processamento de solicitações para conexões existentes.
Abaixo estão exemplos de código que ilustram alguns cenários de notificação de estado de soquete. Parte do código contém para fazer itens para seus próprios aplicativos.
Código comum
Primeiro, aqui está uma listagem de código que contém algumas definições e funções comuns que são usadas pelos cenários a seguir.
#include "pch.h"
#include <winsock2.h>
#pragma comment(lib, "Ws2_32")
#define SERVER_ADDRESS 0x0100007f // localhost
#define SERVER_PORT 0xffff // TODO: select an actual valid port
#define MAX_TIMEOUT 1000
#define CLIENT_LOOP_COUNT 10
typedef struct SERVER_CONTEXT {
HANDLE ioCompletionPort;
SOCKET listenerSocket;
} SERVER_CONTEXT;
typedef struct CLIENT_CONTEXT {
UINT32 transmitCount;
} CLIENT_CONTEXT;
SRWLOCK g_printLock = SRWLOCK_INIT;
VOID DestroyServerContext(_Inout_ _Post_invalid_ SERVER_CONTEXT* serverContext) {
if (serverContext->listenerSocket != INVALID_SOCKET) {
closesocket(serverContext->listenerSocket);
}
if (serverContext->ioCompletionPort != NULL) {
CloseHandle(serverContext->ioCompletionPort);
}
free(serverContext);
}
DWORD CreateServerContext(_Outptr_ SERVER_CONTEXT** serverContext) {
DWORD errorCode;
SERVER_CONTEXT* localContext = NULL;
sockaddr_in serverAddress = { };
localContext = (SERVER_CONTEXT*)malloc(sizeof(*localContext));
if (localContext == NULL) {
errorCode = ERROR_NOT_ENOUGH_MEMORY;
goto Exit;
}
ZeroMemory(localContext, sizeof(*localContext));
localContext->listenerSocket = INVALID_SOCKET;
localContext->ioCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (localContext->ioCompletionPort == NULL) {
errorCode = GetLastError();
goto Exit;
}
localContext->listenerSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (localContext->listenerSocket == INVALID_SOCKET) {
errorCode = GetLastError();
goto Exit;
}
serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = SERVER_ADDRESS;
serverAddress.sin_port = SERVER_PORT;
if (bind(localContext->listenerSocket, (sockaddr*)&serverAddress, sizeof(serverAddress)) != 0) {
errorCode = GetLastError();
goto Exit;
}
if (listen(localContext->listenerSocket, 0) != 0) {
errorCode = GetLastError();
goto Exit;
}
*serverContext = localContext;
localContext = NULL;
errorCode = ERROR_SUCCESS;
Exit:
if (localContext != NULL) {
DestroyServerContext(localContext);
}
return errorCode;
}
// Create a socket, connect to the server, send transmitCount copies of the
// payload, then disconnect.
DWORD
WINAPI
ClientThreadRoutine(_In_ PVOID clientContextPointer) {
const UINT32 payload = 0xdeadbeef;
CLIENT_CONTEXT* clientContext = (CLIENT_CONTEXT*)clientContextPointer;
sockaddr_in serverAddress = {};
SOCKET clientSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (clientSocket == INVALID_SOCKET) {
goto Exit;
}
serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = SERVER_ADDRESS;
serverAddress.sin_port = SERVER_PORT;
if (connect(clientSocket, (sockaddr*)&serverAddress, sizeof(serverAddress)) != 0) {
goto Exit;
}
for (UINT32 Index = 0; Index < clientContext->transmitCount; Index += 1) {
if (send(clientSocket, (const char*)&payload, sizeof(payload), 0) < 0) {
goto Exit;
}
}
if (shutdown(clientSocket, SD_BOTH) != 0) {
goto Exit;
}
Exit:
if (clientSocket != INVALID_SOCKET) {
closesocket(INVALID_SOCKET);
}
free(clientContext);
return 0;
}
DWORD CreateClientThread(_In_ UINT32 transmitCount) {
DWORD errorCode = ERROR_SUCCESS;
CLIENT_CONTEXT* clientContext = NULL;
HANDLE clientThread = NULL;
clientContext = (CLIENT_CONTEXT*)malloc(sizeof(*clientContext));
if (clientContext == NULL) {
errorCode = ERROR_NOT_ENOUGH_MEMORY;
goto Exit;
}
ZeroMemory(clientContext, sizeof(*clientContext));
clientContext->transmitCount = transmitCount;
clientThread = CreateThread(NULL, 0, ClientThreadRoutine, clientContext, 0, NULL);
if (clientThread == NULL) {
errorCode = GetLastError();
goto Exit;
}
clientContext = NULL;
Exit:
if (clientContext != NULL) {
free(clientContext);
}
if (clientThread != NULL) {
CloseHandle(clientThread);
}
return errorCode;
}
VOID PrintError(DWORD errorCode) {
AcquireSRWLockExclusive(&g_printLock);
wprintf_s(L"Server thread %d encountered an error %d.", GetCurrentThreadId(), errorCode);
WCHAR errorString[512];
if (FormatMessageW(FORMAT_MESSAGE_FROM_SYSTEM,
NULL,
errorCode,
0,
errorString,
RTL_NUMBER_OF(errorString),
NULL) != 0)
{
wprintf_s(L"%s", errorString);
}
ReleaseSRWLockExclusive(&g_printLock);
}
// This routine must be used only if a single socket is registered.
DWORD DeregisterAndWait(_In_ HANDLE ioCompletionPort, _In_ SOCKET socket) {
DWORD errorCode;
SOCK_NOTIFY_REGISTRATION registration = {};
OVERLAPPED_ENTRY notification;
UINT32 notificationCount;
// Keep looping until the registration is removed, or a timeout is hit.
while (TRUE) {
registration.operation = SOCK_NOTIFY_OP_REMOVE;
registration.socket = socket;
errorCode = ProcessSocketNotifications(ioCompletionPort,
1,
®istration,
MAX_TIMEOUT,
1,
¬ification,
¬ificationCount);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
if (registration.registrationResult != ERROR_SUCCESS) {
errorCode = registration.registrationResult;
goto Exit;
}
// Drops all non-removal notifications. Must be used only
// if a single socket is registered.
if (SocketNotificationRetrieveEvents(¬ification) & SOCK_NOTIFY_EVENT_REMOVE) {
break;
}
}
Exit:
return errorCode;
}
Substituição simples para sondagem
Esse cenário demonstra uma substituição de lista suspensa para aplicativos que usam a sondagem (WSAPoll) ou APIs semelhantes. Ele é de thread único e usa registros persistentes (não com uma única captura). Como o registro não precisa ser registrado novamente, ele usa GetQueuedCompletionStatusEx para remover notificações da fila.
VOID SimplePollReplacement() {
DWORD errorCode;
WSADATA wsaData;
SERVER_CONTEXT* serverContext = NULL;
SOCKET tcpAcceptSocket = INVALID_SOCKET;
u_long nonBlocking = 1;
SOCKET currentSocket;
SOCK_NOTIFY_REGISTRATION registration = {};
OVERLAPPED_ENTRY notification;
ULONG notificationCount;
UINT32 events;
CHAR dataBuffer[512];
if (WSAStartup(WINSOCK_VERSION, &wsaData) != 0) {
errorCode = GetLastError();
PrintError(errorCode);
return;
}
errorCode = CreateServerContext(&serverContext);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
errorCode = CreateClientThread(CLIENT_LOOP_COUNT);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
tcpAcceptSocket = accept(serverContext->listenerSocket, NULL, NULL);
if (tcpAcceptSocket == INVALID_SOCKET) {
errorCode = GetLastError();
goto Exit;
}
if (ioctlsocket(tcpAcceptSocket, FIONBIO, &nonBlocking) != 0) {
errorCode = GetLastError();
goto Exit;
}
// Register the accepted connection.
registration.completionKey = (PVOID)tcpAcceptSocket;
registration.eventFilter = SOCK_NOTIFY_REGISTER_EVENT_IN | SOCK_NOTIFY_REGISTER_EVENT_HANGUP;
registration.operation = SOCK_NOTIFY_OP_ENABLE;
registration.triggerFlags = SOCK_NOTIFY_TRIGGER_LEVEL;
registration.socket = tcpAcceptSocket;
errorCode = ProcessSocketNotifications(serverContext->ioCompletionPort,
1,
®istration,
0,
0,
NULL,
NULL);
// Make sure all registrations were processed.
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
// Make sure each registration was successful.
if (registration.registrationResult != ERROR_SUCCESS) {
errorCode = registration.registrationResult;
goto Exit;
}
// Keep receiving data until the client disconnects.
while (TRUE) {
wprintf_s(L"Waiting for client action...\r\n");
if (!GetQueuedCompletionStatusEx(serverContext->ioCompletionPort,
¬ification,
1,
¬ificationCount,
MAX_TIMEOUT,
FALSE))
{
errorCode = GetLastError();
goto Exit;
}
// The completion key is the socket we supplied above.
//
// This is true only because the registration supplied the socket as the completion
// key. A more typical pattern is to supply a context pointer. This example supplies
// the socket directly, for simplicity.
//
// The events are stored in the number-of-bytes-received field.
events = SocketNotificationRetrieveEvents(¬ification);
currentSocket = (SOCKET)notification.lpCompletionKey;
if (events & SOCK_NOTIFY_EVENT_IN) {
// We don't check for a 0-size receive because we subscribed to hang-up notifications.
if (recv(currentSocket, dataBuffer, sizeof(dataBuffer), 0) < 0) {
errorCode = GetLastError();
goto Exit;
}
wprintf_s(L"Received client data.\r\n");
}
if (events & SOCK_NOTIFY_EVENT_HANGUP) {
wprintf_s(L"Client hung up. Exiting. \r\n");
break;
}
if (events & SOCK_NOTIFY_EVENT_ERR) {
wprintf_s(L"The socket was ungracefully reset or another error occurred. Exiting.\r\n");
// Obtain a more detailed error code by issuing a non-blocking receive.
recv(currentSocket, dataBuffer, sizeof(dataBuffer), 0);
errorCode = GetLastError();
goto Exit;
}
}
errorCode = ERROR_SUCCESS;
Exit:
if (errorCode != ERROR_SUCCESS) {
PrintError(errorCode);
}
if (serverContext != NULL) {
if (tcpAcceptSocket != INVALID_SOCKET) {
DeregisterAndWait(serverContext->ioCompletionPort, tcpAcceptSocket);
}
DestroyServerContext(serverContext);
}
if (tcpAcceptSocket != INVALID_SOCKET) {
closesocket(tcpAcceptSocket);
}
WSACleanup();
}
Servidor UDP disparado pelo Edge
Esta é uma ilustração simples de como usar as APIs com gatilhos de borda.
Importante
O servidor deve continuar recebendo até receber um WSAEWOULDBLOCK. Caso contrário, não pode ter certeza de que uma borda crescente será observada. Dessa forma, o soquete do servidor também deve ser sem bloqueio.
Este exemplo usa UDP para demonstrar a falta de uma notificação HANGUP . É preciso algumas liberdades para assumir que os auxiliares comuns criem soquetes UDP, se necessário.
// This example assumes that substantially similar helpers are available for UDP sockets.
VOID SimpleEdgeTriggeredSample() {
DWORD errorCode;
WSADATA wsaData;
SOCKET serverSocket = INVALID_SOCKET;
SOCKET currentSocket;
HANDLE ioCompletionPort = NULL;
sockaddr_in serverAddress = { };
u_long nonBlocking = 1;
SOCK_NOTIFY_REGISTRATION registration = {};
OVERLAPPED_ENTRY notification;
ULONG notificationCount;
UINT32 events;
CHAR dataBuffer[512];
UINT32 datagramCount;
int receiveResult;
if (WSAStartup(WINSOCK_VERSION, &wsaData) != 0) {
errorCode = GetLastError();
PrintError(errorCode);
return;
}
ioCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (ioCompletionPort == NULL) {
errorCode = GetLastError();
goto Exit;
}
serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (serverSocket == INVALID_SOCKET) {
errorCode = GetLastError();
goto Exit;
}
// Register the server UDP socket before binding to a port to ensure data doesn't become
// present before the registration. Otherwise, the server could miss the notification and
// hang.
//
// Edge-triggered is not recommended with one-shot due to the difficulty in re-registering.
registration.completionKey = (PVOID)serverSocket;
registration.eventFilter = SOCK_NOTIFY_EVENT_IN;
registration.operation = SOCK_NOTIFY_OP_ENABLE;
registration.triggerFlags = SOCK_NOTIFY_TRIGGER_EDGE;
registration.socket = serverSocket;
errorCode = ProcessSocketNotifications(ioCompletionPort, 1, ®istration, 0, 0, NULL, NULL);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
if (registration.registrationResult != ERROR_SUCCESS) {
errorCode = registration.registrationResult;
goto Exit;
}
// Use non-blocking sockets with edge-triggered notifications, since the data must be
// drained before a rising edge can be observed again.
errorCode = ioctlsocket(serverSocket, FIONBIO, &nonBlocking);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = SERVER_ADDRESS;
serverAddress.sin_port = SERVER_PORT;
if (bind(serverSocket, (sockaddr*)&serverAddress, sizeof(serverAddress)) != 0) {
errorCode = GetLastError();
goto Exit;
}
// Create the client.
// While CreateClientThread connects to a TCP socket and sends data over it, for this example
// assume that CreateClientThread creates a UDP socket instead, and sends data over it.
errorCode = CreateClientThread(CLIENT_LOOP_COUNT);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
// Receive the packets.
datagramCount = 0;
while (datagramCount < CLIENT_LOOP_COUNT) {
wprintf_s(L"Waiting for client action...\r\n");
if (!GetQueuedCompletionStatusEx(ioCompletionPort,
¬ification,
1,
¬ificationCount,
MAX_TIMEOUT,
FALSE))
{
errorCode = GetLastError();
goto Exit;
}
// The completion key is the socket we supplied above.
//
// This is true only because the registration supplied the socket as the completion
// key. A more typical pattern is to supply a context pointer. This example supplies
// the socket directly, for simplicity.
//
// The events are the integer value of the overlapped pointer.
events = SocketNotificationRetrieveEvents(¬ification);
currentSocket = (SOCKET)notification.lpCompletionKey;
if (events & SOCK_NOTIFY_EVENT_ERR) {
// Obtain a more detailed error code by issuing a non-blocking receive.
recv(currentSocket, dataBuffer, sizeof(dataBuffer), 0);
errorCode = GetLastError();
goto Exit;
}
if ((events & SOCK_NOTIFY_EVENT_IN) == 0) {
continue;
}
// Keep looping receiving data until the read would block, otherwise the edge may not
// have been reset.
while (TRUE) {
receiveResult = recv(currentSocket, dataBuffer, sizeof(dataBuffer), 0);
if (receiveResult < 0) {
errorCode = GetLastError();
if (errorCode != WSAEWOULDBLOCK) {
goto Exit;
}
break;
}
datagramCount += 1;
wprintf_s(L"Received client data.\r\n");
}
}
wprintf_s(L"Received all data. Exiting... \r\n");
errorCode = ERROR_SUCCESS;
Exit:
if (errorCode != ERROR_SUCCESS) {
PrintError(errorCode);
}
if (serverSocket != INVALID_SOCKET) {
if (ioCompletionPort != NULL) {
DeregisterAndWait(ioCompletionPort, serverSocket);
}
closesocket(serverSocket);
}
if (ioCompletionPort != NULL) {
CloseHandle(ioCompletionPort);
}
WSACleanup();
}
Servidor multi-threaded
Este exemplo demonstra um padrão de uso multithread mais realista que usa os recursos de expansão da porta de conclusão de E/S para distribuir o trabalho entre vários threads de servidor. O servidor usa o gatilho de nível único para evitar que vários threads selecionem notificações para o mesmo soquete e permitir que cada thread drene os dados recebidos uma parte de cada vez.
Ele também demonstra alguns padrões comuns usados com a porta de conclusão. A chave de conclusão é usada para fornecer um ponteiro de contexto por soquete. O ponteiro de contexto tem um cabeçalho que descreve o tipo de soquete que está sendo usado, para que vários tipos de soquetes possam ser usados em uma única porta de conclusão. Os comentários no exemplo destacam que as conclusões arbitrárias podem ser removidas da fila (assim como acontece com a função GetQueuedCompletionStatusEx ), não apenas as notificações de soquete. A API PostQueuedCompletionStatus é usada para postar mensagens em threads e acordá-las sem precisar aguardar a chegada de uma notificação de soquete.
Por fim, o exemplo demonstra algumas das complexidades de desregistrar e limpar corretamente contextos de soquete em uma carga de trabalho encadeada. Neste exemplo, o contexto de soquete pertence implicitamente ao thread que recebe a notificação. O thread manterá a propriedade se não registrar a notificação.
#define CLIENT_THREAD_COUNT 100
// The I/O completion port infrastructure ensures that the system isn't over-subscribed by
// ensuring server-side threads block if they exceed the number of logical processors. If the
// machine has more than 16 logical processors, then this can be observed by increasing this number.
#define SERVER_THREAD_COUNT 16
#define SERVER_DEQUEUE_COUNT 3
#define SERVER_EXIT_KEY ((ULONG_PTR)-1)
typedef struct SERVER_THREAD_CONTEXT {
SERVER_CONTEXT* commonContext;
SRWLOCK stateLock;
_Guarded_by_(stateLock) UINT32 deregisterCount;
_Guarded_by_(stateLock) BOOLEAN shouldExit;
} SERVER_THREAD_CONTEXT;
typedef enum SOCKET_TYPE {
SOCKET_TYPE_LISTENER,
SOCKET_TYPE_ACCEPT
} SOCKET_TYPE;
typedef struct SOCKET_CONTEXT {
SOCKET_TYPE socketType;
SOCKET socket;
} SOCKET_CONTEXT;
VOID CancelServerThreadsAsync(_Inout_ SERVER_THREAD_CONTEXT* serverThreadContext) {
AcquireSRWLockExclusive(&serverThreadContext->stateLock);
serverThreadContext->shouldExit = TRUE;
ReleaseSRWLockExclusive(&serverThreadContext->stateLock);
}
VOID IndicateServerThreadExit(_In_ HANDLE ioCompletionPort) {
// Notify a server thread that it needs to exit. It can then notify the other threads when it
// exits.
//
// If this fails, then server threads may hang, and this program will never terminate. That
// is an unrecoverable error.
if (!PostQueuedCompletionStatus(ioCompletionPort, 0, SERVER_EXIT_KEY, NULL)) {
RaiseFailFastException(NULL, NULL, 0);
}
}
VOID DestroySocketContext(_Inout_ _Post_invalid_ SOCKET_CONTEXT* socketContext) {
if (socketContext->socket != INVALID_SOCKET) {
closesocket(socketContext->socket);
}
free(socketContext);
}
DWORD AcceptConnection(_In_ SOCKET listenSocket, _Outptr_ SOCKET_CONTEXT** socketContextOut) {
DWORD errorCode;
SOCKET_CONTEXT* socketContext = NULL;
socketContext = (SOCKET_CONTEXT*)malloc(sizeof(*socketContext));
if (socketContext == NULL) {
errorCode = ERROR_NOT_ENOUGH_MEMORY;
goto Exit;
}
ZeroMemory(socketContext, sizeof(*socketContext));
socketContext->socketType = SOCKET_TYPE_ACCEPT;
socketContext->socket = accept(listenSocket, NULL, NULL);
if (socketContext->socket == INVALID_SOCKET) {
errorCode = GetLastError();
goto Exit;
}
*socketContextOut = socketContext;
socketContext = NULL;
Exit:
if (socketContext != NULL) {
_ASSERT(errorCode != ERROR_SUCCESS);
DestroySocketContext(socketContext);
}
return errorCode;
}
DWORD
WINAPI
ServerThreadRoutine(_In_ PVOID serverThreadContextPointer) {
DWORD errorCode;
SERVER_THREAD_CONTEXT* serverThreadContext;
HANDLE ioCompletionPort;
// Accepting a connection requires two registrations: one to re-enable the listening socket
// notification, and one to register the newly-accepted connection.
SOCK_NOTIFY_REGISTRATION registrationBuffer[SERVER_DEQUEUE_COUNT * 2];
UINT32 registrationCount;
SOCK_NOTIFY_REGISTRATION* registration;
OVERLAPPED_ENTRY notifications[SERVER_DEQUEUE_COUNT];
UINT32 notificationCount;
UINT32 events;
SOCKET_CONTEXT* socketContext;
SOCKET_CONTEXT* acceptedContext;
BOOLEAN shouldExit;
CHAR dataBuffer[512];
serverThreadContext = (SERVER_THREAD_CONTEXT*)serverThreadContextPointer;
ioCompletionPort = serverThreadContext->commonContext->ioCompletionPort;
// Boot-strap the loop process.
registrationCount = 0;
// Keep looping, processing notifications until exit has been requested.
while (TRUE) {
AcquireSRWLockExclusive(&serverThreadContext->stateLock);
shouldExit = serverThreadContext->shouldExit;
ReleaseSRWLockExclusive(&serverThreadContext->stateLock);
if (shouldExit) {
goto Exit;
}
AcquireSRWLockExclusive(&g_printLock);
wprintf_s(L"Server thread %d waiting for client action...\r\n", GetCurrentThreadId());
ReleaseSRWLockExclusive(&g_printLock);
// Process notifications and re-register one-shot notifications that were processed on a
// previous iteration.
errorCode = ProcessSocketNotifications(ioCompletionPort,
registrationCount,
(registrationCount == 0) ? NULL : registrationBuffer,
MAX_TIMEOUT,
RTL_NUMBER_OF(notifications),
notifications,
¬ificationCount);
// TODO: Production code should handle failure better. This can fail due to transient memory conditions, or due to
// invalid input such as a bad handle. Retrying in case the memory conditions abate is
// a reasonable strategy.
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
// Check whether any registrations failed, and attempt to clean up if they did.
errorCode = ERROR_SUCCESS;
for (UINT32 i = 0; i < registrationCount; i += 1) {
registration = ®istrationBuffer[i];
if (registration->registrationResult == ERROR_SUCCESS) {
continue;
}
// Preserve the first failure code.
if (errorCode == ERROR_SUCCESS) {
errorCode = registration->registrationResult;
}
// All the registrations are oneshot, so if the registration failed, then only this thread
// has access to the context. Attempt to clean up fully:
// - The listening socket is owned by the main thread, so ignore that.
// - If the socket hasn't been registered, just free its memory.
// - Otherwise, attempt to deregister it.
socketContext = (SOCKET_CONTEXT*)registration->completionKey;
if (socketContext->socketType == SOCKET_TYPE_LISTENER) {
continue;
}
// Best-effort de-registration. In case of failure, simply get rid of the socket and
// context. This is safe to do because the notification for the socket can't be enabled.
// Either it was never registered in the first place, or re-registration failed, and it
// was previously disabled by nature of being a one-shot registration.
registration->operation = SOCK_NOTIFY_OP_REMOVE;
errorCode = ProcessSocketNotifications(ioCompletionPort,
1,
registration,
0,
0,
NULL,
NULL);
if ((errorCode != ERROR_SUCCESS) ||
(registration->registrationResult != ERROR_SUCCESS)) {
DestroySocketContext(socketContext);
}
}
// Process the notifications. Many will need to be re-enabled because they are one-shot,
// so ensure that we can build that incrementally.
registrationCount = 0;
ZeroMemory(registrationBuffer, sizeof(registrationBuffer));
for (UINT32 i = 0; i < notificationCount; i += 1) {
if (notifications[i].lpCompletionKey == SERVER_EXIT_KEY) {
_ASSERT(serverThreadContext->shouldExit);
// On exit, this thread will post the next exit message.
errorCode = ERROR_SUCCESS;
goto Exit;
}
socketContext = (SOCKET_CONTEXT*)notifications[i].lpCompletionKey;
events = SocketNotificationRetrieveEvents(¬ifications[i]);
// Process the socket notification, taking socket-specific actions.
switch (socketContext->socketType) {
case SOCKET_TYPE_LISTENER:
// Accepting connections in response to notifications implicitly throttles
// the rate at which incoming connections are accepted, and limits scale-out for
// new connection acceptance. Consider using AcceptEx if greater scaling of
//connection acceptance is desired.
// Perform an accept regardless of the notification. The only possible notifications
// are for available connections or error conditions. Any possible error conditions
// will be processed as part of the accept.
errorCode = AcceptConnection(socketContext->socket, &acceptedContext);
if (errorCode == ERROR_SUCCESS) {
// Register the accepted connection.
registration = ®istrationBuffer[registrationCount];
registration->socket = acceptedContext->socket;
registration->completionKey = acceptedContext;
registration->eventFilter = SOCK_NOTIFY_EVENT_IN | SOCK_NOTIFY_EVENT_HANGUP;
registration->operation =
SOCK_NOTIFY_OP_ENABLE;
registration->triggerFlags = SOCK_NOTIFY_TRIGGER_ONESHOT | SOCK_NOTIFY_TRIGGER_LEVEL;
registrationCount += 1;
}
// Re-arm the existing listening socket registration.
registration = ®istrationBuffer[registrationCount];
registration->socket = socketContext->socket;
registration->completionKey = socketContext;
registration->eventFilter = SOCK_NOTIFY_EVENT_IN;
registration->operation =
SOCK_NOTIFY_OP_ENABLE;
registration->triggerFlags = SOCK_NOTIFY_TRIGGER_ONESHOT | SOCK_NOTIFY_TRIGGER_LEVEL;
registrationCount += 1;
break;
case SOCKET_TYPE_ACCEPT:
// The registration was removed. Clean up the context.
if (events & SOCK_NOTIFY_EVENT_REMOVE) {
AcquireSRWLockExclusive(&serverThreadContext->stateLock);
serverThreadContext->deregisterCount += 1;
if (serverThreadContext->deregisterCount >= CLIENT_THREAD_COUNT) {
serverThreadContext->shouldExit = TRUE;
}
ReleaseSRWLockExclusive(&serverThreadContext->stateLock);
DestroySocketContext(socketContext);
continue;
}
registration = ®istrationBuffer[registrationCount];
// If a hangup occurred, then remove the registration.
if (events & SOCK_NOTIFY_EVENT_HANGUP) {
registration->eventFilter = 0;
registration->operation = SOCK_NOTIFY_OP_REMOVE;
}
// Receive data.
if (events & (SOCK_NOTIFY_EVENT_IN | SOCK_NOTIFY_EVENT_ERR)) {
// TODO: Handle errors (for example, due to connection reset). The error from recv can
// be used to retrieve the underlying socket for a SOCK_NOTIFY_EVENT_ERR.
if (recv(socketContext->socket, dataBuffer, sizeof(dataBuffer), 0) < 0) {
registration->operation = SOCK_NOTIFY_OP_REMOVE;
registration->eventFilter = 0;
}
else {
registration->operation |=
SOCK_NOTIFY_OP_ENABLE;
registration->triggerFlags =
SOCK_NOTIFY_TRIGGER_ONESHOT | SOCK_NOTIFY_TRIGGER_LEVEL;
registration->eventFilter = SOCK_NOTIFY_EVENT_IN | SOCK_NOTIFY_EVENT_HANGUP;
}
}
registration->socket = socketContext->socket;
registration->completionKey = socketContext;
registrationCount += 1;
break;
// TODO:
//
// Other (potentially non-socket) I/O completion can be processed here. For instance,
// this could also be processing disk I/O. The contexts will need to have a common
// header that can be used to differentiate between the different context types,
// similar to how the listening and accepted sockets are differentiated.
//
// case ... :
default:
_ASSERT(!"Unexpected socket type!");
errorCode = ERROR_UNIDENTIFIED_ERROR;
goto Exit;
}
}
}
errorCode = ERROR_SUCCESS;
Exit:
// If an error occurred, then ensure the other threads know they should exit.
// TODO: use an error handling strategy that isn't just exiting.
if (errorCode != ERROR_SUCCESS) {
PrintError(errorCode);
CancelServerThreadsAsync(serverThreadContext);
}
// Wake a remaining server thread.
IndicateServerThreadExit(ioCompletionPort);
AcquireSRWLockExclusive(&g_printLock);
wprintf_s(L"Server thread %d exited\r\n", GetCurrentThreadId());
ReleaseSRWLockExclusive(&g_printLock);
return errorCode;
}
VOID MultiThreadedTcpServer() {
DWORD errorCode;
WSADATA wsaData;
SERVER_THREAD_CONTEXT serverContext = { NULL, SRWLOCK_INIT, 0, FALSE };
SOCKET_CONTEXT listenContext = {};
SOCK_NOTIFY_REGISTRATION registration = {};
HANDLE serverThreads[SERVER_THREAD_COUNT] = {};
UINT32 serverThreadCount = 0;
if (WSAStartup(WINSOCK_VERSION, &wsaData) != 0) {
errorCode = GetLastError();
PrintError(errorCode);
return;
}
listenContext.socket = INVALID_SOCKET;
listenContext.socketType = SOCKET_TYPE_LISTENER;
errorCode = CreateServerContext(&serverContext.commonContext);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
// Register the listening socket with the I/O completion port so the server threads are notified
// of incoming connections.
listenContext.socket = serverContext.commonContext->listenerSocket;
registration.completionKey = &listenContext;
registration.eventFilter = SOCK_NOTIFY_EVENT_IN;
registration.operation = SOCK_NOTIFY_OP_ENABLE;
registration.triggerFlags = SOCK_NOTIFY_TRIGGER_LEVEL | SOCK_NOTIFY_TRIGGER_PERSISTENT;
registration.socket = listenContext.socket;
errorCode = ProcessSocketNotifications(serverContext.commonContext->ioCompletionPort,
1,
®istration,
0,
0,
NULL,
NULL);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
// Create the server threads. These are likely over-subscribed, but the I/O completion port
// ensures that they scale appropriately.
while (serverThreadCount < RTL_NUMBER_OF(serverThreads)) {
serverThreads[serverThreadCount] =
CreateThread(NULL, 0, ServerThreadRoutine, &serverContext, 0, NULL);
if (serverThreads[serverThreadCount] == NULL) {
errorCode = GetLastError();
goto Exit;
}
}
// Create the client threads, which are badly over-subscribed.
for (UINT32 i = 0; i < CLIENT_THREAD_COUNT; i += 1) {
errorCode = CreateClientThread(CLIENT_LOOP_COUNT);
if (errorCode != ERROR_SUCCESS) {
goto Exit;
}
}
errorCode = ERROR_SUCCESS;
Exit:
if (errorCode != ERROR_SUCCESS) {
PrintError(errorCode);
// In case of error, ensure that all server threads know to exit.
if (serverContext.commonContext != NULL) {
CancelServerThreadsAsync(&serverContext);
IndicateServerThreadExit(serverContext.commonContext->ioCompletionPort);
}
}
if (serverThreadCount > 0) {
wprintf_s(L"Waiting for %d server threads to exit...\r\n", serverThreadCount);
errorCode = WaitForMultipleObjects(serverThreadCount, serverThreads, TRUE, INFINITE);
_ASSERT(errorCode == ERROR_SUCCESS);
}
// TODO: In case of failure, clean up remaining state. For example, Accepted connections can be kept in
// a global list, which can be closed from this thread.
for (UINT32 i = 0; i < serverThreadCount; i += 1) {
CloseHandle(serverThreads[i]);
}
DestroyServerContext(serverContext.commonContext);
WSACleanup();
}