Compare commits

..

14 Commits

Author SHA1 Message Date
Benjamin Sergeant
c36dc0e16a fix unittest 2019-03-20 18:50:56 -07:00
Benjamin Sergeant
7e45659377 update README 2019-03-20 18:33:55 -07:00
Benjamin Sergeant
788c92487c New connection state for server code + fix OpenSSL double init bug 2019-03-20 18:26:29 -07:00
Benjamin Sergeant
0999073435 fix typo 2019-03-20 18:25:28 -07:00
Benjamin Sergeant
2cae6f4cf8 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 16:16:54 -07:00
Benjamin Sergeant
e77b9176f3 Feature/redis (#23)
* Fix warning

* (cmake) add a warning about 32/64 conversion problems.

* simple redis clients

* can publish to redis

* redis subscribe

* display messages received per second

* verbose flag

* (cmake) use clang only compile option -Wshorten-64-to-32 when compiling with clang
2019-03-20 14:29:02 -07:00
Michael Lu
afe8b966ad Fixed heartbeat typos (#22) 2019-03-19 21:31:43 -07:00
Benjamin Sergeant
310724c961 make PollResultType an enum class 2019-03-19 09:29:57 -07:00
Benjamin Sergeant
ceba8ae620 fix bug with isReadyToWrite 2019-03-18 22:05:04 -07:00
Benjamin Sergeant
fead661ab7 workaround bug in Socket::isReadyToWrite 2019-03-18 20:37:33 -07:00
Benjamin Sergeant
9c8c17f577 use milliseconds 2019-03-18 20:17:44 -07:00
Benjamin Sergeant
a04f83930f ws / log subcommand name 2019-03-18 17:54:06 -07:00
Benjamin Sergeant
c421d19800 disable sigpipe on osx when writing/reading into a dead pipe 2019-03-18 17:52:01 -07:00
Benjamin Sergeant
521f02c90e edit homebrew install steps 2019-03-18 15:45:33 -07:00
42 changed files with 672 additions and 124 deletions

View File

@@ -15,6 +15,10 @@ if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif() endif()
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
endif()
set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp ixwebsocket/IXSocketServer.cpp
@@ -35,6 +39,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSelectInterrupt.cpp ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
) )
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
@@ -62,6 +67,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSelectInterrupt.h ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
) )
# Platform specific code # Platform specific code

View File

@@ -10,7 +10,7 @@ communication channels over a single TCP connection. *IXWebSocket* is a C++ libr
* macOS * macOS
* iOS * iOS
* Linux * Linux
* Android * Android
## Examples ## Examples
@@ -63,10 +63,11 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -77,6 +78,12 @@ server.setOnConnectionCallback(
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to. // The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
@@ -183,7 +190,7 @@ There is a Dockerfile for running some code on Linux, and a unittest which can b
You can build and install the ws command line tool with Homebrew. You can build and install the ws command line tool with Homebrew.
``` ```
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz brew tap bsergean/IXWebSocket
brew install IXWebSocket brew install IXWebSocket
``` ```
@@ -223,13 +230,13 @@ Here is a simplistic diagram which explains how the code is structured in term o
+-----------------------+ --- Public +-----------------------+ --- Public
| | Start the receiving Background thread. Auto reconnection. Simple websocket Ping. | | Start the receiving Background thread. Auto reconnection. Simple websocket Ping.
| IXWebSocket | Interface used by C++ test clients. No IX dependencies. | IXWebSocket | Interface used by C++ test clients. No IX dependencies.
| | | |
+-----------------------+ +-----------------------+
| | | |
| IXWebSocketServer | Run a server and give each connections its own WebSocket object. | IXWebSocketServer | Run a server and give each connections its own WebSocket object.
| | Each connection is handled in a new OS thread. | | Each connection is handled in a new OS thread.
| | | |
+-----------------------+ --- Private +-----------------------+ --- Private
| | | |
| IXWebSocketTransport | Low level websocket code, framing, managing raw socket. Adapted from easywsclient. | IXWebSocketTransport | Low level websocket code, framing, managing raw socket. Adapted from easywsclient.
| | | |

View File

@@ -0,0 +1,37 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
#include <sstream>
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
std::stringstream ss;
ss << _globalId++;
_id = ss.str();
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@@ -0,0 +1,33 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@@ -73,7 +73,7 @@ namespace ix
errMsg = "no error"; errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ? // Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested()) if (isCancellationRequested && isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@@ -121,7 +121,7 @@ namespace ix
} }
// Were we cancelled ? // Were we cancelled ?
if (isCancellationRequested()) if (isCancellationRequested && isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@@ -129,7 +129,7 @@ namespace ix
} }
// Maybe a cancellation request got in before the bg terminated ? // Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested()) if (isCancellationRequested && isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;

View File

@@ -56,7 +56,7 @@ namespace ix
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1) if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{ {
std::stringstream ss; std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call" ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno); << " : " << strerror(errno);
errorMsg = ss.str(); errorMsg = ss.str();
@@ -68,7 +68,7 @@ namespace ix
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1) if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{ {
std::stringstream ss; std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call" ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno); << " : " << strerror(errno);
errorMsg = ss.str(); errorMsg = ss.str();
@@ -77,13 +77,31 @@ namespace ix
return false; return false;
} }
// #ifdef F_SETNOSIGPIPE
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
// on reads/writes to a closed fd {
// std::stringstream ss;
// The generation of the SIGPIPE signal can be suppressed using the ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
// F_SETNOSIGPIPE fcntl command. << " : " << strerror(errno);
// errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#endif
return true; return true;
} }
@@ -104,6 +122,7 @@ namespace ix
uint64_t value = 0; uint64_t value = 0;
::read(fd, &value, sizeof(value)); ::read(fd, &value, sizeof(value));
return value; return value;
} }

View File

@@ -45,16 +45,16 @@ namespace ix
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
if (onPollCallback) onPollCallback(PollResultType_Error); if (onPollCallback) onPollCallback(PollResultType::Error);
return; return;
} }
PollResultType pollResult = isReadyToRead(timeoutSecs, 0); PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
if (onPollCallback) onPollCallback(pollResult); if (onPollCallback) onPollCallback(pollResult);
} }
PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs) PollResultType Socket::select(bool readyToRead, int timeoutMs)
{ {
fd_set rfds; fd_set rfds;
fd_set wfds; fd_set wfds;
@@ -62,7 +62,6 @@ namespace ix
FD_ZERO(&wfds); FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds; fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds); FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed // File descriptor used to interrupt select when needed
@@ -73,64 +72,60 @@ namespace ix
} }
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = timeoutSecs; timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = 1000 * timeoutMs; timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
// Compute the highest fd. // Compute the highest fd.
int sockfd = _sockfd; int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd); int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr, int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout); (timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead; PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0) if (ret < 0)
{ {
pollResult = PollResultType_Error; pollResult = PollResultType::Error;
} }
else if (ret == 0) else if (ret == 0)
{ {
pollResult = PollResultType_Timeout; pollResult = PollResultType::Timeout;
} }
else if (interruptFd != -1 && FD_ISSET(interruptFd, fds)) else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{ {
uint64_t value = _selectInterrupt->read(); uint64_t value = _selectInterrupt->read();
if (value == kSendRequest) if (value == kSendRequest)
{ {
pollResult = PollResultType_SendRequest; pollResult = PollResultType::SendRequest;
} }
else if (value == kCloseRequest) else if (value == kCloseRequest)
{ {
pollResult = PollResultType_CloseRequest; pollResult = PollResultType::CloseRequest;
} }
} }
else if (sockfd != -1 && FD_ISSET(sockfd, fds)) else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{ {
if (readyToRead) pollResult = PollResultType::ReadyForRead;
{ }
pollResult = PollResultType_ReadyForRead; else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
} {
else pollResult = PollResultType::ReadyForWrite;
{
pollResult = PollResultType_ReadyForWrite;
}
} }
return pollResult; return pollResult;
} }
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs) PollResultType Socket::isReadyToRead(int timeoutMs)
{ {
bool readyToRead = true; bool readyToRead = true;
return select(readyToRead, timeoutSecs, timeoutMs); return select(readyToRead, timeoutMs);
} }
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs) PollResultType Socket::isReadyToWrite(int timeoutMs)
{ {
bool readyToRead = false; bool readyToRead = false;
return select(readyToRead, timeoutSecs, timeoutMs); return select(readyToRead, timeoutMs);
} }
// Wake up from poll/select by writing to the pipe which is watched by select // Wake up from poll/select by writing to the pipe which is watched by select
@@ -215,7 +210,7 @@ namespace ix
{ {
while (true) while (true)
{ {
if (isCancellationRequested()) return false; if (isCancellationRequested && isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str()); char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size(); int len = (int) str.size();
@@ -227,7 +222,7 @@ namespace ix
{ {
return ret == len; return ret == len;
} }
// There is possibly something to be write, try again // There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
@@ -246,7 +241,7 @@ namespace ix
{ {
while (true) while (true)
{ {
if (isCancellationRequested()) return false; if (isCancellationRequested && isCancellationRequested()) return false;
ssize_t ret; ssize_t ret;
ret = recv(buffer, 1); ret = recv(buffer, 1);
@@ -262,7 +257,7 @@ namespace ix
{ {
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error) if (isReadyToRead(1) == PollResultType::Error)
{ {
return false; return false;
} }
@@ -309,9 +304,12 @@ namespace ix
std::vector<uint8_t> output; std::vector<uint8_t> output;
while (output.size() != length) while (output.size() != length)
{ {
if (isCancellationRequested()) return std::make_pair(false, std::string()); if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
int size = std::min(kChunkSize, length - output.size()); size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size); ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK && if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
@@ -331,7 +329,7 @@ namespace ix
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error) if (isReadyToRead(1) == PollResultType::Error)
{ {
return std::make_pair(false, std::string()); return std::make_pair(false, std::string());
} }

View File

@@ -25,14 +25,14 @@ namespace ix
{ {
class SelectInterrupt; class SelectInterrupt;
enum PollResultType enum class PollResultType
{ {
PollResultType_ReadyForRead = 0, ReadyForRead = 0,
PollResultType_ReadyForWrite = 1, ReadyForWrite = 1,
PollResultType_Timeout = 2, Timeout = 2,
PollResultType_Error = 3, Error = 3,
PollResultType_SendRequest = 4, SendRequest = 4,
PollResultType_CloseRequest = 5 CloseRequest = 5
}; };
class Socket { class Socket {
@@ -50,9 +50,8 @@ namespace ix
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode); bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs); PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs); PollResultType isReadyToRead(int timeoutMs);
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,
@@ -92,6 +91,8 @@ namespace ix
std::mutex _socketMutex; std::mutex _socketMutex;
private: private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout; static const int kDefaultPollNoTimeout;

View File

@@ -66,7 +66,7 @@ namespace ix
for (;;) for (;;)
{ {
if (isCancellationRequested()) // Must handle timeout as well if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{ {
closeSocket(fd); closeSocket(fd);
errMsg = "Cancelled"; errMsg = "Cancelled";

View File

@@ -21,6 +21,7 @@
namespace ix namespace ix
{ {
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false); std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd), SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr), _ssl_connection(nullptr),

View File

@@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method; const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
std::once_flag _openSSLInitFlag; static std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful; static std::atomic<bool> _openSSLInitializationSuccessful;
}; };

View File

@@ -29,7 +29,8 @@ namespace ix
_host(host), _host(host),
_backlog(backlog), _backlog(backlog),
_maxConnections(maxConnections), _maxConnections(maxConnections),
_stop(false) _stop(false),
_connectionStateFactory(&ConnectionState::createConnectionState)
{ {
} }
@@ -145,6 +146,12 @@ namespace ix
::close(_serverFd); ::close(_serverFd);
} }
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run() void SocketServer::run()
{ {
// Set the socket to non blocking mode, so that accept calls are not blocking // Set the socket to non blocking mode, so that accept calls are not blocking
@@ -214,6 +221,12 @@ namespace ix
continue; continue;
} }
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
// //
// the destructor of a future returned by std::async blocks, // the destructor of a future returned by std::async blocks,
@@ -221,7 +234,8 @@ namespace ix
f = std::async(std::launch::async, f = std::async(std::launch::async,
&SocketServer::handleConnection, &SocketServer::handleConnection,
this, this,
clientFd); clientFd,
connectionState);
} }
} }
} }

View File

@@ -6,6 +6,8 @@
#pragma once #pragma once
#include "IXConnectionState.h"
#include <utility> // pair #include <utility> // pair
#include <string> #include <string>
#include <set> #include <set>
@@ -20,6 +22,8 @@ namespace ix
{ {
class SocketServer { class SocketServer {
public: public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort, SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog, int backlog = SocketServer::kDefaultTcpBacklog,
@@ -27,6 +31,8 @@ namespace ix
virtual ~SocketServer(); virtual ~SocketServer();
virtual void stop(); virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort; const static int kDefaultPort;
const static std::string kDefaultHost; const static std::string kDefaultHost;
const static int kDefaultTcpBacklog; const static int kDefaultTcpBacklog;
@@ -60,9 +66,13 @@ namespace ix
std::condition_variable _conditionVariable; std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods // Methods
void run(); void run();
virtual void handleConnection(int fd) = 0; virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
}; };
} }

View File

@@ -79,10 +79,10 @@ namespace ix
return _perMessageDeflateOptions; return _perMessageDeflateOptions;
} }
void WebSocket::setHeartBeatPeriod(int hearBeatPeriod) void WebSocket::setHeartBeatPeriod(int heartBeatPeriod)
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = hearBeatPeriod; _heartBeatPeriod = heartBeatPeriod;
} }
int WebSocket::getHeartBeatPeriod() const int WebSocket::getHeartBeatPeriod() const

View File

@@ -89,7 +89,7 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs); void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int hearBeatPeriod); void setHeartBeatPeriod(int heartBeatPeriod);
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();

View File

@@ -49,10 +49,12 @@ namespace ix
_onConnectionCallback = callback; _onConnectionCallback = callback;
} }
void WebSocketServer::handleConnection(int fd) void WebSocketServer::handleConnection(
int fd,
std::shared_ptr<ConnectionState> connectionState)
{ {
auto webSocket = std::make_shared<WebSocket>(); auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket); _onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection(); webSocket->disableAutomaticReconnection();

View File

@@ -20,7 +20,8 @@
namespace ix namespace ix
{ {
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>; using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>,
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer { class WebSocketServer : public SocketServer {
public: public:
@@ -49,7 +50,8 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs; const static int kDefaultHandShakeTimeoutSecs;
// Methods // Methods
virtual void handleConnection(int fd) final; virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
}; };
} }

View File

@@ -53,7 +53,7 @@
namespace ix namespace ix
{ {
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat"); const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1); const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize; constexpr size_t WebSocketTransport::kChunkSize;
@@ -75,11 +75,11 @@ namespace ix
} }
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod) int heartBeatPeriod)
{ {
_perMessageDeflateOptions = perMessageDeflateOptions; _perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); _enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = hearBeatPeriod; _heartBeatPeriod = heartBeatPeriod;
} }
// Client // Client
@@ -189,7 +189,7 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or // If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a // send for a duration exceeding our heart-beat period, send a
// ping to the server. // ping to the server.
if (pollResult == PollResultType_Timeout && if (pollResult == PollResultType::Timeout &&
heartBeatPeriodExceeded()) heartBeatPeriodExceeded())
{ {
std::stringstream ss; std::stringstream ss;
@@ -198,26 +198,27 @@ namespace ix
} }
// Make sure we send all the buffered data // Make sure we send all the buffered data
// there can be a lot of it for large messages. // there can be a lot of it for large messages.
else if (pollResult == PollResultType_SendRequest) else if (pollResult == PollResultType::SendRequest)
{ {
while (!isSendBufferEmpty() && !_requestInitCancellation) while (!isSendBufferEmpty() && !_requestInitCancellation)
{ {
// Wait with a 10ms timeout until the socket is ready to write. // Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping // This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(0, 10); PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType_Error)
if (result == PollResultType::Error)
{ {
_socket->close(); _socket->close();
setReadyState(CLOSED); setReadyState(CLOSED);
break; break;
} }
else if (result == PollResultType_ReadyForWrite) else if (result == PollResultType::ReadyForWrite)
{ {
sendOnSocket(); sendOnSocket();
} }
} }
} }
else if (pollResult == PollResultType_ReadyForRead) else if (pollResult == PollResultType::ReadyForRead)
{ {
while (true) while (true)
{ {
@@ -243,11 +244,11 @@ namespace ix
} }
} }
} }
else if (pollResult == PollResultType_Error) else if (pollResult == PollResultType::Error)
{ {
_socket->close(); _socket->close();
} }
else if (pollResult == PollResultType_CloseRequest) else if (pollResult == PollResultType::CloseRequest)
{ {
_socket->close(); _socket->close();
} }

View File

@@ -61,7 +61,7 @@ namespace ix
~WebSocketTransport(); ~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod); int heartBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs); int timeoutSecs);
@@ -148,7 +148,7 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex; mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint; std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period // No data was send through the socket for longer than the heartbeat period
bool heartBeatPeriodExceeded(); bool heartBeatPeriodExceeded();
void sendOnSocket(); void sendOnSocket();

View File

@@ -39,7 +39,7 @@ test:
python test/run.py python test/run.py
ws_test: all ws_test: all
(cd ws ; sh test_ws.sh) (cd ws ; bash test_ws.sh)
# For the fork that is configured with appveyor # For the fork that is configured with appveyor
rebase_upstream: rebase_upstream:

View File

@@ -65,7 +65,7 @@ namespace
_webSocket.setUrl(url); _webSocket.setUrl(url);
// The important bit for this test. // The important bit for this test.
// Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second // Set a 1 second heartbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client. // a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1); _webSocket.setHeartBeatPeriod(1);
@@ -128,10 +128,11 @@ namespace
{ {
// A dev/null server // A dev/null server
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket) [&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -141,6 +142,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New server connection"; Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@@ -18,13 +18,32 @@ using namespace ix;
namespace ix namespace ix
{ {
bool startServer(ix::WebSocketServer& server) // Test that we can override the connectionState impl to provide our own
class ConnectionStateCustom : public ConnectionState
{ {
void computeId()
{
// a very boring invariant id that we can test against in the unittest
_id = "foobarConnectionId";
}
};
bool startServer(ix::WebSocketServer& server,
std::string& connectionId)
{
auto factory = []() -> std::shared_ptr<ConnectionState>
{
return std::make_shared<ConnectionStateCustom>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -33,13 +52,18 @@ namespace ix
{ {
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
connectionState->computeId();
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
connectionId = connectionState->getId();
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
@@ -78,7 +102,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
REQUIRE(startServer(server)); std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
@@ -111,7 +136,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
REQUIRE(startServer(server)); std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
@@ -147,7 +173,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
REQUIRE(startServer(server)); std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
@@ -178,6 +205,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
// Give us 500ms for the server to notice that clients went away // Give us 500ms for the server to notice that clients went away
ix::msleep(500); ix::msleep(500);
REQUIRE(connectionId == "foobarConnectionId");
server.stop(); server.stop();
REQUIRE(server.getClients().size() == 0); REQUIRE(server.getClients().size() == 0);
} }

View File

@@ -217,10 +217,11 @@ namespace
bool startServer(ix::WebSocketServer& server) bool startServer(ix::WebSocketServer& server)
{ {
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -230,6 +231,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@@ -27,7 +27,7 @@ class Command(object):
thread.join(timeout) thread.join(timeout)
if thread.is_alive(): if thread.is_alive():
print 'Command timeout, kill it: ' + self.cmd print('Command timeout, kill it: ' + self.cmd)
self.process.terminate() self.process.terminate()
thread.join() thread.join()
return False, 255 return False, 255

View File

@@ -1,2 +1,3 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+ find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+ find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.md' -exec sed -i '' 's/[[:space:]]*$//' {} \+

1
ws/.gitignore vendored
View File

@@ -1 +1,2 @@
build build
node_modules

View File

@@ -23,6 +23,8 @@ add_executable(ws
ixcrypto/IXHash.cpp ixcrypto/IXHash.cpp
ixcrypto/IXUuid.cpp ixcrypto/IXUuid.cpp
IXRedisClient.cpp
ws_http_client.cpp ws_http_client.cpp
ws_ping_pong.cpp ws_ping_pong.cpp
ws_broadcast_server.cpp ws_broadcast_server.cpp
@@ -32,6 +34,8 @@ add_executable(ws
ws_transfer.cpp ws_transfer.cpp
ws_send.cpp ws_send.cpp
ws_receive.cpp ws_receive.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws.cpp) ws.cpp)
if (APPLE AND USE_TLS) if (APPLE AND USE_TLS)

166
ws/IXRedisClient.cpp Normal file
View File

@@ -0,0 +1,166 @@
/*
* IXRedisClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
namespace ix
{
bool RedisClient::connect(const std::string& hostname, int port)
{
bool tls = false;
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return false;
}
std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr);
}
bool RedisClient::publish(const std::string& channel,
const std::string& message)
{
if (!_socket) return false;
std::stringstream ss;
ss << "PUBLISH ";
ss << channel;
ss << " ";
ss << message;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
return lineValid;
}
//
// FIXME: we assume that redis never return errors...
//
bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback)
{
if (!_socket) return false;
std::stringstream ss;
ss << "SUBSCRIBE ";
ss << channel;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
// Wait 1s for the response
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// Read the first line of the response
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// There are 5 items for the subscribe repply
for (int i = 0; i < 5; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
}
// Wait indefinitely for new messages
while (true)
{
// Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
int arraySize;
{
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> arraySize;
}
// There are 6 items for each received message
for (int i = 0; i < arraySize; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// Messages are string, which start with a string size
// => $7 (7 bytes)
int stringSize;
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> stringSize;
auto readResult = _socket->readBytes(stringSize, nullptr, nullptr);
if (!readResult.first) return false;
if (i == 2)
{
// The message is the 3rd element.
callback(readResult.second);
}
// read last 2 bytes (\r\n)
char c;
_socket->readByte(&c, nullptr);
_socket->readByte(&c, nullptr);
}
}
return true;
}
}

36
ws/IXRedisClient.h Normal file
View File

@@ -0,0 +1,36 @@
/*
* IXRedisClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
#include <functional>
namespace ix
{
class Socket;
class RedisClient {
public:
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
~RedisClient() = default;
bool connect(const std::string& hostname,
int port);
bool publish(const std::string& channel,
const std::string& message);
bool subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback);
private:
std::shared_ptr<Socket> _socket;
};
}

View File

@@ -29,7 +29,7 @@ Subcommands:
ws transfer # running on port 8080. ws transfer # running on port 8080.
# Start receiver first # Start receiver first
ws receive ws://localhost:8080 ws receive ws://localhost:8080
# Then send a file. File will be received and written to disk by the receiver process # Then send a file. File will be received and written to disk by the receiver process
ws send ws://localhost:8080 /file/to/path ws send ws://localhost:8080 /file/to/path

19
ws/package-lock.json generated Normal file
View File

@@ -0,0 +1,19 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"ws": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.0.tgz",
"integrity": "sha512-deZYUNlt2O4buFCa3t5bKLf8A7FPP/TVjwOeVNpw818Ma5nk4MLXls2eoEGS39o8119QIYxTrTDoPQ5B/gTD6w==",
"requires": {
"async-limiter": "1.0.0"
}
}
}
}

View File

@@ -1,11 +1,21 @@
#!/bin/sh #!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`
exit 1
}
rm -rf /tmp/ws_test rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test mkdir -p /tmp/ws_test
# Start a transport server # Start a transport server
cd /tmp/ws_test cd /tmp/ws_test
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile & ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile.transfer &
# Wait until the transfer server is up # Wait until the transfer server is up
while true while true
@@ -14,21 +24,21 @@ do
echo "Transfer server up and running" echo "Transfer server up and running"
break break
} }
echo "sleep ..." echo "sleep ... wait for transfer server"
sleep 0.1 sleep 0.1
done done
# Start a receiver # Start a receiver
mkdir -p /tmp/ws_test/receive mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive cd /tmp/ws_test/receive
ws receive --delay 5 ws://127.0.0.1:8090 & ws receive --delay 10 ws://127.0.0.1:8090 --pidfile /tmp/ws_test/pidfile.receive &
mkdir /tmp/ws_test/send mkdir /tmp/ws_test/send
cd /tmp/ws_test/send cd /tmp/ws_test/send
dd if=/dev/urandom of=20M_file count=20000 bs=1024 dd if=/dev/urandom of=20M_file count=20000 bs=1024
# Start the sender job # Start the sender job
ws send ws://127.0.0.1:8090 20M_file ws send --pidfile /tmp/ws_test/pidfile.send ws://127.0.0.1:8090 20M_file
# Wait until the file has been written to disk # Wait until the file has been written to disk
while true while true
@@ -37,7 +47,7 @@ do
echo "Received file does exists, exiting loop" echo "Received file does exists, exiting loop"
break break
fi fi
echo "sleep ..." echo "sleep ... wait for output file"
sleep 0.1 sleep 0.1
done done
@@ -48,4 +58,7 @@ cksum /tmp/ws_test/receive/20M_file
sleep 2 sleep 2
# Cleanup # Cleanup
kill `cat /tmp/ws_test/pidfile` kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`

View File

@@ -35,12 +35,15 @@ int main(int argc, char** argv)
std::string output; std::string output;
std::string hostname("127.0.0.1"); std::string hostname("127.0.0.1");
std::string pidfile; std::string pidfile;
std::string channel;
std::string message;
bool headersOnly = false; bool headersOnly = false;
bool followRedirects = false; bool followRedirects = false;
bool verbose = false; bool verbose = false;
bool save = false; bool save = false;
bool compress = false; bool compress = false;
int port = 8080; int port = 8080;
int redisPort = 6379;
int connectTimeOut = 60; int connectTimeOut = 60;
int transferTimeout = 1800; int transferTimeout = 1800;
int maxRedirects = 5; int maxRedirects = 5;
@@ -50,11 +53,13 @@ int main(int argc, char** argv)
sendApp->add_option("url", url, "Connection url")->required(); sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send") sendApp->add_option("path", path, "Path to the file to send")
->required()->check(CLI::ExistingPath); ->required()->check(CLI::ExistingPath);
sendApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file"); CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required(); receiveApp->add_option("url", url, "Connection url")->required();
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment" receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
" to artificially slow down the receiver"); " to artificially slow down the receiver");
receiveApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url"); transferApp->add_option("--port", port, "Connection url");
@@ -94,21 +99,33 @@ int main(int argc, char** argv)
httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout"); httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout");
httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout"); httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout");
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required();
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
if (app.got_subcommand("transfer")) if (app.got_subcommand("transfer"))
{ {
// pid file handling
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
return ix::ws_transfer_main(port, hostname); return ix::ws_transfer_main(port, hostname);
} }
else if (app.got_subcommand("send")) else if (app.got_subcommand("send"))
@@ -147,6 +164,14 @@ int main(int argc, char** argv)
followRedirects, maxRedirects, verbose, followRedirects, maxRedirects, verbose,
save, output, compress); save, output, compress);
} }
else if (app.got_subcommand("redis_publish"))
{
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
}
else if (app.got_subcommand("redis_subscribe"))
{
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
}
return 1; return 1;
} }

10
ws/ws.h
View File

@@ -39,4 +39,14 @@ namespace ix
int ws_send_main(const std::string& url, int ws_send_main(const std::string& url,
const std::string& path); const std::string& path);
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose);
} }

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket) [](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket](ix::WebSocketMessageType messageType, [webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@@ -231,6 +231,7 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl;

35
ws/ws_redis_publish.cpp Normal file
View File

@@ -0,0 +1,35 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::cerr << "Publishing message " << message
<< " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

66
ws/ws_redis_subscribe.cpp Normal file
View File

@@ -0,0 +1,66 @@
/*
* ws_redis_subscribe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <chrono>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
int msgPerSeconds = 0;
int msgCount = 0;
auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
(const std::string& message)
{
if (verbose)
{
std::cout << message << std::endl;
}
msgPerSeconds++;
auto now = std::chrono::steady_clock::now();
if (now - lastTimePoint > std::chrono::seconds(1))
{
lastTimePoint = std::chrono::steady_clock::now();
msgCount += msgPerSeconds;
// #messages 901 msg/s 150
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
}
};
std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

View File

@@ -162,6 +162,7 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl;

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)