From de0bf5ebcda60457734b18ea91482de18cbafa9f Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Sun, 9 Jun 2019 11:33:17 -0700 Subject: [PATCH] WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg --- CHANGELOG.md | 1 + ixwebsocket/IXWebSocket.cpp | 44 ++++++++----- ixwebsocket/IXWebSocket.h | 12 ++-- ixwebsocket/IXWebSocketCloseInfo.h | 2 +- ixwebsocket/IXWebSocketMessage.h | 24 +++++-- ixwebsocket/IXWebSocketMessageQueue.cpp | 42 ++----------- ixwebsocket/IXWebSocketOpenInfo.h | 2 +- test/IXTest.cpp | 19 +++--- test/IXWebSocketCloseTest.cpp | 62 ++++++++----------- test/IXWebSocketMessageQTest.cpp | 35 ++++------- test/IXWebSocketPingTimeoutTest.cpp | 2 +- test/IXWebSocketServerTest.cpp | 21 +++---- ...IXWebSocketTestConnectionDisconnection.cpp | 23 +++---- test/cmd_websocket_chat.cpp | 44 +++++-------- ws/ixcobra/IXCobraConnection.cpp | 53 +++++++--------- ws/snake/IXSnakeServer.cpp | 39 +++++------- ws/ws_broadcast_server.cpp | 37 +++++------ ws/ws_chat.cpp | 35 +++++------ ws/ws_connect.cpp | 41 ++++++------ ws/ws_echo_server.cpp | 35 +++++------ ws/ws_ping_pong.cpp | 45 ++++++-------- ws/ws_receive.cpp | 37 +++++------ ws/ws_send.cpp | 35 +++++------ ws/ws_transfer.cpp | 37 +++++------ 24 files changed, 311 insertions(+), 416 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c3f2d7e..184fb960 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - 2019-06-xx ### Changed +- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg - Add explicite WebSocket::sendBinary - New headers + WebSocketMessage class to hold message data, still not used across the board - Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility. diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 9e7e4ed6..6d95d49b 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -51,9 +51,11 @@ namespace ix _ws.setOnCloseCallback( [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) { - _onMessageCallback(WebSocketMessageType::Close, "", wireSize, - WebSocketErrorInfo(), WebSocketOpenInfo(), - WebSocketCloseInfo(code, reason, remote)); + _onMessageCallback( + std::make_shared( + WebSocketMessageType::Close, "", wireSize, + WebSocketErrorInfo(), WebSocketOpenInfo(), + WebSocketCloseInfo(code, reason, remote))); } ); } @@ -180,10 +182,12 @@ namespace ix return status; } - _onMessageCallback(WebSocketMessageType::Open, "", 0, - WebSocketErrorInfo(), - WebSocketOpenInfo(status.uri, status.headers), - WebSocketCloseInfo()); + _onMessageCallback( + std::make_shared( + WebSocketMessageType::Open, "", 0, + WebSocketErrorInfo(), + WebSocketOpenInfo(status.uri, status.headers), + WebSocketCloseInfo())); return status; } @@ -203,10 +207,12 @@ namespace ix return status; } - _onMessageCallback(WebSocketMessageType::Open, "", 0, - WebSocketErrorInfo(), - WebSocketOpenInfo(status.uri, status.headers), - WebSocketCloseInfo()); + _onMessageCallback( + std::make_shared( + WebSocketMessageType::Open, "", 0, + WebSocketErrorInfo(), + WebSocketOpenInfo(status.uri, status.headers), + WebSocketCloseInfo())); return status; } @@ -274,9 +280,11 @@ namespace ix connectErr.reason = status.errorStr; connectErr.http_status = status.http_status; - _onMessageCallback(WebSocketMessageType::Error, "", 0, - connectErr, WebSocketOpenInfo(), - WebSocketCloseInfo()); + _onMessageCallback( + std::make_shared( + WebSocketMessageType::Error, "", 0, + connectErr, WebSocketOpenInfo(), + WebSocketCloseInfo())); } } } @@ -342,9 +350,11 @@ namespace ix WebSocketErrorInfo webSocketErrorInfo; webSocketErrorInfo.decompressionError = decompressionError; - _onMessageCallback(webSocketMessageType, msg, wireSize, - webSocketErrorInfo, WebSocketOpenInfo(), - WebSocketCloseInfo()); + _onMessageCallback( + std::make_shared( + webSocketMessageType, msg, wireSize, + webSocketErrorInfo, WebSocketOpenInfo(), + WebSocketCloseInfo())); WebSocket::invokeTrafficTrackerCallback(msg.size(), true); }); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 7fbb5d52..516ce673 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -13,10 +13,10 @@ #include "IXWebSocketCloseConstants.h" #include "IXWebSocketErrorInfo.h" #include "IXWebSocketHttpHeaders.h" +#include "IXWebSocketMessage.h" #include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketSendInfo.h" #include "IXWebSocketTransport.h" -#include "IXWebSocketMessage.h" #include #include #include @@ -33,12 +33,7 @@ namespace ix Closed = 3 }; - using OnMessageCallback = std::function; + using OnMessageCallback = std::function; using OnTrafficTrackerCallback = std::function; @@ -78,7 +73,8 @@ namespace ix const OnProgressCallback& onProgressCallback = nullptr); WebSocketSendInfo ping(const std::string& text); - void close(uint16_t code = 1000, const std::string& reason = "Normal closure"); + void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode, + const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage); void setOnMessageCallback(const OnMessageCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); diff --git a/ixwebsocket/IXWebSocketCloseInfo.h b/ixwebsocket/IXWebSocketCloseInfo.h index 16ae77dc..27000412 100644 --- a/ixwebsocket/IXWebSocketCloseInfo.h +++ b/ixwebsocket/IXWebSocketCloseInfo.h @@ -22,4 +22,4 @@ namespace ix ; } }; -} +} // namespace ix diff --git a/ixwebsocket/IXWebSocketMessage.h b/ixwebsocket/IXWebSocketMessage.h index 3a6ef2e1..bca91eaa 100644 --- a/ixwebsocket/IXWebSocketMessage.h +++ b/ixwebsocket/IXWebSocketMessage.h @@ -6,12 +6,12 @@ #pragma once -#include "IXWebSocketMessageType.h" -#include "IXWebSocketErrorInfo.h" -#include "IXWebSocketOpenInfo.h" #include "IXWebSocketCloseInfo.h" -#include +#include "IXWebSocketErrorInfo.h" +#include "IXWebSocketMessageType.h" +#include "IXWebSocketOpenInfo.h" #include +#include #include namespace ix @@ -25,6 +25,22 @@ namespace ix WebSocketOpenInfo openInfo; WebSocketCloseInfo closeInfo; bool binary; + + WebSocketMessage(WebSocketMessageType t, + const std::string& s, + size_t w, + WebSocketErrorInfo e, + WebSocketOpenInfo o, + WebSocketCloseInfo c) + : type(t) + , str(std::move(s)) + , wireSize(w) + , errorInfo(e) + , openInfo(o) + , closeInfo(c) + { + ; + } }; using WebSocketMessagePtr = std::shared_ptr; diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp index 62db9098..c5db6a31 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -32,14 +32,7 @@ namespace ix if (_websocket) { // set dummy callback just to avoid crash - _websocket->setOnMessageCallback([]( - WebSocketMessageType, - const std::string&, - size_t, - const WebSocketErrorInfo&, - const WebSocketOpenInfo&, - const WebSocketCloseInfo&) - {}); + _websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {}); } _websocket = websocket; @@ -47,27 +40,10 @@ namespace ix // bind new if (_websocket) { - _websocket->setOnMessageCallback([this]( - WebSocketMessageType type, - const std::string& str, - size_t wireSize, - const WebSocketErrorInfo& errorInfo, - const WebSocketOpenInfo& openInfo, - const WebSocketCloseInfo& closeInfo) + _websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg) { - auto message = std::make_shared(); - - message->type = type; - message->str = str; - message->wireSize = wireSize; - message->errorInfo = errorInfo; - message->openInfo = openInfo; - message->closeInfo = closeInfo; - - { - std::lock_guard lock(_messagesMutex); - _messages.emplace_back(std::move(message)); - } + std::lock_guard lock(_messagesMutex); + _messages.emplace_back(std::move(msg)); }); } } @@ -105,15 +81,7 @@ namespace ix while (count > 0 && (message = popMessage())) { - _onMessageUserCallback( - message->type, - message->str, - message->wireSize, - message->errorInfo, - message->openInfo, - message->closeInfo - ); - + _onMessageUserCallback(message); --count; } } diff --git a/ixwebsocket/IXWebSocketOpenInfo.h b/ixwebsocket/IXWebSocketOpenInfo.h index d05897cd..075698c2 100644 --- a/ixwebsocket/IXWebSocketOpenInfo.h +++ b/ixwebsocket/IXWebSocketOpenInfo.h @@ -21,4 +21,4 @@ namespace ix ; } }; -} +} // namespace ix diff --git a/test/IXTest.cpp b/test/IXTest.cpp index d9e2affb..f578c706 100644 --- a/test/IXTest.cpp +++ b/test/IXTest.cpp @@ -178,34 +178,29 @@ namespace ix std::shared_ptr connectionState) { webSocket->setOnMessageCallback( - [webSocket, connectionState, &server](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { Logger() << "New connection"; - Logger() << "Uri: " << openInfo.uri; + Logger() << "Uri: " << msg->openInfo.uri; Logger() << "Headers:"; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { Logger() << it.first << ": " << it.second; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { Logger() << "Closed connection"; } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { for (auto&& client : server.getClients()) { if (client != webSocket) { - client->send(str); + client->send(msg->str); } } } diff --git a/test/IXWebSocketCloseTest.cpp b/test/IXWebSocketCloseTest.cpp index f915ce4d..066d59c3 100644 --- a/test/IXWebSocketCloseTest.cpp +++ b/test/IXWebSocketCloseTest.cpp @@ -108,52 +108,47 @@ namespace log(std::string("Connecting to url: ") + url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const ix::WebSocketMessagePtr& msg) { std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { log("client connected"); } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { std::stringstream ss; ss << "client disconnected(" - << closeInfo.code + << msg->closeInfo.code << "," - << closeInfo.reason + << msg->closeInfo.reason << ")"; log(ss.str()); std::lock_guard lck(_mutexCloseData); - _closeCode = closeInfo.code; - _closeReason = std::string(closeInfo.reason); - _closeRemote = closeInfo.remote; + _closeCode = msg->closeInfo.code; + _closeReason = std::string(msg->closeInfo.reason); + _closeRemote = msg->closeInfo.remote; } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { - ss << "Error ! " << error.reason; + ss << "Error ! " << msg->errorInfo.reason; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Pong) + else if (msg->type == ix::WebSocketMessageType::Pong) { - ss << "Received pong message " << str; + ss << "Received pong message " << msg->str; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Ping) + else if (msg->type == ix::WebSocketMessageType::Ping) { - ss << "Received ping message " << str; + ss << "Received ping message " << msg->str; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - ss << "Received message " << str; + ss << "Received message " << msg->str; log(ss.str()); } else @@ -183,39 +178,34 @@ namespace std::shared_ptr connectionState) { webSocket->setOnMessageCallback( - [webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { Logger() << "New server connection"; Logger() << "id: " << connectionState->getId(); - Logger() << "Uri: " << openInfo.uri; + Logger() << "Uri: " << msg->openInfo.uri; Logger() << "Headers:"; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { Logger() << it.first << ": " << it.second; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { std::stringstream ss; ss << "Server closed connection(" - << closeInfo.code + << msg->closeInfo.code << "," - << closeInfo.reason + << msg->closeInfo.reason << ")"; log(ss.str()); std::lock_guard lck(mutexWrite); - receivedCloseCode = closeInfo.code; - receivedCloseReason = std::string(closeInfo.reason); - receivedCloseRemote = closeInfo.remote; + receivedCloseCode = msg->closeInfo.code; + receivedCloseReason = std::string(msg->closeInfo.reason); + receivedCloseRemote = msg->closeInfo.remote; } } ); diff --git a/test/IXWebSocketMessageQTest.cpp b/test/IXWebSocketMessageQTest.cpp index 9d12b7b5..7465da0a 100644 --- a/test/IXWebSocketMessageQTest.cpp +++ b/test/IXWebSocketMessageQTest.cpp @@ -23,30 +23,25 @@ namespace std::shared_ptr connectionState) { webSocket->setOnMessageCallback( - [connectionState, &server](ix::WebSocketMessageType messageType, - const std::string & str, - size_t wireSize, - const ix::WebSocketErrorInfo & error, - const ix::WebSocketOpenInfo & openInfo, - const ix::WebSocketCloseInfo & closeInfo) + [connectionState, &server](const WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { Logger() << "New connection"; connectionState->computeId(); Logger() << "id: " << connectionState->getId(); Logger() << "Uri: " << openInfo.uri; Logger() << "Headers:"; - for (auto it : openInfo.headers) + for (auto&& it : msg->openInfo.headers) { Logger() << it.first << ": " << it.second; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { Logger() << "Closed connection"; } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { Logger() << "Message received: " << str; @@ -78,42 +73,37 @@ namespace { msgQ.bindWebsocket(&ws); - msgQ.setOnMessageCallback([this](WebSocketMessageType messageType, - const std::string & str, - size_t wireSize, - const WebSocketErrorInfo & error, - const WebSocketOpenInfo & openInfo, - const WebSocketCloseInfo & closeInfo) + msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { REQUIRE(mainThreadId == std::this_thread::get_id()); std::stringstream ss; - if (messageType == WebSocketMessageType::Open) + if (msg->type == WebSocketMessageType::Open) { log("client connected"); sendNextMessage(); } - else if (messageType == WebSocketMessageType::Close) + else if (msg->type == WebSocketMessageType::Close) { log("client disconnected"); } - else if (messageType == WebSocketMessageType::Error) + else if (msg->type == WebSocketMessageType::Error) { ss << "Error ! " << error.reason; log(ss.str()); testDone = true; } - else if (messageType == WebSocketMessageType::Pong) + else if (msg->type == WebSocketMessageType::Pong) { ss << "Received pong message " << str; log(ss.str()); } - else if (messageType == WebSocketMessageType::Ping) + else if (msg->type == WebSocketMessageType::Ping) { ss << "Received ping message " << str; log(ss.str()); } - else if (messageType == WebSocketMessageType::Message) + else if (msg->type == WebSocketMessageType::Message) { REQUIRE(str.compare("Hey dude!") == 0); ++receivedCount; @@ -189,5 +179,4 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]") server.stop(); } - } diff --git a/test/IXWebSocketPingTimeoutTest.cpp b/test/IXWebSocketPingTimeoutTest.cpp index 2da16c91..73b36ef2 100644 --- a/test/IXWebSocketPingTimeoutTest.cpp +++ b/test/IXWebSocketPingTimeoutTest.cpp @@ -106,7 +106,7 @@ namespace { log("client disconnected"); - if (closeInfo.code == 1011) + if (msg->closeInfo.code == 1011) { _closedDueToPingTimeout = true; } diff --git a/test/IXWebSocketServerTest.cpp b/test/IXWebSocketServerTest.cpp index 9f1acaed..429194c2 100644 --- a/test/IXWebSocketServerTest.cpp +++ b/test/IXWebSocketServerTest.cpp @@ -39,42 +39,37 @@ namespace ix server.setOnConnectionCallback( [&server, &connectionId](std::shared_ptr webSocket, - std::shared_ptr connectionState) + std::shared_ptr connectionState) { webSocket->setOnMessageCallback( [webSocket, connectionState, - &connectionId, &server](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + &connectionId, &server](const ix::WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { Logger() << "New connection"; connectionState->computeId(); Logger() << "id: " << connectionState->getId(); - Logger() << "Uri: " << openInfo.uri; + Logger() << "Uri: " << msg->openInfo.uri; Logger() << "Headers:"; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { Logger() << it.first << ": " << it.second; } connectionId = connectionState->getId(); } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { Logger() << "Closed connection"; } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { for (auto&& client : server.getClients()) { if (client != webSocket) { - client->send(str); + client->send(msg->str); } } } diff --git a/test/IXWebSocketTestConnectionDisconnection.cpp b/test/IXWebSocketTestConnectionDisconnection.cpp index ac124ce6..723a3a83 100644 --- a/test/IXWebSocketTestConnectionDisconnection.cpp +++ b/test/IXWebSocketTestConnectionDisconnection.cpp @@ -52,41 +52,36 @@ namespace log(std::string("Connecting to url: ") + url); _webSocket.setOnMessageCallback( - [](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [](const ix::WebSocketMessagePtr& msg) { std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { log("TestConnectionDisconnection: connected !"); } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { log("TestConnectionDisconnection: disconnected !"); } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { ss << "TestConnectionDisconnection: Error! "; - ss << error.reason; + ss << msg->errorInfo.reason; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { log("TestConnectionDisconnection: received message.!"); } - else if (messageType == ix::WebSocketMessageType::Ping) + else if (msg->type == ix::WebSocketMessageType::Ping) { log("TestConnectionDisconnection: received ping message.!"); } - else if (messageType == ix::WebSocketMessageType::Pong) + else if (msg->type == ix::WebSocketMessageType::Pong) { log("TestConnectionDisconnection: received pong message.!"); } - else if (messageType == ix::WebSocketMessageType::Fragment) + else if (msg->type == ix::WebSocketMessageType::Fragment) { log("TestConnectionDisconnection: received fragment.!"); } diff --git a/test/cmd_websocket_chat.cpp b/test/cmd_websocket_chat.cpp index 313d54b7..2ca20bb0 100644 --- a/test/cmd_websocket_chat.cpp +++ b/test/cmd_websocket_chat.cpp @@ -114,31 +114,26 @@ namespace log(std::string("Connecting to url: ") + url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const ix::WebSocketMessagePtr& msg) { std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { ss << "cmd_websocket_chat: user " << _user << " Connected !"; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { ss << "cmd_websocket_chat: user " << _user << " disconnected !"; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - auto result = decodeMessage(str); + auto result = decodeMessage(msg->str); // Our "chat" / "broacast" node.js server does not send us // the messages we send, so we don't need to have a msg_user != user @@ -159,20 +154,20 @@ namespace << _user << " > "; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { - ss << "cmd_websocket_chat: Error ! " << error.reason; + ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Ping) + else if (msg->type == ix::WebSocketMessageType::Ping) { log("cmd_websocket_chat: received ping message"); } - else if (messageType == ix::WebSocketMessageType::Pong) + else if (msg->type == ix::WebSocketMessageType::Pong) { log("cmd_websocket_chat: received pong message"); } - else if (messageType == ix::WebSocketMessageType::Fragment) + else if (msg->type == ix::WebSocketMessageType::Fragment) { log("cmd_websocket_chat: received message fragment"); } @@ -221,35 +216,30 @@ namespace std::shared_ptr connectionState) { webSocket->setOnMessageCallback( - [webSocket, connectionState, &server](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { Logger() << "New connection"; Logger() << "id: " << connectionState->getId(); - Logger() << "Uri: " << openInfo.uri; + Logger() << "Uri: " << msg->openInfo.uri; Logger() << "Headers:"; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { Logger() << it.first << ": " << it.second; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { log("Closed connection"); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { for (auto&& client : server.getClients()) { if (client != webSocket) { - client->send(str); + client->send(msg->str); } } } diff --git a/ws/ixcobra/IXCobraConnection.cpp b/ws/ixcobra/IXCobraConnection.cpp index f2ac81d4..4cb77bd5 100644 --- a/ws/ixcobra/IXCobraConnection.cpp +++ b/ws/ixcobra/IXCobraConnection.cpp @@ -90,46 +90,41 @@ namespace ix void CobraConnection::initWebSocketOnMessageCallback() { _webSocket->setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const ix::WebSocketMessagePtr& msg) { - CobraConnection::invokeTrafficTrackerCallback(wireSize, true); + CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true); std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { invokeEventCallback(ix::CobraConnection_EventType_Open, std::string(), - openInfo.headers); + msg->openInfo.headers); sendHandshakeMessage(); } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { _authenticated = false; std::stringstream ss; - ss << "Close code " << closeInfo.code; - ss << " reason " << closeInfo.reason; + ss << "Close code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason; invokeEventCallback(ix::CobraConnection_EventType_Closed, ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { Json::Value data; Json::Reader reader; - if (!reader.parse(str, data)) + if (!reader.parse(msg->str, data)) { - invokeErrorCallback("Invalid json", str); + invokeErrorCallback("Invalid json", msg->str); return; } if (!data.isMember("action")) { - invokeErrorCallback("Missing action", str); + invokeErrorCallback("Missing action", msg->str); return; } @@ -139,12 +134,12 @@ namespace ix { if (!handleHandshakeResponse(data)) { - invokeErrorCallback("Error extracting nonce from handshake response", str); + invokeErrorCallback("Error extracting nonce from handshake response", msg->str); } } else if (action == "auth/handshake/error") { - invokeErrorCallback("Handshake error", str); + invokeErrorCallback("Handshake error", msg->str); } else if (action == "auth/authenticate/ok") { @@ -154,7 +149,7 @@ namespace ix } else if (action == "auth/authenticate/error") { - invokeErrorCallback("Authentication error", str); + invokeErrorCallback("Authentication error", msg->str); } else if (action == "rtm/subscription/data") { @@ -164,36 +159,36 @@ namespace ix { if (!handleSubscriptionResponse(data)) { - invokeErrorCallback("Error processing subscribe response", str); + invokeErrorCallback("Error processing subscribe response", msg->str); } } else if (action == "rtm/subscribe/error") { - invokeErrorCallback("Subscription error", str); + invokeErrorCallback("Subscription error", msg->str); } else if (action == "rtm/unsubscribe/ok") { if (!handleUnsubscriptionResponse(data)) { - invokeErrorCallback("Error processing subscribe response", str); + invokeErrorCallback("Error processing subscribe response", msg->str); } } else if (action == "rtm/unsubscribe/error") { - invokeErrorCallback("Unsubscription error", str); + invokeErrorCallback("Unsubscription error", msg->str); } else { - invokeErrorCallback("Un-handled message type", str); + invokeErrorCallback("Un-handled message type", msg->str); } } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { std::stringstream ss; - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; invokeErrorCallback(ss.str(), std::string()); } }); diff --git a/ws/snake/IXSnakeServer.cpp b/ws/snake/IXSnakeServer.cpp index a0783f30..9cf36009 100644 --- a/ws/snake/IXSnakeServer.cpp +++ b/ws/snake/IXSnakeServer.cpp @@ -58,25 +58,20 @@ namespace snake auto state = std::dynamic_pointer_cast(connectionState); webSocket->setOnMessageCallback( - [this, webSocket, state](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this, webSocket, state](const ix::WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { std::cerr << "New connection" << std::endl; std::cerr << "id: " << state->getId() << std::endl; - std::cerr << "Uri: " << openInfo.uri << std::endl; + std::cerr << "Uri: " << msg->openInfo.uri << std::endl; std::cerr << "Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cerr << it.first << ": " << it.second << std::endl; } - std::string appkey = parseAppKey(openInfo.uri); + std::string appkey = parseAppKey(msg->openInfo.uri); state->setAppkey(appkey); // Connect to redis first @@ -86,29 +81,29 @@ namespace snake std::cerr << "Cannot connect to redis host" << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { std::cerr << "Closed connection" - << " code " << closeInfo.code - << " reason " << closeInfo.reason << std::endl; + << " code " << msg->closeInfo.code + << " reason " << msg->closeInfo.reason << std::endl; } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { std::stringstream ss; - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; std::cerr << ss.str(); } - else if (messageType == ix::WebSocketMessageType::Fragment) + else if (msg->type == ix::WebSocketMessageType::Fragment) { std::cerr << "Received message fragment" << std::endl; } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - std::cerr << "Received " << wireSize << " bytes" << std::endl; - processCobraMessage(state, webSocket, _appConfig, str); + std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; + processCobraMessage(state, webSocket, _appConfig, msg->str); } } ); diff --git a/ws/ws_broadcast_server.cpp b/ws/ws_broadcast_server.cpp index fb7338f6..505d61ec 100644 --- a/ws/ws_broadcast_server.cpp +++ b/ws/ws_broadcast_server.cpp @@ -21,52 +21,47 @@ namespace ix std::shared_ptr connectionState) { webSocket->setOnMessageCallback( - [webSocket, connectionState, &server](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [webSocket, connectionState, &server](const WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { std::cerr << "New connection" << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl; - std::cerr << "Uri: " << openInfo.uri << std::endl; + std::cerr << "Uri: " << msg->openInfo.uri << std::endl; std::cerr << "Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cerr << it.first << ": " << it.second << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { std::cerr << "Closed connection" - << " code " << closeInfo.code - << " reason " << closeInfo.reason << std::endl; + << " code " << msg->closeInfo.code + << " reason " << msg->closeInfo.reason << std::endl; } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { std::stringstream ss; - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; std::cerr << ss.str(); } - else if (messageType == ix::WebSocketMessageType::Fragment) + else if (msg->type == ix::WebSocketMessageType::Fragment) { std::cerr << "Received message fragment" << std::endl; } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - std::cerr << "Received " << wireSize << " bytes" << std::endl; + std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; for (auto&& client : server.getClients()) { if (client != webSocket) { - client->send(str, + client->send(msg->str, [](int current, int total) -> bool { std::cerr << "Step " << current diff --git a/ws/ws_chat.cpp b/ws/ws_chat.cpp index 825a7aea..dd69ff6f 100644 --- a/ws/ws_chat.cpp +++ b/ws/ws_chat.cpp @@ -84,20 +84,15 @@ namespace ix log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const WebSocketMessagePtr& msg) { std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { log("ws chat: connected"); - std::cout << "Uri: " << openInfo.uri << std::endl; + std::cout << "Uri: " << msg->openInfo.uri << std::endl; std::cout << "Handshake Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cout << it.first << ": " << it.second << std::endl; } @@ -107,18 +102,18 @@ namespace ix << " Connected !"; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { ss << "ws chat: user " << _user << " disconnected !" - << " code " << closeInfo.code - << " reason " << closeInfo.reason; + << " code " << msg->closeInfo.code + << " reason " << msg->closeInfo.reason; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - auto result = decodeMessage(str); + auto result = decodeMessage(msg->str); // Our "chat" / "broacast" node.js server does not send us // the messages we send, so we don't have to filter it out. @@ -127,17 +122,17 @@ namespace ix _receivedQueue.push(result.second); ss << std::endl - << result.first << "(" << wireSize << " bytes)" << " > " << result.second + << result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second << std::endl << _user << " > "; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; log(ss.str()); } else diff --git a/ws/ws_connect.cpp b/ws/ws_connect.cpp index 3ad42bec..fd9370b2 100644 --- a/ws/ws_connect.cpp +++ b/ws/ws_connect.cpp @@ -73,56 +73,51 @@ namespace ix log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const ix::WebSocketMessagePtr& msg) { std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { log("ws_connect: connected"); - std::cout << "Uri: " << openInfo.uri << std::endl; + std::cout << "Uri: " << msg->openInfo.uri << std::endl; std::cout << "Handshake Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cout << it.first << ": " << it.second << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { ss << "ws_connect: connection closed:"; - ss << " code " << closeInfo.code; - ss << " reason " << closeInfo.reason << std::endl; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - std::cerr << "Received " << wireSize << " bytes" << std::endl; + std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; ss << "ws_connect: received message: " - << str; + << msg->str; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Fragment) + else if (msg->type == ix::WebSocketMessageType::Fragment) { std::cerr << "Received message fragment" << std::endl; } - else if (messageType == ix::WebSocketMessageType::Ping) + else if (msg->type == ix::WebSocketMessageType::Ping) { std::cerr << "Received ping" << std::endl; } - else if (messageType == ix::WebSocketMessageType::Pong) + else if (msg->type == ix::WebSocketMessageType::Pong) { std::cerr << "Received pong" << std::endl; } diff --git a/ws/ws_echo_server.cpp b/ws/ws_echo_server.cpp index a9377a55..b5cb9a63 100644 --- a/ws/ws_echo_server.cpp +++ b/ws/ws_echo_server.cpp @@ -21,20 +21,15 @@ namespace ix std::shared_ptr connectionState) { webSocket->setOnMessageCallback( - [webSocket, connectionState, greetings](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { std::cerr << "New connection" << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl; - std::cerr << "Uri: " << openInfo.uri << std::endl; + std::cerr << "Uri: " << msg->openInfo.uri << std::endl; std::cerr << "Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cerr << it.first << ": " << it.second << std::endl; } @@ -44,27 +39,27 @@ namespace ix webSocket->sendText("Welcome !"); } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { std::cerr << "Closed connection" - << " code " << closeInfo.code - << " reason " << closeInfo.reason << std::endl; + << " code " << msg->closeInfo.code + << " reason " << msg->closeInfo.reason << std::endl; } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { std::stringstream ss; - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; std::cerr << ss.str(); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { std::cerr << "Received " - << wireSize << " bytes" + << msg->wireSize << " bytes" << std::endl; - webSocket->send(str); + webSocket->send(msg->str); } } ); diff --git a/ws/ws_ping_pong.cpp b/ws/ws_ping_pong.cpp index 8d1afa6a..e1688131 100644 --- a/ws/ws_ping_pong.cpp +++ b/ws/ws_ping_pong.cpp @@ -54,59 +54,54 @@ namespace ix log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const ix::WebSocketMessagePtr& msg) { - std::cerr << "Received " << wireSize << " bytes" << std::endl; + std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { log("ping_pong: connected"); - std::cout << "Uri: " << openInfo.uri << std::endl; + std::cout << "Uri: " << msg->openInfo.uri << std::endl; std::cout << "Handshake Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cout << it.first << ": " << it.second << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { ss << "ping_pong: disconnected:" - << " code " << closeInfo.code - << " reason " << closeInfo.reason - << str; + << " code " << msg->closeInfo.code + << " reason " << msg->closeInfo.reason + << msg->str; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { ss << "ping_pong: received message: " - << str; + << msg->str; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Ping) + else if (msg->type == ix::WebSocketMessageType::Ping) { ss << "ping_pong: received ping message: " - << str; + << msg->str; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Pong) + else if (msg->type == ix::WebSocketMessageType::Pong) { ss << "ping_pong: received pong message: " - << str; + << msg->str; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; log(ss.str()); } else diff --git a/ws/ws_receive.cpp b/ws/ws_receive.cpp index 81012dbf..77863829 100644 --- a/ws/ws_receive.cpp +++ b/ws/ws_receive.cpp @@ -183,41 +183,36 @@ namespace ix log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const ix::WebSocketMessagePtr& msg) { std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { _condition.notify_one(); log("ws_receive: connected"); - std::cout << "Uri: " << openInfo.uri << std::endl; + std::cout << "Uri: " << msg->openInfo.uri << std::endl; std::cout << "Handshake Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cout << it.first << ": " << it.second << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { ss << "ws_receive: connection closed:"; - ss << " code " << closeInfo.code; - ss << " reason " << closeInfo.reason << std::endl; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - ss << "ws_receive: transfered " << wireSize << " bytes"; + ss << "ws_receive: transfered " << msg->wireSize << " bytes"; log(ss.str()); - handleMessage(str); + handleMessage(msg->str); _condition.notify_one(); } - else if (messageType == ix::WebSocketMessageType::Fragment) + else if (msg->type == ix::WebSocketMessageType::Fragment) { ss << "ws_receive: received fragment " << _receivedFragmentCounter++; log(ss.str()); @@ -229,13 +224,13 @@ namespace ix std::this_thread::sleep_for(duration); } } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { ss << "ws_receive "; - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; log(ss.str()); } else diff --git a/ws/ws_send.cpp b/ws/ws_send.cpp index ac03ff64..cdbe9d12 100644 --- a/ws/ws_send.cpp +++ b/ws/ws_send.cpp @@ -112,42 +112,37 @@ namespace ix log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [this](const WebSocketMessagePtr& msg) { std::stringstream ss; - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { _condition.notify_one(); log("ws_send: connected"); - std::cout << "Uri: " << openInfo.uri << std::endl; + std::cout << "Uri: " << msg->openInfo.uri << std::endl; std::cout << "Handshake Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cout << it.first << ": " << it.second << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { ss << "ws_send: connection closed:"; - ss << " code " << closeInfo.code; - ss << " reason " << closeInfo.reason << std::endl; + ss << " code " << msg->closeInfo.code; + ss << " reason " << msg->closeInfo.reason << std::endl; log(ss.str()); } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { _condition.notify_one(); - ss << "ws_send: received message (" << wireSize << " bytes)"; + ss << "ws_send: received message (" << msg->wireSize << " bytes)"; log(ss.str()); std::string errMsg; - MsgPack data = MsgPack::parse(str, errMsg); + MsgPack data = MsgPack::parse(msg->str, errMsg); if (!errMsg.empty()) { std::cerr << "Invalid MsgPack response" << std::endl; @@ -160,13 +155,13 @@ namespace ix std::cerr << "Invalid id" << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { ss << "ws_send "; - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; log(ss.str()); } else diff --git a/ws/ws_transfer.cpp b/ws/ws_transfer.cpp index a3fc0c97..ad78d064 100644 --- a/ws/ws_transfer.cpp +++ b/ws/ws_transfer.cpp @@ -21,52 +21,47 @@ namespace ix std::shared_ptr connectionState) { webSocket->setOnMessageCallback( - [webSocket, connectionState, &server](ix::WebSocketMessageType messageType, - const std::string& str, - size_t wireSize, - const ix::WebSocketErrorInfo& error, - const ix::WebSocketOpenInfo& openInfo, - const ix::WebSocketCloseInfo& closeInfo) + [webSocket, connectionState, &server](const WebSocketMessagePtr& msg) { - if (messageType == ix::WebSocketMessageType::Open) + if (msg->type == ix::WebSocketMessageType::Open) { std::cerr << "New connection" << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl; - std::cerr << "Uri: " << openInfo.uri << std::endl; + std::cerr << "Uri: " << msg->openInfo.uri << std::endl; std::cerr << "Headers:" << std::endl; - for (auto it : openInfo.headers) + for (auto it : msg->openInfo.headers) { std::cerr << it.first << ": " << it.second << std::endl; } } - else if (messageType == ix::WebSocketMessageType::Close) + else if (msg->type == ix::WebSocketMessageType::Close) { std::cerr << "Closed connection" - << " code " << closeInfo.code - << " reason " << closeInfo.reason << std::endl; + << " code " << msg->closeInfo.code + << " reason " << msg->closeInfo.reason << std::endl; } - else if (messageType == ix::WebSocketMessageType::Error) + else if (msg->type == ix::WebSocketMessageType::Error) { std::stringstream ss; - ss << "Connection error: " << error.reason << std::endl; - ss << "#retries: " << error.retries << std::endl; - ss << "Wait time(ms): " << error.wait_time << std::endl; - ss << "HTTP Status: " << error.http_status << std::endl; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; std::cerr << ss.str(); } - else if (messageType == ix::WebSocketMessageType::Fragment) + else if (msg->type == ix::WebSocketMessageType::Fragment) { std::cerr << "Received message fragment " << std::endl; } - else if (messageType == ix::WebSocketMessageType::Message) + else if (msg->type == ix::WebSocketMessageType::Message) { - std::cerr << "Received " << wireSize << " bytes" << std::endl; + std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; for (auto&& client : server.getClients()) { if (client != webSocket) { - client->send(str, + client->send(msg->str, [](int current, int total) -> bool { std::cerr << "ws_transfer: Step " << current