Win wsa select event (#342)

* Fix #323: Missing SelectInterrupt implementation for Windows

Using WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents to emulate poll() with an interrupt-event.

* Cleanup

* Fixed incomplete comment.

* Switched ifdefs to support other Unixes with pipe file descriptors

* Fixed: SelectInterrupt fallback code for getFd()==-1 && getEvent()==nullptr converted a PollResultType::Timeout into a ReadyForRead causing the HttpClient to fail because it uses a hard-coded "SelectInterrupt" instance that doesn't implement getFd() and getEvent().

* Fixed gcc compile errors

* - HttpClient now uses the SelectInterruptFactory
- Fixed wrong ix::poll result when using Windows WSA functions

* We must deselect the networkevents from the socket event. Otherwise the socket will report states that aren't there.
This commit is contained in:
Andreas Hausladen 2022-01-05 19:21:33 +01:00 committed by GitHub
parent 9f00428d57
commit 1f2895a469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 400 additions and 70 deletions

View File

@ -41,6 +41,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptEvent.cpp
ixwebsocket/IXSetThreadName.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketConnect.cpp
@ -80,6 +81,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptEvent.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h

View File

@ -7,6 +7,9 @@
#include "IXNetSystem.h"
#include <cstdint>
#include <cstdio>
#ifdef _WIN32
#include <vector>
#endif
namespace ix
{
@ -37,6 +40,51 @@ namespace ix
#endif
}
#ifdef _WIN32
struct WSAEvent
{
public:
WSAEvent(struct pollfd* fd)
: _fd(fd)
{
_event = WSACreateEvent();
}
WSAEvent(WSAEvent&& source) noexcept
{
_event = source._event;
source._event = WSA_INVALID_EVENT; // invalidate the event in the source
_fd = source._fd;
}
~WSAEvent()
{
if (_event != WSA_INVALID_EVENT)
{
// We must deselect the networkevents from the socket event. Otherwise the
// socket will report states that aren't there.
if (_fd != nullptr && _fd->fd != -1)
WSAEventSelect(_fd->fd, _event, 0);
WSACloseEvent(_event);
}
}
operator HANDLE()
{
return _event;
}
operator struct pollfd*()
{
return _fd;
}
private:
HANDLE _event;
struct pollfd* _fd;
};
#endif
//
// That function could 'return WSAPoll(pfd, nfds, timeout);'
// but WSAPoll is said to have weird behaviors on the internet
@ -44,9 +92,118 @@ namespace ix
//
// So we make it a select wrapper
//
int poll(struct pollfd* fds, nfds_t nfds, int timeout)
// UPDATE: WSAPoll was fixed in Windows 10 Version 2004
//
// The optional "event" is set to nullptr if it wasn't signaled.
int poll(struct pollfd* fds, nfds_t nfds, int timeout, void** event)
{
#ifdef _WIN32
if (event && *event)
{
HANDLE interruptEvent = reinterpret_cast<HANDLE>(*event);
*event = nullptr; // the event wasn't signaled yet
if (nfds < 0 || nfds >= MAXIMUM_WAIT_OBJECTS - 1)
{
WSASetLastError(WSAEINVAL);
return SOCKET_ERROR;
}
std::vector<WSAEvent> socketEvents;
std::vector<HANDLE> handles;
// put the interrupt event as first element, making it highest priority
handles.push_back(interruptEvent);
// create the WSAEvents for the sockets
for (nfds_t i = 0; i < nfds; ++i)
{
struct pollfd* fd = &fds[i];
fd->revents = 0;
if (fd->fd >= 0)
{
// create WSAEvent and add it to the vectors
socketEvents.push_back(std::move(WSAEvent(fd)));
HANDLE handle = socketEvents.back();
if (handle == WSA_INVALID_EVENT)
{
WSASetLastError(WSAENOBUFS);
return SOCKET_ERROR;
}
handles.push_back(handle);
// mapping
long networkEvents = 0;
if (fd->events & (POLLIN )) networkEvents |= FD_READ | FD_ACCEPT;
if (fd->events & (POLLOUT /*| POLLWRNORM | POLLWRBAND*/)) networkEvents |= FD_WRITE | FD_CONNECT;
//if (fd->events & (POLLPRI | POLLRDBAND )) networkEvents |= FD_OOB;
if (WSAEventSelect(fd->fd, handle, networkEvents) != 0)
{
fd->revents = POLLNVAL;
socketEvents.pop_back();
handles.pop_back();
}
}
}
DWORD n = WSAWaitForMultipleEvents(handles.size(), handles.data(), FALSE, timeout != -1 ? static_cast<DWORD>(timeout) : WSA_INFINITE, FALSE);
if (n == WSA_WAIT_FAILED) return SOCKET_ERROR;
if (n == WSA_WAIT_TIMEOUT) return 0;
if (n == WSA_WAIT_EVENT_0)
{
// the interrupt event was signaled
*event = reinterpret_cast<void*>(interruptEvent);
return 1;
}
int handleIndex = n - WSA_WAIT_EVENT_0;
int socketIndex = handleIndex - 1;
WSANETWORKEVENTS netEvents;
int count = 0;
// WSAWaitForMultipleEvents returns the index of the first signaled event. And to emulate WSAPoll()
// all the signaled events must be processed.
while (socketIndex < socketEvents.size())
{
struct pollfd* fd = socketEvents[socketIndex];
memset(&netEvents, 0, sizeof(netEvents));
if (WSAEnumNetworkEvents(fd->fd, socketEvents[socketIndex], &netEvents) != 0)
{
fd->revents = POLLERR;
}
else if (netEvents.lNetworkEvents != 0)
{
// mapping
if (netEvents.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_OOB)) fd->revents |= POLLIN;
if (netEvents.lNetworkEvents & (FD_WRITE | FD_CONNECT )) fd->revents |= POLLOUT;
for (int i = 0; i < FD_MAX_EVENTS; ++i)
{
if (netEvents.iErrorCode[i] != 0)
{
fd->revents |= POLLERR;
break;
}
}
if (fd->revents != 0)
{
// only signaled sockets count
count++;
}
}
socketIndex++;
}
return count;
}
else
{
if (event && *event) *event = nullptr;
socket_t maxfd = 0;
fd_set readfds, writefds, errorfds;
FD_ZERO(&readfds);
@ -104,9 +261,11 @@ namespace ix
fd->revents |= POLLERR;
}
}
return ret;
}
#else
if (event && *event) *event = nullptr;
//
// It was reported that on Android poll can fail and return -1 with
// errno == EINTR, which should be a temp error and should typically

View File

@ -78,7 +78,7 @@ namespace ix
bool initNetSystem();
bool uninitNetSystem();
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
int poll(struct pollfd* fds, nfds_t nfds, int timeout, void** event);
const char* inet_ntop(int af, const void* src, char* dst, socklen_t size);
int inet_pton(int af, const char* src, void* dst);

View File

@ -45,4 +45,9 @@ namespace ix
{
return -1;
}
void* SelectInterrupt::getEvent() const
{
return nullptr;
}
} // namespace ix

View File

@ -24,6 +24,7 @@ namespace ix
virtual bool clear();
virtual uint64_t read();
virtual int getFd() const;
virtual void* getEvent() const;
// Used as special codes for pipe communication
static const uint64_t kSendRequest;

View File

@ -0,0 +1,84 @@
/*
* IXSelectInterruptEvent.cpp
*/
//
// On Windows we use a Windows Event to wake up ix::poll() (WSAWaitForMultipleEvents).
// And on any other platform that doesn't support pipe file descriptors we
// emulate the interrupt event by using a short timeout with ix::poll() and
// read from the SelectInterrupt. (see Socket::poll() "Emulation mode")
//
#include "IXSelectInterruptEvent.h"
namespace ix
{
SelectInterruptEvent::SelectInterruptEvent()
{
#ifdef _WIN32
_event = CreateEvent(NULL, TRUE, FALSE, NULL);
#endif
}
SelectInterruptEvent::~SelectInterruptEvent()
{
#ifdef _WIN32
CloseHandle(_event);
#endif
}
bool SelectInterruptEvent::init(std::string& /*errorMsg*/)
{
return true;
}
bool SelectInterruptEvent::notify(uint64_t value)
{
std::lock_guard<std::mutex> lock(_valuesMutex);
// WebSocket implementation detail: We only need one of the values in the queue
if (std::find(_values.begin(), _values.end(), value) == _values.end())
_values.push_back(value);
#ifdef _WIN32
SetEvent(_event); // wake up
#endif
return true;
}
uint64_t SelectInterruptEvent::read()
{
std::lock_guard<std::mutex> lock(_valuesMutex);
if (_values.size() > 0)
{
uint64_t value = _values.front();
_values.pop_front();
#ifdef _WIN32
// signal the event if there is still data in the queue
if (_values.size() == 0)
ResetEvent(_event);
#endif
return value;
}
return 0;
}
bool SelectInterruptEvent::clear()
{
std::lock_guard<std::mutex> lock(_valuesMutex);
_values.clear();
#ifdef _WIN32
ResetEvent(_event);
#endif
return true;
}
void* SelectInterruptEvent::getEvent() const
{
#ifdef _WIN32
return reinterpret_cast<void*>(_event);
#else
return nullptr;
#endif
}
} // namespace ix

View File

@ -0,0 +1,39 @@
/*
* IXSelectInterruptEvent.h
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <mutex>
#include <stdint.h>
#include <string>
#include <deque>
#ifdef _WIN32
#include <windows.h>
#endif
namespace ix
{
class SelectInterruptEvent final : public SelectInterrupt
{
public:
SelectInterruptEvent();
virtual ~SelectInterruptEvent();
bool init(std::string& /*errorMsg*/) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
void* getEvent() const final;
private:
// contains every value only once, new values are inserted at the begin, nu
std::deque<uint64_t> _values;
std::mutex _valuesMutex;
#ifdef _WIN32
// Windows Event to wake up the socket poll
HANDLE _event;
#endif
};
} // namespace ix

View File

@ -7,20 +7,20 @@
#include "IXSelectInterruptFactory.h"
#include "IXUniquePtr.h"
#if defined(__linux__) || defined(__APPLE__)
#include "IXSelectInterruptPipe.h"
#if _WIN32
#include "IXSelectInterruptEvent.h"
#else
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptPipe.h"
#endif
namespace ix
{
SelectInterruptPtr createSelectInterrupt()
{
#if defined(__linux__) || defined(__APPLE__)
return ix::make_unique<SelectInterruptPipe>();
#ifdef _WIN32
return ix::make_unique<SelectInterruptEvent>();
#else
return ix::make_unique<SelectInterrupt>();
return ix::make_unique<SelectInterruptPipe>();
#endif
}
} // namespace ix

View File

@ -47,6 +47,8 @@ namespace ix
int sockfd,
const SelectInterruptPtr& selectInterrupt)
{
PollResultType pollResult = PollResultType::ReadyForRead;
//
// We used to use ::select to poll but on Android 9 we get large fds out of
// ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching
@ -68,9 +70,11 @@ namespace ix
// File descriptor used to interrupt select when needed
int interruptFd = -1;
void* interruptEvent = nullptr;
if (selectInterrupt)
{
interruptFd = selectInterrupt->getFd();
interruptEvent = selectInterrupt->getEvent();
if (interruptFd != -1)
{
@ -78,11 +82,21 @@ namespace ix
fds[1].fd = interruptFd;
fds[1].events = POLLIN;
}
else if (interruptEvent == nullptr)
{
// Emulation mode: SelectInterrupt neither supports file descriptors nor events
// Check the selectInterrupt for requests before doing the poll().
if (readSelectInterruptRequest(selectInterrupt, &pollResult))
{
return pollResult;
}
}
}
int ret = ix::poll(fds, nfds, timeoutMs);
void* event = interruptEvent; // ix::poll will set event to nullptr if it wasn't signaled
int ret = ix::poll(fds, nfds, timeoutMs, &event);
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType::Error;
@ -90,19 +104,18 @@ namespace ix
else if (ret == 0)
{
pollResult = PollResultType::Timeout;
}
else if (interruptFd != -1 && fds[1].revents & POLLIN)
if (selectInterrupt && interruptFd == -1 && interruptEvent == nullptr)
{
uint64_t value = selectInterrupt->read();
// Emulation mode: SelectInterrupt neither supports fd nor events
if (value == SelectInterrupt::kSendRequest)
{
pollResult = PollResultType::SendRequest;
// Check the selectInterrupt for requests
readSelectInterruptRequest(selectInterrupt, &pollResult);
}
else if (value == SelectInterrupt::kCloseRequest)
{
pollResult = PollResultType::CloseRequest;
}
else if ((interruptFd != -1 && fds[1].revents & POLLIN) || (interruptEvent != nullptr && event != nullptr))
{
// The InterruptEvent was signaled
readSelectInterruptRequest(selectInterrupt, &pollResult);
}
else if (sockfd != -1 && readyToRead && fds[0].revents & POLLIN)
{
@ -143,6 +156,25 @@ namespace ix
return pollResult;
}
bool Socket::readSelectInterruptRequest(const SelectInterruptPtr& selectInterrupt,
PollResultType* pollResult)
{
uint64_t value = selectInterrupt->read();
if (value == SelectInterrupt::kSendRequest)
{
*pollResult = PollResultType::SendRequest;
return true;
}
else if (value == SelectInterrupt::kCloseRequest)
{
*pollResult = PollResultType::CloseRequest;
return true;
}
return false;
}
PollResultType Socket::isReadyToRead(int timeoutMs)
{
if (_sockfd == -1)
@ -171,6 +203,11 @@ namespace ix
return _selectInterrupt->notify(wakeUpCode);
}
bool Socket::isWakeUpFromPollSupported()
{
return _selectInterrupt->getFd() != -1 || _selectInterrupt->getEvent() != nullptr;
}
bool Socket::accept(std::string& errMsg)
{
if (_sockfd == -1)

View File

@ -43,6 +43,7 @@ namespace ix
// Functions to check whether there is activity on the socket
PollResultType poll(int timeoutMs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint64_t wakeUpCode);
bool isWakeUpFromPollSupported();
PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToRead(int timeoutMs);
@ -83,6 +84,9 @@ namespace ix
std::atomic<int> _sockfd;
std::mutex _socketMutex;
static bool readSelectInterruptRequest(const SelectInterruptPtr& selectInterrupt,
PollResultType* pollResult);
private:
static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout;

View File

@ -20,6 +20,7 @@
#include <linux/in.h>
#include <linux/tcp.h>
#endif
#include <ixwebsocket/IXSelectInterruptFactory.h>
namespace ix
{
@ -66,7 +67,7 @@ namespace ix
int timeoutMs = 10;
bool readyToRead = false;
auto selectInterrupt = ix::make_unique<SelectInterrupt>();
SelectInterruptPtr selectInterrupt = ix::createSelectInterrupt();
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt);
if (pollResult == PollResultType::Timeout)

View File

@ -297,13 +297,11 @@ namespace ix
lastingTimeoutDelayInMs = (1000 * _pingIntervalSecs) - timeSinceLastPingMs;
}
#ifdef _WIN32
// Windows does not have select interrupt capabilities, so wait with a small timeout
if (lastingTimeoutDelayInMs <= 0)
// The platform may not have select interrupt capabilities, so wait with a small timeout
if (lastingTimeoutDelayInMs <= 0 && !_socket->isWakeUpFromPollSupported())
{
lastingTimeoutDelayInMs = 20;
}
#endif
// If we are requesting a cancellation, pass in a positive and small timeout
// to never poll forever without a timeout.