Compare commits

..

4 Commits

16 changed files with 76 additions and 93 deletions

View File

@ -10,7 +10,6 @@
* iOS * iOS
* Linux * Linux
* Android * Android
* Windows (no TLS)
The code was made to compile once on Windows but support is currently broken on this platform. The code was made to compile once on Windows but support is currently broken on this platform.
@ -200,8 +199,6 @@ make install # will install to /usr/local on Unix, on macOS it is a good idea to
Headers and a static library will be installed to the target dir. Headers and a static library will be installed to the target dir.
A [conan](https://conan.io/) file is available at [conan-IXWebSocket](https://github.com/Zinnion/conan-IXWebSocket).
There is a unittest which can be executed by typing `make test`. There is a unittest which can be executed by typing `make test`.
There is a Dockerfile for running some code on Linux. To use docker-compose you must make a docker container first. There is a Dockerfile for running some code on Linux. To use docker-compose you must make a docker container first.

View File

@ -35,6 +35,7 @@ namespace ix
_activeJobs.erase(_id); _activeJobs.erase(_id);
} }
// we want hostname to be copied, not passed as a const reference
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname, struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
int port, int port,
std::string& errMsg) std::string& errMsg)

View File

@ -189,27 +189,11 @@ namespace ix
int Socket::getErrno() int Socket::getErrno()
{ {
int err;
#ifdef _WIN32 #ifdef _WIN32
err = WSAGetLastError(); return WSAGetLastError();
#else #else
err = errno; return errno;
#endif #endif
return err;
}
bool Socket::isWaitNeeded()
{
int err = getErrno();
if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
{
return true;
}
return false;
} }
void Socket::closeSocket(int fd) void Socket::closeSocket(int fd)
@ -244,7 +228,8 @@ namespace ix
return ret == len; return ret == len;
} }
// There is possibly something to be writen, try again // There is possibly something to be writen, try again
else if (ret < 0 && Socket::isWaitNeeded()) else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{ {
continue; continue;
} }
@ -272,7 +257,8 @@ namespace ix
return true; return true;
} }
// There is possibly something to be read, try again // There is possibly something to be read, try again
else if (ret < 0 && Socket::isWaitNeeded()) else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{ {
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
@ -331,12 +317,13 @@ namespace ix
size_t size = std::min(kChunkSize, length - output.size()); size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size); ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && !Socket::isWaitNeeded()) if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{ {
// Error // Error
return std::make_pair(false, std::string()); return std::make_pair(false, std::string());
} }
else else if (ret > 0)
{ {
output.insert(output.end(), output.insert(output.end(),
_readBuffer.begin(), _readBuffer.begin(),

View File

@ -16,20 +16,6 @@
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
#undef EWOULDBLOCK
#undef EAGAIN
#undef EINPROGRESS
#undef EBADF
#undef EINVAL
// map to WSA error codes
#define EWOULDBLOCK WSAEWOULDBLOCK
#define EAGAIN WSATRY_AGAIN
#define EINPROGRESS WSAEINPROGRESS
#define EBADF WSAEBADF
#define EINVAL WSAEINVAL
#endif #endif
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
@ -55,6 +41,8 @@ namespace ix
virtual ~Socket(); virtual ~Socket();
bool init(std::string& errorMsg); bool init(std::string& errorMsg);
void configure();
// Functions to check whether there is activity on the socket // Functions to check whether there is activity on the socket
PollResultType poll(int timeoutSecs = kDefaultPollTimeout); PollResultType poll(int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode); bool wakeUpFromPoll(uint8_t wakeUpCode);
@ -88,14 +76,14 @@ namespace ix
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
static int getErrno(); static int getErrno();
static bool isWaitNeeded();
static void closeSocket(int fd);
// Used as special codes for pipe communication // Used as special codes for pipe communication
static const uint64_t kSendRequest; static const uint64_t kSendRequest;
static const uint64_t kCloseRequest; static const uint64_t kCloseRequest;
protected: protected:
void closeSocket(int fd);
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;

View File

@ -7,7 +7,6 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXDNSLookup.h" #include "IXDNSLookup.h"
#include "IXNetSystem.h" #include "IXNetSystem.h"
#include "IXSocket.h"
#include <string.h> #include <string.h>
#include <fcntl.h> #include <fcntl.h>
@ -19,6 +18,18 @@
# include <linux/tcp.h> # include <linux/tcp.h>
#endif #endif
namespace
{
void closeSocket(int fd)
{
#ifdef _WIN32
closesocket(fd);
#else
::close(fd);
#endif
}
}
namespace ix namespace ix
{ {
// //
@ -45,12 +56,11 @@ namespace ix
// block us for too long // block us for too long
SocketConnect::configure(fd); SocketConnect::configure(fd);
int res = ::connect(fd, address->ai_addr, address->ai_addrlen); if (::connect(fd, address->ai_addr, address->ai_addrlen) == -1
&& errno != EINPROGRESS && errno != 0)
if (res == -1 && !Socket::isWaitNeeded())
{ {
errMsg = strerror(Socket::getErrno()); errMsg = strerror(errno);
Socket::closeSocket(fd); closeSocket(fd);
return -1; return -1;
} }
@ -58,17 +68,15 @@ namespace ix
{ {
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{ {
Socket::closeSocket(fd); closeSocket(fd);
errMsg = "Cancelled"; errMsg = "Cancelled";
return -1; return -1;
} }
// On Linux the timeout needs to be re-initialized everytime // Use select to check the status of the new connection
// http://man7.org/linux/man-pages/man2/select.2.html
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms timeout timeout.tv_usec = 10 * 1000; // 10ms timeout
fd_set wfds; fd_set wfds;
fd_set efds; fd_set efds;
@ -77,13 +85,11 @@ namespace ix
FD_ZERO(&efds); FD_ZERO(&efds);
FD_SET(fd, &efds); FD_SET(fd, &efds);
// Use select to check the status of the new connection if (select(fd + 1, nullptr, &wfds, &efds, &timeout) < 0 &&
res = select(fd + 1, nullptr, &wfds, &efds, &timeout); (errno == EBADF || errno == EINVAL))
if (res < 0 && (Socket::getErrno() == EBADF || Socket::getErrno() == EINVAL))
{ {
Socket::closeSocket(fd); closeSocket(fd);
errMsg = std::string("Connect error, select error: ") + strerror(Socket::getErrno()); errMsg = std::string("Connect error, select error: ") + strerror(errno);
return -1; return -1;
} }
@ -104,7 +110,7 @@ namespace ix
optval != 0) optval != 0)
#endif #endif
{ {
Socket::closeSocket(fd); closeSocket(fd);
errMsg = strerror(optval); errMsg = strerror(optval);
return -1; return -1;
} }
@ -115,7 +121,7 @@ namespace ix
} }
} }
Socket::closeSocket(fd); closeSocket(fd);
errMsg = "connect timed out after 60 seconds"; errMsg = "connect timed out after 60 seconds";
return -1; return -1;
} }

View File

@ -77,7 +77,7 @@ namespace ix
<< "at address " << _host << ":" << _port << "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno()); << " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd); ::close(_serverFd);
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
@ -101,7 +101,7 @@ namespace ix
<< "at address " << _host << ":" << _port << "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno()); << " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd); ::close(_serverFd);
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
@ -115,7 +115,7 @@ namespace ix
<< "at address " << _host << ":" << _port << "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno()); << " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd); ::close(_serverFd);
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
@ -159,7 +159,7 @@ namespace ix
_stop = false; _stop = false;
_conditionVariable.notify_one(); _conditionVariable.notify_one();
Socket::closeSocket(_serverFd); ::close(_serverFd);
} }
void SocketServer::setConnectionStateFactory( void SocketServer::setConnectionStateFactory(
@ -242,18 +242,17 @@ namespace ix
// Accept a connection. // Accept a connection.
struct sockaddr_in client; // client address information struct sockaddr_in client; // client address information
int clientFd; // socket connected to client int clientFd; // socket connected to client
socklen_t addressLen = sizeof(client); socklen_t addressLen = sizeof(socklen_t);
memset(&client, 0, sizeof(client)); memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0) if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{ {
if (!Socket::isWaitNeeded()) if (Socket::getErrno() != EWOULDBLOCK)
{ {
// FIXME: that error should be propagated // FIXME: that error should be propagated
int err = Socket::getErrno();
std::stringstream ss; std::stringstream ss;
ss << "SocketServer::run() error accepting connection: " ss << "SocketServer::run() error accepting connection: "
<< err << ", " << strerror(err); << strerror(Socket::getErrno());
logError(ss.str()); logError(ss.str());
} }
continue; continue;
@ -267,7 +266,7 @@ namespace ix
<< "Not accepting connection"; << "Not accepting connection";
logError(ss.str()); logError(ss.str());
Socket::closeSocket(clientFd); ::close(clientFd);
continue; continue;
} }
@ -281,7 +280,7 @@ namespace ix
if (_stop) return; if (_stop) return;
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); std::lock_guard<std::mutex> lock(_conditionVariableMutex);
_connectionsThreads.push_back(std::make_pair( _connectionsThreads.push_back(std::make_pair(
connectionState, connectionState,
std::thread(&SocketServer::handleConnection, std::thread(&SocketServer::handleConnection,

View File

@ -340,7 +340,9 @@ namespace ix
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });
// If we aren't trying to reconnect automatically, exit if we aren't connected // 4. In blocking mode, getting out of this function is triggered by
// an explicit disconnection from the callback, or by the remote end
// closing the connection, ie isConnected() == false.
if (!isConnected() && !_automaticReconnection) return; if (!isConnected() && !_automaticReconnection) return;
} }
} }

View File

@ -154,6 +154,7 @@ namespace ix
static OnTrafficTrackerCallback _onTrafficTrackerCallback; static OnTrafficTrackerCallback _onTrafficTrackerCallback;
std::atomic<bool> _stop; std::atomic<bool> _stop;
std::atomic<bool> _backgroundThreadRunning;
std::atomic<bool> _automaticReconnection; std::atomic<bool> _automaticReconnection;
std::thread _thread; std::thread _thread;
std::mutex _writeMutex; std::mutex _writeMutex;

View File

@ -296,7 +296,8 @@ namespace ix
{ {
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (ret < 0 && Socket::isWaitNeeded()) if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{ {
break; break;
} }
@ -843,7 +844,8 @@ namespace ix
{ {
ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size()); ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && Socket::isWaitNeeded()) if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{ {
break; break;
} }

View File

@ -8,11 +8,12 @@ project (ixwebsocket_unittest)
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH}) set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH})
find_package(Sanitizers) find_package(Sanitizers)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set (CMAKE_CXX_STANDARD 14) set (CMAKE_CXX_STANDARD 14)
if (NOT WIN32) if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
endif() endif()
@ -32,16 +33,16 @@ set (SOURCES
IXDNSLookupTest.cpp IXDNSLookupTest.cpp
IXSocketTest.cpp IXSocketTest.cpp
IXSocketConnectTest.cpp IXSocketConnectTest.cpp
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
IXWebSocketTestConnectionDisconnection.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet
if (NOT WIN32) if (NOT WIN32)
list(APPEND SOURCES list(APPEND SOURCES
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
IXWebSocketPingTimeoutTest.cpp IXWebSocketPingTimeoutTest.cpp
cmd_websocket_chat.cpp cmd_websocket_chat.cpp
IXWebSocketTestConnectionDisconnection.cpp
) )
endif() endif()

View File

@ -108,7 +108,7 @@ namespace ix
{ {
log("Cannot compute a free port. bind error."); log("Cannot compute a free port. bind error.");
Socket::closeSocket(sockfd); ::close(sockfd);
return getAnyFreePortRandom(); return getAnyFreePortRandom();
} }
@ -118,12 +118,12 @@ namespace ix
{ {
log("Cannot compute a free port. getsockname error."); log("Cannot compute a free port. getsockname error.");
Socket::closeSocket(sockfd); ::close(sockfd);
return getAnyFreePortRandom(); return getAnyFreePortRandom();
} }
int port = ntohs(sa.sin_port); int port = ntohs(sa.sin_port);
Socket::closeSocket(sockfd); ::close(sockfd);
return port; return port;
} }

View File

@ -23,6 +23,7 @@ namespace
public: public:
WebSocketClient(int port, bool useHeartBeatMethod); WebSocketClient(int port, bool useHeartBeatMethod);
void subscribe(const std::string& channel);
void start(); void start();
void stop(); void stop();
bool isReady() const; bool isReady() const;
@ -56,7 +57,7 @@ namespace
std::string url; std::string url;
{ {
std::stringstream ss; std::stringstream ss;
ss << "ws://127.0.0.1:" ss << "ws://localhost:"
<< _port << _port
<< "/"; << "/";
@ -346,9 +347,6 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
webSocketClient.stop(); webSocketClient.stop();
// without this sleep test fails on Windows
ix::msleep(100);
// Here we test ping interval // Here we test ping interval
// client has sent data, but ping should have been sent no matter what // client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second // -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second

View File

@ -23,6 +23,7 @@ namespace
public: public:
WebSocketClient(int port, int pingInterval, int pingTimeout); WebSocketClient(int port, int pingInterval, int pingTimeout);
void subscribe(const std::string& channel);
void start(); void start();
void stop(); void stop();
bool isReady() const; bool isReady() const;
@ -70,7 +71,7 @@ namespace
std::string url; std::string url;
{ {
std::stringstream ss; std::stringstream ss;
ss << "ws://127.0.0.1:" ss << "ws://localhost:"
<< _port << _port
<< "/"; << "/";

View File

@ -107,7 +107,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1"); std::string host("localhost");
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;
@ -141,7 +141,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1"); std::string host("localhost");
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;
@ -178,7 +178,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1"); std::string host("localhost");
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;

View File

@ -100,7 +100,7 @@ namespace
std::string url; std::string url;
{ {
std::stringstream ss; std::stringstream ss;
ss << "ws://127.0.0.1:" ss << "ws://localhost:"
<< _port << _port
<< "/" << "/"
<< _user; << _user;