timeout is configurable

This commit is contained in:
Benjamin Sergeant 2019-01-03 18:33:08 -08:00
parent 332bb87231
commit d75753ec98
9 changed files with 54 additions and 24 deletions

View File

@ -182,6 +182,17 @@ namespace ix
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a timeout until something is written.
// This way we are not busy looping
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout);
continue;
}
// There was an error during the read, abort

View File

@ -30,11 +30,13 @@ namespace
namespace ix
{
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
const int WebSocket::kDefaultHandShakeTimeoutSecs(60);
WebSocket::WebSocket() :
_onMessageCallback(OnMessageCallback()),
_stop(false),
_automaticReconnection(true)
_automaticReconnection(true),
_handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
{
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize)
@ -104,14 +106,14 @@ namespace ix
_automaticReconnection = automaticReconnection;
}
WebSocketInitResult WebSocket::connect()
WebSocketInitResult WebSocket::connect(int timeoutSecs)
{
{
std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions);
}
WebSocketInitResult status = _ws.connectToUrl(_url);
WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs);
if (!status.success)
{
return status;
@ -124,14 +126,14 @@ namespace ix
return status;
}
WebSocketInitResult WebSocket::connectToSocket(int fd)
WebSocketInitResult WebSocket::connectToSocket(int fd, int timeoutSecs)
{
{
std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions);
}
WebSocketInitResult status = _ws.connectToSocket(fd);
WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs);
if (!status.success)
{
return status;
@ -174,7 +176,7 @@ namespace ix
break;
}
status = connect();
status = connect(_handshakeTimeoutSecs);
if (!status.success && !_stop)
{

View File

@ -86,13 +86,14 @@ namespace ix
void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int _handshakeTimeoutSecs);
// Run asynchronously, by calling start and stop.
void start();
void stop();
// Run in blocking mode, by connecting first manually, and then calling run.
WebSocketInitResult connect();
WebSocketInitResult connect(int timeoutSecs);
void run();
WebSocketSendInfo send(const std::string& text);
@ -122,7 +123,7 @@ namespace ix
// Server
void setSocketFileDescriptor(int fd);
WebSocketInitResult connectToSocket(int fd);
WebSocketInitResult connectToSocket(int fd, int timeoutSecs);
WebSocketTransport _ws;
@ -138,6 +139,9 @@ namespace ix
std::thread _thread;
std::mutex _writeMutex;
std::atomic<int> _handshakeTimeoutSecs;
static const int kDefaultHandShakeTimeoutSecs;
friend class WebSocketServer;
};
}

View File

@ -262,12 +262,13 @@ namespace ix
WebSocketInitResult WebSocketHandshake::clientHandshake(const std::string& url,
const std::string& host,
const std::string& path,
int port)
int port,
int timeoutSecs)
{
_requestInitCancellation = false;
auto isCancellationRequested =
makeCancellationRequestWithTimeout(60, _requestInitCancellation);
makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string errMsg;
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
@ -383,16 +384,15 @@ namespace ix
return WebSocketInitResult(true, status, "", headers, path);
}
WebSocketInitResult WebSocketHandshake::serverHandshake(int fd)
WebSocketInitResult WebSocketHandshake::serverHandshake(int fd, int timeoutSecs)
{
_requestInitCancellation = false;
// Set the socket to non blocking mode + other tweaks
SocketConnect::configure(fd);
// FIXME: timeout should be configurable
auto isCancellationRequested =
makeCancellationRequestWithTimeout(3, _requestInitCancellation);
makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string remote = std::string("remote fd ") + std::to_string(fd);

View File

@ -53,8 +53,11 @@ namespace ix
WebSocketInitResult clientHandshake(const std::string& url,
const std::string& host,
const std::string& path,
int port);
WebSocketInitResult serverHandshake(int fd);
int port,
int timeoutSecs);
WebSocketInitResult serverHandshake(int fd,
int timeoutSecs);
static bool parseUrl(const std::string& url,
std::string& protocol,

View File

@ -24,15 +24,18 @@ namespace ix
const std::string WebSocketServer::kDefaultHost("127.0.0.1");
const int WebSocketServer::kDefaultTcpBacklog(5);
const size_t WebSocketServer::kDefaultMaxConnections(32);
const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds
WebSocketServer::WebSocketServer(int port,
const std::string& host,
int backlog,
size_t maxConnections) :
size_t maxConnections,
int handshakeTimeoutSecs) :
_port(port),
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_handshakeTimeoutSecs(handshakeTimeoutSecs),
_stop(false)
{
@ -236,7 +239,7 @@ namespace ix
_clients.insert(webSocket);
}
auto status = webSocket->connectToSocket(fd);
auto status = webSocket->connectToSocket(fd, _handshakeTimeoutSecs);
if (status.success)
{
// Process incoming messages and execute callbacks

View File

@ -26,7 +26,8 @@ namespace ix
WebSocketServer(int port = WebSocketServer::kDefaultPort,
const std::string& host = WebSocketServer::kDefaultHost,
int backlog = WebSocketServer::kDefaultTcpBacklog,
size_t maxConnections = WebSocketServer::kDefaultMaxConnections);
size_t maxConnections = WebSocketServer::kDefaultMaxConnections,
int handshakeTimeoutSecs = WebSocketServer::kDefaultHandShakeTimeoutSecs);
virtual ~WebSocketServer();
void setOnConnectionCallback(const OnConnectionCallback& callback);
@ -45,6 +46,7 @@ namespace ix
std::string _host;
int _backlog;
size_t _maxConnections;
int _handshakeTimeoutSecs;
OnConnectionCallback _onConnectionCallback;
@ -66,6 +68,7 @@ namespace ix
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
const static size_t kDefaultMaxConnections;
const static int kDefaultHandShakeTimeoutSecs;
// Methods
void run();

View File

@ -55,7 +55,8 @@ namespace ix
}
// Client
WebSocketInitResult WebSocketTransport::connectToUrl(const std::string& url)
WebSocketInitResult WebSocketTransport::connectToUrl(const std::string& url,
int timeoutSecs)
{
std::string protocol, host, path, query;
int port;
@ -92,7 +93,8 @@ namespace ix
_perMessageDeflateOptions,
_enablePerMessageDeflate);
auto result = webSocketHandshake.clientHandshake(url, host, path, port);
auto result = webSocketHandshake.clientHandshake(url, host, path, port,
timeoutSecs);
if (result.success)
{
setReadyState(OPEN);
@ -101,7 +103,7 @@ namespace ix
}
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd)
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{
_socket.reset();
_socket = std::make_shared<Socket>(fd);
@ -112,7 +114,7 @@ namespace ix
_perMessageDeflateOptions,
_enablePerMessageDeflate);
auto result = webSocketHandshake.serverHandshake(fd);
auto result = webSocketHandshake.serverHandshake(fd, timeoutSecs);
if (result.success)
{
setReadyState(OPEN);

View File

@ -59,8 +59,10 @@ namespace ix
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
WebSocketInitResult connectToUrl(const std::string& url); // Client
WebSocketInitResult connectToSocket(int fd); // Server
WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs);
WebSocketInitResult connectToSocket(int fd, // Server
int timeoutSecs);
void poll();
WebSocketSendInfo sendBinary(const std::string& message);