Compare commits

..

27 Commits

Author SHA1 Message Date
d35818b688 bump timeout 2019-04-26 11:53:50 -07:00
9936260711 initialize netSystem (aka winsock on windows) explicitely 2019-04-26 11:52:47 -07:00
22fcdc0e2e Fixes for windows (#45)
* init Net system on Windows

* propagate DNS error

* Add zlib 1.2.11 sources

* link zlib statically for windows

* remove not implemented function declaration

* fix connect on Windows
2019-04-26 11:52:47 -07:00
561eac816b fix indentation of greatestCommonDivisor 2019-04-26 11:52:47 -07:00
7256b3df65 Remove commented code 2019-04-26 11:52:47 -07:00
f4c771b745 Fix data race in WebSocket where _url is accessed without protection in setThreadName
Also fix with url usage + docker container uses fedora and works with tsan
2019-04-26 11:52:47 -07:00
73ee18b093 disable failing unittest temporarily 2019-04-26 11:52:47 -07:00
f502d3ca35 Speedup build for Windows (#43)
* Speedup build for Windows

* add space :)
2019-04-26 11:52:47 -07:00
9703f76386 fix disconnection after own close 2019-04-26 15:17:37 +02:00
3ea7dbb637 consider socket close as local, remote only when receiving reason from remote 2019-04-26 11:26:54 +02:00
6beecc0aa8 fixes, renaming, spaces, changed close timeout to 200ms 2019-04-26 11:24:34 +02:00
eee99ecfc9 change close timeout from 500 to 100ms 2019-04-25 10:30:25 +02:00
ed4063bd6a remove line 2019-04-25 10:13:24 +02:00
3a9fe7c480 final fixes, with timeout 2019-04-25 10:01:45 +02:00
2dfd141897 Merge branch 'master' into add-close-code-to-websocket-and-fixes 2019-04-25 08:39:46 +02:00
f9abf3908f Merge branch 'master' into add-close-code-to-websocket-and-fixes 2019-04-24 18:49:00 +02:00
679791dd63 remove unused var and method 2019-04-24 17:56:37 +02:00
2b9b31ef4c fixes 2019-04-24 17:53:11 +02:00
1f518aa95d wip 2019-04-24 16:43:22 +02:00
ec3896e61b Merge branch 'master' into add-close-code-to-websocket-and-fixes 2019-04-24 09:06:34 +02:00
503826a762 remove param remote for public close method, only left in internalClose 2019-04-23 15:53:45 +02:00
2eb3085d30 test value 2019-04-23 15:24:35 +02:00
3800978b3c fix test 2019-04-23 14:59:03 +02:00
37c639387f merge and remove useless comments 2019-04-23 14:37:55 +02:00
d4cdbe6141 add comments 2019-04-23 14:30:35 +02:00
776227edcb add close params, fix close wrong code in callback after close() sent on remote side 2019-04-23 14:09:12 +02:00
23384dcd6e add boolean and add missing protocol error close constant 2019-04-23 08:55:06 +02:00
27 changed files with 704 additions and 372 deletions

View File

@ -1 +0,0 @@
docker/Dockerfile.fedora

42
Dockerfile Normal file
View File

@ -0,0 +1,42 @@
FROM fedora:30 as build
RUN yum install -y gcc-g++
RUN yum install -y cmake
RUN yum install -y make
RUN yum install -y openssl-devel
RUN yum install -y wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN yum install -y python
RUN yum install -y libtsan
COPY . .
# RUN ["make", "test"]
RUN ["make"]
# Runtime
FROM fedora:30 as runtime
RUN yum install -y libtsan
RUN groupadd app && useradd -g app app
COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws
RUN chmod +x /usr/local/bin/ws
RUN ldd /usr/local/bin/ws
# Now run in usermode
USER app
WORKDIR /home/app
COPY --chown=app:app ws/snake/appsConfig.json .
COPY --chown=app:app ws/cobraMetricsSample.json .
ENTRYPOINT ["ws"]
CMD ["--help"]

View File

@ -10,7 +10,6 @@
* iOS * iOS
* Linux * Linux
* Android * Android
* Windows (no TLS)
The code was made to compile once on Windows but support is currently broken on this platform. The code was made to compile once on Windows but support is currently broken on this platform.
@ -200,8 +199,6 @@ make install # will install to /usr/local on Unix, on macOS it is a good idea to
Headers and a static library will be installed to the target dir. Headers and a static library will be installed to the target dir.
A [conan](https://conan.io/) file is available at [conan-IXWebSocket](https://github.com/Zinnion/conan-IXWebSocket).
There is a unittest which can be executed by typing `make test`. There is a unittest which can be executed by typing `make test`.
There is a Dockerfile for running some code on Linux. To use docker-compose you must make a docker container first. There is a Dockerfile for running some code on Linux. To use docker-compose you must make a docker container first.

View File

@ -1,52 +0,0 @@
# Build time
FROM debian:buster as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make"]
# Runtime
FROM debian:buster as runtime
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
# Runtime
RUN apt-get install -y libssl1.1
RUN apt-get install -y ca-certificates
RUN ["update-ca-certificates"]
# Debugging
RUN apt-get install -y strace
RUN apt-get install -y procps
RUN apt-get install -y htop
RUN adduser --disabled-password --gecos '' app
COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws
RUN chmod +x /usr/local/bin/ws
RUN ldd /usr/local/bin/ws
# Now run in usermode
USER app
WORKDIR /home/app
COPY --chown=app:app ws/snake/appsConfig.json .
COPY --chown=app:app ws/cobraMetricsSample.json .
ENTRYPOINT ["ws"]
CMD ["--help"]

View File

@ -1,42 +0,0 @@
FROM fedora:30 as build
RUN yum install -y gcc-g++
RUN yum install -y cmake
RUN yum install -y make
RUN yum install -y openssl-devel
RUN yum install -y wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN yum install -y python
RUN yum install -y libtsan
COPY . .
# RUN ["make", "test"]
RUN ["make"]
# Runtime
FROM fedora:30 as runtime
RUN yum install -y libtsan
RUN groupadd app && useradd -g app app
COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws
RUN chmod +x /usr/local/bin/ws
RUN ldd /usr/local/bin/ws
# Now run in usermode
USER app
WORKDIR /home/app
COPY --chown=app:app ws/snake/appsConfig.json .
COPY --chown=app:app ws/cobraMetricsSample.json .
ENTRYPOINT ["ws"]
CMD ["--help"]

View File

@ -1,24 +0,0 @@
# Build time
FROM ubuntu:xenial as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install python
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
RUN ["make", "test"]

View File

@ -17,25 +17,28 @@ namespace ix
std::atomic<uint64_t> DNSLookup::_nextId(0); std::atomic<uint64_t> DNSLookup::_nextId(0);
std::set<uint64_t> DNSLookup::_activeJobs; std::set<uint64_t> DNSLookup::_activeJobs;
std::mutex DNSLookup::_activeJobsMutex; std::mutex DNSLookup::_activeJobsMutex;
std::mutex DNSLookup::_resMutex;
DNSLookup::DNSLookup(const std::string& hostname, int port, int64_t wait) : DNSLookup::DNSLookup(const std::string& hostname, int port, int64_t wait) :
_hostname(hostname),
_port(port), _port(port),
_wait(wait), _wait(wait),
_res(nullptr), _res(nullptr),
_done(false), _done(false),
_id(_nextId++) _id(_nextId++)
{ {
setHostname(hostname); ;
} }
DNSLookup::~DNSLookup() DNSLookup::~DNSLookup()
{ {
// Remove this job from the active jobs list // Remove this job from the active jobs list
std::lock_guard<std::mutex> lock(_activeJobsMutex); std::unique_lock<std::mutex> lock(_activeJobsMutex);
_activeJobs.erase(_id); _activeJobs.erase(_id);
} }
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname, // we want hostname to be copied, not passed as a const reference
struct addrinfo* DNSLookup::getAddrInfo(std::string hostname,
int port, int port,
std::string& errMsg) std::string& errMsg)
{ {
@ -78,7 +81,7 @@ namespace ix
return nullptr; return nullptr;
} }
return getAddrInfo(getHostname(), _port, errMsg); return getAddrInfo(_hostname, _port, errMsg);
} }
struct addrinfo* DNSLookup::resolveAsync(std::string& errMsg, struct addrinfo* DNSLookup::resolveAsync(std::string& errMsg,
@ -96,7 +99,7 @@ namespace ix
// Record job in the active Job set // Record job in the active Job set
{ {
std::lock_guard<std::mutex> lock(_activeJobsMutex); std::unique_lock<std::mutex> lock(_activeJobsMutex);
_activeJobs.insert(_id); _activeJobs.insert(_id);
} }
@ -104,7 +107,7 @@ namespace ix
// Good resource on thread forced termination // Good resource on thread forced termination
// https://www.bo-yang.net/2017/11/19/cpp-kill-detached-thread // https://www.bo-yang.net/2017/11/19/cpp-kill-detached-thread
// //
_thread = std::thread(&DNSLookup::run, this, _id, getHostname(), _port); _thread = std::thread(&DNSLookup::run, this, _id, _hostname, _port);
_thread.detach(); _thread.detach();
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
@ -134,8 +137,13 @@ namespace ix
return nullptr; return nullptr;
} }
errMsg = getErrMsg(); if (!_errMsg.empty())
return getRes(); {
errMsg = _errMsg;
}
std::unique_lock<std::mutex> rlock(_resMutex);
return _res;
} }
void DNSLookup::run(uint64_t id, const std::string& hostname, int port) // thread runner void DNSLookup::run(uint64_t id, const std::string& hostname, int port) // thread runner
@ -147,55 +155,21 @@ namespace ix
struct addrinfo* res = getAddrInfo(hostname, port, errMsg); struct addrinfo* res = getAddrInfo(hostname, port, errMsg);
// if this isn't an active job, and the control thread is gone // if this isn't an active job, and the control thread is gone
// there is nothing to do, and we don't want to touch the defunct // there is not thing to do, and we don't want to touch the defunct
// object data structure such as _errMsg or _condition // object data structure such as _errMsg or _condition
std::lock_guard<std::mutex> lock(_activeJobsMutex); std::unique_lock<std::mutex> lock(_activeJobsMutex);
if (_activeJobs.count(id) == 0) if (_activeJobs.count(id) == 0)
{ {
return; return;
} }
// Copy result into the member variables // Copy result into the member variables
setRes(res); {
setErrMsg(errMsg); std::unique_lock<std::mutex> rlock(_resMutex);
_res = res;
}
_errMsg = errMsg;
_condition.notify_one(); _condition.notify_one();
_done = true; _done = true;
} }
void DNSLookup::setHostname(const std::string& hostname)
{
std::lock_guard<std::mutex> lock(_hostnameMutex);
_hostname = hostname;
}
const std::string& DNSLookup::getHostname()
{
std::lock_guard<std::mutex> lock(_hostnameMutex);
return _hostname;
}
void DNSLookup::setErrMsg(const std::string& errMsg)
{
std::lock_guard<std::mutex> lock(_errMsgMutex);
_errMsg = errMsg;
}
const std::string& DNSLookup::getErrMsg()
{
std::lock_guard<std::mutex> lock(_errMsgMutex);
return _errMsg;
}
void DNSLookup::setRes(struct addrinfo* addr)
{
std::lock_guard<std::mutex> lock(_resMutex);
_res = addr;
}
struct addrinfo* DNSLookup::getRes()
{
std::lock_guard<std::mutex> lock(_resMutex);
return _res;
}
} }

View File

@ -39,32 +39,18 @@ namespace ix
struct addrinfo* resolveBlocking(std::string& errMsg, struct addrinfo* resolveBlocking(std::string& errMsg,
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
static struct addrinfo* getAddrInfo(const std::string& hostname, static struct addrinfo* getAddrInfo(std::string hostname,
int port, int port,
std::string& errMsg); std::string& errMsg);
void run(uint64_t id, const std::string& hostname, int port); // thread runner void run(uint64_t id, const std::string& hostname, int port); // thread runner
void setHostname(const std::string& hostname);
const std::string& getHostname();
void setErrMsg(const std::string& errMsg);
const std::string& getErrMsg();
void setRes(struct addrinfo* addr);
struct addrinfo* getRes();
std::string _hostname; std::string _hostname;
std::mutex _hostnameMutex;
int _port; int _port;
int64_t _wait; int64_t _wait;
struct addrinfo* _res;
std::mutex _resMutex;
std::string _errMsg; std::string _errMsg;
std::mutex _errMsgMutex; struct addrinfo* _res;
static std::mutex _resMutex;
std::atomic<bool> _done; std::atomic<bool> _done;
std::thread _thread; std::thread _thread;

View File

@ -189,27 +189,11 @@ namespace ix
int Socket::getErrno() int Socket::getErrno()
{ {
int err;
#ifdef _WIN32 #ifdef _WIN32
err = WSAGetLastError(); return WSAGetLastError();
#else #else
err = errno; return errno;
#endif #endif
return err;
}
bool Socket::isWaitNeeded()
{
int err = getErrno();
if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
{
return true;
}
return false;
} }
void Socket::closeSocket(int fd) void Socket::closeSocket(int fd)
@ -244,7 +228,8 @@ namespace ix
return ret == len; return ret == len;
} }
// There is possibly something to be writen, try again // There is possibly something to be writen, try again
else if (ret < 0 && Socket::isWaitNeeded()) else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{ {
continue; continue;
} }
@ -272,7 +257,8 @@ namespace ix
return true; return true;
} }
// There is possibly something to be read, try again // There is possibly something to be read, try again
else if (ret < 0 && Socket::isWaitNeeded()) else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{ {
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
@ -331,12 +317,13 @@ namespace ix
size_t size = std::min(kChunkSize, length - output.size()); size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size); ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && !Socket::isWaitNeeded()) if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{ {
// Error // Error
return std::make_pair(false, std::string()); return std::make_pair(false, std::string());
} }
else else if (ret > 0)
{ {
output.insert(output.end(), output.insert(output.end(),
_readBuffer.begin(), _readBuffer.begin(),

View File

@ -16,20 +16,6 @@
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
#undef EWOULDBLOCK
#undef EAGAIN
#undef EINPROGRESS
#undef EBADF
#undef EINVAL
// map to WSA error codes
#define EWOULDBLOCK WSAEWOULDBLOCK
#define EAGAIN WSATRY_AGAIN
#define EINPROGRESS WSAEINPROGRESS
#define EBADF WSAEBADF
#define EINVAL WSAEINVAL
#endif #endif
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
@ -55,6 +41,8 @@ namespace ix
virtual ~Socket(); virtual ~Socket();
bool init(std::string& errorMsg); bool init(std::string& errorMsg);
void configure();
// Functions to check whether there is activity on the socket // Functions to check whether there is activity on the socket
PollResultType poll(int timeoutSecs = kDefaultPollTimeout); PollResultType poll(int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode); bool wakeUpFromPoll(uint8_t wakeUpCode);
@ -88,14 +76,14 @@ namespace ix
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
static int getErrno(); static int getErrno();
static bool isWaitNeeded();
static void closeSocket(int fd);
// Used as special codes for pipe communication // Used as special codes for pipe communication
static const uint64_t kSendRequest; static const uint64_t kSendRequest;
static const uint64_t kCloseRequest; static const uint64_t kCloseRequest;
protected: protected:
void closeSocket(int fd);
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;

View File

@ -7,7 +7,6 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXDNSLookup.h" #include "IXDNSLookup.h"
#include "IXNetSystem.h" #include "IXNetSystem.h"
#include "IXSocket.h"
#include <string.h> #include <string.h>
#include <fcntl.h> #include <fcntl.h>
@ -19,6 +18,18 @@
# include <linux/tcp.h> # include <linux/tcp.h>
#endif #endif
namespace
{
void closeSocket(int fd)
{
#ifdef _WIN32
closesocket(fd);
#else
::close(fd);
#endif
}
}
namespace ix namespace ix
{ {
// //
@ -45,12 +56,11 @@ namespace ix
// block us for too long // block us for too long
SocketConnect::configure(fd); SocketConnect::configure(fd);
int res = ::connect(fd, address->ai_addr, address->ai_addrlen); if (::connect(fd, address->ai_addr, address->ai_addrlen) == -1
&& errno != EINPROGRESS && errno != 0)
if (res == -1 && !Socket::isWaitNeeded())
{ {
errMsg = strerror(Socket::getErrno()); errMsg = strerror(errno);
Socket::closeSocket(fd); closeSocket(fd);
return -1; return -1;
} }
@ -58,17 +68,15 @@ namespace ix
{ {
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{ {
Socket::closeSocket(fd); closeSocket(fd);
errMsg = "Cancelled"; errMsg = "Cancelled";
return -1; return -1;
} }
// On Linux the timeout needs to be re-initialized everytime // Use select to check the status of the new connection
// http://man7.org/linux/man-pages/man2/select.2.html
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms timeout timeout.tv_usec = 10 * 1000; // 10ms timeout
fd_set wfds; fd_set wfds;
fd_set efds; fd_set efds;
@ -77,13 +85,11 @@ namespace ix
FD_ZERO(&efds); FD_ZERO(&efds);
FD_SET(fd, &efds); FD_SET(fd, &efds);
// Use select to check the status of the new connection if (select(fd + 1, nullptr, &wfds, &efds, &timeout) < 0 &&
res = select(fd + 1, nullptr, &wfds, &efds, &timeout); (errno == EBADF || errno == EINVAL))
if (res < 0 && (Socket::getErrno() == EBADF || Socket::getErrno() == EINVAL))
{ {
Socket::closeSocket(fd); closeSocket(fd);
errMsg = std::string("Connect error, select error: ") + strerror(Socket::getErrno()); errMsg = std::string("Connect error, select error: ") + strerror(errno);
return -1; return -1;
} }
@ -104,7 +110,7 @@ namespace ix
optval != 0) optval != 0)
#endif #endif
{ {
Socket::closeSocket(fd); closeSocket(fd);
errMsg = strerror(optval); errMsg = strerror(optval);
return -1; return -1;
} }
@ -115,7 +121,7 @@ namespace ix
} }
} }
Socket::closeSocket(fd); closeSocket(fd);
errMsg = "connect timed out after 60 seconds"; errMsg = "connect timed out after 60 seconds";
return -1; return -1;
} }

View File

@ -77,7 +77,7 @@ namespace ix
<< "at address " << _host << ":" << _port << "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno()); << " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd); ::close(_serverFd);
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
@ -101,7 +101,7 @@ namespace ix
<< "at address " << _host << ":" << _port << "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno()); << " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd); ::close(_serverFd);
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
@ -115,7 +115,7 @@ namespace ix
<< "at address " << _host << ":" << _port << "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno()); << " : " << strerror(Socket::getErrno());
Socket::closeSocket(_serverFd); ::close(_serverFd);
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
@ -159,7 +159,7 @@ namespace ix
_stop = false; _stop = false;
_conditionVariable.notify_one(); _conditionVariable.notify_one();
Socket::closeSocket(_serverFd); ::close(_serverFd);
} }
void SocketServer::setConnectionStateFactory( void SocketServer::setConnectionStateFactory(
@ -242,18 +242,17 @@ namespace ix
// Accept a connection. // Accept a connection.
struct sockaddr_in client; // client address information struct sockaddr_in client; // client address information
int clientFd; // socket connected to client int clientFd; // socket connected to client
socklen_t addressLen = sizeof(client); socklen_t addressLen = sizeof(socklen_t);
memset(&client, 0, sizeof(client)); memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0) if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{ {
if (!Socket::isWaitNeeded()) if (Socket::getErrno() != EWOULDBLOCK)
{ {
// FIXME: that error should be propagated // FIXME: that error should be propagated
int err = Socket::getErrno();
std::stringstream ss; std::stringstream ss;
ss << "SocketServer::run() error accepting connection: " ss << "SocketServer::run() error accepting connection: "
<< err << ", " << strerror(err); << strerror(Socket::getErrno());
logError(ss.str()); logError(ss.str());
} }
continue; continue;
@ -267,7 +266,7 @@ namespace ix
<< "Not accepting connection"; << "Not accepting connection";
logError(ss.str()); logError(ss.str());
Socket::closeSocket(clientFd); ::close(clientFd);
continue; continue;
} }
@ -281,7 +280,7 @@ namespace ix
if (_stop) return; if (_stop) return;
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); std::lock_guard<std::mutex> lock(_conditionVariableMutex);
_connectionsThreads.push_back(std::make_pair( _connectionsThreads.push_back(std::make_pair(
connectionState, connectionState,
std::thread(&SocketServer::handleConnection, std::thread(&SocketServer::handleConnection,

View File

@ -216,9 +216,14 @@ namespace ix
return getReadyState() == WebSocket_ReadyState_Closing; return getReadyState() == WebSocket_ReadyState_Closing;
} }
void WebSocket::close() bool WebSocket::isConnectedOrClosing() const
{ {
_ws.close(); return isConnected() || isClosing();
}
void WebSocket::close(uint16_t code, const std::string& reason)
{
_ws.close(code, reason);
} }
void WebSocket::reconnectPerpetuallyIfDisconnected() void WebSocket::reconnectPerpetuallyIfDisconnected()
@ -229,27 +234,6 @@ namespace ix
using millis = std::chrono::duration<double, std::milli>; using millis = std::chrono::duration<double, std::milli>;
millis duration; millis duration;
// Try to connect only once when we don't have automaticReconnection setup
if (!isConnected() && !isClosing() && !_stop && !_automaticReconnection)
{
status = connect(_handshakeTimeoutSecs);
if (!status.success)
{
duration = millis(calculateRetryWaitMilliseconds(retries++));
connectErr.retries = retries;
connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
}
}
else
{
// Otherwise try to reconnect perpertually
while (true) while (true)
{ {
if (isConnected() || isClosing() || _stop || !_automaticReconnection) if (isConnected() || isClosing() || _stop || !_automaticReconnection)
@ -259,7 +243,7 @@ namespace ix
status = connect(_handshakeTimeoutSecs); status = connect(_handshakeTimeoutSecs);
if (!status.success) if (!status.success && !_stop)
{ {
duration = millis(calculateRetryWaitMilliseconds(retries++)); duration = millis(calculateRetryWaitMilliseconds(retries++));
@ -271,15 +255,10 @@ namespace ix
connectErr, WebSocketOpenInfo(), connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()); WebSocketCloseInfo());
// Only sleep if we aren't in the middle of stopping
if (!_stop)
{
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
} }
} }
}
}
void WebSocket::run() void WebSocket::run()
{ {
@ -287,20 +266,17 @@ namespace ix
while (true) while (true)
{ {
if (_stop) return; if (_stop && !isClosing()) return;
// 1. Make sure we are always connected // 1. Make sure we are always connected
reconnectPerpetuallyIfDisconnected(); reconnectPerpetuallyIfDisconnected();
if (_stop) return;
// 2. Poll to see if there's any new data available // 2. Poll to see if there's any new data available
_ws.poll(); WebSocketTransport::PollPostTreatment pollPostTreatment = _ws.poll();
if (_stop) return;
// 3. Dispatch the incoming messages // 3. Dispatch the incoming messages
_ws.dispatch( _ws.dispatch(
pollPostTreatment,
[this](const std::string& msg, [this](const std::string& msg,
size_t wireSize, size_t wireSize,
bool decompressionError, bool decompressionError,
@ -340,8 +316,10 @@ namespace ix
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });
// If we aren't trying to reconnect automatically, exit if we aren't connected // 4. In blocking mode, getting out of this function is triggered by
if (!isConnected() && !_automaticReconnection) return; // an explicit disconnection from the callback, or by the remote end
// closing the connection, ie isConnectedOrClosing() == false.
if (!_thread.joinable() && !isConnectedOrClosing() && !_automaticReconnection) return;
} }
} }

View File

@ -111,7 +111,8 @@ namespace ix
WebSocketSendInfo sendText(const std::string& text, WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(); void close(uint16_t code = 1000,
const std::string& reason = "Normal closure");
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@ -136,6 +137,7 @@ namespace ix
bool isConnected() const; bool isConnected() const;
bool isClosing() const; bool isClosing() const;
bool isConnectedOrClosing() const;
void reconnectPerpetuallyIfDisconnected(); void reconnectPerpetuallyIfDisconnected();
std::string readyStateToString(ReadyState readyState); std::string readyStateToString(ReadyState readyState);
static void invokeTrafficTrackerCallback(size_t size, bool incoming); static void invokeTrafficTrackerCallback(size_t size, bool incoming);

View File

@ -72,7 +72,9 @@ namespace ix
const int WebSocketTransport::kDefaultPingIntervalSecs(-1); const int WebSocketTransport::kDefaultPingIntervalSecs(-1);
const int WebSocketTransport::kDefaultPingTimeoutSecs(-1); const int WebSocketTransport::kDefaultPingTimeoutSecs(-1);
const bool WebSocketTransport::kDefaultEnablePong(true); const bool WebSocketTransport::kDefaultEnablePong(true);
const int WebSocketTransport::kClosingMaximumWaitingDelayInMs(200);
constexpr size_t WebSocketTransport::kChunkSize; constexpr size_t WebSocketTransport::kChunkSize;
const uint16_t WebSocketTransport::kInternalErrorCode(1011); const uint16_t WebSocketTransport::kInternalErrorCode(1011);
const uint16_t WebSocketTransport::kAbnormalCloseCode(1006); const uint16_t WebSocketTransport::kAbnormalCloseCode(1006);
const uint16_t WebSocketTransport::kProtocolErrorCode(1002); const uint16_t WebSocketTransport::kProtocolErrorCode(1002);
@ -90,6 +92,7 @@ namespace ix
_closeRemote(false), _closeRemote(false),
_enablePerMessageDeflate(false), _enablePerMessageDeflate(false),
_requestInitCancellation(false), _requestInitCancellation(false),
_closingTimePoint(std::chrono::steady_clock::now()),
_enablePong(kDefaultEnablePong), _enablePong(kDefaultEnablePong),
_pingIntervalSecs(kDefaultPingIntervalSecs), _pingIntervalSecs(kDefaultPingIntervalSecs),
_pingTimeoutSecs(kDefaultPingTimeoutSecs), _pingTimeoutSecs(kDefaultPingTimeoutSecs),
@ -246,9 +249,19 @@ namespace ix
return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs); return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs);
} }
void WebSocketTransport::poll() bool WebSocketTransport::closingDelayExceeded()
{ {
PollResultType pollResult = _socket->poll(_pingIntervalOrTimeoutGCDSecs); std::lock_guard<std::mutex> lock(_closingTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _closingTimePoint > std::chrono::milliseconds(kClosingMaximumWaitingDelayInMs);
}
WebSocketTransport::PollPostTreatment WebSocketTransport::poll()
{
// we need to have no timeout if state is CLOSING
int timeoutDelaySecs = (_readyState == CLOSING) ? 0 : _pingIntervalOrTimeoutGCDSecs;
PollResultType pollResult = _socket->poll(timeoutDelaySecs);
if (_readyState == OPEN) if (_readyState == OPEN)
{ {
@ -296,23 +309,19 @@ namespace ix
{ {
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (ret < 0 && Socket::isWaitNeeded()) if (ret < 0 && _readyState != CLOSING && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{ {
break; break;
} }
else if (ret <= 0) else if (ret <= 0)
{ {
_rxbuf.clear(); // if there are received data pending to be processed, then delay the abnormal closure
// to after dispatch (other close code/reason could be read from the buffer)
_socket->close(); _socket->close();
{
std::lock_guard<std::mutex> lock(_closeDataMutex); return CHECK_OR_RAISE_ABNORMAL_CLOSE_AFTER_DISPATCH;
_closeCode = kAbnormalCloseCode;
_closeReason = kAbnormalCloseMessage;
_closeWireSize = 0;
_closeRemote = true;
}
setReadyState(CLOSED);
break;
} }
else else
{ {
@ -331,12 +340,15 @@ namespace ix
_socket->close(); _socket->close();
} }
// Avoid a race condition where we get stuck in select if (_readyState == CLOSING && closingDelayExceeded())
// while closing.
if (_readyState == CLOSING)
{ {
_rxbuf.clear();
// close code and reason were set when calling close()
_socket->close(); _socket->close();
setReadyState(CLOSED);
} }
return NONE;
} }
bool WebSocketTransport::isSendBufferEmpty() const bool WebSocketTransport::isSendBufferEmpty() const
@ -398,12 +410,13 @@ namespace ix
// | Payload Data continued ... | // | Payload Data continued ... |
// +---------------------------------------------------------------+ // +---------------------------------------------------------------+
// //
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback) void WebSocketTransport::dispatch(WebSocketTransport::PollPostTreatment pollPostTreatment,
const OnMessageCallback& onMessageCallback)
{ {
while (true) while (true)
{ {
wsheader_type ws; wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */ if (_rxbuf.size() < 2) break; /* Need at least 2 */
const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume
ws.fin = (data[0] & 0x80) == 0x80; ws.fin = (data[0] & 0x80) == 0x80;
ws.rsv1 = (data[0] & 0x40) == 0x40; ws.rsv1 = (data[0] & 0x40) == 0x40;
@ -411,7 +424,7 @@ namespace ix
ws.mask = (data[1] & 0x80) == 0x80; ws.mask = (data[1] & 0x80) == 0x80;
ws.N0 = (data[1] & 0x7f); ws.N0 = (data[1] & 0x7f);
ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0); ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0);
if (_rxbuf.size() < ws.header_size) return; /* Need: ws.header_size - _rxbuf.size() */ if (_rxbuf.size() < ws.header_size) break; /* Need: ws.header_size - _rxbuf.size() */
// //
// Calculate payload length: // Calculate payload length:
@ -553,9 +566,25 @@ namespace ix
std::string reason(_rxbuf.begin()+ws.header_size + 2, std::string reason(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N); _rxbuf.begin()+ws.header_size + (size_t) ws.N);
bool remote = true;
close(code, reason, _rxbuf.size(), remote); // We receive a CLOSE frame from remote and are NOT the ones who triggered the close
if (_readyState != CLOSING)
{
// send back the CLOSE frame
sendCloseFrame(code, reason);
_socket->wakeUpFromPoll(Socket::kCloseRequest);
bool remote = true;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
}
// we got the CLOSE frame answer from our close, so we can close the connection if
// the code/reason are the same
else if (_closeCode == code && _closeReason == reason)
{
bool remote = false;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
}
} }
else else
{ {
@ -568,6 +597,25 @@ namespace ix
_rxbuf.erase(_rxbuf.begin(), _rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N); _rxbuf.begin() + ws.header_size + (size_t) ws.N);
} }
// if an abnormal closure was raised in poll, and nothing else triggered a CLOSED state in
// the received and processed data then close the connection
if (pollPostTreatment == CHECK_OR_RAISE_ABNORMAL_CLOSE_AFTER_DISPATCH)
{
_rxbuf.clear();
// if we previously closed the connection (CLOSING state), then set state to CLOSED (code/reason were set before)
if (_readyState == CLOSING)
{
_socket->close();
setReadyState(CLOSED);
}
// if we weren't closing, then close using abnormal close code and message
else if (_readyState != CLOSED)
{
closeSocketAndSwitchToClosedState(kAbnormalCloseCode, kAbnormalCloseMessage, 0, false);
}
}
} }
std::string WebSocketTransport::getMergedChunks() const std::string WebSocketTransport::getMergedChunks() const
@ -843,7 +891,8 @@ namespace ix
{ {
ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size()); ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && Socket::isWaitNeeded()) if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{ {
break; break;
} }
@ -861,12 +910,9 @@ namespace ix
} }
} }
void WebSocketTransport::close(uint16_t code, const std::string& reason, size_t closeWireSize, bool remote)
void WebSocketTransport::sendCloseFrame(uint16_t code, const std::string& reason)
{ {
_requestInitCancellation = true;
if (_readyState == CLOSING || _readyState == CLOSED) return;
// See list of close events here: // See list of close events here:
// https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent // https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
@ -879,11 +925,11 @@ namespace ix
bool compress = false; bool compress = false;
sendData(wsheader_type::CLOSE, closure, compress); sendData(wsheader_type::CLOSE, closure, compress);
setReadyState(CLOSING); }
_socket->wakeUpFromPoll(Socket::kCloseRequest); void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code, const std::string& reason, size_t closeWireSize, bool remote)
{
_socket->close(); _socket->close();
{ {
std::lock_guard<std::mutex> lock(_closeDataMutex); std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code; _closeCode = code;
@ -891,10 +937,33 @@ namespace ix
_closeWireSize = closeWireSize; _closeWireSize = closeWireSize;
_closeRemote = remote; _closeRemote = remote;
} }
setReadyState(CLOSED); setReadyState(CLOSED);
} }
void WebSocketTransport::close(uint16_t code, const std::string& reason, size_t closeWireSize, bool remote)
{
_requestInitCancellation = true;
if (_readyState == CLOSING || _readyState == CLOSED) return;
sendCloseFrame(code, reason);
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code;
_closeReason = reason;
_closeWireSize = closeWireSize;
_closeRemote = remote;
}
{
std::lock_guard<std::mutex> lock(_closingTimePointMutex);
_closingTimePoint = std::chrono::steady_clock::now();
}
setReadyState(CLOSING);
// wake up the poll, but do not close yet
_socket->wakeUpFromPoll(Socket::kSendRequest);
}
size_t WebSocketTransport::bufferedAmount() const size_t WebSocketTransport::bufferedAmount() const
{ {
std::lock_guard<std::mutex> lock(_txbufMutex); std::lock_guard<std::mutex> lock(_txbufMutex);

View File

@ -56,6 +56,12 @@ namespace ix
FRAGMENT FRAGMENT
}; };
enum PollPostTreatment
{
NONE,
CHECK_OR_RAISE_ABNORMAL_CLOSE_AFTER_DISPATCH
};
using OnMessageCallback = std::function<void(const std::string&, using OnMessageCallback = std::function<void(const std::string&,
size_t, size_t,
bool, bool,
@ -78,7 +84,7 @@ namespace ix
WebSocketInitResult connectToSocket(int fd, // Server WebSocketInitResult connectToSocket(int fd, // Server
int timeoutSecs); int timeoutSecs);
void poll(); PollPostTreatment poll();
WebSocketSendInfo sendBinary(const std::string& message, WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback); const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message, WebSocketSendInfo sendText(const std::string& message,
@ -93,7 +99,8 @@ namespace ix
ReadyStateValues getReadyState() const; ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue); void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback); void dispatch(PollPostTreatment pollPostTreatment,
const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const; size_t bufferedAmount() const;
private: private:
@ -119,7 +126,7 @@ namespace ix
// Tells whether we should mask the data we send. // Tells whether we should mask the data we send.
// client should mask but server should not // client should mask but server should not
std::atomic<bool> _useMask; bool _useMask;
// Buffer for reading from our socket. That buffer is never resized. // Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf; std::vector<uint8_t> _readbuf;
@ -163,6 +170,10 @@ namespace ix
// Used to cancel dns lookup + socket connect + http upgrade // Used to cancel dns lookup + socket connect + http upgrade
std::atomic<bool> _requestInitCancellation; std::atomic<bool> _requestInitCancellation;
mutable std::mutex _closingTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock>_closingTimePoint;
static const int kClosingMaximumWaitingDelayInMs;
// Constants for dealing with closing conneections // Constants for dealing with closing conneections
static const uint16_t kInternalErrorCode; static const uint16_t kInternalErrorCode;
static const uint16_t kAbnormalCloseCode; static const uint16_t kAbnormalCloseCode;
@ -201,6 +212,16 @@ namespace ix
// No PONG data was received through the socket for longer than ping timeout delay // No PONG data was received through the socket for longer than ping timeout delay
bool pingTimeoutExceeded(); bool pingTimeoutExceeded();
// after calling close(), if no CLOSE frame answer is received back from the remote, we should close the connexion
bool closingDelayExceeded();
void sendCloseFrame(uint16_t code, const std::string& reason);
void closeSocketAndSwitchToClosedState(uint16_t code,
const std::string& reason,
size_t closeWireSize,
bool remote);
void sendOnSocket(); void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message, const std::string& message,

View File

@ -8,11 +8,12 @@ project (ixwebsocket_unittest)
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH}) set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH})
find_package(Sanitizers) find_package(Sanitizers)
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
# set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set (CMAKE_CXX_STANDARD 14) set (CMAKE_CXX_STANDARD 14)
if (NOT WIN32) if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
endif() endif()
@ -32,16 +33,17 @@ set (SOURCES
IXDNSLookupTest.cpp IXDNSLookupTest.cpp
IXSocketTest.cpp IXSocketTest.cpp
IXSocketConnectTest.cpp IXSocketConnectTest.cpp
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
IXWebSocketTestConnectionDisconnection.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet
if (NOT WIN32) if (NOT WIN32)
list(APPEND SOURCES list(APPEND SOURCES
IXWebSocketCloseTest.cpp
IXWebSocketServerTest.cpp
IXWebSocketPingTest.cpp
IXWebSocketPingTimeoutTest.cpp IXWebSocketPingTimeoutTest.cpp
cmd_websocket_chat.cpp cmd_websocket_chat.cpp
IXWebSocketTestConnectionDisconnection.cpp
) )
endif() endif()

View File

@ -108,7 +108,7 @@ namespace ix
{ {
log("Cannot compute a free port. bind error."); log("Cannot compute a free port. bind error.");
Socket::closeSocket(sockfd); ::close(sockfd);
return getAnyFreePortRandom(); return getAnyFreePortRandom();
} }
@ -118,12 +118,12 @@ namespace ix
{ {
log("Cannot compute a free port. getsockname error."); log("Cannot compute a free port. getsockname error.");
Socket::closeSocket(sockfd); ::close(sockfd);
return getAnyFreePortRandom(); return getAnyFreePortRandom();
} }
int port = ntohs(sa.sin_port); int port = ntohs(sa.sin_port);
Socket::closeSocket(sockfd); ::close(sockfd);
return port; return port;
} }

View File

@ -0,0 +1,407 @@
/*
* IXWebSocketCloseTest.cpp
* Author: Alexandre Konieczny
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <queue>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include "IXTest.h"
#include "catch.hpp"
using namespace ix;
namespace
{
class WebSocketClient
{
public:
WebSocketClient(int port);
void subscribe(const std::string& channel);
void start();
void stop();
void stop(uint16_t code, const std::string& reason);
bool isReady() const;
void sendMessage(const std::string& text);
uint16_t getCloseCode();
const std::string& getCloseReason();
bool getCloseRemote();
private:
ix::WebSocket _webSocket;
int _port;
mutable std::mutex _mutexCloseData;
uint16_t _closeCode;
std::string _closeReason;
bool _closeRemote;
};
WebSocketClient::WebSocketClient(int port)
: _port(port)
, _closeCode(0)
, _closeReason(std::string(""))
, _closeRemote(false)
{
;
}
bool WebSocketClient::isReady() const
{
return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open;
}
uint16_t WebSocketClient::getCloseCode()
{
std::lock_guard<std::mutex> lck(_mutexCloseData);
return _closeCode;
}
const std::string& WebSocketClient::getCloseReason()
{
std::lock_guard<std::mutex> lck(_mutexCloseData);
return _closeReason;
}
bool WebSocketClient::getCloseRemote()
{
std::lock_guard<std::mutex> lck(_mutexCloseData);
return _closeRemote;
}
void WebSocketClient::stop()
{
_webSocket.stop();
}
void WebSocketClient::stop(uint16_t code, const std::string& reason)
{
_webSocket.close(code, reason);
_webSocket.stop();
}
void WebSocketClient::start()
{
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:"
<< _port
<< "/";
url = ss.str();
}
_webSocket.setUrl(url);
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
log("client connected");
_webSocket.disableAutomaticReconnection();
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("client disconnected");
std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = closeInfo.code;
_closeReason = std::string(closeInfo.reason);
_closeRemote = closeInfo.remote;
_webSocket.disableAutomaticReconnection();
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "Error ! " << error.reason;
log(ss.str());
_webSocket.disableAutomaticReconnection();
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
ss << "Received pong message " << str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
ss << "Received ping message " << str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "Received message " << str;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void WebSocketClient::sendMessage(const std::string& text)
{
_webSocket.send(text);
}
bool startServer(ix::WebSocketServer& server,
uint16_t& receivedCloseCode,
std::string& receivedCloseReason,
bool& receivedCloseRemote,
std::mutex& mutexWrite)
{
// A dev/null server
server.setOnConnectionCallback(
[&server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("Server closed connection");
//Logger() << closeInfo.code;
//Logger() << closeInfo.reason;
//Logger() << closeInfo.remote;
std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason);
receivedCloseRemote = closeInfo.remote;
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
log(res.second);
return false;
}
server.start();
return true;
}
}
TEST_CASE("Websocket_client_close_default", "[close]")
{
SECTION("Make sure that close code and reason was used and sent to server.")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
uint16_t serverReceivedCloseCode(0);
bool serverReceivedCloseRemote(false);
std::string serverReceivedCloseReason("");
std::mutex mutexWrite;
REQUIRE(startServer(server, serverReceivedCloseCode, serverReceivedCloseReason, serverReceivedCloseRemote, mutexWrite));
std::string session = ix::generateSessionId();
WebSocketClient webSocketClient(port);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
ix::msleep(100);
webSocketClient.stop();
ix::msleep(200);
// ensure client close is the same as values given
REQUIRE(webSocketClient.getCloseCode() == 1000);
REQUIRE(webSocketClient.getCloseReason() == "Normal closure");
REQUIRE(webSocketClient.getCloseRemote() == false);
{
std::lock_guard<std::mutex> lck(mutexWrite);
// Here we read the code/reason received by the server, and ensure that remote is true
REQUIRE(serverReceivedCloseCode == 1000);
REQUIRE(serverReceivedCloseReason == "Normal closure");
REQUIRE(serverReceivedCloseRemote == true);
}
// Give us 1000ms for the server to notice that clients went away
ix::msleep(1000);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}
TEST_CASE("Websocket_client_close_params_given", "[close]")
{
SECTION("Make sure that close code and reason was used and sent to server.")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
uint16_t serverReceivedCloseCode(0);
bool serverReceivedCloseRemote(false);
std::string serverReceivedCloseReason("");
std::mutex mutexWrite;
REQUIRE(startServer(server, serverReceivedCloseCode, serverReceivedCloseReason, serverReceivedCloseRemote, mutexWrite));
std::string session = ix::generateSessionId();
WebSocketClient webSocketClient(port);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
ix::msleep(100);
webSocketClient.stop(4000, "My reason");
ix::msleep(500);
// ensure client close is the same as values given
REQUIRE(webSocketClient.getCloseCode() == 4000);
REQUIRE(webSocketClient.getCloseReason() == "My reason");
REQUIRE(webSocketClient.getCloseRemote() == false);
{
std::lock_guard<std::mutex> lck(mutexWrite);
// Here we read the code/reason received by the server, and ensure that remote is true
REQUIRE(serverReceivedCloseCode == 4000);
REQUIRE(serverReceivedCloseReason == "My reason");
REQUIRE(serverReceivedCloseRemote == true);
}
// Give us 1000ms for the server to notice that clients went away
ix::msleep(1000);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}
TEST_CASE("Websocket_server_close", "[close]")
{
SECTION("Make sure that close code and reason was read from server.")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
uint16_t serverReceivedCloseCode(0);
bool serverReceivedCloseRemote(false);
std::string serverReceivedCloseReason("");
std::mutex mutexWrite;
REQUIRE(startServer(server, serverReceivedCloseCode, serverReceivedCloseReason, serverReceivedCloseRemote, mutexWrite));
std::string session = ix::generateSessionId();
WebSocketClient webSocketClient(port);
webSocketClient.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
ix::msleep(200);
server.stop();
ix::msleep(500);
// ensure client close is the same as values given
REQUIRE(webSocketClient.getCloseCode() == 1000);
REQUIRE(webSocketClient.getCloseReason() == "Normal closure");
REQUIRE(webSocketClient.getCloseRemote() == true);
{
std::lock_guard<std::mutex> lck(mutexWrite);
// Here we read the code/reason received by the server, and ensure that remote is true
REQUIRE(serverReceivedCloseCode == 1000);
REQUIRE(serverReceivedCloseReason == "Normal closure");
REQUIRE(serverReceivedCloseRemote == false);
}
// Give us 1000ms for the server to notice that clients went away
ix::msleep(1000);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}

View File

@ -23,6 +23,7 @@ namespace
public: public:
WebSocketClient(int port, bool useHeartBeatMethod); WebSocketClient(int port, bool useHeartBeatMethod);
void subscribe(const std::string& channel);
void start(); void start();
void stop(); void stop();
bool isReady() const; bool isReady() const;
@ -56,7 +57,7 @@ namespace
std::string url; std::string url;
{ {
std::stringstream ss; std::stringstream ss;
ss << "ws://127.0.0.1:" ss << "ws://localhost:"
<< _port << _port
<< "/"; << "/";
@ -346,9 +347,6 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
webSocketClient.stop(); webSocketClient.stop();
// without this sleep test fails on Windows
ix::msleep(100);
// Here we test ping interval // Here we test ping interval
// client has sent data, but ping should have been sent no matter what // client has sent data, but ping should have been sent no matter what
// -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second // -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second

View File

@ -23,6 +23,7 @@ namespace
public: public:
WebSocketClient(int port, int pingInterval, int pingTimeout); WebSocketClient(int port, int pingInterval, int pingTimeout);
void subscribe(const std::string& channel);
void start(); void start();
void stop(); void stop();
bool isReady() const; bool isReady() const;
@ -70,7 +71,7 @@ namespace
std::string url; std::string url;
{ {
std::stringstream ss; std::stringstream ss;
ss << "ws://127.0.0.1:" ss << "ws://localhost:"
<< _port << _port
<< "/"; << "/";

View File

@ -107,7 +107,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1"); std::string host("localhost");
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;
@ -141,7 +141,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1"); std::string host("localhost");
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;
@ -178,7 +178,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("127.0.0.1"); std::string host("localhost");
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;

View File

@ -100,7 +100,7 @@ namespace
std::string url; std::string url;
{ {
std::stringstream ss; std::stringstream ss;
ss << "ws://127.0.0.1:" ss << "ws://localhost:"
<< _port << _port
<< "/" << "/"
<< _user; << _user;

View File

@ -251,8 +251,8 @@ def executeJob(job):
sys.stderr.write('.') sys.stderr.write('.')
# print('Executing ' + job['cmd'] + '...') # print('Executing ' + job['cmd'] + '...')
# 2 minutes of timeout for a single test # 10 minutes of timeout for a single test, cf PR #42
timeout = 2 * 60 timeout = 10 * 60
command = Command(job['cmd']) command = Command(job['cmd'])
timedout, ret = command.run(timeout) timedout, ret = command.run(timeout)

View File

@ -58,7 +58,6 @@ int main(int argc, char** argv)
bool compress = false; bool compress = false;
bool strict = false; bool strict = false;
bool stress = false; bool stress = false;
bool disableAutomaticReconnection = false;
int port = 8080; int port = 8080;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -88,7 +87,6 @@ int main(int argc, char** argv)
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required(); chatApp->add_option("url", url, "Connection url")->required();
@ -220,7 +218,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("connect")) else if (app.got_subcommand("connect"))
{ {
ret = ix::ws_connect_main(url, disableAutomaticReconnection); ret = ix::ws_connect_main(url);
} }
else if (app.got_subcommand("chat")) else if (app.got_subcommand("chat"))
{ {

View File

@ -31,7 +31,7 @@ namespace ix
int ws_chat_main(const std::string& url, int ws_chat_main(const std::string& url,
const std::string& user); const std::string& user);
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection); int ws_connect_main(const std::string& url);
int ws_receive_main(const std::string& url, int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate, bool enablePerMessageDeflate,

View File

@ -14,8 +14,7 @@ namespace ix
class WebSocketConnect class WebSocketConnect
{ {
public: public:
WebSocketConnect(const std::string& _url, WebSocketConnect(const std::string& _url);
bool disableAutomaticReconnection);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -30,17 +29,10 @@ namespace ix
void log(const std::string& msg); void log(const std::string& msg);
}; };
WebSocketConnect::WebSocketConnect(const std::string& url, WebSocketConnect::WebSocketConnect(const std::string& url) :
bool disableAutomaticReconnection) :
_url(url) _url(url)
{ {
if (disableAutomaticReconnection) ;
{
std::cout << "Disabling automatic reconnection with "
"_webSocket.disableAutomaticReconnection()"
" not supported yet" << std::endl;
_webSocket.disableAutomaticReconnection();
}
} }
void WebSocketConnect::log(const std::string& msg) void WebSocketConnect::log(const std::string& msg)
@ -121,10 +113,10 @@ namespace ix
_webSocket.send(text); _webSocket.send(text);
} }
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection) void interactiveMain(const std::string& url)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url, disableAutomaticReconnection); WebSocketConnect webSocketChat(url);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
@ -157,7 +149,11 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketChat.stop(); webSocketChat.stop();
}
int ws_connect_main(const std::string& url)
{
interactiveMain(url);
return 0; return 0;
} }
} }