WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg

This commit is contained in:
Benjamin Sergeant 2019-06-09 11:33:17 -07:00
parent 15369e1ae9
commit de0bf5ebcd
24 changed files with 311 additions and 416 deletions

View File

@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file.
## [Unreleased] - 2019-06-xx ## [Unreleased] - 2019-06-xx
### Changed ### Changed
- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg
- Add explicite WebSocket::sendBinary - Add explicite WebSocket::sendBinary
- New headers + WebSocketMessage class to hold message data, still not used across the board - New headers + WebSocketMessage class to hold message data, still not used across the board
- Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility. - Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility.

View File

@ -51,9 +51,11 @@ namespace ix
_ws.setOnCloseCallback( _ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote)
{ {
_onMessageCallback(WebSocketMessageType::Close, "", wireSize, _onMessageCallback(
WebSocketErrorInfo(), WebSocketOpenInfo(), std::make_shared<WebSocketMessage>(
WebSocketCloseInfo(code, reason, remote)); WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
} }
); );
} }
@ -180,10 +182,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
WebSocketErrorInfo(), std::make_shared<WebSocketMessage>(
WebSocketOpenInfo(status.uri, status.headers), WebSocketMessageType::Open, "", 0,
WebSocketCloseInfo()); WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status; return status;
} }
@ -203,10 +207,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
WebSocketErrorInfo(), std::make_shared<WebSocketMessage>(
WebSocketOpenInfo(status.uri, status.headers), WebSocketMessageType::Open, "", 0,
WebSocketCloseInfo()); WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status; return status;
} }
@ -274,9 +280,11 @@ namespace ix
connectErr.reason = status.errorStr; connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status; connectErr.http_status = status.http_status;
_onMessageCallback(WebSocketMessageType::Error, "", 0, _onMessageCallback(
connectErr, WebSocketOpenInfo(), std::make_shared<WebSocketMessage>(
WebSocketCloseInfo()); WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()));
} }
} }
} }
@ -342,9 +350,11 @@ namespace ix
WebSocketErrorInfo webSocketErrorInfo; WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError; webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize, _onMessageCallback(
webSocketErrorInfo, WebSocketOpenInfo(), std::make_shared<WebSocketMessage>(
WebSocketCloseInfo()); webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo()));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });

View File

@ -13,10 +13,10 @@
#include "IXWebSocketCloseConstants.h" #include "IXWebSocketCloseConstants.h"
#include "IXWebSocketErrorInfo.h" #include "IXWebSocketErrorInfo.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketMessage.h"
#include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketSendInfo.h" #include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
#include "IXWebSocketMessage.h"
#include <atomic> #include <atomic>
#include <mutex> #include <mutex>
#include <string> #include <string>
@ -33,12 +33,7 @@ namespace ix
Closed = 3 Closed = 3
}; };
using OnMessageCallback = std::function<void(WebSocketMessageType, using OnMessageCallback = std::function<void(const WebSocketMessagePtr&)>;
const std::string&,
size_t wireSize,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>; using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
@ -78,7 +73,8 @@ namespace ix
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(uint16_t code = 1000, const std::string& reason = "Normal closure"); void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);

View File

@ -22,4 +22,4 @@ namespace ix
; ;
} }
}; };
} } // namespace ix

View File

@ -6,12 +6,12 @@
#pragma once #pragma once
#include "IXWebSocketMessageType.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketOpenInfo.h"
#include "IXWebSocketCloseInfo.h" #include "IXWebSocketCloseInfo.h"
#include <string> #include "IXWebSocketErrorInfo.h"
#include "IXWebSocketMessageType.h"
#include "IXWebSocketOpenInfo.h"
#include <memory> #include <memory>
#include <string>
#include <thread> #include <thread>
namespace ix namespace ix
@ -25,6 +25,22 @@ namespace ix
WebSocketOpenInfo openInfo; WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo; WebSocketCloseInfo closeInfo;
bool binary; bool binary;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c)
: type(t)
, str(std::move(s))
, wireSize(w)
, errorInfo(e)
, openInfo(o)
, closeInfo(c)
{
;
}
}; };
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>; using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;

View File

@ -32,14 +32,7 @@ namespace ix
if (_websocket) if (_websocket)
{ {
// set dummy callback just to avoid crash // set dummy callback just to avoid crash
_websocket->setOnMessageCallback([]( _websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
WebSocketMessageType,
const std::string&,
size_t,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
{});
} }
_websocket = websocket; _websocket = websocket;
@ -47,27 +40,10 @@ namespace ix
// bind new // bind new
if (_websocket) if (_websocket)
{ {
_websocket->setOnMessageCallback([this]( _websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg)
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
{ {
auto message = std::make_shared<WebSocketMessage>(); std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(msg));
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
}); });
} }
} }
@ -105,15 +81,7 @@ namespace ix
while (count > 0 && (message = popMessage())) while (count > 0 && (message = popMessage()))
{ {
_onMessageUserCallback( _onMessageUserCallback(message);
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count; --count;
} }
} }

View File

@ -21,4 +21,4 @@ namespace ix
; ;
} }
}; };
} } // namespace ix

View File

@ -178,34 +178,29 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -108,52 +108,47 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "client disconnected(" ss << "client disconnected("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(_mutexCloseData); std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = closeInfo.code; _closeCode = msg->closeInfo.code;
_closeReason = std::string(closeInfo.reason); _closeReason = std::string(msg->closeInfo.reason);
_closeRemote = closeInfo.remote; _closeRemote = msg->closeInfo.remote;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "Received message " << str; ss << "Received message " << msg->str;
log(ss.str()); log(ss.str());
} }
else else
@ -183,39 +178,34 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New server connection"; Logger() << "New server connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Server closed connection(" ss << "Server closed connection("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite); std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code; receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason); receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = closeInfo.remote; receivedCloseRemote = msg->closeInfo.remote;
} }
} }
); );

View File

@ -23,30 +23,25 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[connectionState, &server](ix::WebSocketMessageType messageType, [connectionState, &server](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const ix::WebSocketErrorInfo & error,
const ix::WebSocketOpenInfo & openInfo,
const ix::WebSocketCloseInfo & closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto&& it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Logger() << "Message received: " << str; Logger() << "Message received: " << str;
@ -78,42 +73,37 @@ namespace
{ {
msgQ.bindWebsocket(&ws); msgQ.bindWebsocket(&ws);
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType, msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & error,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
{ {
REQUIRE(mainThreadId == std::this_thread::get_id()); REQUIRE(mainThreadId == std::this_thread::get_id());
std::stringstream ss; std::stringstream ss;
if (messageType == WebSocketMessageType::Open) if (msg->type == WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
sendNextMessage(); sendNextMessage();
} }
else if (messageType == WebSocketMessageType::Close) else if (msg->type == WebSocketMessageType::Close)
{ {
log("client disconnected"); log("client disconnected");
} }
else if (messageType == WebSocketMessageType::Error) else if (msg->type == WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << error.reason;
log(ss.str()); log(ss.str());
testDone = true; testDone = true;
} }
else if (messageType == WebSocketMessageType::Pong) else if (msg->type == WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Ping) else if (msg->type == WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Message) else if (msg->type == WebSocketMessageType::Message)
{ {
REQUIRE(str.compare("Hey dude!") == 0); REQUIRE(str.compare("Hey dude!") == 0);
++receivedCount; ++receivedCount;
@ -189,5 +179,4 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
server.stop(); server.stop();
} }
} }

View File

@ -106,7 +106,7 @@ namespace
{ {
log("client disconnected"); log("client disconnected");
if (closeInfo.code == 1011) if (msg->closeInfo.code == 1011)
{ {
_closedDueToPingTimeout = true; _closedDueToPingTimeout = true;
} }

View File

@ -39,42 +39,37 @@ namespace ix
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket, [&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, [webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType, &connectionId, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
connectionId = connectionState->getId(); connectionId = connectionState->getId();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -52,41 +52,36 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("TestConnectionDisconnection: connected !"); log("TestConnectionDisconnection: connected !");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("TestConnectionDisconnection: disconnected !"); log("TestConnectionDisconnection: disconnected !");
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "TestConnectionDisconnection: Error! "; ss << "TestConnectionDisconnection: Error! ";
ss << error.reason; ss << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
log("TestConnectionDisconnection: received message.!"); log("TestConnectionDisconnection: received message.!");
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("TestConnectionDisconnection: received ping message.!"); log("TestConnectionDisconnection: received ping message.!");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("TestConnectionDisconnection: received pong message.!"); log("TestConnectionDisconnection: received pong message.!");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("TestConnectionDisconnection: received fragment.!"); log("TestConnectionDisconnection: received fragment.!");
} }

View File

@ -114,31 +114,26 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " disconnected !"; << " disconnected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user // the messages we send, so we don't need to have a msg_user != user
@ -159,20 +154,20 @@ namespace
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "cmd_websocket_chat: Error ! " << error.reason; ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("cmd_websocket_chat: received ping message"); log("cmd_websocket_chat: received ping message");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("cmd_websocket_chat: received pong message"); log("cmd_websocket_chat: received pong message");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("cmd_websocket_chat: received message fragment"); log("cmd_websocket_chat: received message fragment");
} }
@ -221,35 +216,30 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("Closed connection"); log("Closed connection");
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -90,46 +90,41 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback() void CobraConnection::initWebSocketOnMessageCallback()
{ {
_webSocket->setOnMessageCallback( _webSocket->setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
CobraConnection::invokeTrafficTrackerCallback(wireSize, true); CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
invokeEventCallback(ix::CobraConnection_EventType_Open, invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(), std::string(),
openInfo.headers); msg->openInfo.headers);
sendHandshakeMessage(); sendHandshakeMessage();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
_authenticated = false; _authenticated = false;
std::stringstream ss; std::stringstream ss;
ss << "Close code " << closeInfo.code; ss << "Close code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason; ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraConnection_EventType_Closed, invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str()); ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Json::Value data; Json::Value data;
Json::Reader reader; Json::Reader reader;
if (!reader.parse(str, data)) if (!reader.parse(msg->str, data))
{ {
invokeErrorCallback("Invalid json", str); invokeErrorCallback("Invalid json", msg->str);
return; return;
} }
if (!data.isMember("action")) if (!data.isMember("action"))
{ {
invokeErrorCallback("Missing action", str); invokeErrorCallback("Missing action", msg->str);
return; return;
} }
@ -139,12 +134,12 @@ namespace ix
{ {
if (!handleHandshakeResponse(data)) if (!handleHandshakeResponse(data))
{ {
invokeErrorCallback("Error extracting nonce from handshake response", str); invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
} }
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
invokeErrorCallback("Handshake error", str); invokeErrorCallback("Handshake error", msg->str);
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -154,7 +149,7 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
invokeErrorCallback("Authentication error", str); invokeErrorCallback("Authentication error", msg->str);
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -164,36 +159,36 @@ namespace ix
{ {
if (!handleSubscriptionResponse(data)) if (!handleSubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/subscribe/error") else if (action == "rtm/subscribe/error")
{ {
invokeErrorCallback("Subscription error", str); invokeErrorCallback("Subscription error", msg->str);
} }
else if (action == "rtm/unsubscribe/ok") else if (action == "rtm/unsubscribe/ok")
{ {
if (!handleUnsubscriptionResponse(data)) if (!handleUnsubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/unsubscribe/error") else if (action == "rtm/unsubscribe/error")
{ {
invokeErrorCallback("Unsubscription error", str); invokeErrorCallback("Unsubscription error", msg->str);
} }
else else
{ {
invokeErrorCallback("Un-handled message type", str); invokeErrorCallback("Un-handled message type", msg->str);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string()); invokeErrorCallback(ss.str(), std::string());
} }
}); });

View File

@ -58,25 +58,20 @@ namespace snake
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState); auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType, [this, webSocket, state](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl; std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::string appkey = parseAppKey(openInfo.uri); std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey); state->setAppkey(appkey);
// Connect to redis first // Connect to redis first
@ -86,29 +81,29 @@ namespace snake
std::cerr << "Cannot connect to redis host" << std::endl; std::cerr << "Cannot connect to redis host" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str); processCobraMessage(state, webSocket, _appConfig, msg->str);
} }
} }
); );

View File

@ -21,52 +21,47 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "Step " << current std::cerr << "Step " << current

View File

@ -84,20 +84,15 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws chat: connected"); log("ws chat: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
@ -107,18 +102,18 @@ namespace ix
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws chat: user " ss << "ws chat: user "
<< _user << _user
<< " disconnected !" << " disconnected !"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason; << " reason " << msg->closeInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out. // the messages we send, so we don't have to filter it out.
@ -127,17 +122,17 @@ namespace ix
_receivedQueue.push(result.second); _receivedQueue.push(result.second);
ss << std::endl ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second << result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -73,56 +73,51 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); log("ws_connect: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_connect: connection closed:"; ss << "ws_connect: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: " ss << "ws_connect: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
std::cerr << "Received ping" << std::endl; std::cerr << "Received ping" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cerr << "Received pong" << std::endl; std::cerr << "Received pong" << std::endl;
} }

View File

@ -21,20 +21,15 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, greetings](ix::WebSocketMessageType messageType, [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
@ -44,27 +39,27 @@ namespace ix
webSocket->sendText("Welcome !"); webSocket->sendText("Welcome !");
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " std::cerr << "Received "
<< wireSize << " bytes" << msg->wireSize << " bytes"
<< std::endl; << std::endl;
webSocket->send(str); webSocket->send(msg->str);
} }
} }
); );

View File

@ -54,59 +54,54 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ping_pong: disconnected:" ss << "ping_pong: disconnected:"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << " reason " << msg->closeInfo.reason
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ping_pong: received message: " ss << "ping_pong: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "ping_pong: received ping message: " ss << "ping_pong: received ping message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "ping_pong: received pong message: " ss << "ping_pong: received pong message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -183,41 +183,36 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_receive: connected"); log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_receive: connection closed:"; ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ws_receive: transfered " << wireSize << " bytes"; ss << "ws_receive: transfered " << msg->wireSize << " bytes";
log(ss.str()); log(ss.str());
handleMessage(str); handleMessage(msg->str);
_condition.notify_one(); _condition.notify_one();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
ss << "ws_receive: received fragment " << _receivedFragmentCounter++; ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str()); log(ss.str());
@ -229,13 +224,13 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_receive "; ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -112,42 +112,37 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_send: connected"); log("ws_send: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_send: connection closed:"; ss << "ws_send: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
_condition.notify_one(); _condition.notify_one();
ss << "ws_send: received message (" << wireSize << " bytes)"; ss << "ws_send: received message (" << msg->wireSize << " bytes)";
log(ss.str()); log(ss.str());
std::string errMsg; std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg); MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty()) if (!errMsg.empty())
{ {
std::cerr << "Invalid MsgPack response" << std::endl; std::cerr << "Invalid MsgPack response" << std::endl;
@ -160,13 +155,13 @@ namespace ix
std::cerr << "Invalid id" << std::endl; std::cerr << "Invalid id" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_send "; ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -21,52 +21,47 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment " std::cerr << "Received message fragment "
<< std::endl; << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "ws_transfer: Step " << current std::cerr << "ws_transfer: Step " << current