Compare commits

..

14 Commits

Author SHA1 Message Date
Benjamin Sergeant
c36dc0e16a fix unittest 2019-03-20 18:50:56 -07:00
Benjamin Sergeant
7e45659377 update README 2019-03-20 18:33:55 -07:00
Benjamin Sergeant
788c92487c New connection state for server code + fix OpenSSL double init bug 2019-03-20 18:26:29 -07:00
Benjamin Sergeant
0999073435 fix typo 2019-03-20 18:25:28 -07:00
Benjamin Sergeant
2cae6f4cf8 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 16:16:54 -07:00
Benjamin Sergeant
e77b9176f3 Feature/redis (#23)
* Fix warning

* (cmake) add a warning about 32/64 conversion problems.

* simple redis clients

* can publish to redis

* redis subscribe

* display messages received per second

* verbose flag

* (cmake) use clang only compile option -Wshorten-64-to-32 when compiling with clang
2019-03-20 14:29:02 -07:00
Michael Lu
afe8b966ad Fixed heartbeat typos (#22) 2019-03-19 21:31:43 -07:00
Benjamin Sergeant
310724c961 make PollResultType an enum class 2019-03-19 09:29:57 -07:00
Benjamin Sergeant
ceba8ae620 fix bug with isReadyToWrite 2019-03-18 22:05:04 -07:00
Benjamin Sergeant
fead661ab7 workaround bug in Socket::isReadyToWrite 2019-03-18 20:37:33 -07:00
Benjamin Sergeant
9c8c17f577 use milliseconds 2019-03-18 20:17:44 -07:00
Benjamin Sergeant
a04f83930f ws / log subcommand name 2019-03-18 17:54:06 -07:00
Benjamin Sergeant
c421d19800 disable sigpipe on osx when writing/reading into a dead pipe 2019-03-18 17:52:01 -07:00
Benjamin Sergeant
521f02c90e edit homebrew install steps 2019-03-18 15:45:33 -07:00
42 changed files with 672 additions and 124 deletions

View File

@@ -15,6 +15,10 @@ if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif()
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
@@ -35,6 +39,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
)
set( IXWEBSOCKET_HEADERS
@@ -62,6 +67,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
)
# Platform specific code

View File

@@ -10,7 +10,7 @@ communication channels over a single TCP connection. *IXWebSocket* is a C++ libr
* macOS
* iOS
* Linux
* Android
* Android
## Examples
@@ -63,10 +63,11 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@@ -77,6 +78,12 @@ server.setOnConnectionCallback(
{
std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl;
@@ -183,7 +190,7 @@ There is a Dockerfile for running some code on Linux, and a unittest which can b
You can build and install the ws command line tool with Homebrew.
```
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz
brew tap bsergean/IXWebSocket
brew install IXWebSocket
```
@@ -223,13 +230,13 @@ Here is a simplistic diagram which explains how the code is structured in term o
+-----------------------+ --- Public
| | Start the receiving Background thread. Auto reconnection. Simple websocket Ping.
| IXWebSocket | Interface used by C++ test clients. No IX dependencies.
| |
| |
+-----------------------+
| |
| IXWebSocketServer | Run a server and give each connections its own WebSocket object.
| | Each connection is handled in a new OS thread.
| |
+-----------------------+ --- Private
+-----------------------+ --- Private
| |
| IXWebSocketTransport | Low level websocket code, framing, managing raw socket. Adapted from easywsclient.
| |

View File

@@ -0,0 +1,37 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
#include <sstream>
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
std::stringstream ss;
ss << _globalId++;
_id = ss.str();
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@@ -0,0 +1,33 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@@ -73,7 +73,7 @@ namespace ix
errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
@@ -121,7 +121,7 @@ namespace ix
}
// Were we cancelled ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
@@ -129,7 +129,7 @@ namespace ix
}
// Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;

View File

@@ -56,7 +56,7 @@ namespace ix
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
@@ -68,7 +68,7 @@ namespace ix
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
@@ -77,13 +77,31 @@ namespace ix
return false;
}
//
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE
// on reads/writes to a closed fd
//
// The generation of the SIGPIPE signal can be suppressed using the
// F_SETNOSIGPIPE fcntl command.
//
#ifdef F_SETNOSIGPIPE
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#endif
return true;
}
@@ -104,6 +122,7 @@ namespace ix
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}

View File

@@ -45,16 +45,16 @@ namespace ix
{
if (_sockfd == -1)
{
if (onPollCallback) onPollCallback(PollResultType_Error);
if (onPollCallback) onPollCallback(PollResultType::Error);
return;
}
PollResultType pollResult = isReadyToRead(timeoutSecs, 0);
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
if (onPollCallback) onPollCallback(pollResult);
}
PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs)
PollResultType Socket::select(bool readyToRead, int timeoutMs)
{
fd_set rfds;
fd_set wfds;
@@ -62,7 +62,6 @@ namespace ix
FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed
@@ -73,64 +72,60 @@ namespace ix
}
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 1000 * timeoutMs;
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
// Compute the highest fd.
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
(timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead;
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType_Error;
pollResult = PollResultType::Error;
}
else if (ret == 0)
{
pollResult = PollResultType_Timeout;
pollResult = PollResultType::Timeout;
}
else if (interruptFd != -1 && FD_ISSET(interruptFd, fds))
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
uint64_t value = _selectInterrupt->read();
if (value == kSendRequest)
{
pollResult = PollResultType_SendRequest;
pollResult = PollResultType::SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType_CloseRequest;
pollResult = PollResultType::CloseRequest;
}
}
else if (sockfd != -1 && FD_ISSET(sockfd, fds))
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{
if (readyToRead)
{
pollResult = PollResultType_ReadyForRead;
}
else
{
pollResult = PollResultType_ReadyForWrite;
}
pollResult = PollResultType::ReadyForRead;
}
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{
pollResult = PollResultType::ReadyForWrite;
}
return pollResult;
}
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs)
PollResultType Socket::isReadyToRead(int timeoutMs)
{
bool readyToRead = true;
return select(readyToRead, timeoutSecs, timeoutMs);
return select(readyToRead, timeoutMs);
}
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs)
PollResultType Socket::isReadyToWrite(int timeoutMs)
{
bool readyToRead = false;
return select(readyToRead, timeoutSecs, timeoutMs);
return select(readyToRead, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select
@@ -215,7 +210,7 @@ namespace ix
{
while (true)
{
if (isCancellationRequested()) return false;
if (isCancellationRequested && isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
@@ -227,7 +222,7 @@ namespace ix
{
return ret == len;
}
// There is possibly something to be write, try again
// There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
@@ -246,7 +241,7 @@ namespace ix
{
while (true)
{
if (isCancellationRequested()) return false;
if (isCancellationRequested && isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
@@ -262,7 +257,7 @@ namespace ix
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error)
if (isReadyToRead(1) == PollResultType::Error)
{
return false;
}
@@ -309,9 +304,12 @@ namespace ix
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested()) return std::make_pair(false, std::string());
if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
int size = std::min(kChunkSize, length - output.size());
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
@@ -331,7 +329,7 @@ namespace ix
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error)
if (isReadyToRead(1) == PollResultType::Error)
{
return std::make_pair(false, std::string());
}

View File

@@ -25,14 +25,14 @@ namespace ix
{
class SelectInterrupt;
enum PollResultType
enum class PollResultType
{
PollResultType_ReadyForRead = 0,
PollResultType_ReadyForWrite = 1,
PollResultType_Timeout = 2,
PollResultType_Error = 3,
PollResultType_SendRequest = 4,
PollResultType_CloseRequest = 5
ReadyForRead = 0,
ReadyForWrite = 1,
Timeout = 2,
Error = 3,
SendRequest = 4,
CloseRequest = 5
};
class Socket {
@@ -50,9 +50,8 @@ namespace ix
int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs);
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs);
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs);
PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToRead(int timeoutMs);
// Virtual methods
virtual bool connect(const std::string& url,
@@ -92,6 +91,8 @@ namespace ix
std::mutex _socketMutex;
private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout;

View File

@@ -66,7 +66,7 @@ namespace ix
for (;;)
{
if (isCancellationRequested()) // Must handle timeout as well
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{
closeSocket(fd);
errMsg = "Cancelled";

View File

@@ -21,6 +21,7 @@
namespace ix
{
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr),

View File

@@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
std::once_flag _openSSLInitFlag;
static std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful;
};

View File

@@ -29,7 +29,8 @@ namespace ix
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_stop(false)
_stop(false),
_connectionStateFactory(&ConnectionState::createConnectionState)
{
}
@@ -145,6 +146,12 @@ namespace ix
::close(_serverFd);
}
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run()
{
// Set the socket to non blocking mode, so that accept calls are not blocking
@@ -214,6 +221,12 @@ namespace ix
continue;
}
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
@@ -221,7 +234,8 @@ namespace ix
f = std::async(std::launch::async,
&SocketServer::handleConnection,
this,
clientFd);
clientFd,
connectionState);
}
}
}

View File

@@ -6,6 +6,8 @@
#pragma once
#include "IXConnectionState.h"
#include <utility> // pair
#include <string>
#include <set>
@@ -20,6 +22,8 @@ namespace ix
{
class SocketServer {
public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
@@ -27,6 +31,8 @@ namespace ix
virtual ~SocketServer();
virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
@@ -60,9 +66,13 @@ namespace ix
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods
void run();
virtual void handleConnection(int fd) = 0;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;
};
}

View File

@@ -79,10 +79,10 @@ namespace ix
return _perMessageDeflateOptions;
}
void WebSocket::setHeartBeatPeriod(int hearBeatPeriod)
void WebSocket::setHeartBeatPeriod(int heartBeatPeriod)
{
std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = hearBeatPeriod;
_heartBeatPeriod = heartBeatPeriod;
}
int WebSocket::getHeartBeatPeriod() const

View File

@@ -89,7 +89,7 @@ namespace ix
void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int hearBeatPeriod);
void setHeartBeatPeriod(int heartBeatPeriod);
// Run asynchronously, by calling start and stop.
void start();

View File

@@ -49,10 +49,12 @@ namespace ix
_onConnectionCallback = callback;
}
void WebSocketServer::handleConnection(int fd)
void WebSocketServer::handleConnection(
int fd,
std::shared_ptr<ConnectionState> connectionState)
{
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket);
_onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection();

View File

@@ -20,7 +20,8 @@
namespace ix
{
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>,
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer {
public:
@@ -49,7 +50,8 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs;
// Methods
virtual void handleConnection(int fd) final;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
};
}

View File

@@ -53,7 +53,7 @@
namespace ix
{
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize;
@@ -75,11 +75,11 @@ namespace ix
}
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod)
int heartBeatPeriod)
{
_perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = hearBeatPeriod;
_heartBeatPeriod = heartBeatPeriod;
}
// Client
@@ -189,7 +189,7 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
if (pollResult == PollResultType::Timeout &&
heartBeatPeriodExceeded())
{
std::stringstream ss;
@@ -198,26 +198,27 @@ namespace ix
}
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType_SendRequest)
else if (pollResult == PollResultType::SendRequest)
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(0, 10);
if (result == PollResultType_Error)
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType::Error)
{
_socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType_ReadyForWrite)
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
}
}
else if (pollResult == PollResultType_ReadyForRead)
else if (pollResult == PollResultType::ReadyForRead)
{
while (true)
{
@@ -243,11 +244,11 @@ namespace ix
}
}
}
else if (pollResult == PollResultType_Error)
else if (pollResult == PollResultType::Error)
{
_socket->close();
}
else if (pollResult == PollResultType_CloseRequest)
else if (pollResult == PollResultType::CloseRequest)
{
_socket->close();
}

View File

@@ -61,7 +61,7 @@ namespace ix
~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod);
int heartBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs);
@@ -148,7 +148,7 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period
// No data was send through the socket for longer than the heartbeat period
bool heartBeatPeriodExceeded();
void sendOnSocket();

View File

@@ -39,7 +39,7 @@ test:
python test/run.py
ws_test: all
(cd ws ; sh test_ws.sh)
(cd ws ; bash test_ws.sh)
# For the fork that is configured with appveyor
rebase_upstream:

View File

@@ -65,7 +65,7 @@ namespace
_webSocket.setUrl(url);
// The important bit for this test.
// Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second
// Set a 1 second heartbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1);
@@ -128,10 +128,11 @@ namespace
{
// A dev/null server
server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@@ -141,6 +142,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)

View File

@@ -18,13 +18,32 @@ using namespace ix;
namespace ix
{
bool startServer(ix::WebSocketServer& server)
// Test that we can override the connectionState impl to provide our own
class ConnectionStateCustom : public ConnectionState
{
void computeId()
{
// a very boring invariant id that we can test against in the unittest
_id = "foobarConnectionId";
}
};
bool startServer(ix::WebSocketServer& server,
std::string& connectionId)
{
auto factory = []() -> std::shared_ptr<ConnectionState>
{
return std::make_shared<ConnectionStateCustom>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@@ -33,13 +52,18 @@ namespace ix
{
if (messageType == ix::WebSocket_MessageType_Open)
{
connectionState->computeId();
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
@@ -78,7 +102,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@@ -111,7 +136,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@@ -147,7 +173,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@@ -178,6 +205,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(connectionId == "foobarConnectionId");
server.stop();
REQUIRE(server.getClients().size() == 0);
}

View File

@@ -217,10 +217,11 @@ namespace
bool startServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@@ -230,6 +231,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)

View File

@@ -27,7 +27,7 @@ class Command(object):
thread.join(timeout)
if thread.is_alive():
print 'Command timeout, kill it: ' + self.cmd
print('Command timeout, kill it: ' + self.cmd)
self.process.terminate()
thread.join()
return False, 255

View File

@@ -1,2 +1,3 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.md' -exec sed -i '' 's/[[:space:]]*$//' {} \+

1
ws/.gitignore vendored
View File

@@ -1 +1,2 @@
build
node_modules

View File

@@ -23,6 +23,8 @@ add_executable(ws
ixcrypto/IXHash.cpp
ixcrypto/IXUuid.cpp
IXRedisClient.cpp
ws_http_client.cpp
ws_ping_pong.cpp
ws_broadcast_server.cpp
@@ -32,6 +34,8 @@ add_executable(ws
ws_transfer.cpp
ws_send.cpp
ws_receive.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws.cpp)
if (APPLE AND USE_TLS)

166
ws/IXRedisClient.cpp Normal file
View File

@@ -0,0 +1,166 @@
/*
* IXRedisClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
namespace ix
{
bool RedisClient::connect(const std::string& hostname, int port)
{
bool tls = false;
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return false;
}
std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr);
}
bool RedisClient::publish(const std::string& channel,
const std::string& message)
{
if (!_socket) return false;
std::stringstream ss;
ss << "PUBLISH ";
ss << channel;
ss << " ";
ss << message;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
return lineValid;
}
//
// FIXME: we assume that redis never return errors...
//
bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback)
{
if (!_socket) return false;
std::stringstream ss;
ss << "SUBSCRIBE ";
ss << channel;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
// Wait 1s for the response
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// Read the first line of the response
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// There are 5 items for the subscribe repply
for (int i = 0; i < 5; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
}
// Wait indefinitely for new messages
while (true)
{
// Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
int arraySize;
{
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> arraySize;
}
// There are 6 items for each received message
for (int i = 0; i < arraySize; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// Messages are string, which start with a string size
// => $7 (7 bytes)
int stringSize;
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> stringSize;
auto readResult = _socket->readBytes(stringSize, nullptr, nullptr);
if (!readResult.first) return false;
if (i == 2)
{
// The message is the 3rd element.
callback(readResult.second);
}
// read last 2 bytes (\r\n)
char c;
_socket->readByte(&c, nullptr);
_socket->readByte(&c, nullptr);
}
}
return true;
}
}

36
ws/IXRedisClient.h Normal file
View File

@@ -0,0 +1,36 @@
/*
* IXRedisClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
#include <functional>
namespace ix
{
class Socket;
class RedisClient {
public:
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
~RedisClient() = default;
bool connect(const std::string& hostname,
int port);
bool publish(const std::string& channel,
const std::string& message);
bool subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback);
private:
std::shared_ptr<Socket> _socket;
};
}

View File

@@ -29,7 +29,7 @@ Subcommands:
ws transfer # running on port 8080.
# Start receiver first
ws receive ws://localhost:8080
ws receive ws://localhost:8080
# Then send a file. File will be received and written to disk by the receiver process
ws send ws://localhost:8080 /file/to/path

19
ws/package-lock.json generated Normal file
View File

@@ -0,0 +1,19 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"ws": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.0.tgz",
"integrity": "sha512-deZYUNlt2O4buFCa3t5bKLf8A7FPP/TVjwOeVNpw818Ma5nk4MLXls2eoEGS39o8119QIYxTrTDoPQ5B/gTD6w==",
"requires": {
"async-limiter": "1.0.0"
}
}
}
}

View File

@@ -1,11 +1,21 @@
#!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`
exit 1
}
rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test
# Start a transport server
cd /tmp/ws_test
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile &
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile.transfer &
# Wait until the transfer server is up
while true
@@ -14,21 +24,21 @@ do
echo "Transfer server up and running"
break
}
echo "sleep ..."
echo "sleep ... wait for transfer server"
sleep 0.1
done
# Start a receiver
mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive
ws receive --delay 5 ws://127.0.0.1:8090 &
ws receive --delay 10 ws://127.0.0.1:8090 --pidfile /tmp/ws_test/pidfile.receive &
mkdir /tmp/ws_test/send
cd /tmp/ws_test/send
dd if=/dev/urandom of=20M_file count=20000 bs=1024
# Start the sender job
ws send ws://127.0.0.1:8090 20M_file
ws send --pidfile /tmp/ws_test/pidfile.send ws://127.0.0.1:8090 20M_file
# Wait until the file has been written to disk
while true
@@ -37,7 +47,7 @@ do
echo "Received file does exists, exiting loop"
break
fi
echo "sleep ..."
echo "sleep ... wait for output file"
sleep 0.1
done
@@ -48,4 +58,7 @@ cksum /tmp/ws_test/receive/20M_file
sleep 2
# Cleanup
kill `cat /tmp/ws_test/pidfile`
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`

View File

@@ -35,12 +35,15 @@ int main(int argc, char** argv)
std::string output;
std::string hostname("127.0.0.1");
std::string pidfile;
std::string channel;
std::string message;
bool headersOnly = false;
bool followRedirects = false;
bool verbose = false;
bool save = false;
bool compress = false;
int port = 8080;
int redisPort = 6379;
int connectTimeOut = 60;
int transferTimeout = 1800;
int maxRedirects = 5;
@@ -50,11 +53,13 @@ int main(int argc, char** argv)
sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send")
->required()->check(CLI::ExistingPath);
sendApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required();
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
" to artificially slow down the receiver");
receiveApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url");
@@ -94,21 +99,33 @@ int main(int argc, char** argv)
httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout");
httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout");
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required();
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
CLI11_PARSE(app, argc, argv);
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
if (app.got_subcommand("transfer"))
{
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
return ix::ws_transfer_main(port, hostname);
}
else if (app.got_subcommand("send"))
@@ -147,6 +164,14 @@ int main(int argc, char** argv)
followRedirects, maxRedirects, verbose,
save, output, compress);
}
else if (app.got_subcommand("redis_publish"))
{
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
}
else if (app.got_subcommand("redis_subscribe"))
{
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
}
return 1;
}

10
ws/ws.h
View File

@@ -39,4 +39,14 @@ namespace ix
int ws_send_main(const std::string& url,
const std::string& path);
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose);
}

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket)
[](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket](ix::WebSocketMessageType messageType,
[webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)

View File

@@ -231,6 +231,7 @@ namespace ix
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;

35
ws/ws_redis_publish.cpp Normal file
View File

@@ -0,0 +1,35 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::cerr << "Publishing message " << message
<< " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

66
ws/ws_redis_subscribe.cpp Normal file
View File

@@ -0,0 +1,66 @@
/*
* ws_redis_subscribe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <chrono>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
int msgPerSeconds = 0;
int msgCount = 0;
auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
(const std::string& message)
{
if (verbose)
{
std::cout << message << std::endl;
}
msgPerSeconds++;
auto now = std::chrono::steady_clock::now();
if (now - lastTimePoint > std::chrono::seconds(1))
{
lastTimePoint = std::chrono::steady_clock::now();
msgCount += msgPerSeconds;
// #messages 901 msg/s 150
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
}
};
std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

View File

@@ -162,6 +162,7 @@ namespace ix
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)