This commit is contained in:
Benjamin Sergeant 2019-01-24 12:42:49 -08:00
parent 121c84a2d1
commit a8b6573f96
7 changed files with 85 additions and 21 deletions

View File

@ -21,6 +21,9 @@
namespace ix namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd) _sockfd(fd)
{ {
@ -32,14 +35,8 @@ namespace ix
close(); close();
} }
void Socket::poll(const OnPollCallback& onPollCallback) void Socket::poll(const OnPollCallback& onPollCallback, int timeoutSecs)
{ {
if (_sockfd == -1)
{
onPollCallback();
return;
}
fd_set rfds; fd_set rfds;
FD_ZERO(&rfds); FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds); FD_SET(_sockfd, &rfds);
@ -48,11 +45,26 @@ namespace ix
FD_SET(_eventfd.getFd(), &rfds); FD_SET(_eventfd.getFd(), &rfds);
#endif #endif
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 0;
int sockfd = _sockfd; int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _eventfd.getFd()); int nfds = (std::max)(sockfd, _eventfd.getFd());
select(nfds + 1, &rfds, nullptr, nullptr, nullptr); int ret = select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs == kDefaultPollNoTimeout) ? nullptr : &timeout);
onPollCallback(); PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType_Error;
}
else if (ret == 0)
{
pollResult = PollResultType_Timeout;
}
onPollCallback(pollResult);
} }
void Socket::wakeUpFromPoll() void Socket::wakeUpFromPoll()

View File

@ -21,16 +21,24 @@ typedef SSIZE_T ssize_t;
namespace ix namespace ix
{ {
enum PollResultType
{
PollResultType_ReadyForRead = 0,
PollResultType_Timeout = 1,
PollResultType_Error = 2
};
class Socket { class Socket {
public: public:
using OnPollCallback = std::function<void()>; using OnPollCallback = std::function<void(PollResultType)>;
Socket(int fd = -1); Socket(int fd = -1);
virtual ~Socket(); virtual ~Socket();
void configure(); void configure();
virtual void poll(const OnPollCallback& onPollCallback); virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout);
virtual void wakeUpFromPoll(); virtual void wakeUpFromPoll();
// Virtual methods // Virtual methods
@ -62,5 +70,9 @@ namespace ix
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;
EventFd _eventfd; EventFd _eventfd;
private:
static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout;
}; };
} }

View File

@ -31,12 +31,14 @@ namespace ix
{ {
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr; OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
const int WebSocket::kDefaultHandShakeTimeoutSecs(60); const int WebSocket::kDefaultHandShakeTimeoutSecs(60);
const int WebSocket::kDefaultHeartBeatPeriod(-1);
WebSocket::WebSocket() : WebSocket::WebSocket() :
_onMessageCallback(OnMessageCallback()), _onMessageCallback(OnMessageCallback()),
_stop(false), _stop(false),
_automaticReconnection(true), _automaticReconnection(true),
_handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs) _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs),
_heartBeatPeriod(kDefaultHeartBeatPeriod)
{ {
_ws.setOnCloseCallback( _ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize) [this](uint16_t code, const std::string& reason, size_t wireSize)
@ -77,6 +79,18 @@ namespace ix
return _perMessageDeflateOptions; return _perMessageDeflateOptions;
} }
void WebSocket::setHeartBeatPeriod(int hearBeatPeriod)
{
std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = hearBeatPeriod;
}
int WebSocket::getHeartBeatPeriod() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _heartBeatPeriod;
}
void WebSocket::start() void WebSocket::start()
{ {
if (_thread.joinable()) return; // we've already been started if (_thread.joinable()) return; // we've already been started
@ -110,7 +124,8 @@ namespace ix
{ {
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions); _ws.configure(_perMessageDeflateOptions,
_heartBeatPeriod);
} }
WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs); WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs);
@ -130,7 +145,7 @@ namespace ix
{ {
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions); _ws.configure(_perMessageDeflateOptions, _heartBeatPeriod);
} }
WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs); WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs);

View File

@ -86,7 +86,8 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int _handshakeTimeoutSecs); void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int hearBeatPeriod);
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();
@ -107,6 +108,7 @@ namespace ix
ReadyState getReadyState() const; ReadyState getReadyState() const;
const std::string& getUrl() const; const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const;
void enableAutomaticReconnection(); void enableAutomaticReconnection();
void disableAutomaticReconnection(); void disableAutomaticReconnection();
@ -142,6 +144,10 @@ namespace ix
std::atomic<int> _handshakeTimeoutSecs; std::atomic<int> _handshakeTimeoutSecs;
static const int kDefaultHandShakeTimeoutSecs; static const int kDefaultHandShakeTimeoutSecs;
// Optional Heartbeat
int _heartBeatPeriod;
static const int kDefaultHeartBeatPeriod;
friend class WebSocketServer; friend class WebSocketServer;
}; };
} }

View File

@ -33,12 +33,16 @@
namespace ix namespace ix
{ {
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
WebSocketTransport::WebSocketTransport() : WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED), _readyState(CLOSED),
_closeCode(0), _closeCode(0),
_closeWireSize(0), _closeWireSize(0),
_enablePerMessageDeflate(false), _enablePerMessageDeflate(false),
_requestInitCancellation(false) _requestInitCancellation(false),
_heartBeatPeriod(kDefaultHeartBeatPeriod)
{ {
} }
@ -48,10 +52,12 @@ namespace ix
; ;
} }
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions) void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod)
{ {
_perMessageDeflateOptions = perMessageDeflateOptions; _perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); _enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = hearBeatPeriod;
} }
// Client // Client
@ -152,8 +158,14 @@ namespace ix
void WebSocketTransport::poll() void WebSocketTransport::poll()
{ {
_socket->poll( _socket->poll(
[this]() [this](PollResultType pollResult)
{ {
if (pollResult == PollResultType_Timeout)
{
sendPing(kHeartBeatPingMessage);
return;
}
while (true) while (true)
{ {
int N = (int) _rxbuf.size(); int N = (int) _rxbuf.size();
@ -185,7 +197,8 @@ namespace ix
_socket->close(); _socket->close();
setReadyState(CLOSED); setReadyState(CLOSED);
} }
}); },
_heartBeatPeriod);
} }
bool WebSocketTransport::isSendBufferEmpty() const bool WebSocketTransport::isSendBufferEmpty() const

View File

@ -57,7 +57,8 @@ namespace ix
WebSocketTransport(); WebSocketTransport();
~WebSocketTransport(); ~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs); int timeoutSecs);
@ -117,6 +118,11 @@ namespace ix
// Used to cancel dns lookup + socket connect + http upgrade // Used to cancel dns lookup + socket connect + http upgrade
std::atomic<bool> _requestInitCancellation; std::atomic<bool> _requestInitCancellation;
// Optional Heartbeat
int _heartBeatPeriod;
static const int kDefaultHeartBeatPeriod;
const static std::string kHeartBeatPingMessage;
void sendOnSocket(); void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message, const std::string& message,

View File

@ -19,7 +19,7 @@ if osName == 'Windows':
testBinary ='ixwebsocket_unittest.exe' testBinary ='ixwebsocket_unittest.exe'
else: else:
generator = '' generator = ''
make = 'make' make = 'make -j6'
testBinary ='./ixwebsocket_unittest' testBinary ='./ixwebsocket_unittest'
sanitizersFlags = { sanitizersFlags = {