diff --git a/CMakeLists.txt b/CMakeLists.txt index e2c1bd32..049581b6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,7 +44,6 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXWebSocketCloseConstants.cpp ixwebsocket/IXWebSocketHandshake.cpp ixwebsocket/IXWebSocketHttpHeaders.cpp - ixwebsocket/IXWebSocketMessageQueue.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp @@ -84,7 +83,6 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXWebSocketHttpHeaders.h ixwebsocket/IXWebSocketInitResult.h ixwebsocket/IXWebSocketMessage.h - ixwebsocket/IXWebSocketMessageQueue.h ixwebsocket/IXWebSocketMessageType.h ixwebsocket/IXWebSocketOpenInfo.h ixwebsocket/IXWebSocketPerMessageDeflate.h diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d21eb08d..498f38e1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.2.5] - 2020-04-13 + +(websocket) WebSocketMessagePtr is a unique_ptr instead of a shared_ptr + ## [9.2.4] - 2020-04-13 (websocket) use persistent member variable as temp variables to encode/decode zlib messages in order to reduce transient allocations diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 68974d8f..864482ea 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -34,7 +34,7 @@ namespace ix _ws.setOnCloseCallback( [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) { _onMessageCallback( - std::make_shared(WebSocketMessageType::Close, + std::make_unique(WebSocketMessageType::Close, "", wireSize, WebSocketErrorInfo(), @@ -193,7 +193,7 @@ namespace ix return status; } - _onMessageCallback(std::make_shared( + _onMessageCallback(std::make_unique( WebSocketMessageType::Open, "", 0, @@ -225,7 +225,7 @@ namespace ix } _onMessageCallback( - std::make_shared(WebSocketMessageType::Open, + std::make_unique(WebSocketMessageType::Open, "", 0, WebSocketErrorInfo(), @@ -310,7 +310,7 @@ namespace ix connectErr.reason = status.errorStr; connectErr.http_status = status.http_status; - _onMessageCallback(std::make_shared(WebSocketMessageType::Error, + _onMessageCallback(std::make_unique(WebSocketMessageType::Error, "", 0, connectErr, @@ -386,7 +386,7 @@ namespace ix bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY; - _onMessageCallback(std::make_shared(webSocketMessageType, + _onMessageCallback(std::make_unique(webSocketMessageType, msg, wireSize, webSocketErrorInfo, diff --git a/ixwebsocket/IXWebSocketMessage.h b/ixwebsocket/IXWebSocketMessage.h index 5bc4e697..6dcc3d64 100644 --- a/ixwebsocket/IXWebSocketMessage.h +++ b/ixwebsocket/IXWebSocketMessage.h @@ -45,5 +45,5 @@ namespace ix } }; - using WebSocketMessagePtr = std::shared_ptr; + using WebSocketMessagePtr = std::unique_ptr; } // namespace ix diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp deleted file mode 100644 index 8e40eb70..00000000 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * IXWebSocketMessageQueue.cpp - * Author: Korchynskyi Dmytro - * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. - */ - -#include "IXWebSocketMessageQueue.h" - -namespace ix -{ - WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket) - { - bindWebsocket(websocket); - } - - WebSocketMessageQueue::~WebSocketMessageQueue() - { - if (!_messages.empty()) - { - // not handled all messages - } - - bindWebsocket(nullptr); - } - - void WebSocketMessageQueue::bindWebsocket(WebSocket* websocket) - { - if (_websocket == websocket) return; - - // unbind old - if (_websocket) - { - // set dummy callback just to avoid crash - _websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {}); - } - - _websocket = websocket; - - // bind new - if (_websocket) - { - _websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg) { - std::lock_guard lock(_messagesMutex); - _messages.emplace_back(std::move(msg)); - }); - } - } - - void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback) - { - _onMessageUserCallback = callback; - } - - void WebSocketMessageQueue::setOnMessageCallback(OnMessageCallback&& callback) - { - _onMessageUserCallback = std::move(callback); - } - - WebSocketMessagePtr WebSocketMessageQueue::popMessage() - { - WebSocketMessagePtr message; - std::lock_guard lock(_messagesMutex); - - if (!_messages.empty()) - { - message = std::move(_messages.front()); - _messages.pop_front(); - } - - return message; - } - - void WebSocketMessageQueue::poll(int count) - { - if (!_onMessageUserCallback) return; - - WebSocketMessagePtr message; - - while (count > 0 && (message = popMessage())) - { - _onMessageUserCallback(message); - --count; - } - } - -} // namespace ix diff --git a/ixwebsocket/IXWebSocketMessageQueue.h b/ixwebsocket/IXWebSocketMessageQueue.h deleted file mode 100644 index 962b78d0..00000000 --- a/ixwebsocket/IXWebSocketMessageQueue.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * IXWebSocketMessageQueue.h - * Author: Korchynskyi Dmytro - * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. - */ - -#pragma once - -#include "IXWebSocket.h" -#include -#include -#include - -namespace ix -{ - // - // A helper class to dispatch websocket message callbacks in your thread. - // - class WebSocketMessageQueue - { - public: - WebSocketMessageQueue(WebSocket* websocket = nullptr); - ~WebSocketMessageQueue(); - - void bindWebsocket(WebSocket* websocket); - - void setOnMessageCallback(const OnMessageCallback& callback); - void setOnMessageCallback(OnMessageCallback&& callback); - - void poll(int count = 512); - - protected: - WebSocketMessagePtr popMessage(); - - private: - WebSocket* _websocket = nullptr; - OnMessageCallback _onMessageUserCallback; - std::mutex _messagesMutex; - std::list _messages; - }; -} // namespace ix diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 589a3750..c37b4448 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.2.4" +#define IX_WEBSOCKET_VERSION "9.2.5" diff --git a/test/IXWebSocketMessageQTest.cpp b/test/IXWebSocketMessageQTest.cpp deleted file mode 100644 index ef6b808d..00000000 --- a/test/IXWebSocketMessageQTest.cpp +++ /dev/null @@ -1,178 +0,0 @@ -/* - * IXWebSocketMessageQTest.cpp - * Author: Korchynskyi Dmytro - * Copyright (c) 2019 Machine Zone. All rights reserved. - */ - -#include "IXTest.h" -#include "catch.hpp" -#include -#include -#include -#include - -using namespace ix; - -namespace -{ - bool startServer(ix::WebSocketServer& server) - { - server.setOnConnectionCallback([&server](std::shared_ptr webSocket, - std::shared_ptr connectionState) { - webSocket->setOnMessageCallback( - [connectionState, &server](const WebSocketMessagePtr& msg) { - if (msg->type == ix::WebSocketMessageType::Open) - { - TLogger() << "New connection"; - connectionState->computeId(); - TLogger() << "id: " << connectionState->getId(); - TLogger() << "Uri: " << msg->openInfo.uri; - TLogger() << "Headers:"; - for (auto&& it : msg->openInfo.headers) - { - TLogger() << it.first << ": " << it.second; - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - TLogger() << "Closed connection"; - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - TLogger() << "Message received: " << msg->str; - - for (auto&& client : server.getClients()) - { - client->send(msg->str); - } - } - }); - }); - - auto res = server.listen(); - if (!res.first) - { - TLogger() << res.second; - return false; - } - - server.start(); - return true; - } - - class MsgQTestClient - { - public: - MsgQTestClient() - { - msgQ.bindWebsocket(&ws); - - msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg) { - REQUIRE(mainThreadId == std::this_thread::get_id()); - - std::stringstream ss; - if (msg->type == WebSocketMessageType::Open) - { - log("client connected"); - sendNextMessage(); - } - else if (msg->type == WebSocketMessageType::Close) - { - log("client disconnected"); - } - else if (msg->type == WebSocketMessageType::Error) - { - ss << "Error ! " << msg->errorInfo.reason; - log(ss.str()); - testDone = true; - } - else if (msg->type == WebSocketMessageType::Pong) - { - ss << "Received pong message " << msg->str; - log(ss.str()); - } - else if (msg->type == WebSocketMessageType::Ping) - { - ss << "Received ping message " << msg->str; - log(ss.str()); - } - else if (msg->type == WebSocketMessageType::Message) - { - REQUIRE(msg->str.compare("Hey dude!") == 0); - ++receivedCount; - ss << "Received message " << msg->str; - log(ss.str()); - sendNextMessage(); - } - else - { - ss << "Invalid WebSocketMessageType"; - log(ss.str()); - testDone = true; - } - }); - } - - void sendNextMessage() - { - if (receivedCount >= 3) - { - testDone = true; - succeeded = true; - } - else - { - auto info = ws.sendText("Hey dude!"); - if (info.success) - log("sent message"); - else - log("send failed"); - } - } - - void run(const std::string& url) - { - mainThreadId = std::this_thread::get_id(); - testDone = false; - receivedCount = 0; - - ws.setUrl(url); - ws.start(); - - while (!testDone) - { - msgQ.poll(); - msleep(50); - } - } - - bool isSucceeded() const - { - return succeeded; - } - - private: - WebSocket ws; - WebSocketMessageQueue msgQ; - bool testDone = false; - uint32_t receivedCount = 0; - std::thread::id mainThreadId; - bool succeeded = false; - }; -} // namespace - -TEST_CASE("Websocket_message_queue", "[websocket_message_q]") -{ - SECTION("Send several messages") - { - int port = getFreePort(); - WebSocketServer server(port); - REQUIRE(startServer(server)); - - MsgQTestClient testClient; - testClient.run("ws://127.0.0.1:" + std::to_string(port)); - REQUIRE(testClient.isSucceeded()); - - server.stop(); - } -}