Compare commits

...

10 Commits

41 changed files with 626 additions and 521 deletions

26
CHANGELOG.md Normal file
View File

@ -0,0 +1,26 @@
# Changelog
All notable changes to this project will be documented in this file.
## [4.0.0] - 2019-06-09
### Changed
- WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
- WebSocket::send takes a third arg, binary which default to true (can be text too)
- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg
- Add explicite WebSocket::sendBinary
- New headers + WebSocketMessage class to hold message data, still not used across the board
- Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility.
- ws echo_server has a -g option to print a greeting message on connect
- IXSocketMbedTLS: better error handling in close and connect
## [3.1.2] - 2019-06-06
### Added
- ws connect has a -x option to disable per message deflate
- Add WebSocket::disablePerMessageDeflate() option.
## [3.0.0] - 2019-06-xx
### Changed
- TLS, aka SSL works on Windows (websocket and http clients)
- ws command line tool build on Windows
- Async API for HttpClient
- HttpClient API changed to use shared_ptr for response and request

View File

@ -61,6 +61,10 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketHandshake.h ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketSendInfo.h ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketErrorInfo.h ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketCloseInfo.h
ixwebsocket/IXWebSocketOpenInfo.h
ixwebsocket/IXWebSocketMessageType.h
ixwebsocket/IXWebSocketMessage.h
ixwebsocket/IXWebSocketPerMessageDeflate.h ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h ixwebsocket/IXWebSocketPerMessageDeflateOptions.h

View File

@ -1 +1 @@
2.2.1 4.0.0

View File

@ -28,6 +28,9 @@ webSocket.setUrl(url);
// to make sure that load balancers do not kill an idle connection. // to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45); webSocket.setHeartBeatPeriod(45);
// Per message deflate connection is enabled by default. You can tweak its parameters or disable it
webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received // Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](ix::WebSocketMessageType messageType,
@ -255,7 +258,7 @@ No manual polling to fetch data is required. Data is sent and received instantly
### Automatic reconnection ### Automatic reconnection
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. This behavior can be disabled.
### Large messages ### Large messages

View File

@ -24,6 +24,8 @@ namespace ix
bool SocketMbedTLS::init(const std::string& host, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, std::string& errMsg)
{ {
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_init(&_ssl); mbedtls_ssl_init(&_ssl);
mbedtls_ssl_config_init(&_conf); mbedtls_ssl_config_init(&_conf);
mbedtls_ctr_drbg_init(&_ctr_drbg); mbedtls_ctr_drbg_init(&_ctr_drbg);
@ -75,15 +77,24 @@ namespace ix
std::string& errMsg, std::string& errMsg,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); {
if (_sockfd == -1) return false; std::lock_guard<std::mutex> lock(_mutex);
if (!init(host, errMsg)) return false; _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
}
if (!init(host, errMsg))
{
close();
return false;
}
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL); mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res; int res;
do do
{ {
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl); res = mbedtls_ssl_handshake(&_ssl);
} }
while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE); while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
@ -95,6 +106,8 @@ namespace ix
errMsg = "error in handshake : "; errMsg = "error in handshake : ";
errMsg += buf; errMsg += buf;
close();
return false; return false;
} }
@ -103,10 +116,14 @@ namespace ix
void SocketMbedTLS::close() void SocketMbedTLS::close()
{ {
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_free(&_ssl); mbedtls_ssl_free(&_ssl);
mbedtls_ssl_config_free(&_conf); mbedtls_ssl_config_free(&_conf);
mbedtls_ctr_drbg_free(&_ctr_drbg); mbedtls_ctr_drbg_free(&_ctr_drbg);
mbedtls_entropy_free(&_entropy); mbedtls_entropy_free(&_entropy);
Socket::close();
} }
ssize_t SocketMbedTLS::send(char* buf, size_t nbyte) ssize_t SocketMbedTLS::send(char* buf, size_t nbyte)

View File

@ -51,9 +51,11 @@ namespace ix
_ws.setOnCloseCallback( _ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote)
{ {
_onMessageCallback(WebSocketMessageType::Close, "", wireSize, _onMessageCallback(
WebSocketErrorInfo(), WebSocketOpenInfo(), std::make_shared<WebSocketMessage>(
WebSocketCloseInfo(code, reason, remote)); WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
} }
); );
} }
@ -135,6 +137,13 @@ namespace ix
_enablePong = false; _enablePong = false;
} }
void WebSocket::disablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(false);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::start() void WebSocket::start()
{ {
if (_thread.joinable()) return; // we've already been started if (_thread.joinable()) return; // we've already been started
@ -173,10 +182,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
WebSocketErrorInfo(), std::make_shared<WebSocketMessage>(
WebSocketOpenInfo(status.uri, status.headers), WebSocketMessageType::Open, "", 0,
WebSocketCloseInfo()); WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status; return status;
} }
@ -196,10 +207,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
WebSocketErrorInfo(), std::make_shared<WebSocketMessage>(
WebSocketOpenInfo(status.uri, status.headers), WebSocketMessageType::Open, "", 0,
WebSocketCloseInfo()); WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status; return status;
} }
@ -267,9 +280,11 @@ namespace ix
connectErr.reason = status.errorStr; connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status; connectErr.http_status = status.http_status;
_onMessageCallback(WebSocketMessageType::Error, "", 0, _onMessageCallback(
connectErr, WebSocketOpenInfo(), std::make_shared<WebSocketMessage>(
WebSocketCloseInfo()); WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()));
} }
} }
} }
@ -310,8 +325,8 @@ namespace ix
WebSocketMessageType webSocketMessageType; WebSocketMessageType webSocketMessageType;
switch (messageKind) switch (messageKind)
{ {
default: case WebSocketTransport::MessageKind::MSG_TEXT:
case WebSocketTransport::MessageKind::MSG: case WebSocketTransport::MessageKind::MSG_BINARY:
{ {
webSocketMessageType = WebSocketMessageType::Message; webSocketMessageType = WebSocketMessageType::Message;
} break; } break;
@ -335,9 +350,13 @@ namespace ix
WebSocketErrorInfo webSocketErrorInfo; WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError; webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize, bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo()); _onMessageCallback(
std::make_shared<WebSocketMessage>(
webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo(), binary));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });
@ -368,9 +387,18 @@ namespace ix
} }
WebSocketSendInfo WebSocket::send(const std::string& data, WebSocketSendInfo WebSocket::send(const std::string& data,
bool binary,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {
return sendMessage(data, SendMessageKind::Binary, onProgressCallback); return sendMessage(data,
(binary) ? SendMessageKind::Binary: SendMessageKind::Text,
onProgressCallback);
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
} }
WebSocketSendInfo WebSocket::sendText(const std::string& text, WebSocketSendInfo WebSocket::sendText(const std::string& text,

View File

@ -13,6 +13,7 @@
#include "IXWebSocketCloseConstants.h" #include "IXWebSocketCloseConstants.h"
#include "IXWebSocketErrorInfo.h" #include "IXWebSocketErrorInfo.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketMessage.h"
#include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketSendInfo.h" #include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
@ -32,52 +33,7 @@ namespace ix
Closed = 3 Closed = 3
}; };
enum class WebSocketMessageType using OnMessageCallback = std::function<void(const WebSocketMessagePtr&)>;
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
size_t wireSize,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>; using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
@ -95,6 +51,7 @@ namespace ix
void setPingTimeout(int pingTimeoutSecs); void setPingTimeout(int pingTimeoutSecs);
void enablePong(); void enablePong();
void disablePong(); void disablePong();
void disablePerMessageDeflate();
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();
@ -107,14 +64,18 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs); WebSocketInitResult connect(int timeoutSecs);
void run(); void run();
// send binary data // send is in binary mode by default
WebSocketSendInfo send(const std::string& data, WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text, WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(uint16_t code = 1000, const std::string& reason = "Normal closure"); void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@ -168,7 +129,7 @@ namespace ix
bool _enablePong; bool _enablePong;
static const bool kDefaultEnablePong; static const bool kDefaultEnablePong;
// Optional ping and ping timeout // Optional ping and pong timeout
int _pingIntervalSecs; int _pingIntervalSecs;
int _pingTimeoutSecs; int _pingTimeoutSecs;
static const int kDefaultPingIntervalSecs; static const int kDefaultPingIntervalSecs;

View File

@ -0,0 +1,25 @@
/*
* IXWebSocketCloseInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
} // namespace ix

View File

@ -0,0 +1,49 @@
/*
* IXWebSocketMessage.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocketCloseInfo.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketMessageType.h"
#include "IXWebSocketOpenInfo.h"
#include <memory>
#include <string>
#include <thread>
namespace ix
{
struct WebSocketMessage
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
bool binary;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c,
bool b = false)
: type(t)
, str(std::move(s))
, wireSize(w)
, errorInfo(e)
, openInfo(o)
, closeInfo(c)
, binary(b)
{
;
}
};
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;
} // namespace ix

View File

@ -32,14 +32,7 @@ namespace ix
if (_websocket) if (_websocket)
{ {
// set dummy callback just to avoid crash // set dummy callback just to avoid crash
_websocket->setOnMessageCallback([]( _websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
WebSocketMessageType,
const std::string&,
size_t,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
{});
} }
_websocket = websocket; _websocket = websocket;
@ -47,27 +40,10 @@ namespace ix
// bind new // bind new
if (_websocket) if (_websocket)
{ {
_websocket->setOnMessageCallback([this]( _websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg)
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
{ {
MessagePtr message(new Message()); std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(msg));
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
}); });
} }
} }
@ -82,9 +58,9 @@ namespace ix
_onMessageUserCallback = std::move(callback); _onMessageUserCallback = std::move(callback);
} }
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage() WebSocketMessagePtr WebSocketMessageQueue::popMessage()
{ {
MessagePtr message; WebSocketMessagePtr message;
std::lock_guard<std::mutex> lock(_messagesMutex); std::lock_guard<std::mutex> lock(_messagesMutex);
if (!_messages.empty()) if (!_messages.empty())
@ -101,19 +77,11 @@ namespace ix
if (!_onMessageUserCallback) if (!_onMessageUserCallback)
return; return;
MessagePtr message; WebSocketMessagePtr message;
while (count > 0 && (message = popMessage())) while (count > 0 && (message = popMessage()))
{ {
_onMessageUserCallback( _onMessageUserCallback(message);
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count; --count;
} }
} }

View File

@ -30,24 +30,12 @@ namespace ix
void poll(int count = 512); void poll(int count = 512);
protected: protected:
struct Message WebSocketMessagePtr popMessage();
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessagePtr = std::shared_ptr<Message>;
MessagePtr popMessage();
private: private:
WebSocket* _websocket = nullptr; WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback; OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex; std::mutex _messagesMutex;
std::list<MessagePtr> _messages; std::list<WebSocketMessagePtr> _messages;
}; };
} // namespace ix } // namespace ix

View File

@ -0,0 +1,21 @@
/*
* IXWebSocketMessageType.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
}

View File

@ -0,0 +1,24 @@
/*
* IXWebSocketOpenInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
} // namespace ix

View File

@ -542,12 +542,17 @@ namespace ix
) { ) {
unmaskReceiveBuffer(ws); unmaskReceiveBuffer(ws);
MessageKind messageKind =
(ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
// //
// Usual case. Small unfragmented messages // Usual case. Small unfragmented messages
// //
if (ws.fin && _chunks.empty()) if (ws.fin && _chunks.empty())
{ {
emitMessage(MessageKind::MSG, emitMessage(messageKind,
std::string(_rxbuf.begin()+ws.header_size, std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N), _rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws, ws,
@ -567,7 +572,7 @@ namespace ix
_rxbuf.begin()+ws.header_size+(size_t)ws.N)); _rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin) if (ws.fin)
{ {
emitMessage(MessageKind::MSG, getMergedChunks(), ws, onMessageCallback); emitMessage(messageKind, getMergedChunks(), ws, onMessageCallback);
_chunks.clear(); _chunks.clear();
} }
else else

View File

@ -50,7 +50,8 @@ namespace ix
enum class MessageKind enum class MessageKind
{ {
MSG, MSG_TEXT,
MSG_BINARY,
PING, PING,
PONG, PONG,
FRAGMENT FRAGMENT

View File

@ -9,7 +9,7 @@ install: brew
# on osx it is good practice to make /usr/local user writable # on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local # sudo chown -R `whoami`/staff /usr/local
brew: brew:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j install) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
ws: ws:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j)
@ -44,7 +44,7 @@ trail:
sh third_party/remote_trailing_whitespaces.sh sh third_party/remote_trailing_whitespaces.sh
format: format:
find ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \; find test ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
# That target is used to start a node server, but isn't required as we have # That target is used to start a node server, but isn't required as we have
# a builtin C++ server started in the unittest now # a builtin C++ server started in the unittest now

View File

@ -178,34 +178,29 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -6,13 +6,13 @@
#pragma once #pragma once
#include <string>
#include <vector>
#include <sstream>
#include <iostream> #include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketServer.h> #include <sstream>
#include <string>
#include <vector>
namespace ix namespace ix
{ {
@ -28,20 +28,20 @@ namespace ix
struct Logger struct Logger
{ {
public: public:
template <typename T> template<typename T>
Logger& operator<<(T const& obj) Logger& operator<<(T const& obj)
{ {
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
std::stringstream ss; std::stringstream ss;
ss << obj; ss << obj;
spdlog::info(ss.str()); spdlog::info(ss.str());
return *this; return *this;
} }
private: private:
static std::mutex _mutex; static std::mutex _mutex;
}; };
void log(const std::string& msg); void log(const std::string& msg);
@ -49,4 +49,4 @@ namespace ix
int getFreePort(); int getFreePort();
bool startWebSocketEchoServer(ix::WebSocketServer& server); bool startWebSocketEchoServer(ix::WebSocketServer& server);
} } // namespace ix

View File

@ -108,52 +108,47 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "client disconnected(" ss << "client disconnected("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(_mutexCloseData); std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = closeInfo.code; _closeCode = msg->closeInfo.code;
_closeReason = std::string(closeInfo.reason); _closeReason = std::string(msg->closeInfo.reason);
_closeRemote = closeInfo.remote; _closeRemote = msg->closeInfo.remote;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "Received message " << str; ss << "Received message " << msg->str;
log(ss.str()); log(ss.str());
} }
else else
@ -183,39 +178,34 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New server connection"; Logger() << "New server connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Server closed connection(" ss << "Server closed connection("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite); std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code; receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason); receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = closeInfo.remote; receivedCloseRemote = msg->closeInfo.remote;
} }
} }
); );

View File

@ -20,33 +20,28 @@ namespace
{ {
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[connectionState, &server](ix::WebSocketMessageType messageType, [connectionState, &server](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const ix::WebSocketErrorInfo & error,
const ix::WebSocketOpenInfo & openInfo,
const ix::WebSocketCloseInfo & closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto&& it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Logger() << "Message received: " << str; Logger() << "Message received: " << str;
@ -78,42 +73,37 @@ namespace
{ {
msgQ.bindWebsocket(&ws); msgQ.bindWebsocket(&ws);
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType, msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & error,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
{ {
REQUIRE(mainThreadId == std::this_thread::get_id()); REQUIRE(mainThreadId == std::this_thread::get_id());
std::stringstream ss; std::stringstream ss;
if (messageType == WebSocketMessageType::Open) if (msg->type == WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
sendNextMessage(); sendNextMessage();
} }
else if (messageType == WebSocketMessageType::Close) else if (msg->type == WebSocketMessageType::Close)
{ {
log("client disconnected"); log("client disconnected");
} }
else if (messageType == WebSocketMessageType::Error) else if (msg->type == WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << error.reason;
log(ss.str()); log(ss.str());
testDone = true; testDone = true;
} }
else if (messageType == WebSocketMessageType::Pong) else if (msg->type == WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Ping) else if (msg->type == WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Message) else if (msg->type == WebSocketMessageType::Message)
{ {
REQUIRE(str.compare("Hey dude!") == 0); REQUIRE(str.compare("Hey dude!") == 0);
++receivedCount; ++receivedCount;
@ -189,5 +179,4 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
server.stop(); server.stop();
} }
} }

View File

@ -106,7 +106,7 @@ namespace
{ {
log("client disconnected"); log("client disconnected");
if (closeInfo.code == 1011) if (msg->closeInfo.code == 1011)
{ {
_closedDueToPingTimeout = true; _closedDueToPingTimeout = true;
} }

View File

@ -39,42 +39,37 @@ namespace ix
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket, [&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, [webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType, &connectionId, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
connectionId = connectionState->getId(); connectionId = connectionState->getId();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -52,41 +52,36 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("TestConnectionDisconnection: connected !"); log("TestConnectionDisconnection: connected !");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("TestConnectionDisconnection: disconnected !"); log("TestConnectionDisconnection: disconnected !");
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "TestConnectionDisconnection: Error! "; ss << "TestConnectionDisconnection: Error! ";
ss << error.reason; ss << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
log("TestConnectionDisconnection: received message.!"); log("TestConnectionDisconnection: received message.!");
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("TestConnectionDisconnection: received ping message.!"); log("TestConnectionDisconnection: received ping message.!");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("TestConnectionDisconnection: received pong message.!"); log("TestConnectionDisconnection: received pong message.!");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("TestConnectionDisconnection: received fragment.!"); log("TestConnectionDisconnection: received fragment.!");
} }

View File

@ -114,31 +114,26 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " disconnected !"; << " disconnected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user // the messages we send, so we don't need to have a msg_user != user
@ -159,20 +154,20 @@ namespace
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "cmd_websocket_chat: Error ! " << error.reason; ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("cmd_websocket_chat: received ping message"); log("cmd_websocket_chat: received ping message");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("cmd_websocket_chat: received pong message"); log("cmd_websocket_chat: received pong message");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("cmd_websocket_chat: received message fragment"); log("cmd_websocket_chat: received message fragment");
} }
@ -221,35 +216,30 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("Closed connection"); log("Closed connection");
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -0,0 +1,30 @@
# Clients
## ws
```
$ ws connect ws://127.0.0.1:8765
Type Ctrl-D to exit prompt...
Connecting to url: ws://127.0.0.1:8765
> ws_connect: connected
Uri: /
Handshake Headers:
Connection: Upgrade
Date: Sat, 08 Jun 2019 16:43:29 GMT
Sec-WebSocket-Accept: kPCNwGa97y+7NWdAvHi/7/rA8AE=
Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
Server: Python/3.7 websockets/7.0
Upgrade: websocket
Received 13 bytes
ws_connect: received message: > Welcome !
ws_connect: connection closed: code 1006 reason Abnormal closure
```
## wscat
```
$ ./node_modules/.bin/wscat -c ws://127.0.0.1:8765
connected (press CTRL+C to quit)
< > Welcome !
disconnected (code: 1006)
```

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# WS server example
import asyncio
import websockets
async def hello(websocket, path):
await websocket.send(f"> Welcome !")
name = await websocket.recv()
print(f"< {name}")
greeting = f"Hello {name}!"
await websocket.send(greeting)
print(f"> {greeting}")
start_server = websockets.serve(hello, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@ -6,10 +6,10 @@
#pragma once #pragma once
#include <algorithm>
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <regex> #include <regex>
#include <algorithm>
namespace ix namespace ix
{ {

View File

@ -90,46 +90,41 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback() void CobraConnection::initWebSocketOnMessageCallback()
{ {
_webSocket->setOnMessageCallback( _webSocket->setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
CobraConnection::invokeTrafficTrackerCallback(wireSize, true); CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
invokeEventCallback(ix::CobraConnection_EventType_Open, invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(), std::string(),
openInfo.headers); msg->openInfo.headers);
sendHandshakeMessage(); sendHandshakeMessage();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
_authenticated = false; _authenticated = false;
std::stringstream ss; std::stringstream ss;
ss << "Close code " << closeInfo.code; ss << "Close code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason; ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraConnection_EventType_Closed, invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str()); ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Json::Value data; Json::Value data;
Json::Reader reader; Json::Reader reader;
if (!reader.parse(str, data)) if (!reader.parse(msg->str, data))
{ {
invokeErrorCallback("Invalid json", str); invokeErrorCallback("Invalid json", msg->str);
return; return;
} }
if (!data.isMember("action")) if (!data.isMember("action"))
{ {
invokeErrorCallback("Missing action", str); invokeErrorCallback("Missing action", msg->str);
return; return;
} }
@ -139,12 +134,12 @@ namespace ix
{ {
if (!handleHandshakeResponse(data)) if (!handleHandshakeResponse(data))
{ {
invokeErrorCallback("Error extracting nonce from handshake response", str); invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
} }
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
invokeErrorCallback("Handshake error", str); invokeErrorCallback("Handshake error", msg->str);
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -154,7 +149,7 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
invokeErrorCallback("Authentication error", str); invokeErrorCallback("Authentication error", msg->str);
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -164,36 +159,36 @@ namespace ix
{ {
if (!handleSubscriptionResponse(data)) if (!handleSubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/subscribe/error") else if (action == "rtm/subscribe/error")
{ {
invokeErrorCallback("Subscription error", str); invokeErrorCallback("Subscription error", msg->str);
} }
else if (action == "rtm/unsubscribe/ok") else if (action == "rtm/unsubscribe/ok")
{ {
if (!handleUnsubscriptionResponse(data)) if (!handleUnsubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/unsubscribe/error") else if (action == "rtm/unsubscribe/error")
{ {
invokeErrorCallback("Unsubscription error", str); invokeErrorCallback("Unsubscription error", msg->str);
} }
else else
{ {
invokeErrorCallback("Un-handled message type", str); invokeErrorCallback("Un-handled message type", msg->str);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string()); invokeErrorCallback(ss.str(), std::string());
} }
}); });

View File

@ -58,25 +58,20 @@ namespace snake
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState); auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType, [this, webSocket, state](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl; std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::string appkey = parseAppKey(openInfo.uri); std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey); state->setAppkey(appkey);
// Connect to redis first // Connect to redis first
@ -86,29 +81,29 @@ namespace snake
std::cerr << "Cannot connect to redis host" << std::endl; std::cerr << "Cannot connect to redis host" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str); processCobraMessage(state, webSocket, _appConfig, msg->str);
} }
} }
); );

View File

@ -80,6 +80,8 @@ int main(int argc, char** argv)
bool strict = false; bool strict = false;
bool stress = false; bool stress = false;
bool disableAutomaticReconnection = false; bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
bool greetings = false;
int port = 8008; int port = 8008;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -110,6 +112,7 @@ int main(int argc, char** argv)
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection"); connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
connectApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required(); chatApp->add_option("url", url, "Connection url")->required();
@ -118,6 +121,7 @@ int main(int argc, char** argv)
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server"); CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Port"); echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname"); echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Port"); broadcastServerApp->add_option("--port", port, "Port");
@ -241,7 +245,8 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("connect")) else if (app.got_subcommand("connect"))
{ {
ret = ix::ws_connect_main(url, disableAutomaticReconnection); ret = ix::ws_connect_main(url, disableAutomaticReconnection,
disablePerMessageDeflate);
} }
else if (app.got_subcommand("chat")) else if (app.got_subcommand("chat"))
{ {
@ -249,7 +254,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("echo_server")) else if (app.got_subcommand("echo_server"))
{ {
ret = ix::ws_echo_server_main(port, hostname); ret = ix::ws_echo_server_main(port, greetings, hostname);
} }
else if (app.got_subcommand("broadcast_server")) else if (app.got_subcommand("broadcast_server"))
{ {

View File

@ -24,13 +24,15 @@ namespace ix
int ws_ping_pong_main(const std::string& url); int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port, const std::string& hostname); int ws_echo_server_main(int port, bool greetings, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname); int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname); int ws_transfer_main(int port, const std::string& hostname);
int ws_chat_main(const std::string& url, const std::string& user); int ws_chat_main(const std::string& url, const std::string& user);
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection); int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs); int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs);

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
msg->binary,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "Step " << current std::cerr << "Step " << current

View File

@ -84,20 +84,15 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws chat: connected"); log("ws chat: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
@ -107,18 +102,18 @@ namespace ix
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws chat: user " ss << "ws chat: user "
<< _user << _user
<< " disconnected !" << " disconnected !"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason; << " reason " << msg->closeInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out. // the messages we send, so we don't have to filter it out.
@ -127,17 +122,17 @@ namespace ix
_receivedQueue.push(result.second); _receivedQueue.push(result.second);
ss << std::endl ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second << result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else
@ -172,7 +167,7 @@ namespace ix
void WebSocketChat::sendMessage(const std::string& text) void WebSocketChat::sendMessage(const std::string& text)
{ {
_webSocket.send(encodeMessage(text)); _webSocket.sendText(encodeMessage(text));
} }
int ws_chat_main(const std::string& url, int ws_chat_main(const std::string& url,

View File

@ -15,7 +15,8 @@ namespace ix
{ {
public: public:
WebSocketConnect(const std::string& _url, WebSocketConnect(const std::string& _url,
bool disableAutomaticReconnection); bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -26,13 +27,16 @@ namespace ix
private: private:
std::string _url; std::string _url;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
void log(const std::string& msg); void log(const std::string& msg);
}; };
WebSocketConnect::WebSocketConnect(const std::string& url, WebSocketConnect::WebSocketConnect(const std::string& url,
bool disableAutomaticReconnection) : bool disableAutomaticReconnection,
_url(url) bool disablePerMessageDeflate) :
_url(url),
_disablePerMessageDeflate(disablePerMessageDeflate)
{ {
if (disableAutomaticReconnection) if (disableAutomaticReconnection)
{ {
@ -54,64 +58,66 @@ namespace ix
{ {
_webSocket.setUrl(_url); _webSocket.setUrl(_url);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( if (_disablePerMessageDeflate)
true, false, false, 15, 15); {
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); _webSocket.disablePerMessageDeflate();
}
else
{
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); log("ws_connect: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_connect: connection closed:"; ss << "ws_connect: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: " ss << "ws_connect: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
std::cerr << "Received ping" << std::endl; std::cerr << "Received ping" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cerr << "Received pong" << std::endl; std::cerr << "Received pong" << std::endl;
} }
@ -127,13 +133,17 @@ namespace ix
void WebSocketConnect::sendMessage(const std::string& text) void WebSocketConnect::sendMessage(const std::string& text)
{ {
_webSocket.send(text); _webSocket.sendText(text);
} }
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection) int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url, disableAutomaticReconnection); WebSocketConnect webSocketChat(url,
disableAutomaticReconnection,
disablePerMessageDeflate);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)

View File

@ -10,56 +10,56 @@
namespace ix namespace ix
{ {
int ws_echo_server_main(int port, const std::string& hostname) int ws_echo_server_main(int port, bool greetings, const std::string& hostname)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket, [greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState](ix::WebSocketMessageType messageType, [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
if (greetings)
{
webSocket->sendText("Welcome !");
}
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " std::cerr << "Received "
<< wireSize << " bytes" << msg->wireSize << " bytes"
<< std::endl; << std::endl;
webSocket->send(str); webSocket->send(msg->str, msg->binary);
} }
} }
); );

View File

@ -54,59 +54,54 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ping_pong: disconnected:" ss << "ping_pong: disconnected:"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << " reason " << msg->closeInfo.reason
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ping_pong: received message: " ss << "ping_pong: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "ping_pong: received ping message: " ss << "ping_pong: received ping message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "ping_pong: received pong message: " ss << "ping_pong: received pong message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -183,41 +183,36 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_receive: connected"); log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_receive: connection closed:"; ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ws_receive: transfered " << wireSize << " bytes"; ss << "ws_receive: transfered " << msg->wireSize << " bytes";
log(ss.str()); log(ss.str());
handleMessage(str); handleMessage(msg->str);
_condition.notify_one(); _condition.notify_one();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
ss << "ws_receive: received fragment " << _receivedFragmentCounter++; ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str()); log(ss.str());
@ -229,13 +224,13 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_receive "; ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -112,42 +112,37 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_send: connected"); log("ws_send: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_send: connection closed:"; ss << "ws_send: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
_condition.notify_one(); _condition.notify_one();
ss << "ws_send: received message (" << wireSize << " bytes)"; ss << "ws_send: received message (" << msg->wireSize << " bytes)";
log(ss.str()); log(ss.str());
std::string errMsg; std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg); MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty()) if (!errMsg.empty())
{ {
std::cerr << "Invalid MsgPack response" << std::endl; std::cerr << "Invalid MsgPack response" << std::endl;
@ -160,13 +155,13 @@ namespace ix
std::cerr << "Invalid id" << std::endl; std::cerr << "Invalid id" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_send "; ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else
@ -244,8 +239,8 @@ namespace ix
MsgPack msg(pdu); MsgPack msg(pdu);
Bench bench("Sending file through websocket"); Bench bench("Sending file through websocket");
_webSocket.send(msg.dump(), _webSocket.sendBinary(msg.dump(),
[throttle](int current, int total) -> bool [throttle](int current, int total) -> bool
{ {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl; std::cout << "ws_send: Step " << current << " out of " << total << std::endl;

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment " std::cerr << "Received message fragment "
<< std::endl; << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
msg->binary,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "ws_transfer: Step " << current std::cerr << "ws_transfer: Step " << current