Compare commits

...

8 Commits

Author SHA1 Message Date
Benjamin Sergeant
c36dc0e16a fix unittest 2019-03-20 18:50:56 -07:00
Benjamin Sergeant
7e45659377 update README 2019-03-20 18:33:55 -07:00
Benjamin Sergeant
788c92487c New connection state for server code + fix OpenSSL double init bug 2019-03-20 18:26:29 -07:00
Benjamin Sergeant
0999073435 fix typo 2019-03-20 18:25:28 -07:00
Benjamin Sergeant
2cae6f4cf8 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 16:16:54 -07:00
Benjamin Sergeant
e77b9176f3 Feature/redis (#23)
* Fix warning

* (cmake) add a warning about 32/64 conversion problems.

* simple redis clients

* can publish to redis

* redis subscribe

* display messages received per second

* verbose flag

* (cmake) use clang only compile option -Wshorten-64-to-32 when compiling with clang
2019-03-20 14:29:02 -07:00
Michael Lu
afe8b966ad Fixed heartbeat typos (#22) 2019-03-19 21:31:43 -07:00
Benjamin Sergeant
310724c961 make PollResultType an enum class 2019-03-19 09:29:57 -07:00
36 changed files with 569 additions and 74 deletions

View File

@@ -15,6 +15,10 @@ if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif() endif()
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
endif()
set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp ixwebsocket/IXSocketServer.cpp
@@ -35,6 +39,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSelectInterrupt.cpp ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
) )
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
@@ -62,6 +67,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSelectInterrupt.h ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
) )
# Platform specific code # Platform specific code

View File

@@ -63,10 +63,11 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -77,6 +78,12 @@ server.setOnConnectionCallback(
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to. // The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;

View File

@@ -0,0 +1,37 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
#include <sstream>
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
std::stringstream ss;
ss << _globalId++;
_id = ss.str();
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@@ -0,0 +1,33 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@@ -73,7 +73,7 @@ namespace ix
errMsg = "no error"; errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ? // Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested()) if (isCancellationRequested && isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@@ -121,7 +121,7 @@ namespace ix
} }
// Were we cancelled ? // Were we cancelled ?
if (isCancellationRequested()) if (isCancellationRequested && isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@@ -129,7 +129,7 @@ namespace ix
} }
// Maybe a cancellation request got in before the bg terminated ? // Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested()) if (isCancellationRequested && isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;

View File

@@ -45,7 +45,7 @@ namespace ix
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
if (onPollCallback) onPollCallback(PollResultType_Error); if (onPollCallback) onPollCallback(PollResultType::Error);
return; return;
} }
@@ -82,14 +82,14 @@ namespace ix
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr, int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutMs < 0) ? nullptr : &timeout); (timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead; PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0) if (ret < 0)
{ {
pollResult = PollResultType_Error; pollResult = PollResultType::Error;
} }
else if (ret == 0) else if (ret == 0)
{ {
pollResult = PollResultType_Timeout; pollResult = PollResultType::Timeout;
} }
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds)) else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{ {
@@ -97,20 +97,20 @@ namespace ix
if (value == kSendRequest) if (value == kSendRequest)
{ {
pollResult = PollResultType_SendRequest; pollResult = PollResultType::SendRequest;
} }
else if (value == kCloseRequest) else if (value == kCloseRequest)
{ {
pollResult = PollResultType_CloseRequest; pollResult = PollResultType::CloseRequest;
} }
} }
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds)) else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{ {
pollResult = PollResultType_ReadyForRead; pollResult = PollResultType::ReadyForRead;
} }
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds)) else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{ {
pollResult = PollResultType_ReadyForWrite; pollResult = PollResultType::ReadyForWrite;
} }
return pollResult; return pollResult;
@@ -210,7 +210,7 @@ namespace ix
{ {
while (true) while (true)
{ {
if (isCancellationRequested()) return false; if (isCancellationRequested && isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str()); char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size(); int len = (int) str.size();
@@ -222,7 +222,7 @@ namespace ix
{ {
return ret == len; return ret == len;
} }
// There is possibly something to be write, try again // There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
@@ -241,7 +241,7 @@ namespace ix
{ {
while (true) while (true)
{ {
if (isCancellationRequested()) return false; if (isCancellationRequested && isCancellationRequested()) return false;
ssize_t ret; ssize_t ret;
ret = recv(buffer, 1); ret = recv(buffer, 1);
@@ -257,7 +257,7 @@ namespace ix
{ {
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(1) == PollResultType_Error) if (isReadyToRead(1) == PollResultType::Error)
{ {
return false; return false;
} }
@@ -304,9 +304,12 @@ namespace ix
std::vector<uint8_t> output; std::vector<uint8_t> output;
while (output.size() != length) while (output.size() != length)
{ {
if (isCancellationRequested()) return std::make_pair(false, std::string()); if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
int size = std::min(kChunkSize, length - output.size()); size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size); ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK && if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
@@ -326,7 +329,7 @@ namespace ix
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(1) == PollResultType_Error) if (isReadyToRead(1) == PollResultType::Error)
{ {
return std::make_pair(false, std::string()); return std::make_pair(false, std::string());
} }

View File

@@ -25,14 +25,14 @@ namespace ix
{ {
class SelectInterrupt; class SelectInterrupt;
enum PollResultType enum class PollResultType
{ {
PollResultType_ReadyForRead = 0, ReadyForRead = 0,
PollResultType_ReadyForWrite = 1, ReadyForWrite = 1,
PollResultType_Timeout = 2, Timeout = 2,
PollResultType_Error = 3, Error = 3,
PollResultType_SendRequest = 4, SendRequest = 4,
PollResultType_CloseRequest = 5 CloseRequest = 5
}; };
class Socket { class Socket {

View File

@@ -66,7 +66,7 @@ namespace ix
for (;;) for (;;)
{ {
if (isCancellationRequested()) // Must handle timeout as well if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{ {
closeSocket(fd); closeSocket(fd);
errMsg = "Cancelled"; errMsg = "Cancelled";

View File

@@ -21,6 +21,7 @@
namespace ix namespace ix
{ {
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false); std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd), SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr), _ssl_connection(nullptr),

View File

@@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method; const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
std::once_flag _openSSLInitFlag; static std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful; static std::atomic<bool> _openSSLInitializationSuccessful;
}; };

View File

@@ -29,7 +29,8 @@ namespace ix
_host(host), _host(host),
_backlog(backlog), _backlog(backlog),
_maxConnections(maxConnections), _maxConnections(maxConnections),
_stop(false) _stop(false),
_connectionStateFactory(&ConnectionState::createConnectionState)
{ {
} }
@@ -145,6 +146,12 @@ namespace ix
::close(_serverFd); ::close(_serverFd);
} }
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run() void SocketServer::run()
{ {
// Set the socket to non blocking mode, so that accept calls are not blocking // Set the socket to non blocking mode, so that accept calls are not blocking
@@ -214,6 +221,12 @@ namespace ix
continue; continue;
} }
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
// //
// the destructor of a future returned by std::async blocks, // the destructor of a future returned by std::async blocks,
@@ -221,7 +234,8 @@ namespace ix
f = std::async(std::launch::async, f = std::async(std::launch::async,
&SocketServer::handleConnection, &SocketServer::handleConnection,
this, this,
clientFd); clientFd,
connectionState);
} }
} }
} }

View File

@@ -6,6 +6,8 @@
#pragma once #pragma once
#include "IXConnectionState.h"
#include <utility> // pair #include <utility> // pair
#include <string> #include <string>
#include <set> #include <set>
@@ -20,6 +22,8 @@ namespace ix
{ {
class SocketServer { class SocketServer {
public: public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort, SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog, int backlog = SocketServer::kDefaultTcpBacklog,
@@ -27,6 +31,8 @@ namespace ix
virtual ~SocketServer(); virtual ~SocketServer();
virtual void stop(); virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort; const static int kDefaultPort;
const static std::string kDefaultHost; const static std::string kDefaultHost;
const static int kDefaultTcpBacklog; const static int kDefaultTcpBacklog;
@@ -60,9 +66,13 @@ namespace ix
std::condition_variable _conditionVariable; std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods // Methods
void run(); void run();
virtual void handleConnection(int fd) = 0; virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
}; };
} }

View File

@@ -79,10 +79,10 @@ namespace ix
return _perMessageDeflateOptions; return _perMessageDeflateOptions;
} }
void WebSocket::setHeartBeatPeriod(int hearBeatPeriod) void WebSocket::setHeartBeatPeriod(int heartBeatPeriod)
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = hearBeatPeriod; _heartBeatPeriod = heartBeatPeriod;
} }
int WebSocket::getHeartBeatPeriod() const int WebSocket::getHeartBeatPeriod() const

View File

@@ -89,7 +89,7 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs); void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int hearBeatPeriod); void setHeartBeatPeriod(int heartBeatPeriod);
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();

View File

@@ -49,10 +49,12 @@ namespace ix
_onConnectionCallback = callback; _onConnectionCallback = callback;
} }
void WebSocketServer::handleConnection(int fd) void WebSocketServer::handleConnection(
int fd,
std::shared_ptr<ConnectionState> connectionState)
{ {
auto webSocket = std::make_shared<WebSocket>(); auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket); _onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection(); webSocket->disableAutomaticReconnection();

View File

@@ -20,7 +20,8 @@
namespace ix namespace ix
{ {
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>; using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>,
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer { class WebSocketServer : public SocketServer {
public: public:
@@ -49,7 +50,8 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs; const static int kDefaultHandShakeTimeoutSecs;
// Methods // Methods
virtual void handleConnection(int fd) final; virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
}; };
} }

View File

@@ -53,7 +53,7 @@
namespace ix namespace ix
{ {
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat"); const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1); const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize; constexpr size_t WebSocketTransport::kChunkSize;
@@ -75,11 +75,11 @@ namespace ix
} }
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod) int heartBeatPeriod)
{ {
_perMessageDeflateOptions = perMessageDeflateOptions; _perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); _enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = hearBeatPeriod; _heartBeatPeriod = heartBeatPeriod;
} }
// Client // Client
@@ -189,7 +189,7 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or // If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a // send for a duration exceeding our heart-beat period, send a
// ping to the server. // ping to the server.
if (pollResult == PollResultType_Timeout && if (pollResult == PollResultType::Timeout &&
heartBeatPeriodExceeded()) heartBeatPeriodExceeded())
{ {
std::stringstream ss; std::stringstream ss;
@@ -198,7 +198,7 @@ namespace ix
} }
// Make sure we send all the buffered data // Make sure we send all the buffered data
// there can be a lot of it for large messages. // there can be a lot of it for large messages.
else if (pollResult == PollResultType_SendRequest) else if (pollResult == PollResultType::SendRequest)
{ {
while (!isSendBufferEmpty() && !_requestInitCancellation) while (!isSendBufferEmpty() && !_requestInitCancellation)
{ {
@@ -206,19 +206,19 @@ namespace ix
// This way we are not busy looping // This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10); PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType_Error) if (result == PollResultType::Error)
{ {
_socket->close(); _socket->close();
setReadyState(CLOSED); setReadyState(CLOSED);
break; break;
} }
else if (result == PollResultType_ReadyForWrite) else if (result == PollResultType::ReadyForWrite)
{ {
sendOnSocket(); sendOnSocket();
} }
} }
} }
else if (pollResult == PollResultType_ReadyForRead) else if (pollResult == PollResultType::ReadyForRead)
{ {
while (true) while (true)
{ {
@@ -244,11 +244,11 @@ namespace ix
} }
} }
} }
else if (pollResult == PollResultType_Error) else if (pollResult == PollResultType::Error)
{ {
_socket->close(); _socket->close();
} }
else if (pollResult == PollResultType_CloseRequest) else if (pollResult == PollResultType::CloseRequest)
{ {
_socket->close(); _socket->close();
} }

View File

@@ -61,7 +61,7 @@ namespace ix
~WebSocketTransport(); ~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod); int heartBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs); int timeoutSecs);
@@ -148,7 +148,7 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex; mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint; std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period // No data was send through the socket for longer than the heartbeat period
bool heartBeatPeriodExceeded(); bool heartBeatPeriodExceeded();
void sendOnSocket(); void sendOnSocket();

View File

@@ -39,7 +39,7 @@ test:
python test/run.py python test/run.py
ws_test: all ws_test: all
(cd ws ; sh test_ws.sh) (cd ws ; bash test_ws.sh)
# For the fork that is configured with appveyor # For the fork that is configured with appveyor
rebase_upstream: rebase_upstream:

View File

@@ -65,7 +65,7 @@ namespace
_webSocket.setUrl(url); _webSocket.setUrl(url);
// The important bit for this test. // The important bit for this test.
// Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second // Set a 1 second heartbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client. // a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1); _webSocket.setHeartBeatPeriod(1);
@@ -128,10 +128,11 @@ namespace
{ {
// A dev/null server // A dev/null server
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket) [&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -141,6 +142,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New server connection"; Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@@ -18,13 +18,32 @@ using namespace ix;
namespace ix namespace ix
{ {
bool startServer(ix::WebSocketServer& server) // Test that we can override the connectionState impl to provide our own
class ConnectionStateCustom : public ConnectionState
{ {
void computeId()
{
// a very boring invariant id that we can test against in the unittest
_id = "foobarConnectionId";
}
};
bool startServer(ix::WebSocketServer& server,
std::string& connectionId)
{
auto factory = []() -> std::shared_ptr<ConnectionState>
{
return std::make_shared<ConnectionStateCustom>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -33,13 +52,18 @@ namespace ix
{ {
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
connectionState->computeId();
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
connectionId = connectionState->getId();
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
@@ -78,7 +102,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
REQUIRE(startServer(server)); std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
@@ -111,7 +136,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
REQUIRE(startServer(server)); std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
@@ -147,7 +173,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
REQUIRE(startServer(server)); std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg; std::string errMsg;
bool tls = false; bool tls = false;
@@ -178,6 +205,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
// Give us 500ms for the server to notice that clients went away // Give us 500ms for the server to notice that clients went away
ix::msleep(500); ix::msleep(500);
REQUIRE(connectionId == "foobarConnectionId");
server.stop(); server.stop();
REQUIRE(server.getClients().size() == 0); REQUIRE(server.getClients().size() == 0);
} }

View File

@@ -217,10 +217,11 @@ namespace
bool startServer(ix::WebSocketServer& server) bool startServer(ix::WebSocketServer& server)
{ {
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -230,6 +231,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@@ -27,7 +27,7 @@ class Command(object):
thread.join(timeout) thread.join(timeout)
if thread.is_alive(): if thread.is_alive():
print 'Command timeout, kill it: ' + self.cmd print('Command timeout, kill it: ' + self.cmd)
self.process.terminate() self.process.terminate()
thread.join() thread.join()
return False, 255 return False, 255

View File

@@ -1,2 +1,3 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+ find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+ find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.md' -exec sed -i '' 's/[[:space:]]*$//' {} \+

View File

@@ -23,6 +23,8 @@ add_executable(ws
ixcrypto/IXHash.cpp ixcrypto/IXHash.cpp
ixcrypto/IXUuid.cpp ixcrypto/IXUuid.cpp
IXRedisClient.cpp
ws_http_client.cpp ws_http_client.cpp
ws_ping_pong.cpp ws_ping_pong.cpp
ws_broadcast_server.cpp ws_broadcast_server.cpp
@@ -32,6 +34,8 @@ add_executable(ws
ws_transfer.cpp ws_transfer.cpp
ws_send.cpp ws_send.cpp
ws_receive.cpp ws_receive.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws.cpp) ws.cpp)
if (APPLE AND USE_TLS) if (APPLE AND USE_TLS)

166
ws/IXRedisClient.cpp Normal file
View File

@@ -0,0 +1,166 @@
/*
* IXRedisClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
namespace ix
{
bool RedisClient::connect(const std::string& hostname, int port)
{
bool tls = false;
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return false;
}
std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr);
}
bool RedisClient::publish(const std::string& channel,
const std::string& message)
{
if (!_socket) return false;
std::stringstream ss;
ss << "PUBLISH ";
ss << channel;
ss << " ";
ss << message;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
return lineValid;
}
//
// FIXME: we assume that redis never return errors...
//
bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback)
{
if (!_socket) return false;
std::stringstream ss;
ss << "SUBSCRIBE ";
ss << channel;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
// Wait 1s for the response
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// Read the first line of the response
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// There are 5 items for the subscribe repply
for (int i = 0; i < 5; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
}
// Wait indefinitely for new messages
while (true)
{
// Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
int arraySize;
{
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> arraySize;
}
// There are 6 items for each received message
for (int i = 0; i < arraySize; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// Messages are string, which start with a string size
// => $7 (7 bytes)
int stringSize;
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> stringSize;
auto readResult = _socket->readBytes(stringSize, nullptr, nullptr);
if (!readResult.first) return false;
if (i == 2)
{
// The message is the 3rd element.
callback(readResult.second);
}
// read last 2 bytes (\r\n)
char c;
_socket->readByte(&c, nullptr);
_socket->readByte(&c, nullptr);
}
}
return true;
}
}

36
ws/IXRedisClient.h Normal file
View File

@@ -0,0 +1,36 @@
/*
* IXRedisClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
#include <functional>
namespace ix
{
class Socket;
class RedisClient {
public:
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
~RedisClient() = default;
bool connect(const std::string& hostname,
int port);
bool publish(const std::string& channel,
const std::string& message);
bool subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback);
private:
std::shared_ptr<Socket> _socket;
};
}

View File

@@ -35,12 +35,15 @@ int main(int argc, char** argv)
std::string output; std::string output;
std::string hostname("127.0.0.1"); std::string hostname("127.0.0.1");
std::string pidfile; std::string pidfile;
std::string channel;
std::string message;
bool headersOnly = false; bool headersOnly = false;
bool followRedirects = false; bool followRedirects = false;
bool verbose = false; bool verbose = false;
bool save = false; bool save = false;
bool compress = false; bool compress = false;
int port = 8080; int port = 8080;
int redisPort = 6379;
int connectTimeOut = 60; int connectTimeOut = 60;
int transferTimeout = 1800; int transferTimeout = 1800;
int maxRedirects = 5; int maxRedirects = 5;
@@ -96,6 +99,18 @@ int main(int argc, char** argv)
httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout"); httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout");
httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout"); httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout");
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required();
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
// pid file handling // pid file handling
@@ -149,6 +164,14 @@ int main(int argc, char** argv)
followRedirects, maxRedirects, verbose, followRedirects, maxRedirects, verbose,
save, output, compress); save, output, compress);
} }
else if (app.got_subcommand("redis_publish"))
{
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
}
else if (app.got_subcommand("redis_subscribe"))
{
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
}
return 1; return 1;
} }

10
ws/ws.h
View File

@@ -39,4 +39,14 @@ namespace ix
int ws_send_main(const std::string& url, int ws_send_main(const std::string& url,
const std::string& path); const std::string& path);
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose);
} }

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket) [](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket](ix::WebSocketMessageType messageType, [webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

35
ws/ws_redis_publish.cpp Normal file
View File

@@ -0,0 +1,35 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::cerr << "Publishing message " << message
<< " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

66
ws/ws_redis_subscribe.cpp Normal file
View File

@@ -0,0 +1,66 @@
/*
* ws_redis_subscribe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <chrono>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
int msgPerSeconds = 0;
int msgCount = 0;
auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
(const std::string& message)
{
if (verbose)
{
std::cout << message << std::endl;
}
msgPerSeconds++;
auto now = std::chrono::steady_clock::now();
if (now - lastTimePoint > std::chrono::seconds(1))
{
lastTimePoint = std::chrono::steady_clock::now();
msgCount += msgPerSeconds;
// #messages 901 msg/s 150
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
}
};
std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

View File

@@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)