From 1f2895a4693b11176df1c410c3b9c75208f9987e Mon Sep 17 00:00:00 2001 From: Andreas Hausladen Date: Wed, 5 Jan 2022 19:21:33 +0100 Subject: [PATCH] Win wsa select event (#342) * Fix #323: Missing SelectInterrupt implementation for Windows Using WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents to emulate poll() with an interrupt-event. * Cleanup * Fixed incomplete comment. * Switched ifdefs to support other Unixes with pipe file descriptors * Fixed: SelectInterrupt fallback code for getFd()==-1 && getEvent()==nullptr converted a PollResultType::Timeout into a ReadyForRead causing the HttpClient to fail because it uses a hard-coded "SelectInterrupt" instance that doesn't implement getFd() and getEvent(). * Fixed gcc compile errors * - HttpClient now uses the SelectInterruptFactory - Fixed wrong ix::poll result when using Windows WSA functions * We must deselect the networkevents from the socket event. Otherwise the socket will report states that aren't there. --- CMakeLists.txt | 2 + ixwebsocket/IXNetSystem.cpp | 249 +++++++++++++++++++---- ixwebsocket/IXNetSystem.h | 2 +- ixwebsocket/IXSelectInterrupt.cpp | 5 + ixwebsocket/IXSelectInterrupt.h | 1 + ixwebsocket/IXSelectInterruptEvent.cpp | 84 ++++++++ ixwebsocket/IXSelectInterruptEvent.h | 39 ++++ ixwebsocket/IXSelectInterruptFactory.cpp | 12 +- ixwebsocket/IXSocket.cpp | 63 ++++-- ixwebsocket/IXSocket.h | 4 + ixwebsocket/IXSocketConnect.cpp | 3 +- ixwebsocket/IXWebSocketTransport.cpp | 6 +- 12 files changed, 400 insertions(+), 70 deletions(-) create mode 100644 ixwebsocket/IXSelectInterruptEvent.cpp create mode 100644 ixwebsocket/IXSelectInterruptEvent.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8d8b89d1..0c4fb61a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,6 +41,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterrupt.cpp ixwebsocket/IXSelectInterruptFactory.cpp ixwebsocket/IXSelectInterruptPipe.cpp + ixwebsocket/IXSelectInterruptEvent.cpp ixwebsocket/IXSetThreadName.cpp ixwebsocket/IXSocket.cpp ixwebsocket/IXSocketConnect.cpp @@ -80,6 +81,7 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXSelectInterrupt.h ixwebsocket/IXSelectInterruptFactory.h ixwebsocket/IXSelectInterruptPipe.h + ixwebsocket/IXSelectInterruptEvent.h ixwebsocket/IXSetThreadName.h ixwebsocket/IXSocket.h ixwebsocket/IXSocketConnect.h diff --git a/ixwebsocket/IXNetSystem.cpp b/ixwebsocket/IXNetSystem.cpp index ab7b2a38..7b2a8541 100644 --- a/ixwebsocket/IXNetSystem.cpp +++ b/ixwebsocket/IXNetSystem.cpp @@ -7,6 +7,9 @@ #include "IXNetSystem.h" #include #include +#ifdef _WIN32 +#include +#endif namespace ix { @@ -37,6 +40,51 @@ namespace ix #endif } +#ifdef _WIN32 + struct WSAEvent + { + public: + WSAEvent(struct pollfd* fd) + : _fd(fd) + { + _event = WSACreateEvent(); + } + + WSAEvent(WSAEvent&& source) noexcept + { + _event = source._event; + source._event = WSA_INVALID_EVENT; // invalidate the event in the source + _fd = source._fd; + } + + ~WSAEvent() + { + if (_event != WSA_INVALID_EVENT) + { + // We must deselect the networkevents from the socket event. Otherwise the + // socket will report states that aren't there. + if (_fd != nullptr && _fd->fd != -1) + WSAEventSelect(_fd->fd, _event, 0); + WSACloseEvent(_event); + } + } + + operator HANDLE() + { + return _event; + } + + operator struct pollfd*() + { + return _fd; + } + + private: + HANDLE _event; + struct pollfd* _fd; + }; +#endif + // // That function could 'return WSAPoll(pfd, nfds, timeout);' // but WSAPoll is said to have weird behaviors on the internet @@ -44,69 +92,180 @@ namespace ix // // So we make it a select wrapper // - int poll(struct pollfd* fds, nfds_t nfds, int timeout) + // UPDATE: WSAPoll was fixed in Windows 10 Version 2004 + // + // The optional "event" is set to nullptr if it wasn't signaled. + int poll(struct pollfd* fds, nfds_t nfds, int timeout, void** event) { #ifdef _WIN32 - socket_t maxfd = 0; - fd_set readfds, writefds, errorfds; - FD_ZERO(&readfds); - FD_ZERO(&writefds); - FD_ZERO(&errorfds); - for (nfds_t i = 0; i < nfds; ++i) + if (event && *event) { - struct pollfd* fd = &fds[i]; + HANDLE interruptEvent = reinterpret_cast(*event); + *event = nullptr; // the event wasn't signaled yet - if (fd->fd > maxfd) + if (nfds < 0 || nfds >= MAXIMUM_WAIT_OBJECTS - 1) { - maxfd = fd->fd; + WSASetLastError(WSAEINVAL); + return SOCKET_ERROR; } - if ((fd->events & POLLIN)) + + std::vector socketEvents; + std::vector handles; + // put the interrupt event as first element, making it highest priority + handles.push_back(interruptEvent); + + // create the WSAEvents for the sockets + for (nfds_t i = 0; i < nfds; ++i) { - FD_SET(fd->fd, &readfds); + struct pollfd* fd = &fds[i]; + fd->revents = 0; + if (fd->fd >= 0) + { + // create WSAEvent and add it to the vectors + socketEvents.push_back(std::move(WSAEvent(fd))); + HANDLE handle = socketEvents.back(); + if (handle == WSA_INVALID_EVENT) + { + WSASetLastError(WSAENOBUFS); + return SOCKET_ERROR; + } + handles.push_back(handle); + + // mapping + long networkEvents = 0; + if (fd->events & (POLLIN )) networkEvents |= FD_READ | FD_ACCEPT; + if (fd->events & (POLLOUT /*| POLLWRNORM | POLLWRBAND*/)) networkEvents |= FD_WRITE | FD_CONNECT; + //if (fd->events & (POLLPRI | POLLRDBAND )) networkEvents |= FD_OOB; + + if (WSAEventSelect(fd->fd, handle, networkEvents) != 0) + { + fd->revents = POLLNVAL; + socketEvents.pop_back(); + handles.pop_back(); + } + } } - if ((fd->events & POLLOUT)) + + DWORD n = WSAWaitForMultipleEvents(handles.size(), handles.data(), FALSE, timeout != -1 ? static_cast(timeout) : WSA_INFINITE, FALSE); + + if (n == WSA_WAIT_FAILED) return SOCKET_ERROR; + if (n == WSA_WAIT_TIMEOUT) return 0; + if (n == WSA_WAIT_EVENT_0) { - FD_SET(fd->fd, &writefds); + // the interrupt event was signaled + *event = reinterpret_cast(interruptEvent); + return 1; } - if ((fd->events & POLLERR)) + + int handleIndex = n - WSA_WAIT_EVENT_0; + int socketIndex = handleIndex - 1; + + WSANETWORKEVENTS netEvents; + int count = 0; + // WSAWaitForMultipleEvents returns the index of the first signaled event. And to emulate WSAPoll() + // all the signaled events must be processed. + while (socketIndex < socketEvents.size()) { - FD_SET(fd->fd, &errorfds); + struct pollfd* fd = socketEvents[socketIndex]; + + memset(&netEvents, 0, sizeof(netEvents)); + if (WSAEnumNetworkEvents(fd->fd, socketEvents[socketIndex], &netEvents) != 0) + { + fd->revents = POLLERR; + } + else if (netEvents.lNetworkEvents != 0) + { + // mapping + if (netEvents.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_OOB)) fd->revents |= POLLIN; + if (netEvents.lNetworkEvents & (FD_WRITE | FD_CONNECT )) fd->revents |= POLLOUT; + + for (int i = 0; i < FD_MAX_EVENTS; ++i) + { + if (netEvents.iErrorCode[i] != 0) + { + fd->revents |= POLLERR; + break; + } + } + + if (fd->revents != 0) + { + // only signaled sockets count + count++; + } + } + socketIndex++; } + + return count; } - - struct timeval tv; - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout % 1000) * 1000; - - int ret = select(maxfd + 1, &readfds, &writefds, &errorfds, timeout != -1 ? &tv : NULL); - - if (ret < 0) + else { + if (event && *event) *event = nullptr; + + socket_t maxfd = 0; + fd_set readfds, writefds, errorfds; + FD_ZERO(&readfds); + FD_ZERO(&writefds); + FD_ZERO(&errorfds); + + for (nfds_t i = 0; i < nfds; ++i) + { + struct pollfd* fd = &fds[i]; + + if (fd->fd > maxfd) + { + maxfd = fd->fd; + } + if ((fd->events & POLLIN)) + { + FD_SET(fd->fd, &readfds); + } + if ((fd->events & POLLOUT)) + { + FD_SET(fd->fd, &writefds); + } + if ((fd->events & POLLERR)) + { + FD_SET(fd->fd, &errorfds); + } + } + + struct timeval tv; + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout % 1000) * 1000; + + int ret = select(maxfd + 1, &readfds, &writefds, &errorfds, timeout != -1 ? &tv : NULL); + + if (ret < 0) + { + return ret; + } + + for (nfds_t i = 0; i < nfds; ++i) + { + struct pollfd* fd = &fds[i]; + fd->revents = 0; + + if (FD_ISSET(fd->fd, &readfds)) + { + fd->revents |= POLLIN; + } + if (FD_ISSET(fd->fd, &writefds)) + { + fd->revents |= POLLOUT; + } + if (FD_ISSET(fd->fd, &errorfds)) + { + fd->revents |= POLLERR; + } + } return ret; } - - for (nfds_t i = 0; i < nfds; ++i) - { - struct pollfd* fd = &fds[i]; - fd->revents = 0; - - if (FD_ISSET(fd->fd, &readfds)) - { - fd->revents |= POLLIN; - } - if (FD_ISSET(fd->fd, &writefds)) - { - fd->revents |= POLLOUT; - } - if (FD_ISSET(fd->fd, &errorfds)) - { - fd->revents |= POLLERR; - } - } - - return ret; #else + if (event && *event) *event = nullptr; + // // It was reported that on Android poll can fail and return -1 with // errno == EINTR, which should be a temp error and should typically diff --git a/ixwebsocket/IXNetSystem.h b/ixwebsocket/IXNetSystem.h index 7ca58f49..96395443 100644 --- a/ixwebsocket/IXNetSystem.h +++ b/ixwebsocket/IXNetSystem.h @@ -78,7 +78,7 @@ namespace ix bool initNetSystem(); bool uninitNetSystem(); - int poll(struct pollfd* fds, nfds_t nfds, int timeout); + int poll(struct pollfd* fds, nfds_t nfds, int timeout, void** event); const char* inet_ntop(int af, const void* src, char* dst, socklen_t size); int inet_pton(int af, const char* src, void* dst); diff --git a/ixwebsocket/IXSelectInterrupt.cpp b/ixwebsocket/IXSelectInterrupt.cpp index 36dc66c9..df38bc50 100644 --- a/ixwebsocket/IXSelectInterrupt.cpp +++ b/ixwebsocket/IXSelectInterrupt.cpp @@ -45,4 +45,9 @@ namespace ix { return -1; } + + void* SelectInterrupt::getEvent() const + { + return nullptr; + } } // namespace ix diff --git a/ixwebsocket/IXSelectInterrupt.h b/ixwebsocket/IXSelectInterrupt.h index c3bb7f3f..de6db127 100644 --- a/ixwebsocket/IXSelectInterrupt.h +++ b/ixwebsocket/IXSelectInterrupt.h @@ -24,6 +24,7 @@ namespace ix virtual bool clear(); virtual uint64_t read(); virtual int getFd() const; + virtual void* getEvent() const; // Used as special codes for pipe communication static const uint64_t kSendRequest; diff --git a/ixwebsocket/IXSelectInterruptEvent.cpp b/ixwebsocket/IXSelectInterruptEvent.cpp new file mode 100644 index 00000000..d23e64ee --- /dev/null +++ b/ixwebsocket/IXSelectInterruptEvent.cpp @@ -0,0 +1,84 @@ +/* + * IXSelectInterruptEvent.cpp + */ + +// +// On Windows we use a Windows Event to wake up ix::poll() (WSAWaitForMultipleEvents). +// And on any other platform that doesn't support pipe file descriptors we +// emulate the interrupt event by using a short timeout with ix::poll() and +// read from the SelectInterrupt. (see Socket::poll() "Emulation mode") +// +#include "IXSelectInterruptEvent.h" + +namespace ix +{ + SelectInterruptEvent::SelectInterruptEvent() + { +#ifdef _WIN32 + _event = CreateEvent(NULL, TRUE, FALSE, NULL); +#endif + } + + SelectInterruptEvent::~SelectInterruptEvent() + { +#ifdef _WIN32 + CloseHandle(_event); +#endif + } + + bool SelectInterruptEvent::init(std::string& /*errorMsg*/) + { + return true; + } + + bool SelectInterruptEvent::notify(uint64_t value) + { + std::lock_guard lock(_valuesMutex); + + // WebSocket implementation detail: We only need one of the values in the queue + if (std::find(_values.begin(), _values.end(), value) == _values.end()) + _values.push_back(value); +#ifdef _WIN32 + SetEvent(_event); // wake up +#endif + return true; + } + + uint64_t SelectInterruptEvent::read() + { + std::lock_guard lock(_valuesMutex); + + if (_values.size() > 0) + { + uint64_t value = _values.front(); + _values.pop_front(); +#ifdef _WIN32 + // signal the event if there is still data in the queue + if (_values.size() == 0) + ResetEvent(_event); +#endif + return value; + } + return 0; + } + + bool SelectInterruptEvent::clear() + { + std::lock_guard lock(_valuesMutex); + _values.clear(); +#ifdef _WIN32 + ResetEvent(_event); +#endif + return true; + } + + void* SelectInterruptEvent::getEvent() const + { +#ifdef _WIN32 + return reinterpret_cast(_event); +#else + return nullptr; +#endif + } + +} // namespace ix diff --git a/ixwebsocket/IXSelectInterruptEvent.h b/ixwebsocket/IXSelectInterruptEvent.h new file mode 100644 index 00000000..d965661d --- /dev/null +++ b/ixwebsocket/IXSelectInterruptEvent.h @@ -0,0 +1,39 @@ +/* + * IXSelectInterruptEvent.h + */ + +#pragma once + +#include "IXSelectInterrupt.h" +#include +#include +#include +#include +#ifdef _WIN32 +#include +#endif + +namespace ix +{ + class SelectInterruptEvent final : public SelectInterrupt + { + public: + SelectInterruptEvent(); + virtual ~SelectInterruptEvent(); + + bool init(std::string& /*errorMsg*/) final; + + bool notify(uint64_t value) final; + bool clear() final; + uint64_t read() final; + void* getEvent() const final; + private: + // contains every value only once, new values are inserted at the begin, nu + std::deque _values; + std::mutex _valuesMutex; +#ifdef _WIN32 + // Windows Event to wake up the socket poll + HANDLE _event; +#endif + }; +} // namespace ix diff --git a/ixwebsocket/IXSelectInterruptFactory.cpp b/ixwebsocket/IXSelectInterruptFactory.cpp index 9018810d..c66c14c7 100644 --- a/ixwebsocket/IXSelectInterruptFactory.cpp +++ b/ixwebsocket/IXSelectInterruptFactory.cpp @@ -7,20 +7,20 @@ #include "IXSelectInterruptFactory.h" #include "IXUniquePtr.h" -#if defined(__linux__) || defined(__APPLE__) -#include "IXSelectInterruptPipe.h" +#if _WIN32 +#include "IXSelectInterruptEvent.h" #else -#include "IXSelectInterrupt.h" +#include "IXSelectInterruptPipe.h" #endif namespace ix { SelectInterruptPtr createSelectInterrupt() { -#if defined(__linux__) || defined(__APPLE__) - return ix::make_unique(); +#ifdef _WIN32 + return ix::make_unique(); #else - return ix::make_unique(); + return ix::make_unique(); #endif } } // namespace ix diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index bccfe7d9..d3469125 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -47,6 +47,8 @@ namespace ix int sockfd, const SelectInterruptPtr& selectInterrupt) { + PollResultType pollResult = PollResultType::ReadyForRead; + // // We used to use ::select to poll but on Android 9 we get large fds out of // ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching @@ -68,9 +70,11 @@ namespace ix // File descriptor used to interrupt select when needed int interruptFd = -1; + void* interruptEvent = nullptr; if (selectInterrupt) { interruptFd = selectInterrupt->getFd(); + interruptEvent = selectInterrupt->getEvent(); if (interruptFd != -1) { @@ -78,11 +82,21 @@ namespace ix fds[1].fd = interruptFd; fds[1].events = POLLIN; } + else if (interruptEvent == nullptr) + { + // Emulation mode: SelectInterrupt neither supports file descriptors nor events + + // Check the selectInterrupt for requests before doing the poll(). + if (readSelectInterruptRequest(selectInterrupt, &pollResult)) + { + return pollResult; + } + } } - int ret = ix::poll(fds, nfds, timeoutMs); + void* event = interruptEvent; // ix::poll will set event to nullptr if it wasn't signaled + int ret = ix::poll(fds, nfds, timeoutMs, &event); - PollResultType pollResult = PollResultType::ReadyForRead; if (ret < 0) { pollResult = PollResultType::Error; @@ -90,20 +104,19 @@ namespace ix else if (ret == 0) { pollResult = PollResultType::Timeout; - } - else if (interruptFd != -1 && fds[1].revents & POLLIN) - { - uint64_t value = selectInterrupt->read(); + if (selectInterrupt && interruptFd == -1 && interruptEvent == nullptr) + { + // Emulation mode: SelectInterrupt neither supports fd nor events - if (value == SelectInterrupt::kSendRequest) - { - pollResult = PollResultType::SendRequest; - } - else if (value == SelectInterrupt::kCloseRequest) - { - pollResult = PollResultType::CloseRequest; + // Check the selectInterrupt for requests + readSelectInterruptRequest(selectInterrupt, &pollResult); } } + else if ((interruptFd != -1 && fds[1].revents & POLLIN) || (interruptEvent != nullptr && event != nullptr)) + { + // The InterruptEvent was signaled + readSelectInterruptRequest(selectInterrupt, &pollResult); + } else if (sockfd != -1 && readyToRead && fds[0].revents & POLLIN) { pollResult = PollResultType::ReadyForRead; @@ -143,6 +156,25 @@ namespace ix return pollResult; } + bool Socket::readSelectInterruptRequest(const SelectInterruptPtr& selectInterrupt, + PollResultType* pollResult) + { + uint64_t value = selectInterrupt->read(); + + if (value == SelectInterrupt::kSendRequest) + { + *pollResult = PollResultType::SendRequest; + return true; + } + else if (value == SelectInterrupt::kCloseRequest) + { + *pollResult = PollResultType::CloseRequest; + return true; + } + + return false; + } + PollResultType Socket::isReadyToRead(int timeoutMs) { if (_sockfd == -1) @@ -171,6 +203,11 @@ namespace ix return _selectInterrupt->notify(wakeUpCode); } + bool Socket::isWakeUpFromPollSupported() + { + return _selectInterrupt->getFd() != -1 || _selectInterrupt->getEvent() != nullptr; + } + bool Socket::accept(std::string& errMsg) { if (_sockfd == -1) diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index 393d5bc4..ae5a560e 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -43,6 +43,7 @@ namespace ix // Functions to check whether there is activity on the socket PollResultType poll(int timeoutMs = kDefaultPollTimeout); bool wakeUpFromPoll(uint64_t wakeUpCode); + bool isWakeUpFromPollSupported(); PollResultType isReadyToWrite(int timeoutMs); PollResultType isReadyToRead(int timeoutMs); @@ -83,6 +84,9 @@ namespace ix std::atomic _sockfd; std::mutex _socketMutex; + static bool readSelectInterruptRequest(const SelectInterruptPtr& selectInterrupt, + PollResultType* pollResult); + private: static const int kDefaultPollTimeout; static const int kDefaultPollNoTimeout; diff --git a/ixwebsocket/IXSocketConnect.cpp b/ixwebsocket/IXSocketConnect.cpp index 94ebc406..f191eae8 100644 --- a/ixwebsocket/IXSocketConnect.cpp +++ b/ixwebsocket/IXSocketConnect.cpp @@ -20,6 +20,7 @@ #include #include #endif +#include namespace ix { @@ -66,7 +67,7 @@ namespace ix int timeoutMs = 10; bool readyToRead = false; - auto selectInterrupt = ix::make_unique(); + SelectInterruptPtr selectInterrupt = ix::createSelectInterrupt(); PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt); if (pollResult == PollResultType::Timeout) diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 8537a95e..1fcdfdeb 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -297,13 +297,11 @@ namespace ix lastingTimeoutDelayInMs = (1000 * _pingIntervalSecs) - timeSinceLastPingMs; } -#ifdef _WIN32 - // Windows does not have select interrupt capabilities, so wait with a small timeout - if (lastingTimeoutDelayInMs <= 0) + // The platform may not have select interrupt capabilities, so wait with a small timeout + if (lastingTimeoutDelayInMs <= 0 && !_socket->isWakeUpFromPollSupported()) { lastingTimeoutDelayInMs = 20; } -#endif // If we are requesting a cancellation, pass in a positive and small timeout // to never poll forever without a timeout.