diff --git a/CMakeLists.txt b/CMakeLists.txt index 049581b6..c2f6bb53 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,6 +44,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXWebSocketCloseConstants.cpp ixwebsocket/IXWebSocketHandshake.cpp ixwebsocket/IXWebSocketHttpHeaders.cpp + ixwebsocket/IXWebSocketMessage.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 864482ea..0c4d869f 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -30,16 +30,16 @@ namespace ix , _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs) , _enablePong(kDefaultEnablePong) , _pingIntervalSecs(kDefaultPingIntervalSecs) + , _webSocketMessage(std::make_unique(WebSocketMessageType::Message)) + , _webSocketErrorMessage(std::make_unique(WebSocketMessageType::Error)) + , _webSocketOpenMessage(std::make_unique(WebSocketMessageType::Open)) + , _webSocketCloseMessage(std::make_unique(WebSocketMessageType::Close)) { _ws.setOnCloseCallback( [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) { - _onMessageCallback( - std::make_unique(WebSocketMessageType::Close, - "", - wireSize, - WebSocketErrorInfo(), - WebSocketOpenInfo(), - WebSocketCloseInfo(code, reason, remote))); + _webSocketCloseMessage->wireSize = wireSize; + _webSocketCloseMessage->closeInfo = WebSocketCloseInfo(code, reason, remote); + _onMessageCallback(_webSocketCloseMessage); }); } @@ -193,13 +193,8 @@ namespace ix return status; } - _onMessageCallback(std::make_unique( - WebSocketMessageType::Open, - "", - 0, - WebSocketErrorInfo(), - WebSocketOpenInfo(status.uri, status.headers, status.protocol), - WebSocketCloseInfo())); + _webSocketOpenMessage->openInfo = WebSocketOpenInfo(status.uri, status.headers, status.protocol), + _onMessageCallback(_webSocketOpenMessage); if (_pingIntervalSecs > 0) { @@ -224,13 +219,8 @@ namespace ix return status; } - _onMessageCallback( - std::make_unique(WebSocketMessageType::Open, - "", - 0, - WebSocketErrorInfo(), - WebSocketOpenInfo(status.uri, status.headers), - WebSocketCloseInfo())); + _webSocketOpenMessage->openInfo = WebSocketOpenInfo(status.uri, status.headers); + _onMessageCallback(_webSocketOpenMessage); if (_pingIntervalSecs > 0) { @@ -310,12 +300,8 @@ namespace ix connectErr.reason = status.errorStr; connectErr.http_status = status.http_status; - _onMessageCallback(std::make_unique(WebSocketMessageType::Error, - "", - 0, - connectErr, - WebSocketOpenInfo(), - WebSocketCloseInfo())); + _webSocketErrorMessage->errorInfo = connectErr; + _onMessageCallback(_webSocketErrorMessage); } } } @@ -381,18 +367,15 @@ namespace ix break; } - WebSocketErrorInfo webSocketErrorInfo; - webSocketErrorInfo.decompressionError = decompressionError; - bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY; - _onMessageCallback(std::make_unique(webSocketMessageType, - msg, - wireSize, - webSocketErrorInfo, - WebSocketOpenInfo(), - WebSocketCloseInfo(), - binary)); + _webSocketMessage->type = webSocketMessageType; + _webSocketMessage->str = msg; + _webSocketMessage->wireSize = wireSize; + _webSocketMessage->errorInfo.decompressionError = decompressionError; + _webSocketMessage->binary = binary; + + _onMessageCallback(_webSocketMessage); WebSocket::invokeTrafficTrackerCallback(wireSize, true); }); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 0c288cd5..9ceb4354 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -155,6 +155,12 @@ namespace ix static const int kDefaultPingIntervalSecs; static const int kDefaultPingTimeoutSecs; + // One message ptr for each message kinds + WebSocketMessagePtr _webSocketMessage; + WebSocketMessagePtr _webSocketErrorMessage; + WebSocketMessagePtr _webSocketOpenMessage; + WebSocketMessagePtr _webSocketCloseMessage; + // Subprotocols std::vector _subProtocols; diff --git a/ixwebsocket/IXWebSocketCloseInfo.h b/ixwebsocket/IXWebSocketCloseInfo.h index 27000412..409e74fd 100644 --- a/ixwebsocket/IXWebSocketCloseInfo.h +++ b/ixwebsocket/IXWebSocketCloseInfo.h @@ -6,6 +6,9 @@ #pragma once +#include +#include + namespace ix { struct WebSocketCloseInfo diff --git a/ixwebsocket/IXWebSocketErrorInfo.h b/ixwebsocket/IXWebSocketErrorInfo.h index 03c074cc..1c0f108b 100644 --- a/ixwebsocket/IXWebSocketErrorInfo.h +++ b/ixwebsocket/IXWebSocketErrorInfo.h @@ -6,6 +6,7 @@ #pragma once +#include #include namespace ix @@ -17,5 +18,10 @@ namespace ix int http_status = 0; std::string reason; bool decompressionError = false; + + WebSocketErrorInfo() + { + ; + } }; } // namespace ix diff --git a/ixwebsocket/IXWebSocketMessage.cpp b/ixwebsocket/IXWebSocketMessage.cpp new file mode 100644 index 00000000..6b93ebd4 --- /dev/null +++ b/ixwebsocket/IXWebSocketMessage.cpp @@ -0,0 +1,12 @@ +/* + * IXWebSocketMessage.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ + +#include "IXWebSocketMessage.h" + +namespace ix +{ + std::string WebSocketMessage::kStr("foo"); +} diff --git a/ixwebsocket/IXWebSocketMessage.h b/ixwebsocket/IXWebSocketMessage.h index 6dcc3d64..e5cd08ac 100644 --- a/ixwebsocket/IXWebSocketMessage.h +++ b/ixwebsocket/IXWebSocketMessage.h @@ -19,22 +19,23 @@ namespace ix struct WebSocketMessage { WebSocketMessageType type; - const std::string& str; + std::string& str; size_t wireSize; WebSocketErrorInfo errorInfo; WebSocketOpenInfo openInfo; WebSocketCloseInfo closeInfo; bool binary; + static std::string kStr; + WebSocketMessage(WebSocketMessageType t, - const std::string& s, - size_t w, - WebSocketErrorInfo e, - WebSocketOpenInfo o, - WebSocketCloseInfo c, + size_t w = 0, + WebSocketErrorInfo e = WebSocketErrorInfo(), + WebSocketOpenInfo o = WebSocketOpenInfo(), + WebSocketCloseInfo c = WebSocketCloseInfo(), bool b = false) : type(t) - , str(s) + , str(WebSocketMessage::kStr) , wireSize(w) , errorInfo(e) , openInfo(o) @@ -43,6 +44,11 @@ namespace ix { ; } + + // void setStr(const std::string& s) + // { + // str = std::move(s); + // } }; using WebSocketMessagePtr = std::unique_ptr; diff --git a/ixwebsocket/IXWebSocketOpenInfo.h b/ixwebsocket/IXWebSocketOpenInfo.h index 13289a92..37bff41e 100644 --- a/ixwebsocket/IXWebSocketOpenInfo.h +++ b/ixwebsocket/IXWebSocketOpenInfo.h @@ -6,6 +6,10 @@ #pragma once +#include +#include +#include "IXWebSocketHttpHeaders.h" + namespace ix { struct WebSocketOpenInfo diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 77300e1a..0a401d3d 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -515,8 +515,10 @@ namespace ix // if (ws.fin && _chunks.empty()) { - emitMessage( - _fragmentedMessageKind, frameData, _receivedMessageCompressed, onMessageCallback); + emitMessage(_fragmentedMessageKind, + frameData, + _receivedMessageCompressed, + onMessageCallback); _receivedMessageCompressed = false; } diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index c37b4448..503fc49e 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.2.5" +#define IX_WEBSOCKET_VERSION "9.2.6" diff --git a/test/IXWebSocketBroadcastTest.cpp b/test/IXWebSocketBroadcastTest.cpp index fed38830..879df7a2 100644 --- a/test/IXWebSocketBroadcastTest.cpp +++ b/test/IXWebSocketBroadcastTest.cpp @@ -6,8 +6,8 @@ #include "IXTest.h" #include "catch.hpp" -#include #include "msgpack11.hpp" +#include #include #include #include @@ -130,7 +130,8 @@ namespace } else if (msg->type == ix::WebSocketMessageType::Error) { - ss << "websocket_broadcast_client: " << _user << " Error ! " << msg->errorInfo.reason; + ss << "websocket_broadcast_client: " << _user << " Error ! " + << msg->errorInfo.reason; log(ss.str()); } else if (msg->type == ix::WebSocketMessageType::Ping) @@ -234,7 +235,7 @@ namespace server.start(); return true; } -} // namespace ix +} // namespace TEST_CASE("Websocket_broadcast_server", "[websocket_server]") { @@ -247,7 +248,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") std::string session = ix::generateSessionId(); std::vector> chatClients; - for (int i = 0 ; i < 10; ++i) + for (int i = 0; i < 10; ++i) { std::string user("user_" + std::to_string(i)); chatClients.push_back(std::make_shared(user, session, port)); @@ -259,7 +260,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") while (true) { bool allReady = true; - for (size_t i = 0 ; i < chatClients.size(); ++i) + for (size_t i = 0; i < chatClients.size(); ++i) { allReady &= chatClients[i]->isReady(); } @@ -269,7 +270,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") for (int j = 0; j < 1000; j++) { - for (size_t i = 0 ; i < chatClients.size(); ++i) + for (size_t i = 0; i < chatClients.size(); ++i) { chatClients[i]->sendMessage("hello world"); } @@ -291,7 +292,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") // Stop all clients size_t messageCount = chatClients.size() * 50; - for (size_t i = 0 ; i < chatClients.size(); ++i) + for (size_t i = 0; i < chatClients.size(); ++i) { REQUIRE(chatClients[i]->getReceivedMessagesCount() >= messageCount); chatClients[i]->stop(); diff --git a/ws/ws.cpp b/ws/ws.cpp index f728913e..03afe333 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -268,8 +268,10 @@ int main(int argc, char** argv) cobra2statsd->add_option("--port", statsdPort, "Statsd port"); cobra2statsd->add_option("--prefix", prefix, "Statsd prefix"); cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join(); - cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")->join(); - cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")->join(); + cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge") + ->join(); + cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer") + ->join(); cobra2statsd->add_option("channel", channel, "Channel")->required(); cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); @@ -449,7 +451,8 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_subscribe")) { - ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd, runtime); + ret = ix::ws_cobra_subscribe_main( + cobraConfig, channel, filter, position, quiet, fluentd, runtime); } else if (app.got_subcommand("cobra_publish")) { @@ -463,7 +466,7 @@ int main(int argc, char** argv) { if (!timer.empty() && !gauge.empty()) { - spdlog::error("--gauge and --timer options are exclusive. " \ + spdlog::error("--gauge and --timer options are exclusive. " "you can only supply one"); ret = 1; } diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index aed7d67d..4aec0fb8 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -41,7 +41,7 @@ namespace ix enveloppe["message"] = msgWithPosition; jsonWriter->write(enveloppe, &std::cout); - std::cout << std::endl; // add lf and flush + std::cout << std::endl; // add lf and flush } else { @@ -184,7 +184,7 @@ namespace ix // Run for a duration, used by unittesting now else { - for (int i = 0 ; i < runtime; ++i) + for (int i = 0; i < runtime; ++i) { auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration);