select interrupt cleanup

This commit is contained in:
Benjamin Sergeant
2019-03-14 18:37:38 -07:00
parent b462b5a5c8
commit b91dc77d6f
22 changed files with 411 additions and 150 deletions

View File

@ -0,0 +1,46 @@
/*
* IXSelectInterrupt.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterrupt.h"
namespace ix
{
SelectInterrupt::SelectInterrupt()
{
;
}
SelectInterrupt::~SelectInterrupt()
{
;
}
bool SelectInterrupt::init(std::string& /*errorMsg*/)
{
return true;
}
bool SelectInterrupt::notify(uint64_t /*value*/)
{
return true;
}
uint64_t SelectInterrupt::read()
{
return 0;
}
bool SelectInterrupt::clear()
{
return true;
}
int SelectInterrupt::getFd()
{
return -1;
}
}

View File

@ -0,0 +1,28 @@
/*
* IXSelectInterrupt.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterrupt {
public:
SelectInterrupt();
virtual ~SelectInterrupt();
virtual bool init(std::string& errorMsg);
virtual bool notify(uint64_t value);
virtual bool clear();
virtual uint64_t read();
virtual int getFd();
};
}

View File

@ -1,9 +1,13 @@
/*
* IXEventFd.cpp
* IXSelectInterruptEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On Linux we use eventd to wake up select.
//
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
@ -22,57 +26,59 @@
#include "IXEventFd.h"
#ifdef __linux__
# include <sys/eventfd.h>
#endif
#include <sys/eventfd.h>
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <sstream>
namespace ix
{
// File descriptor at index 0 in _fildes is the read end of the pipe
// File descriptor at index 1 in _fildes is the write end of the pipe
const int EventFd::kPipeReadIndex = 0;
const int EventFd::kPipeWriteIndex = 1;
EventFd::EventFd()
SelectInterruptEventFd::SelectInterruptEventFd()
{
#ifdef __linux__
_eventfd = -1;
_eventfd = eventfd(0, 0);
fcntl(_eventfd, F_SETFL, O_NONBLOCK);
#else
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
pipe(_fildes);
fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK);
fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK);
#endif
;
}
EventFd::~EventFd()
SelectInterruptEventFd::~SelectInterruptEventFd()
{
#ifdef __linux__
::close(_eventfd);
#else
::close(_fildes[kPipeReadIndex]);
::close(_fildes[kPipeWriteIndex]);
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
#endif
}
bool EventFd::notify(uint64_t value)
bool SelectInterruptEventFd::init(std::string& errorMsg)
{
int fd;
_eventfd = -1;
#if defined(__linux__)
fd = _eventfd;
#else
fd = _fildes[kPipeWriteIndex];
#endif
_eventfd = eventfd(0, 0);
if (_eventfd < 0)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in eventfd()"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
if (fcntl(_eventfd, F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
return true;
}
bool SelectInterruptEventFd::notify(uint64_t value)
{
int fd = _eventfd;
if (fd == -1) return false;
@ -81,23 +87,17 @@ namespace ix
}
// TODO: return max uint64_t for errors ?
uint64_t EventFd::read()
uint64_t SelectInterruptEventFd::read()
{
int fd;
int fd = _eventfd;
#if defined(__linux__)
fd = _eventfd;
#else
fd = _fildes[kPipeReadIndex];
#endif
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool EventFd::clear()
bool SelectInterruptEventFd::clear()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
@ -105,17 +105,10 @@ namespace ix
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
int EventFd::getFd()
int SelectInterruptEventFd::getFd()
{
#if defined(__linux__)
return _eventfd;
#else
return _fildes[kPipeReadIndex];
#endif
}
}

View File

@ -0,0 +1,32 @@
/*
* IXSelectInterruptEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptEventFd : public SelectInterrupt {
public:
SelectInterruptEventFd();
virtual ~SelectInterruptEventFd();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() final;
private:
int _eventfd;
};
}

View File

@ -0,0 +1,25 @@
/*
* IXSelectInterruptFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterruptFactory.h"
#if defined(__linux__)
# include <ixwebsocket/IXSelectInterruptEventFd.h>
#else
# include <ixwebsocket/IXSelectInterruptPipe.h>
#endif
namespace ix
{
std::shared_ptr<SelectInterrupt> createSelectInterrupt()
{
#if defined(__linux__)
return std::make_shared<SelectInterruptEventFd>();
#else
return std::make_shared<SelectInterruptPipe>();
#endif
}
}

View File

@ -0,0 +1,15 @@
/*
* IXSelectInterruptFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class SelectInterrupt;
std::shared_ptr<SelectInterrupt> createSelectInterrupt();
}

View File

@ -0,0 +1,108 @@
/*
* IXEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptPipe.h"
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <sstream>
namespace ix
{
// File descriptor at index 0 in _fildes is the read end of the pipe
// File descriptor at index 1 in _fildes is the write end of the pipe
const int SelectInterruptPipe::kPipeReadIndex = 0;
const int SelectInterruptPipe::kPipeWriteIndex = 1;
SelectInterruptPipe::SelectInterruptPipe()
{
;
}
SelectInterruptPipe::~SelectInterruptPipe()
{
::close(_fildes[kPipeReadIndex]);
::close(_fildes[kPipeWriteIndex]);
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
bool SelectInterruptPipe::init(std::string& errorMsg)
{
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
if (pipe(_fildes) < 0)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in pipe() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
return false;
}
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
return true;
}
bool SelectInterruptPipe::notify(uint64_t value)
{
int fd = _fildes[kPipeWriteIndex];
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptPipe::read()
{
int fd = _fildes[kPipeReadIndex];
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptPipe::clear()
{
return true;
}
int SelectInterruptPipe::getFd()
{
return _fildes[kPipeReadIndex];
}
}

View File

@ -1,37 +1,39 @@
/*
* IXEventFd.h
* IXSelectInterruptPipe.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class EventFd {
class SelectInterruptPipe : public SelectInterrupt {
public:
EventFd();
virtual ~EventFd();
SelectInterruptPipe();
virtual ~SelectInterruptPipe();
bool notify(uint64_t value);
bool clear();
uint64_t read();
int getFd();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() final;
private:
#if defined(__linux__)
int _eventfd;
#else
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
#endif
// Used to identify the read/write idx
static const int kPipeReadIndex;
static const int kPipeWriteIndex;
};
}

View File

@ -7,6 +7,8 @@
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
#include <stdio.h>
#include <stdlib.h>
@ -28,7 +30,8 @@ namespace ix
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) :
_sockfd(fd)
_sockfd(fd),
_selectInterrupt(createSelectInterrupt())
{
;
}
@ -57,11 +60,11 @@ namespace ix
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
// File descriptor at index 0 in _fildes is the read end of the pipe
int eventfd = _eventfd.getFd();
if (eventfd != -1)
// File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1)
{
FD_SET(eventfd, &rfds);
FD_SET(interruptFd, &rfds);
}
struct timeval timeout;
@ -70,7 +73,7 @@ namespace ix
// Compute the highest fd.
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, eventfd);
int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
@ -84,9 +87,9 @@ namespace ix
{
pollResult = PollResultType_Timeout;
}
else if (eventfd != -1 && FD_ISSET(eventfd, &rfds))
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
uint64_t value = _eventfd.read();
uint64_t value = _selectInterrupt->read();
if (value == kSendRequest)
{
@ -104,7 +107,7 @@ namespace ix
// Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{
return _eventfd.notify(wakeUpCode);
return _selectInterrupt->notify(wakeUpCode);
}
bool Socket::connect(const std::string& host,
@ -114,7 +117,7 @@ namespace ix
{
std::lock_guard<std::mutex> lock(_socketMutex);
if (!_eventfd.clear()) return false;
if (!_selectInterrupt->clear()) return false;
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
return _sockfd != -1;
@ -173,24 +176,9 @@ namespace ix
#endif
}
bool Socket::init()
bool Socket::init(std::string& errorMsg)
{
#ifdef _WIN32
INT rc;
WSADATA wsaData;
rc = WSAStartup(MAKEWORD(2, 2), &wsaData);
return rc != 0;
#else
return true;
#endif
}
void Socket::cleanup()
{
#ifdef _WIN32
WSACleanup();
#endif
return _selectInterrupt->init(errorMsg);
}
bool Socket::writeBytes(const std::string& str,

View File

@ -23,6 +23,8 @@ typedef SSIZE_T ssize_t;
namespace ix
{
class SelectInterrupt;
enum PollResultType
{
PollResultType_ReadyForRead = 0,
@ -38,6 +40,7 @@ namespace ix
Socket(int fd = -1);
virtual ~Socket();
bool init(std::string& errorMsg);
void configure();
@ -72,8 +75,6 @@ namespace ix
const CancellationRequest& isCancellationRequested);
static int getErrno();
static bool init(); // Required on Windows to initialize WinSocket
static void cleanup(); // Required on Windows to cleanup WinSocket
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
@ -93,6 +94,6 @@ namespace ix
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
EventFd _eventfd;
std::shared_ptr<SelectInterrupt> _selectInterrupt;
};
}

View File

@ -20,23 +20,45 @@ namespace ix
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket;
if (!tls)
{
return std::make_shared<Socket>();
socket = std::make_shared<Socket>();
}
else
{
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
return std::make_shared<SocketAppleSSL>();
socket = std::make_shared<SocketAppleSSL>();
# else
return std::make_shared<SocketOpenSSL>();
socket = std::make_shared<SocketOpenSSL>();
# endif
#else
errorMsg = "TLS support is not enabled on this platform.";
return nullptr;
#endif
}
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket = std::make_shared<Socket>(fd);
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
}

View File

@ -14,4 +14,7 @@ namespace ix
class Socket;
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg);
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg);
}

View File

@ -123,8 +123,13 @@ namespace ix
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{
_socket.reset();
_socket = std::make_shared<Socket>(fd);
std::string errorMsg;
_socket = createSocket(fd, errorMsg);
if (!_socket)
{
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket,