Revert "Merge branch 'Dimon4eg-message-queue'"
This reverts commit13fa325134, reversing changes made toaecd5e9c94.
This commit is contained in:
		@@ -28,7 +28,6 @@ set( IXWEBSOCKET_SOURCES
 | 
			
		||||
    ixwebsocket/IXCancellationRequest.cpp
 | 
			
		||||
    ixwebsocket/IXNetSystem.cpp
 | 
			
		||||
    ixwebsocket/IXWebSocket.cpp
 | 
			
		||||
    ixwebsocket/IXWebSocketMessageQueue.cpp
 | 
			
		||||
    ixwebsocket/IXWebSocketServer.cpp
 | 
			
		||||
    ixwebsocket/IXWebSocketTransport.cpp
 | 
			
		||||
    ixwebsocket/IXWebSocketHandshake.cpp
 | 
			
		||||
@@ -56,7 +55,6 @@ set( IXWEBSOCKET_HEADERS
 | 
			
		||||
    ixwebsocket/IXNetSystem.h
 | 
			
		||||
    ixwebsocket/IXProgressCallback.h
 | 
			
		||||
    ixwebsocket/IXWebSocket.h
 | 
			
		||||
    ixwebsocket/IXWebSocketMessageQueue.h
 | 
			
		||||
    ixwebsocket/IXWebSocketServer.h
 | 
			
		||||
    ixwebsocket/IXWebSocketTransport.h
 | 
			
		||||
    ixwebsocket/IXWebSocketHandshake.h
 | 
			
		||||
 
 | 
			
		||||
@@ -119,11 +119,7 @@ namespace ix
 | 
			
		||||
        void close(uint16_t code = 1000,
 | 
			
		||||
                   const std::string& reason = "Normal closure");
 | 
			
		||||
 | 
			
		||||
        // Set callback to receive websocket messages.
 | 
			
		||||
        // Be aware: your callback will be executed from websocket's internal thread!
 | 
			
		||||
        // To receive message events in your thread, look at WebSocketMessageQueue class
 | 
			
		||||
        void setOnMessageCallback(const OnMessageCallback& callback);
 | 
			
		||||
 | 
			
		||||
        static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
 | 
			
		||||
        static void resetTrafficTrackerCallback();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,121 +0,0 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXWebSocketMessageQueue.cpp
 | 
			
		||||
 *  Author: Korchynskyi Dmytro
 | 
			
		||||
 *  Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "IXWebSocketMessageQueue.h"
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
    WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket)
 | 
			
		||||
    {
 | 
			
		||||
        bindWebsocket(websocket);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    WebSocketMessageQueue::~WebSocketMessageQueue()
 | 
			
		||||
    {
 | 
			
		||||
        if (!_messages.empty())
 | 
			
		||||
        {
 | 
			
		||||
            // not handled all messages
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        bindWebsocket(nullptr);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket)
 | 
			
		||||
    {
 | 
			
		||||
        if (_websocket == websocket) return;
 | 
			
		||||
 | 
			
		||||
        // unbind old
 | 
			
		||||
        if (_websocket)
 | 
			
		||||
        {
 | 
			
		||||
            // set dummy callback just to avoid crash
 | 
			
		||||
            _websocket->setOnMessageCallback([](
 | 
			
		||||
                WebSocketMessageType,
 | 
			
		||||
                const std::string&,
 | 
			
		||||
                size_t,
 | 
			
		||||
                const WebSocketErrorInfo&,
 | 
			
		||||
                const WebSocketOpenInfo&,
 | 
			
		||||
                const WebSocketCloseInfo&)
 | 
			
		||||
            {});
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        _websocket = websocket;
 | 
			
		||||
 | 
			
		||||
        // bind new
 | 
			
		||||
        if (_websocket)
 | 
			
		||||
        {
 | 
			
		||||
            _websocket->setOnMessageCallback([this](
 | 
			
		||||
                WebSocketMessageType type,
 | 
			
		||||
                const std::string& str,
 | 
			
		||||
                size_t wireSize,
 | 
			
		||||
                const WebSocketErrorInfo& errorInfo,
 | 
			
		||||
                const WebSocketOpenInfo& openInfo,
 | 
			
		||||
                const WebSocketCloseInfo& closeInfo)
 | 
			
		||||
            {
 | 
			
		||||
                MessagePtr message(new Message());
 | 
			
		||||
 | 
			
		||||
                message->type      = type;
 | 
			
		||||
                message->str       = str;
 | 
			
		||||
                message->wireSize  = wireSize;
 | 
			
		||||
                message->errorInfo = errorInfo;
 | 
			
		||||
                message->openInfo  = openInfo;
 | 
			
		||||
                message->closeInfo = closeInfo;
 | 
			
		||||
 | 
			
		||||
                {
 | 
			
		||||
                    std::lock_guard<std::mutex> lock(_messagesMutex);
 | 
			
		||||
                    _messages.emplace_back(std::move(message));
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback)
 | 
			
		||||
    {
 | 
			
		||||
        _onMessageUserCallback = callback;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void WebSocketMessageQueue::setOnMessageCallback(OnMessageCallback&& callback)
 | 
			
		||||
    {
 | 
			
		||||
        _onMessageUserCallback = std::move(callback);
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
 | 
			
		||||
    {
 | 
			
		||||
        MessagePtr message;
 | 
			
		||||
        std::lock_guard<std::mutex> lock(_messagesMutex);
 | 
			
		||||
 | 
			
		||||
        if (!_messages.empty())
 | 
			
		||||
        {
 | 
			
		||||
            message = std::move(_messages.front());
 | 
			
		||||
            _messages.pop_front();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return message;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void WebSocketMessageQueue::poll(int count)
 | 
			
		||||
    {
 | 
			
		||||
        if (!_onMessageUserCallback)
 | 
			
		||||
            return;
 | 
			
		||||
 | 
			
		||||
        MessagePtr message;
 | 
			
		||||
 | 
			
		||||
        while (count > 0 && (message = popMessage()))
 | 
			
		||||
        {
 | 
			
		||||
            _onMessageUserCallback(
 | 
			
		||||
                message->type,
 | 
			
		||||
                message->str,
 | 
			
		||||
                message->wireSize,
 | 
			
		||||
                message->errorInfo,
 | 
			
		||||
                message->openInfo,
 | 
			
		||||
                message->closeInfo
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            --count;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -1,53 +0,0 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXWebSocketMessageQueue.h
 | 
			
		||||
 *  Author: Korchynskyi Dmytro
 | 
			
		||||
 *  Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "IXWebSocket.h"
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <list>
 | 
			
		||||
#include <memory>
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    //
 | 
			
		||||
    // A helper class to dispatch websocket message callbacks in your thread.
 | 
			
		||||
    //
 | 
			
		||||
    class WebSocketMessageQueue
 | 
			
		||||
    {
 | 
			
		||||
    public:
 | 
			
		||||
        WebSocketMessageQueue(WebSocket* websocket = nullptr);
 | 
			
		||||
        ~WebSocketMessageQueue();
 | 
			
		||||
 | 
			
		||||
        void bindWebsocket(WebSocket* websocket);
 | 
			
		||||
 | 
			
		||||
        void setOnMessageCallback(const OnMessageCallback& callback);
 | 
			
		||||
        void setOnMessageCallback(OnMessageCallback&& callback);
 | 
			
		||||
 | 
			
		||||
        void poll(int count = 512);
 | 
			
		||||
 | 
			
		||||
    protected:
 | 
			
		||||
        struct Message
 | 
			
		||||
        {
 | 
			
		||||
            WebSocketMessageType type;
 | 
			
		||||
            std::string str;
 | 
			
		||||
            size_t wireSize;
 | 
			
		||||
            WebSocketErrorInfo errorInfo;
 | 
			
		||||
            WebSocketOpenInfo openInfo;
 | 
			
		||||
            WebSocketCloseInfo closeInfo;
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        using MessagePtr = std::shared_ptr<Message>;
 | 
			
		||||
 | 
			
		||||
        MessagePtr popMessage();
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        WebSocket* _websocket = nullptr;
 | 
			
		||||
        OnMessageCallback _onMessageUserCallback;
 | 
			
		||||
        std::mutex _messagesMutex;
 | 
			
		||||
        std::list<MessagePtr> _messages;
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
@@ -36,7 +36,6 @@ set (SOURCES
 | 
			
		||||
  IXWebSocketServerTest.cpp
 | 
			
		||||
  IXWebSocketTestConnectionDisconnection.cpp
 | 
			
		||||
  IXUrlParserTest.cpp
 | 
			
		||||
  IXWebSocketMessageQTest.cpp
 | 
			
		||||
  IXWebSocketServerTest.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,191 +0,0 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXWebSocketMessageQTest.cpp
 | 
			
		||||
 *  Author: Korchynskyi Dmytro
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include <ixwebsocket/IXWebSocket.h>
 | 
			
		||||
#include <ixwebsocket/IXWebSocketServer.h>
 | 
			
		||||
#include <ixwebsocket/IXWebSocketMessageQueue.h>
 | 
			
		||||
 | 
			
		||||
#include "IXTest.h"
 | 
			
		||||
#include "catch.hpp"
 | 
			
		||||
#include <thread>
 | 
			
		||||
 | 
			
		||||
using namespace ix;
 | 
			
		||||
 | 
			
		||||
namespace
 | 
			
		||||
{
 | 
			
		||||
    bool startServer(ix::WebSocketServer& server)
 | 
			
		||||
    {
 | 
			
		||||
        server.setOnConnectionCallback(
 | 
			
		||||
            [&server](std::shared_ptr<ix::WebSocket> webSocket,
 | 
			
		||||
                std::shared_ptr<ConnectionState> connectionState)
 | 
			
		||||
        {
 | 
			
		||||
            webSocket->setOnMessageCallback(
 | 
			
		||||
                [connectionState, &server](ix::WebSocketMessageType messageType,
 | 
			
		||||
                    const std::string & str,
 | 
			
		||||
                    size_t wireSize,
 | 
			
		||||
                    const ix::WebSocketErrorInfo & error,
 | 
			
		||||
                    const ix::WebSocketOpenInfo & openInfo,
 | 
			
		||||
                    const ix::WebSocketCloseInfo & closeInfo)
 | 
			
		||||
            {
 | 
			
		||||
                if (messageType == ix::WebSocketMessageType::Open)
 | 
			
		||||
                {
 | 
			
		||||
                    Logger() << "New connection";
 | 
			
		||||
                    connectionState->computeId();
 | 
			
		||||
                    Logger() << "id: " << connectionState->getId();
 | 
			
		||||
                    Logger() << "Uri: " << openInfo.uri;
 | 
			
		||||
                    Logger() << "Headers:";
 | 
			
		||||
                    for (auto it : openInfo.headers)
 | 
			
		||||
                    {
 | 
			
		||||
                        Logger() << it.first << ": " << it.second;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                else if (messageType == ix::WebSocketMessageType::Close)
 | 
			
		||||
                {
 | 
			
		||||
                    Logger() << "Closed connection";
 | 
			
		||||
                }
 | 
			
		||||
                else if (messageType == ix::WebSocketMessageType::Message)
 | 
			
		||||
                {
 | 
			
		||||
                    Logger() << "Message received: " << str;
 | 
			
		||||
 | 
			
		||||
                    for (auto&& client : server.getClients())
 | 
			
		||||
                    {
 | 
			
		||||
                        client->send(str);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        auto res = server.listen();
 | 
			
		||||
        if (!res.first)
 | 
			
		||||
        {
 | 
			
		||||
            Logger() << res.second;
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        server.start();
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    class MsgQTestClient
 | 
			
		||||
    {
 | 
			
		||||
    public:
 | 
			
		||||
        MsgQTestClient()
 | 
			
		||||
        {
 | 
			
		||||
            msgQ.bindWebsocket(&ws);
 | 
			
		||||
 | 
			
		||||
            msgQ.setOnMessageCallback([this](WebSocketMessageType messageType,
 | 
			
		||||
                const std::string & str,
 | 
			
		||||
                size_t wireSize,
 | 
			
		||||
                const WebSocketErrorInfo & error,
 | 
			
		||||
                const WebSocketOpenInfo & openInfo,
 | 
			
		||||
                const WebSocketCloseInfo & closeInfo)
 | 
			
		||||
            {
 | 
			
		||||
                REQUIRE(mainThreadId == std::this_thread::get_id());
 | 
			
		||||
 | 
			
		||||
                std::stringstream ss;
 | 
			
		||||
                if (messageType == WebSocketMessageType::Open)
 | 
			
		||||
                {
 | 
			
		||||
                    log("client connected");
 | 
			
		||||
                    sendNextMessage();
 | 
			
		||||
                }
 | 
			
		||||
                else if (messageType == WebSocketMessageType::Close)
 | 
			
		||||
                {
 | 
			
		||||
                    log("client disconnected");
 | 
			
		||||
                }
 | 
			
		||||
                else if (messageType == WebSocketMessageType::Error)
 | 
			
		||||
                {
 | 
			
		||||
                    ss << "Error ! " << error.reason;
 | 
			
		||||
                    log(ss.str());
 | 
			
		||||
                    testDone = true;
 | 
			
		||||
                }
 | 
			
		||||
                else if (messageType == WebSocketMessageType::Pong)
 | 
			
		||||
                {
 | 
			
		||||
                    ss << "Received pong message " << str;
 | 
			
		||||
                    log(ss.str());
 | 
			
		||||
                }
 | 
			
		||||
                else if (messageType == WebSocketMessageType::Ping)
 | 
			
		||||
                {
 | 
			
		||||
                    ss << "Received ping message " << str;
 | 
			
		||||
                    log(ss.str());
 | 
			
		||||
                }
 | 
			
		||||
                else if (messageType == WebSocketMessageType::Message)
 | 
			
		||||
                {
 | 
			
		||||
                    REQUIRE(str.compare("Hey dude!") == 0);
 | 
			
		||||
                    ++receivedCount;
 | 
			
		||||
                    ss << "Received message " << str;
 | 
			
		||||
                    log(ss.str());
 | 
			
		||||
                    sendNextMessage();
 | 
			
		||||
                }
 | 
			
		||||
                else
 | 
			
		||||
                {
 | 
			
		||||
                    ss << "Invalid WebSocketMessageType";
 | 
			
		||||
                    log(ss.str());
 | 
			
		||||
                    testDone = true;
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        void sendNextMessage()
 | 
			
		||||
        {
 | 
			
		||||
            if (receivedCount >= 3)
 | 
			
		||||
            {
 | 
			
		||||
                testDone = true;
 | 
			
		||||
                succeeded = true;
 | 
			
		||||
            }
 | 
			
		||||
            else
 | 
			
		||||
            {
 | 
			
		||||
                auto info = ws.sendText("Hey dude!");
 | 
			
		||||
                if (info.success)
 | 
			
		||||
                    log("sent message");
 | 
			
		||||
                else
 | 
			
		||||
                    log("send failed");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        void run(const std::string& url)
 | 
			
		||||
        {
 | 
			
		||||
            mainThreadId = std::this_thread::get_id();
 | 
			
		||||
            testDone = false;
 | 
			
		||||
            receivedCount = 0;
 | 
			
		||||
 | 
			
		||||
            ws.setUrl(url);
 | 
			
		||||
            ws.start();
 | 
			
		||||
 | 
			
		||||
            while (!testDone)
 | 
			
		||||
            {
 | 
			
		||||
                msgQ.poll();
 | 
			
		||||
                msleep(50);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        bool isSucceeded() const { return succeeded; }
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        WebSocket ws;
 | 
			
		||||
        WebSocketMessageQueue msgQ;
 | 
			
		||||
        bool testDone = false;
 | 
			
		||||
        uint32_t receivedCount = 0;
 | 
			
		||||
        std::thread::id mainThreadId;
 | 
			
		||||
        bool succeeded = false;
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
 | 
			
		||||
{
 | 
			
		||||
    SECTION("Send several messages")
 | 
			
		||||
    {
 | 
			
		||||
        int port = getFreePort();
 | 
			
		||||
        WebSocketServer server(port);
 | 
			
		||||
        REQUIRE(startServer(server));
 | 
			
		||||
 | 
			
		||||
        MsgQTestClient testClient;
 | 
			
		||||
        testClient.run("ws://127.0.0.1:" + std::to_string(port));
 | 
			
		||||
        REQUIRE(testClient.isSucceeded());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user