timeout is configurable

This commit is contained in:
Benjamin Sergeant 2019-01-03 18:33:08 -08:00
parent 3021ac4b95
commit bce5ef2dca
9 changed files with 54 additions and 24 deletions

View File

@ -182,6 +182,17 @@ namespace ix
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
// Wait with a timeout until something is written.
// This way we are not busy looping
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout);
continue; continue;
} }
// There was an error during the read, abort // There was an error during the read, abort

View File

@ -30,11 +30,13 @@ namespace
namespace ix namespace ix
{ {
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr; OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
const int WebSocket::kDefaultHandShakeTimeoutSecs(60);
WebSocket::WebSocket() : WebSocket::WebSocket() :
_onMessageCallback(OnMessageCallback()), _onMessageCallback(OnMessageCallback()),
_stop(false), _stop(false),
_automaticReconnection(true) _automaticReconnection(true),
_handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
{ {
_ws.setOnCloseCallback( _ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize) [this](uint16_t code, const std::string& reason, size_t wireSize)
@ -104,14 +106,14 @@ namespace ix
_automaticReconnection = automaticReconnection; _automaticReconnection = automaticReconnection;
} }
WebSocketInitResult WebSocket::connect() WebSocketInitResult WebSocket::connect(int timeoutSecs)
{ {
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions); _ws.configure(_perMessageDeflateOptions);
} }
WebSocketInitResult status = _ws.connectToUrl(_url); WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs);
if (!status.success) if (!status.success)
{ {
return status; return status;
@ -124,14 +126,14 @@ namespace ix
return status; return status;
} }
WebSocketInitResult WebSocket::connectToSocket(int fd) WebSocketInitResult WebSocket::connectToSocket(int fd, int timeoutSecs)
{ {
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions); _ws.configure(_perMessageDeflateOptions);
} }
WebSocketInitResult status = _ws.connectToSocket(fd); WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs);
if (!status.success) if (!status.success)
{ {
return status; return status;
@ -174,7 +176,7 @@ namespace ix
break; break;
} }
status = connect(); status = connect(_handshakeTimeoutSecs);
if (!status.success && !_stop) if (!status.success && !_stop)
{ {

View File

@ -86,13 +86,14 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int _handshakeTimeoutSecs);
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();
void stop(); void stop();
// Run in blocking mode, by connecting first manually, and then calling run. // Run in blocking mode, by connecting first manually, and then calling run.
WebSocketInitResult connect(); WebSocketInitResult connect(int timeoutSecs);
void run(); void run();
WebSocketSendInfo send(const std::string& text); WebSocketSendInfo send(const std::string& text);
@ -122,7 +123,7 @@ namespace ix
// Server // Server
void setSocketFileDescriptor(int fd); void setSocketFileDescriptor(int fd);
WebSocketInitResult connectToSocket(int fd); WebSocketInitResult connectToSocket(int fd, int timeoutSecs);
WebSocketTransport _ws; WebSocketTransport _ws;
@ -138,6 +139,9 @@ namespace ix
std::thread _thread; std::thread _thread;
std::mutex _writeMutex; std::mutex _writeMutex;
std::atomic<int> _handshakeTimeoutSecs;
static const int kDefaultHandShakeTimeoutSecs;
friend class WebSocketServer; friend class WebSocketServer;
}; };
} }

View File

@ -262,12 +262,13 @@ namespace ix
WebSocketInitResult WebSocketHandshake::clientHandshake(const std::string& url, WebSocketInitResult WebSocketHandshake::clientHandshake(const std::string& url,
const std::string& host, const std::string& host,
const std::string& path, const std::string& path,
int port) int port,
int timeoutSecs)
{ {
_requestInitCancellation = false; _requestInitCancellation = false;
auto isCancellationRequested = auto isCancellationRequested =
makeCancellationRequestWithTimeout(60, _requestInitCancellation); makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string errMsg; std::string errMsg;
bool success = _socket->connect(host, port, errMsg, isCancellationRequested); bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
@ -383,16 +384,15 @@ namespace ix
return WebSocketInitResult(true, status, "", headers, path); return WebSocketInitResult(true, status, "", headers, path);
} }
WebSocketInitResult WebSocketHandshake::serverHandshake(int fd) WebSocketInitResult WebSocketHandshake::serverHandshake(int fd, int timeoutSecs)
{ {
_requestInitCancellation = false; _requestInitCancellation = false;
// Set the socket to non blocking mode + other tweaks // Set the socket to non blocking mode + other tweaks
SocketConnect::configure(fd); SocketConnect::configure(fd);
// FIXME: timeout should be configurable
auto isCancellationRequested = auto isCancellationRequested =
makeCancellationRequestWithTimeout(3, _requestInitCancellation); makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string remote = std::string("remote fd ") + std::to_string(fd); std::string remote = std::string("remote fd ") + std::to_string(fd);

View File

@ -53,8 +53,11 @@ namespace ix
WebSocketInitResult clientHandshake(const std::string& url, WebSocketInitResult clientHandshake(const std::string& url,
const std::string& host, const std::string& host,
const std::string& path, const std::string& path,
int port); int port,
WebSocketInitResult serverHandshake(int fd); int timeoutSecs);
WebSocketInitResult serverHandshake(int fd,
int timeoutSecs);
static bool parseUrl(const std::string& url, static bool parseUrl(const std::string& url,
std::string& protocol, std::string& protocol,

View File

@ -24,15 +24,18 @@ namespace ix
const std::string WebSocketServer::kDefaultHost("127.0.0.1"); const std::string WebSocketServer::kDefaultHost("127.0.0.1");
const int WebSocketServer::kDefaultTcpBacklog(5); const int WebSocketServer::kDefaultTcpBacklog(5);
const size_t WebSocketServer::kDefaultMaxConnections(32); const size_t WebSocketServer::kDefaultMaxConnections(32);
const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds
WebSocketServer::WebSocketServer(int port, WebSocketServer::WebSocketServer(int port,
const std::string& host, const std::string& host,
int backlog, int backlog,
size_t maxConnections) : size_t maxConnections,
int handshakeTimeoutSecs) :
_port(port), _port(port),
_host(host), _host(host),
_backlog(backlog), _backlog(backlog),
_maxConnections(maxConnections), _maxConnections(maxConnections),
_handshakeTimeoutSecs(handshakeTimeoutSecs),
_stop(false) _stop(false)
{ {
@ -236,7 +239,7 @@ namespace ix
_clients.insert(webSocket); _clients.insert(webSocket);
} }
auto status = webSocket->connectToSocket(fd); auto status = webSocket->connectToSocket(fd, _handshakeTimeoutSecs);
if (status.success) if (status.success)
{ {
// Process incoming messages and execute callbacks // Process incoming messages and execute callbacks

View File

@ -26,7 +26,8 @@ namespace ix
WebSocketServer(int port = WebSocketServer::kDefaultPort, WebSocketServer(int port = WebSocketServer::kDefaultPort,
const std::string& host = WebSocketServer::kDefaultHost, const std::string& host = WebSocketServer::kDefaultHost,
int backlog = WebSocketServer::kDefaultTcpBacklog, int backlog = WebSocketServer::kDefaultTcpBacklog,
size_t maxConnections = WebSocketServer::kDefaultMaxConnections); size_t maxConnections = WebSocketServer::kDefaultMaxConnections,
int handshakeTimeoutSecs = WebSocketServer::kDefaultHandShakeTimeoutSecs);
virtual ~WebSocketServer(); virtual ~WebSocketServer();
void setOnConnectionCallback(const OnConnectionCallback& callback); void setOnConnectionCallback(const OnConnectionCallback& callback);
@ -45,6 +46,7 @@ namespace ix
std::string _host; std::string _host;
int _backlog; int _backlog;
size_t _maxConnections; size_t _maxConnections;
int _handshakeTimeoutSecs;
OnConnectionCallback _onConnectionCallback; OnConnectionCallback _onConnectionCallback;
@ -66,6 +68,7 @@ namespace ix
const static std::string kDefaultHost; const static std::string kDefaultHost;
const static int kDefaultTcpBacklog; const static int kDefaultTcpBacklog;
const static size_t kDefaultMaxConnections; const static size_t kDefaultMaxConnections;
const static int kDefaultHandShakeTimeoutSecs;
// Methods // Methods
void run(); void run();

View File

@ -55,7 +55,8 @@ namespace ix
} }
// Client // Client
WebSocketInitResult WebSocketTransport::connectToUrl(const std::string& url) WebSocketInitResult WebSocketTransport::connectToUrl(const std::string& url,
int timeoutSecs)
{ {
std::string protocol, host, path, query; std::string protocol, host, path, query;
int port; int port;
@ -92,7 +93,8 @@ namespace ix
_perMessageDeflateOptions, _perMessageDeflateOptions,
_enablePerMessageDeflate); _enablePerMessageDeflate);
auto result = webSocketHandshake.clientHandshake(url, host, path, port); auto result = webSocketHandshake.clientHandshake(url, host, path, port,
timeoutSecs);
if (result.success) if (result.success)
{ {
setReadyState(OPEN); setReadyState(OPEN);
@ -101,7 +103,7 @@ namespace ix
} }
// Server // Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd) WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{ {
_socket.reset(); _socket.reset();
_socket = std::make_shared<Socket>(fd); _socket = std::make_shared<Socket>(fd);
@ -112,7 +114,7 @@ namespace ix
_perMessageDeflateOptions, _perMessageDeflateOptions,
_enablePerMessageDeflate); _enablePerMessageDeflate);
auto result = webSocketHandshake.serverHandshake(fd); auto result = webSocketHandshake.serverHandshake(fd, timeoutSecs);
if (result.success) if (result.success)
{ {
setReadyState(OPEN); setReadyState(OPEN);

View File

@ -59,8 +59,10 @@ namespace ix
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
WebSocketInitResult connectToUrl(const std::string& url); // Client WebSocketInitResult connectToUrl(const std::string& url, // Client
WebSocketInitResult connectToSocket(int fd); // Server int timeoutSecs);
WebSocketInitResult connectToSocket(int fd, // Server
int timeoutSecs);
void poll(); void poll();
WebSocketSendInfo sendBinary(const std::string& message); WebSocketSendInfo sendBinary(const std::string& message);