Compare commits

..

4 Commits

16 changed files with 76 additions and 93 deletions

View File

@ -10,7 +10,6 @@
* iOS
* Linux
* Android
* Windows (no TLS)
The code was made to compile once on Windows but support is currently broken on this platform.
@ -200,8 +199,6 @@ make install # will install to /usr/local on Unix, on macOS it is a good idea to
Headers and a static library will be installed to the target dir.
A [conan](https://conan.io/) file is available at [conan-IXWebSocket](https://github.com/Zinnion/conan-IXWebSocket).
There is a unittest which can be executed by typing `make test`.
There is a Dockerfile for running some code on Linux. To use docker-compose you must make a docker container first.

View File

@ -35,6 +35,7 @@ namespace ix
_activeJobs.erase(_id);
}
// we want hostname to be copied, not passed as a const reference
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg)

View File

@ -189,27 +189,11 @@ namespace ix
int Socket::getErrno()
{
int err;
#ifdef _WIN32
err = WSAGetLastError();
return WSAGetLastError();
#else
err = errno;
return errno;
#endif
return err;
}
bool Socket::isWaitNeeded()
{
int err = getErrno();
if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
{
return true;
}
return false;
}
void Socket::closeSocket(int fd)
@ -244,7 +228,8 @@ namespace ix
return ret == len;
}
// There is possibly something to be writen, try again
else if (ret < 0 && Socket::isWaitNeeded())
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
continue;
}
@ -272,7 +257,8 @@ namespace ix
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && Socket::isWaitNeeded())
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
@ -331,12 +317,13 @@ namespace ix
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && !Socket::isWaitNeeded())
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{
// Error
return std::make_pair(false, std::string());
}
else
else if (ret > 0)
{
output.insert(output.end(),
_readBuffer.begin(),

View File

@ -16,20 +16,6 @@
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#undef EWOULDBLOCK
#undef EAGAIN
#undef EINPROGRESS
#undef EBADF
#undef EINVAL
// map to WSA error codes
#define EWOULDBLOCK WSAEWOULDBLOCK
#define EAGAIN WSATRY_AGAIN
#define EINPROGRESS WSAEINPROGRESS
#define EBADF WSAEBADF
#define EINVAL WSAEINVAL
#endif
#include "IXCancellationRequest.h"
@ -55,6 +41,8 @@ namespace ix
virtual ~Socket();
bool init(std::string& errorMsg);
void configure();
// Functions to check whether there is activity on the socket
PollResultType poll(int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode);
@ -88,14 +76,14 @@ namespace ix
const CancellationRequest& isCancellationRequested);
static int getErrno();
static bool isWaitNeeded();
static void closeSocket(int fd);
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected:
void closeSocket(int fd);
std::atomic<int> _sockfd;
std::mutex _socketMutex;

View File

@ -7,7 +7,6 @@
#include "IXSocketConnect.h"
#include "IXDNSLookup.h"
#include "IXNetSystem.h"
#include "IXSocket.h"
#include <string.h>
#include <fcntl.h>
@ -19,6 +18,18 @@
# include <linux/tcp.h>
#endif
namespace
{
void closeSocket(int fd)
{
#ifdef _WIN32
closesocket(fd);
#else
::close(fd);
#endif
}
}
namespace ix
{
//
@ -45,12 +56,11 @@ namespace ix
// block us for too long
SocketConnect::configure(fd);
int res = ::connect(fd, address->ai_addr, address->ai_addrlen);
if (res == -1 && !Socket::isWaitNeeded())
if (::connect(fd, address->ai_addr, address->ai_addrlen) == -1
&& errno != EINPROGRESS && errno != 0)
{
errMsg = strerror(Socket::getErrno());
Socket::closeSocket(fd);
errMsg = strerror(errno);
closeSocket(fd);
return -1;
}
@ -58,17 +68,15 @@ namespace ix
{
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{
Socket::closeSocket(fd);
closeSocket(fd);
errMsg = "Cancelled";
return -1;
}
// On Linux the timeout needs to be re-initialized everytime
// http://man7.org/linux/man-pages/man2/select.2.html
// Use select to check the status of the new connection
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms timeout
fd_set wfds;
fd_set efds;
@ -77,13 +85,11 @@ namespace ix
FD_ZERO(&efds);
FD_SET(fd, &efds);
// Use select to check the status of the new connection
res = select(fd + 1, nullptr, &wfds, &efds, &timeout);
if (res < 0 && (Socket::getErrno() == EBADF || Socket::getErrno() == EINVAL))
if (select(fd + 1, nullptr, &wfds, &efds, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
Socket::closeSocket(fd);
errMsg = std::string("Connect error, select error: ") + strerror(Socket::getErrno());
closeSocket(fd);
errMsg = std::string("Connect error, select error: ") + strerror(errno);
return -1;
}
@ -104,7 +110,7 @@ namespace ix
optval != 0)
#endif
{
Socket::closeSocket(fd);
closeSocket(fd);
errMsg = strerror(optval);
return -1;
}
@ -115,7 +121,7 @@ namespace ix
}
}
Socket::closeSocket(fd);
closeSocket(fd);
errMsg = "connect timed out after 60 seconds";
return -1;
}

View File

@ -77,7 +77,7 @@ namespace ix
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd);
::close(_serverFd);
return std::make_pair(false, ss.str());
}
@ -101,7 +101,7 @@ namespace ix
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd);
::close(_serverFd);
return std::make_pair(false, ss.str());
}
@ -115,7 +115,7 @@ namespace ix
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd);
::close(_serverFd);
return std::make_pair(false, ss.str());
}
@ -159,7 +159,7 @@ namespace ix
_stop = false;
_conditionVariable.notify_one();
Socket::closeSocket(_serverFd);
::close(_serverFd);
}
void SocketServer::setConnectionStateFactory(
@ -242,18 +242,17 @@ namespace ix
// Accept a connection.
struct sockaddr_in client; // client address information
int clientFd; // socket connected to client
socklen_t addressLen = sizeof(client);
socklen_t addressLen = sizeof(socklen_t);
memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{
if (!Socket::isWaitNeeded())
if (Socket::getErrno() != EWOULDBLOCK)
{
// FIXME: that error should be propagated
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error accepting connection: "
<< err << ", " << strerror(err);
<< strerror(Socket::getErrno());
logError(ss.str());
}
continue;
@ -267,7 +266,7 @@ namespace ix
<< "Not accepting connection";
logError(ss.str());
Socket::closeSocket(clientFd);
::close(clientFd);
continue;
}
@ -281,7 +280,7 @@ namespace ix
if (_stop) return;
// Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
std::lock_guard<std::mutex> lock(_conditionVariableMutex);
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::thread(&SocketServer::handleConnection,

View File

@ -270,7 +270,7 @@ namespace ix
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
// Only sleep if we aren't in the middle of stopping
if (!_stop)
{
@ -340,7 +340,9 @@ namespace ix
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});
// If we aren't trying to reconnect automatically, exit if we aren't connected
// 4. In blocking mode, getting out of this function is triggered by
// an explicit disconnection from the callback, or by the remote end
// closing the connection, ie isConnected() == false.
if (!isConnected() && !_automaticReconnection) return;
}
}

View File

@ -154,6 +154,7 @@ namespace ix
static OnTrafficTrackerCallback _onTrafficTrackerCallback;
std::atomic<bool> _stop;
std::atomic<bool> _backgroundThreadRunning;
std::atomic<bool> _automaticReconnection;
std::thread _thread;
std::mutex _writeMutex;

View File

@ -296,7 +296,8 @@ namespace ix
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (ret < 0 && Socket::isWaitNeeded())
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
@ -552,7 +553,7 @@ namespace ix
// Get the reason.
std::string reason(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
bool remote = true;
close(code, reason, _rxbuf.size(), remote);
@ -843,7 +844,8 @@ namespace ix
{
ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && Socket::isWaitNeeded())
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}

View File

@ -8,11 +8,12 @@ project (ixwebsocket_unittest)
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH})
find_package(Sanitizers)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set (CMAKE_CXX_STANDARD 14)
if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
option(USE_TLS "Add TLS support" ON)
endif()
@ -32,16 +33,16 @@ set (SOURCES
IXDNSLookupTest.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
IXWebSocketTestConnectionDisconnection.cpp
)
# Some unittest don't work on windows yet
if (NOT WIN32)
list(APPEND SOURCES
list(APPEND SOURCES
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
IXWebSocketPingTimeoutTest.cpp
cmd_websocket_chat.cpp
IXWebSocketTestConnectionDisconnection.cpp
)
endif()

View File

@ -108,7 +108,7 @@ namespace ix
{
log("Cannot compute a free port. bind error.");
Socket::closeSocket(sockfd);
::close(sockfd);
return getAnyFreePortRandom();
}
@ -118,12 +118,12 @@ namespace ix
{
log("Cannot compute a free port. getsockname error.");
Socket::closeSocket(sockfd);
::close(sockfd);
return getAnyFreePortRandom();
}
int port = ntohs(sa.sin_port);
Socket::closeSocket(sockfd);
::close(sockfd);
return port;
}

View File

@ -23,6 +23,7 @@ namespace
public:
WebSocketClient(int port, bool useHeartBeatMethod);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
@ -56,7 +57,7 @@ namespace
std::string url;
{
std::stringstream ss;
ss << "ws://127.0.0.1:"
ss << "ws://localhost:"
<< _port
<< "/";
@ -346,9 +347,6 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
webSocketClient.stop();
// without this sleep test fails on Windows
ix::msleep(100);
// Here we test ping interval
// client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second

View File

@ -23,6 +23,7 @@ namespace
public:
WebSocketClient(int port, int pingInterval, int pingTimeout);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
@ -70,7 +71,7 @@ namespace
std::string url;
{
std::stringstream ss;
ss << "ws://127.0.0.1:"
ss << "ws://localhost:"
<< _port
<< "/";

View File

@ -107,7 +107,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1");
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
@ -141,7 +141,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1");
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
@ -178,7 +178,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1");
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;

View File

@ -100,7 +100,7 @@ namespace
std::string url;
{
std::stringstream ss;
ss << "ws://127.0.0.1:"
ss << "ws://localhost:"
<< _port
<< "/"
<< _user;

View File

@ -41,7 +41,7 @@ namespace ix
{
if (!redisClient.publish(channel, message, errMsg))
{
std::cerr << "Error publishing to channel " << channel
std::cerr << "Error publishing to channel " << channel
<< "error: " << errMsg
<< std::endl;
return 1;