From c4a5647b622b0624eb959491a0d91c9ebfea7535 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 16 May 2019 22:15:17 -0700 Subject: [PATCH] Revert "Merge branch 'Dimon4eg-message-queue'" This reverts commit 13fa325134163997fe73a5b0658b8301c9b2f729, reversing changes made to aecd5e9c944ce9b53533a846790ae4d383fa924b. --- CMakeLists.txt | 2 - ixwebsocket/IXWebSocket.h | 4 - ixwebsocket/IXWebSocketMessageQueue.cpp | 121 --------------- ixwebsocket/IXWebSocketMessageQueue.h | 53 ------- test/CMakeLists.txt | 1 - test/IXWebSocketMessageQTest.cpp | 191 ------------------------ 6 files changed, 372 deletions(-) delete mode 100644 ixwebsocket/IXWebSocketMessageQueue.cpp delete mode 100644 ixwebsocket/IXWebSocketMessageQueue.h delete mode 100644 test/IXWebSocketMessageQTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a82c9c05..7d4c63be 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,7 +28,6 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXNetSystem.cpp ixwebsocket/IXWebSocket.cpp - ixwebsocket/IXWebSocketMessageQueue.cpp ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketHandshake.cpp @@ -56,7 +55,6 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXNetSystem.h ixwebsocket/IXProgressCallback.h ixwebsocket/IXWebSocket.h - ixwebsocket/IXWebSocketMessageQueue.h ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketHandshake.h diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index f25a88ec..331923ca 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -119,11 +119,7 @@ namespace ix void close(uint16_t code = 1000, const std::string& reason = "Normal closure"); - // Set callback to receive websocket messages. - // Be aware: your callback will be executed from websocket's internal thread! - // To receive message events in your thread, look at WebSocketMessageQueue class void setOnMessageCallback(const OnMessageCallback& callback); - static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void resetTrafficTrackerCallback(); diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp deleted file mode 100644 index 64e9ddb6..00000000 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - * IXWebSocketMessageQueue.cpp - * Author: Korchynskyi Dmytro - * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. - */ - -#include "IXWebSocketMessageQueue.h" - -namespace ix -{ - - WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket) - { - bindWebsocket(websocket); - } - - WebSocketMessageQueue::~WebSocketMessageQueue() - { - if (!_messages.empty()) - { - // not handled all messages - } - - bindWebsocket(nullptr); - } - - void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket) - { - if (_websocket == websocket) return; - - // unbind old - if (_websocket) - { - // set dummy callback just to avoid crash - _websocket->setOnMessageCallback([]( - WebSocketMessageType, - const std::string&, - size_t, - const WebSocketErrorInfo&, - const WebSocketOpenInfo&, - const WebSocketCloseInfo&) - {}); - } - - _websocket = websocket; - - // bind new - if (_websocket) - { - _websocket->setOnMessageCallback([this]( - WebSocketMessageType type, - const std::string& str, - size_t wireSize, - const WebSocketErrorInfo& errorInfo, - const WebSocketOpenInfo& openInfo, - const WebSocketCloseInfo& closeInfo) - { - MessagePtr message(new Message()); - - message->type = type; - message->str = str; - message->wireSize = wireSize; - message->errorInfo = errorInfo; - message->openInfo = openInfo; - message->closeInfo = closeInfo; - - { - std::lock_guard lock(_messagesMutex); - _messages.emplace_back(std::move(message)); - } - }); - } - } - - void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback) - { - _onMessageUserCallback = callback; - } - - void WebSocketMessageQueue::setOnMessageCallback(OnMessageCallback&& callback) - { - _onMessageUserCallback = std::move(callback); - } - - WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage() - { - MessagePtr message; - std::lock_guard lock(_messagesMutex); - - if (!_messages.empty()) - { - message = std::move(_messages.front()); - _messages.pop_front(); - } - - return message; - } - - void WebSocketMessageQueue::poll(int count) - { - if (!_onMessageUserCallback) - return; - - MessagePtr message; - - while (count > 0 && (message = popMessage())) - { - _onMessageUserCallback( - message->type, - message->str, - message->wireSize, - message->errorInfo, - message->openInfo, - message->closeInfo - ); - - --count; - } - } - -} diff --git a/ixwebsocket/IXWebSocketMessageQueue.h b/ixwebsocket/IXWebSocketMessageQueue.h deleted file mode 100644 index b8b85c25..00000000 --- a/ixwebsocket/IXWebSocketMessageQueue.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * IXWebSocketMessageQueue.h - * Author: Korchynskyi Dmytro - * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. - */ - -#pragma once - -#include "IXWebSocket.h" -#include -#include -#include - -namespace ix -{ - // - // A helper class to dispatch websocket message callbacks in your thread. - // - class WebSocketMessageQueue - { - public: - WebSocketMessageQueue(WebSocket* websocket = nullptr); - ~WebSocketMessageQueue(); - - void bindWebsocket(WebSocket* websocket); - - void setOnMessageCallback(const OnMessageCallback& callback); - void setOnMessageCallback(OnMessageCallback&& callback); - - void poll(int count = 512); - - protected: - struct Message - { - WebSocketMessageType type; - std::string str; - size_t wireSize; - WebSocketErrorInfo errorInfo; - WebSocketOpenInfo openInfo; - WebSocketCloseInfo closeInfo; - }; - - using MessagePtr = std::shared_ptr; - - MessagePtr popMessage(); - - private: - WebSocket* _websocket = nullptr; - OnMessageCallback _onMessageUserCallback; - std::mutex _messagesMutex; - std::list _messages; - }; -} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d27a796f..77ae3630 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -36,7 +36,6 @@ set (SOURCES IXWebSocketServerTest.cpp IXWebSocketTestConnectionDisconnection.cpp IXUrlParserTest.cpp - IXWebSocketMessageQTest.cpp IXWebSocketServerTest.cpp ) diff --git a/test/IXWebSocketMessageQTest.cpp b/test/IXWebSocketMessageQTest.cpp deleted file mode 100644 index 034a2ac0..00000000 --- a/test/IXWebSocketMessageQTest.cpp +++ /dev/null @@ -1,191 +0,0 @@ -/* - * IXWebSocketMessageQTest.cpp - * Author: Korchynskyi Dmytro - * Copyright (c) 2019 Machine Zone. All rights reserved. - */ - -#include -#include -#include - -#include "IXTest.h" -#include "catch.hpp" -#include - -using namespace ix; - -namespace -{ - bool startServer(ix::WebSocketServer& server) - { - server.setOnConnectionCallback( - [&server](std::shared_ptr webSocket, - std::shared_ptr connectionState) - { - webSocket->setOnMessageCallback( - [connectionState, &server](ix::WebSocketMessageType messageType, - const std::string & str, - size_t wireSize, - const ix::WebSocketErrorInfo & error, - const ix::WebSocketOpenInfo & openInfo, - const ix::WebSocketCloseInfo & closeInfo) - { - if (messageType == ix::WebSocketMessageType::Open) - { - Logger() << "New connection"; - connectionState->computeId(); - Logger() << "id: " << connectionState->getId(); - Logger() << "Uri: " << openInfo.uri; - Logger() << "Headers:"; - for (auto it : openInfo.headers) - { - Logger() << it.first << ": " << it.second; - } - } - else if (messageType == ix::WebSocketMessageType::Close) - { - Logger() << "Closed connection"; - } - else if (messageType == ix::WebSocketMessageType::Message) - { - Logger() << "Message received: " << str; - - for (auto&& client : server.getClients()) - { - client->send(str); - } - } - } - ); - } - ); - - auto res = server.listen(); - if (!res.first) - { - Logger() << res.second; - return false; - } - - server.start(); - return true; - } - - class MsgQTestClient - { - public: - MsgQTestClient() - { - msgQ.bindWebsocket(&ws); - - msgQ.setOnMessageCallback([this](WebSocketMessageType messageType, - const std::string & str, - size_t wireSize, - const WebSocketErrorInfo & error, - const WebSocketOpenInfo & openInfo, - const WebSocketCloseInfo & closeInfo) - { - REQUIRE(mainThreadId == std::this_thread::get_id()); - - std::stringstream ss; - if (messageType == WebSocketMessageType::Open) - { - log("client connected"); - sendNextMessage(); - } - else if (messageType == WebSocketMessageType::Close) - { - log("client disconnected"); - } - else if (messageType == WebSocketMessageType::Error) - { - ss << "Error ! " << error.reason; - log(ss.str()); - testDone = true; - } - else if (messageType == WebSocketMessageType::Pong) - { - ss << "Received pong message " << str; - log(ss.str()); - } - else if (messageType == WebSocketMessageType::Ping) - { - ss << "Received ping message " << str; - log(ss.str()); - } - else if (messageType == WebSocketMessageType::Message) - { - REQUIRE(str.compare("Hey dude!") == 0); - ++receivedCount; - ss << "Received message " << str; - log(ss.str()); - sendNextMessage(); - } - else - { - ss << "Invalid WebSocketMessageType"; - log(ss.str()); - testDone = true; - } - }); - } - - void sendNextMessage() - { - if (receivedCount >= 3) - { - testDone = true; - succeeded = true; - } - else - { - auto info = ws.sendText("Hey dude!"); - if (info.success) - log("sent message"); - else - log("send failed"); - } - } - - void run(const std::string& url) - { - mainThreadId = std::this_thread::get_id(); - testDone = false; - receivedCount = 0; - - ws.setUrl(url); - ws.start(); - - while (!testDone) - { - msgQ.poll(); - msleep(50); - } - } - - bool isSucceeded() const { return succeeded; } - - private: - WebSocket ws; - WebSocketMessageQueue msgQ; - bool testDone = false; - uint32_t receivedCount = 0; - std::thread::id mainThreadId; - bool succeeded = false; - }; -} - -TEST_CASE("Websocket_message_queue", "[websocket_message_q]") -{ - SECTION("Send several messages") - { - int port = getFreePort(); - WebSocketServer server(port); - REQUIRE(startServer(server)); - - MsgQTestClient testClient; - testClient.run("ws://127.0.0.1:" + std::to_string(port)); - REQUIRE(testClient.isSucceeded()); - } - -}