Compare commits

...

7 Commits

Author SHA1 Message Date
205c8c15bd socket server / used wrong mutex to protect _connectionsThreads 2019-05-06 12:24:20 -07:00
78198a0147 Fix windows (#51)
* More fixes for Windows

* fix tests for windows

* qf for linux

* clean up
2019-05-06 12:22:57 -07:00
d561e1141e Update README.md 2019-05-06 09:22:52 -07:00
753fc845ac Fix for windows (#50) 2019-05-06 09:13:42 -07:00
5dbc00bbfe doc: add reference to the conan file built at https://github.com/Zinnion/conan-IXWebSocket 2019-05-01 21:31:32 -07:00
14ec8522ef remove un-needed _backgroundThreadRunning variable 2019-05-01 11:09:25 -07:00
0c2d1c22bc Make AutomaticReconnection optional (#47)
* unittest pass + commands behave as expected

* cleanup
2019-04-29 21:12:34 -07:00
17 changed files with 128 additions and 88 deletions

View File

@ -10,6 +10,7 @@
* iOS
* Linux
* Android
* Windows (no TLS)
The code was made to compile once on Windows but support is currently broken on this platform.
@ -199,6 +200,8 @@ make install # will install to /usr/local on Unix, on macOS it is a good idea to
Headers and a static library will be installed to the target dir.
A [conan](https://conan.io/) file is available at [conan-IXWebSocket](https://github.com/Zinnion/conan-IXWebSocket).
There is a unittest which can be executed by typing `make test`.
There is a Dockerfile for running some code on Linux. To use docker-compose you must make a docker container first.

View File

@ -35,7 +35,6 @@ namespace ix
_activeJobs.erase(_id);
}
// we want hostname to be copied, not passed as a const reference
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg)

View File

@ -189,11 +189,27 @@ namespace ix
int Socket::getErrno()
{
int err;
#ifdef _WIN32
return WSAGetLastError();
err = WSAGetLastError();
#else
return errno;
err = errno;
#endif
return err;
}
bool Socket::isWaitNeeded()
{
int err = getErrno();
if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
{
return true;
}
return false;
}
void Socket::closeSocket(int fd)
@ -228,8 +244,7 @@ namespace ix
return ret == len;
}
// There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
else if (ret < 0 && Socket::isWaitNeeded())
{
continue;
}
@ -257,8 +272,7 @@ namespace ix
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
else if (ret < 0 && Socket::isWaitNeeded())
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
@ -317,13 +331,12 @@ namespace ix
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
if (ret <= 0 && !Socket::isWaitNeeded())
{
// Error
return std::make_pair(false, std::string());
}
else if (ret > 0)
else
{
output.insert(output.end(),
_readBuffer.begin(),

View File

@ -16,6 +16,20 @@
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#undef EWOULDBLOCK
#undef EAGAIN
#undef EINPROGRESS
#undef EBADF
#undef EINVAL
// map to WSA error codes
#define EWOULDBLOCK WSAEWOULDBLOCK
#define EAGAIN WSATRY_AGAIN
#define EINPROGRESS WSAEINPROGRESS
#define EBADF WSAEBADF
#define EINVAL WSAEINVAL
#endif
#include "IXCancellationRequest.h"
@ -41,8 +55,6 @@ namespace ix
virtual ~Socket();
bool init(std::string& errorMsg);
void configure();
// Functions to check whether there is activity on the socket
PollResultType poll(int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode);
@ -76,14 +88,14 @@ namespace ix
const CancellationRequest& isCancellationRequested);
static int getErrno();
static bool isWaitNeeded();
static void closeSocket(int fd);
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected:
void closeSocket(int fd);
std::atomic<int> _sockfd;
std::mutex _socketMutex;

View File

@ -7,6 +7,7 @@
#include "IXSocketConnect.h"
#include "IXDNSLookup.h"
#include "IXNetSystem.h"
#include "IXSocket.h"
#include <string.h>
#include <fcntl.h>
@ -18,18 +19,6 @@
# include <linux/tcp.h>
#endif
namespace
{
void closeSocket(int fd)
{
#ifdef _WIN32
closesocket(fd);
#else
::close(fd);
#endif
}
}
namespace ix
{
//
@ -56,11 +45,12 @@ namespace ix
// block us for too long
SocketConnect::configure(fd);
if (::connect(fd, address->ai_addr, address->ai_addrlen) == -1
&& errno != EINPROGRESS && errno != 0)
int res = ::connect(fd, address->ai_addr, address->ai_addrlen);
if (res == -1 && !Socket::isWaitNeeded())
{
errMsg = strerror(errno);
closeSocket(fd);
errMsg = strerror(Socket::getErrno());
Socket::closeSocket(fd);
return -1;
}
@ -68,15 +58,17 @@ namespace ix
{
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{
closeSocket(fd);
Socket::closeSocket(fd);
errMsg = "Cancelled";
return -1;
}
// Use select to check the status of the new connection
// On Linux the timeout needs to be re-initialized everytime
// http://man7.org/linux/man-pages/man2/select.2.html
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms timeout
fd_set wfds;
fd_set efds;
@ -85,11 +77,13 @@ namespace ix
FD_ZERO(&efds);
FD_SET(fd, &efds);
if (select(fd + 1, nullptr, &wfds, &efds, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
// Use select to check the status of the new connection
res = select(fd + 1, nullptr, &wfds, &efds, &timeout);
if (res < 0 && (Socket::getErrno() == EBADF || Socket::getErrno() == EINVAL))
{
closeSocket(fd);
errMsg = std::string("Connect error, select error: ") + strerror(errno);
Socket::closeSocket(fd);
errMsg = std::string("Connect error, select error: ") + strerror(Socket::getErrno());
return -1;
}
@ -110,7 +104,7 @@ namespace ix
optval != 0)
#endif
{
closeSocket(fd);
Socket::closeSocket(fd);
errMsg = strerror(optval);
return -1;
}
@ -121,7 +115,7 @@ namespace ix
}
}
closeSocket(fd);
Socket::closeSocket(fd);
errMsg = "connect timed out after 60 seconds";
return -1;
}

View File

@ -77,7 +77,7 @@ namespace ix
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
::close(_serverFd);
Socket::closeSocket(_serverFd);
return std::make_pair(false, ss.str());
}
@ -101,7 +101,7 @@ namespace ix
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
::close(_serverFd);
Socket::closeSocket(_serverFd);
return std::make_pair(false, ss.str());
}
@ -115,7 +115,7 @@ namespace ix
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
::close(_serverFd);
Socket::closeSocket(_serverFd);
return std::make_pair(false, ss.str());
}
@ -159,7 +159,7 @@ namespace ix
_stop = false;
_conditionVariable.notify_one();
::close(_serverFd);
Socket::closeSocket(_serverFd);
}
void SocketServer::setConnectionStateFactory(
@ -242,17 +242,18 @@ namespace ix
// Accept a connection.
struct sockaddr_in client; // client address information
int clientFd; // socket connected to client
socklen_t addressLen = sizeof(socklen_t);
socklen_t addressLen = sizeof(client);
memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{
if (Socket::getErrno() != EWOULDBLOCK)
if (!Socket::isWaitNeeded())
{
// FIXME: that error should be propagated
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error accepting connection: "
<< strerror(Socket::getErrno());
<< err << ", " << strerror(err);
logError(ss.str());
}
continue;
@ -266,7 +267,7 @@ namespace ix
<< "Not accepting connection";
logError(ss.str());
::close(clientFd);
Socket::closeSocket(clientFd);
continue;
}
@ -280,7 +281,7 @@ namespace ix
if (_stop) return;
// Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_conditionVariableMutex);
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::thread(&SocketServer::handleConnection,

View File

@ -38,7 +38,6 @@ namespace ix
WebSocket::WebSocket() :
_onMessageCallback(OnMessageCallback()),
_stop(false),
_backgroundThreadRunning(false),
_automaticReconnection(true),
_handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs),
_enablePong(kDefaultEnablePong),
@ -136,7 +135,6 @@ namespace ix
{
if (_thread.joinable()) return; // we've already been started
_backgroundThreadRunning = true;
_thread = std::thread(&WebSocket::run, this);
}
@ -157,7 +155,6 @@ namespace ix
_stop = true;
_thread.join();
_backgroundThreadRunning = false;
_stop = false;
_automaticReconnection = automaticReconnection;
@ -232,16 +229,12 @@ namespace ix
using millis = std::chrono::duration<double, std::milli>;
millis duration;
while (true)
// Try to connect only once when we don't have automaticReconnection setup
if (!isConnected() && !isClosing() && !_stop && !_automaticReconnection)
{
if (isConnected() || isClosing() || _stop || !_automaticReconnection)
{
break;
}
status = connect(_handshakeTimeoutSecs);
if (!status.success && !_stop)
if (!status.success)
{
duration = millis(calculateRetryWaitMilliseconds(retries++));
@ -252,8 +245,38 @@ namespace ix
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
}
}
else
{
// Otherwise try to reconnect perpertually
while (true)
{
if (isConnected() || isClosing() || _stop || !_automaticReconnection)
{
break;
}
std::this_thread::sleep_for(duration);
status = connect(_handshakeTimeoutSecs);
if (!status.success)
{
duration = millis(calculateRetryWaitMilliseconds(retries++));
connectErr.retries = retries;
connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
// Only sleep if we aren't in the middle of stopping
if (!_stop)
{
std::this_thread::sleep_for(duration);
}
}
}
}
}
@ -317,11 +340,8 @@ namespace ix
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});
// 4. In blocking mode, getting out of this function is triggered by
// an explicit disconnection from the callback, or by the remote end
// closing the connection, ie isConnected() == false.
// closing the connection, ie isConnectedOrClosing() == false.
if (!_backgroundThreadRunning && !isConnected() && !_automaticReconnection) return;
// If we aren't trying to reconnect automatically, exit if we aren't connected
if (!isConnected() && !_automaticReconnection) return;
}
}

View File

@ -154,7 +154,6 @@ namespace ix
static OnTrafficTrackerCallback _onTrafficTrackerCallback;
std::atomic<bool> _stop;
std::atomic<bool> _backgroundThreadRunning;
std::atomic<bool> _automaticReconnection;
std::thread _thread;
std::mutex _writeMutex;

View File

@ -296,8 +296,7 @@ namespace ix
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
if (ret < 0 && Socket::isWaitNeeded())
{
break;
}
@ -553,7 +552,7 @@ namespace ix
// Get the reason.
std::string reason(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
bool remote = true;
close(code, reason, _rxbuf.size(), remote);
@ -844,8 +843,7 @@ namespace ix
{
ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
if (ret < 0 && Socket::isWaitNeeded())
{
break;
}

View File

@ -8,12 +8,11 @@ project (ixwebsocket_unittest)
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH})
find_package(Sanitizers)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set (CMAKE_CXX_STANDARD 14)
if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
option(USE_TLS "Add TLS support" ON)
endif()
@ -33,16 +32,16 @@ set (SOURCES
IXDNSLookupTest.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
IXWebSocketTestConnectionDisconnection.cpp
)
# Some unittest don't work on windows yet
if (NOT WIN32)
list(APPEND SOURCES
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
list(APPEND SOURCES
IXWebSocketPingTimeoutTest.cpp
cmd_websocket_chat.cpp
IXWebSocketTestConnectionDisconnection.cpp
)
endif()

View File

@ -108,7 +108,7 @@ namespace ix
{
log("Cannot compute a free port. bind error.");
::close(sockfd);
Socket::closeSocket(sockfd);
return getAnyFreePortRandom();
}
@ -118,12 +118,12 @@ namespace ix
{
log("Cannot compute a free port. getsockname error.");
::close(sockfd);
Socket::closeSocket(sockfd);
return getAnyFreePortRandom();
}
int port = ntohs(sa.sin_port);
::close(sockfd);
Socket::closeSocket(sockfd);
return port;
}

View File

@ -23,7 +23,6 @@ namespace
public:
WebSocketClient(int port, bool useHeartBeatMethod);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
@ -57,7 +56,7 @@ namespace
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:"
ss << "ws://127.0.0.1:"
<< _port
<< "/";
@ -347,6 +346,9 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
webSocketClient.stop();
// without this sleep test fails on Windows
ix::msleep(100);
// Here we test ping interval
// client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second

View File

@ -23,7 +23,6 @@ namespace
public:
WebSocketClient(int port, int pingInterval, int pingTimeout);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
@ -71,7 +70,7 @@ namespace
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:"
ss << "ws://127.0.0.1:"
<< _port
<< "/";

View File

@ -107,7 +107,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
std::string host("127.0.0.1");
auto isCancellationRequested = []() -> bool
{
return false;
@ -141,7 +141,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
std::string host("127.0.0.1");
auto isCancellationRequested = []() -> bool
{
return false;
@ -178,7 +178,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
std::string host("127.0.0.1");
auto isCancellationRequested = []() -> bool
{
return false;

View File

@ -100,7 +100,7 @@ namespace
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:"
ss << "ws://127.0.0.1:"
<< _port
<< "/"
<< _user;

View File

@ -39,6 +39,7 @@ namespace ix
std::cout << "Disabling automatic reconnection with "
"_webSocket.disableAutomaticReconnection()"
" not supported yet" << std::endl;
_webSocket.disableAutomaticReconnection();
}
}

View File

@ -41,7 +41,7 @@ namespace ix
{
if (!redisClient.publish(channel, message, errMsg))
{
std::cerr << "Error publishing to channel " << channel
std::cerr << "Error publishing to channel " << channel
<< "error: " << errMsg
<< std::endl;
return 1;