Пролог
Недавно я столкнулся с необходимостью эффективной работы с сокетами в Windows приложении. Задача типичная для нагруженного сервера. Нетипичным тут будет казаться только язык реализации — Delphi.
Я хочу описать способ массовой асинхронной работы с большим количеством сокетов с использованием I/O Completion Ports. Microsoft в своей литературе рекомендует использовать именно эту технологию. Я думаю, многие с ней знакомы, но на всякий случай укажу ссылку на MSDN. Суть технологии в том, что система организует высокоэффективную очередь событий, а программа обрабатывает её из тред-пула, размер которого подобран по количеству вычислительных ядер. Данный подход имеет преимущества при большом количестве одновременно производимых асинхронных операций ввода вывода для разных конечных точек. Готовый исходник можно (лучше) сразу гляуть здесь. Не всё идеально, но для эксперементов сойдёт.
Roadmap
Я, в некотором смысле, буду придерживаться идеологии Node.Js во всём, что касается организации объектов и операций ввода вывода.
В случае с серверной частью понадобиться реализовать следующее:
- Прослушивание сокета. Принятием или отклонением новых соединений.
- Отслеживание сигнала закрытия клиентских сокетов.
Для клиента первый пункт этого списка не актуален, но необходимо реализовать асинхронное подключение к серверу. В обоих классах будет возможность одновременного и чтения и записи на одну конечную точку.
Все созданные экземпляры клиентских и серверных сокетов будут использовать одну общую очередь сообщений и один тред-пул. Это нужно для возможности использовать оба типа сокетов в одном приложении оптимальным образом.
Реализация
Приступим. Для начала отмечу, что в связи с абсолютно асинхронной событийной моделью построения я буду реализовывать не классы а интерфейсы. Это очень удобно в данном случае, так как с конечного программиста снимается ответственность за выделенную память. Да и вообще, отследить тут её использование другим способом либо очень затратно либо вовсе невозможно. Очень много работы должно происходить при инициализации модуля.
- Создание списков сокетов разных типов.
- Инициализация подсистемы сокетов.
- Создание очереди сообщений.
- Создание пула для обработки очереди.
- Создание событий для сокетов.
- Создание потоков отслеживающих сокетные события( например подключение нового клиента).
И так, секция инициализации содержит следующую процедуру, которая реализует список пункт за пунктом.
procedure Init;
var
WSAData: TWsaData;
i: Integer;
begin
gClients := TProtoStore.Create;
gListeners := TProtoStore.Create;
gServerClients := TProtoStore.Create;
if WSAStartup(MAKEWORD(2, 2), WSAData) <> 0 then
raise IOCPClientException.Create(sErrorInit_WSAtartup);
gIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, CPUCount * 2);
if gIOCP = INVALID_HANDLE_VALUE then
raise IOCPClientException.Create(sErrorInit_CreateIoCompletionPort);
for i := 1 to CPUCount * 2 do
begin
SetLength(gWorkers, Length(gWorkers) + 1);
gWorkers[Length(gWorkers) - 1] := TWorkerThread.Create();
end;
gListenerAcceptEvent := WSACreateEvent;
if gListenerAcceptEvent = WSA_INVALID_EVENT then
raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
gServerClientsCloseEvent := WSACreateEvent;
if gServerClientsCloseEvent = WSA_INVALID_EVENT then
raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
gClisentsConnectAndCloseEvents := WSACreateEvent;
if gClisentsConnectAndCloseEvents = WSA_INVALID_EVENT then
raise IOCPClientException.Create(sErrorInit_WSACreateEvent);
gClientSocketEventThread := TSocketEventThread.Create
(gClisentsConnectAndCloseEvents, gClients, ET_EVENT_SIGNALED);
gClientSocketEventThread.Start;
gServerClientsSocketEventThread := TSocketEventThread.Create
(gServerClientsCloseEvent, gServerClients, ET_EVENT_SIGNALED);
gServerClientsSocketEventThread.Start;
gServerSocketEventThread := TSocketEventThread.Create(gListenerAcceptEvent,
gListeners, ET_EVENT_SIGNALED);
gServerSocketEventThread.Start;
end;
Функция CreateIoCompletionPort в данном случае выполняет создание специальной очереди сообщений.
Видно, что для отслеживания событий на сокетах с разным назначением используется один и тот же класс потока TSocketEventThread. Потоки этого класса выполняют процедуру, которая ожидает сокетные события, и сразу же ставят в очередь сообщения (для каждого сокета относящегося к типу, который облуживает этот поток) о том что произошло какое-то событие.
procedure TSocketEventThread.WaitForClientsEvents;
var
WaitResult: DWORD;
const
TimeOut: DWORD = 100;
begin
WaitResult := WSAWaitForMultipleEvents(1, <hh user=fEvent>, FALSE, TimeOut, FALSE);
if WaitResult = WSA_WAIT_FAILED then
raise IOCPClientException.Create
(sErrorWaitForClientsEvents_WSAWaitForMultipleEvents);
if WaitResult = WSA_WAIT_EVENT_0 then
begin
if not WSAResetEvent(fEvent) then
raise IOCPClientException.Create
(sErrorWaitForClientsEvents_WSAResetEvent);
fStore.Post(fKey);
end;
end;
Тут метод fStore.Post(fKey); как раз и выполняет отправку сообщений в очередь.
procedure TProtoStore.Post(CompletionKey: DWORD);
var
i: Integer;
begin
fLock.Enter;
try
for i := 0 to Length(ProtoArray) - 1 do
begin
ProtoArray[i]._AddRef;
if not PostQueuedCompletionStatus(gIOCP, 0, CompletionKey,
POverlapped(ProtoArray[i])) then
begin
ProtoArray[i]._Release;
raise IOCPClientException.Create(sErrorPost_PostQueuedCompletionStatus);
end;
end;
finally
fLock.Leave;
end;
end;
Особое внимание заслуживает тут использование объектов с интерфейсами.
Метод _AddRef используется для того, чтобы обозначить тот факт, что объект «находится в очереди» и его не следует уничтожать. (Позже после обработки будет вызван _Release). Процедура PostQueuedCompletionStatus непосредственно выполняет постановку сообщения в очередь.
Пул обработает каждое сообщение в асинхронном режиме.
Для этого он выполняет следующую процедуру.
procedure TWorkerThread.ProcessIOCP;
var
NumberOfBytes: DWORD;
CompletionKey: NativeUInt;
Overlapped: POverlapped;
Proto: TIOCPSocketProto;
begin
if not((not GetQueuedCompletionStatus(gIOCP, NumberOfBytes, CompletionKey,
Overlapped, INFINITE)) and (Overlapped = nil)) then
begin
if CompletionKey = ET_EVENT_SIGNALED then
begin
Proto := TIOCPSocketProto(Overlapped);
with Proto do
begin
IOCPProcessEventsProc();
_Release;
end
end
else if CompletionKey <> 0 then
begin
Proto := TIOCPSocketProto(CompletionKey);
if Proto.IOCPProcessIOProc(NumberOfBytes, Overlapped) then
Proto._Release;
end;
end
end;
Процедура GetQueuedCompletionStatus служит для получения сообщения из очереди. Далее определяется является ли это сообщение сообщением о завершенном вводе/выводе или это сообщение о произошедшем событии. Тут продемонстрированы два способа передать через очередь какую-то информацию, в данном случае это ссылка на конкретный экземпляр класса сокетов.
Обработка ведётся унифицировано для всех типов сокетов, это достигнуто с помощью наследования от общего предка который содержит общие обработчики, допускается их переопределение.
Рассмотрим механизм обработки сокетных событий.
procedure TIOCPSocketProto.IOCPProcessEventsProc();
var
WSAEvents: TWsaNetworkEvents;
AcceptedSocket: TSocket;
RemoteAddress: string;
begin
if fStateLock <> CLI_SOCKET_LOCK_CLOSED then
begin
fClosingLock.BeginRead;
try
if (fStateLock <> CLI_SOCKET_LOCK_CLOSED) then
if WSAEnumNetworkEvents(fSocket, 0, WSAEvents) <> SOCKET_ERROR then
begin
if ((WSAEvents.lNetworkEvents and FD_CONNECT) <> 0) then
begin
if 0 = WSAEvents.iErrorCode[FD_CONNECT_BIT] then
InterlockedExchange(fStateLock, CLI_SOCKET_LOCK_CONNECTED);
CallOnConnect;
end;
if ((WSAEvents.lNetworkEvents and FD_CLOSE) <> 0) and
(0 = WSAEvents.iErrorCode[FD_CLOSE_BIT]) then
CallOnClose;
if ((WSAEvents.lNetworkEvents and FD_ACCEPT) <> 0) and
(0 = WSAEvents.iErrorCode[FD_ACCEPT_BIT]) then
begin
AcceptedSocket := DoAccept(RemoteAddress);
if AcceptedSocket <> INVALID_SOCKET then
begin
fClientClass.Create(AcceptedSocket, fOnConnect, fOnClose,
RemoteAddress).Prepare;
end;
end;
end
finally
fClosingLock.EndRead;
end;
end;
end;
Здесь интересно применён класс TMultiReadExclusiveWriteSynchronizer. Он используется для предотвращения попытки закрыть сокет и уничтожить объект из другой нити пула (fClosingLock.BeginRead). Все операции с сокетом проходят как операции чтения для этого объекта синхронизации, кроме операции создания и операции закрытия сокета — они являются операциями записи и потому могут выполняться только при монопольном владении ресурсом.
Во всём же остальном работа с сокетами в данной процедуре совершенно обыкновенная.
Единственное что в этой процедуре стоит рассмотреть дополнительно — это подключение нового клиента к серверу, метод DoAccept.
function TIOCPSocketProto.DoAccept(var RemoteAddress: string): TSocket;
var
addr: TSockAddr;
addrlen: Integer;
dwCallbackData: NativeUInt;
RemoteAddrLen: DWORD;
begin
dwCallbackData := NativeUInt(self);
addrlen := SizeOf(addr);
Result := WSAAccept(fSocket, <hh user=addr>, <hh user=addrlen>, ServerAcceptCallBack,
dwCallbackData);
if Result <> INVALID_SOCKET then
begin
SetLength(RemoteAddress, 255);
RemoteAddrLen := Length(RemoteAddress);
if WSAAddressToString(addr, addrlen, nil, PChar(<hh user=RemoteAddress>[1]),
RemoteAddrLen) = SOCKET_ERROR then
raise IOCPClientException.Create(sErrorAccept_WSAAddressToString);
SetLength(RemoteAddress, RemoteAddrLen - 1)
end
end;
Здесь ключевым моментом является использование WSAAccept. Эта функция позволяет отклонять подключение клиентов таким образом, что клиент на самом деле получает событие FD_CONNECT.
Это предочтительный путь для организации так называемых чёрных списков.
Идём далее. Расмотрим организацию ввода вывода. Сделаем это на примере операции чтения.
procedure TIOCPSocketProto.Read(Length: DWORD;
OnRead, OnReadProcess: TOnReadEvent);
var
Bytes, Flags: DWORD;
WsaBuf: TWsaBuf;
begin
fClosingLock.BeginRead;
try
if fStateLock = CLI_SOCKET_LOCK_CONNECTED then
begin
if InterlockedCompareExchange(fReadLock, IO_PROCESS, IO_IDLE) = IO_IDLE
then
begin
fOnRead := OnRead;
fOnReadProcess := OnReadProcess;
fReaded := 0;
fReadBufLength := Length;
fReadBuffer := nil;
GetMem(fReadBuffer, Length);
if fReadBuffer <> nil then
begin
Bytes := 0;
FillChar(fOverlappedRead, SizeOf(fOverlappedRead), 0);
WsaBuf.buf := fReadBuffer;
WsaBuf.len := fReadBufLength;
Flags := 0;
Bytes := 0;
_AddRef;
if (WSARecv(fSocket, <hh user=WsaBuf>, 1, Bytes, Flags, <hh user=fOverlappedRead>, nil)
= SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
begin
FreeMem(fReadBuffer, Length);
InterlockedExchange(fReadLock, IO_IDLE);
_Release;
raise IOCPClientException.Create(sErrorRead_WSARecv);
end;
end
else
raise IOCPClientException.Create(sErrorRead_GetMem);
end
else
raise IOCPClientException.Create(sErrorRead_InProcess);
end
else
raise IOCPClientException.Create(sErrorRead_NotConnected);
finally
fClosingLock.EndRead;
end;
end;
Здесь пришлось использовать интерлокед блокировку, т.к. она очень быстрая и удовлетворяет потребность в отсечении попытки повторного вызова опрации ввода/вывода. Память выделяется под буфер единажды в каждой операции. Далее вызывается чтение из сокета в асинхронном режиме. Объект также «помечается» с помощью AddRef, для невозможности его удаления во время нахождения в очереди. По завершении вычитывания пакета сообщения об этом автоматически выставляется в очередь.
Рассмотрим, что происходит при выборке из очереди сообщения о завершенном вводе/выводе.
function TIOCPSocketProto.IOCPProcessIOProc(NumberOfBytes: DWORD;
Overlapped: POverlapped): Boolean;
var
Bytes, Flags: DWORD;
WsaBuf: TWsaBuf;
begin
Result := FALSE;
fClosingLock.BeginRead;
try
if Overlapped = <hh user=fOverlappedRead> then
begin
if NumberOfBytes <> 0 then
begin
if fReadLock = IO_PROCESS then
begin
inc(fReaded, NumberOfBytes);
if fReaded < fReadBufLength then
begin
CallOnReadProcess;
WsaBuf.buf := fReadBuffer;
inc(WsaBuf.buf, fReaded);
WsaBuf.len := fReadBufLength;
dec(WsaBuf.len, fReaded);
Flags := 0;
Bytes := 0;
if (WSARecv(fSocket, <hh user=WsaBuf>, 1, Bytes, Flags, <hh user=fOverlappedRead>,
nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
begin
CallOnRead;
Result := True;
end
end
else
begin
CallOnReadProcess;
CallOnRead;
Result := True;
end;
end
end
else
begin
CallOnRead;
Result := True;
end;
end
else if Overlapped = <hh user=fOverlappedWrite> then
begin
if NumberOfBytes <> 0 then
begin
if fWriteLock = IO_PROCESS then
begin
inc(fWrited, NumberOfBytes);
if fWrited < fWriteBufLength then
begin
CallOnWriteProcess;
WsaBuf.buf := fWriteBuffer;
inc(WsaBuf.buf, fWrited);
WsaBuf.len := fWriteBufLength;
dec(WsaBuf.len, fWrited);
Flags := 0;
Bytes := 0;
if (WSASend(fSocket, <hh user=WsaBuf>, 1, Bytes, Flags, <hh user=fOverlappedWrite>,
nil) = SOCKET_ERROR) and (WSAGetLastError <> WSA_IO_PENDING) then
begin
CallOnWrite;
Result := True;
end
end
else
begin
CallOnWriteProcess;
CallOnWrite;
Result := True;
end;
end
end
else
begin
CallOnWrite;
Result := True;
end;
end
finally
fClosingLock.EndRead;
end;
end;
Суть этой процедуры в том, что она вызывает чтение или запись в сокет до того момента, когда выделенный буфер не окажется заполненным. Интересный момент в данном случае, это определение типа операции по ссылке на оверлапед структуру. Эту ссылку предоставляет очередь и необходимо лишь сравнить её с соответсвующими полями класса, в которых храняться структуры для чтения и записи.
Так же примечательно, то что если операция чтения/записи выполнилась мгновенно, то она всё равно попадает в очередь, однако это можно настроить через апи.
Стоит так же рассмотреть создание класса сокета и внедрение в очередь.
constructor TIOCPClientSocket.Create(RemoteAddress: string;
OnConnect, OnClose: TOnSimpleSocketEvenet);
var
lRemoteAddress: TSockAddr;
lRemoteAddressLength: Integer;
begin
inherited Create();
fStore := gClients;
fOnConnect := OnConnect;
fOnClose := OnClose;
fStateLock := 0;
fRemoteAddressStr := RemoteAddress;
fSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
if fSocket = INVALID_SOCKET then
raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSASocket);
if (WSAEventSelect(fSocket, gClisentsConnectAndCloseEvents,
FD_CONNECT or FD_CLOSE) = SOCKET_ERROR) then
raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAEventSelect);
if CreateIoCompletionPort(fSocket, gIOCP, NativeUInt(self), 0) = 0 then
raise IOCPClientException.Create
(sErrorTIOCPClientSocket_CreateIoCompletionPort);
fStateLock := CLI_SOCKET_LOCK_CREATED;
fStore.Add(self);
lRemoteAddressLength := SizeOf(lRemoteAddress);
lRemoteAddress.sa_family := AF_INET;
if WSAStringToAddress(PChar(<hh user=fRemoteAddressStr>[1]), AF_INET, nil,
lRemoteAddress, lRemoteAddressLength) = SOCKET_ERROR then
raise IOCPClientException.Create
(sErrorTIOCPClientSocket_WSAStringToAddress);
if (WSAConnect(fSocket, lRemoteAddress, lRemoteAddressLength, nil, nil, nil,
nil) = SOCKET_ERROR) and (WSAGetLastError <> WSAEWOULDBLOCK) then
raise IOCPClientException.Create(sErrorTIOCPClientSocket_WSAConnect);
end;
В конструкторе клиентского сокета создаётся непосрественно сокет (WSASocket), регестрируется в очереди (CreateIoCompletionPort), асоциируется с событием и вызывает асинхронную функцию подключения(WSAConnect). Сам факт подключения ожидается в потоке который был рассмотрен первым(поток ожидания событий в сокетах). Тот в свою очередь поставит это событие в очередь.
Эпилог
В данной статье кратко рассмотрены, на мой взгляд, удачные приёмы создания классов для событийного программирования.
Удалось создать класс для выскопроизводительной работы с сокетами для Delphi. Тема эта освещена в целом крайне слабо и я планирую продолжить эту публикацию ещё 2 — 3 постами по темам контекстов сокетов при использовании интерфейсов и создание защищённых соединений при использовании IOCP (криптопровайдеры и Winsock Secure Socket Extensions). Полный код примера здесь.
Автор: Mr_Developer