From 77c7fdc636925de88199a23107afa981c56b4be8 Mon Sep 17 00:00:00 2001 From: Dimon4eg Date: Wed, 8 May 2019 22:02:56 +0300 Subject: [PATCH] Added IXWebSocketPoll class --- CMakeLists.txt | 2 + ixwebsocket/IXWebSocket.h | 5 ++ ixwebsocket/IXWebSocketPoll.cpp | 108 ++++++++++++++++++++++++++++++++ ixwebsocket/IXWebSocketPoll.h | 52 +++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 ixwebsocket/IXWebSocketPoll.cpp create mode 100644 ixwebsocket/IXWebSocketPoll.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f86fcf95..9860e023 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXNetSystem.cpp ixwebsocket/IXWebSocket.cpp + ixwebsocket/IXWebSocketPoll.cpp ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketHandshake.cpp @@ -54,6 +55,7 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXNetSystem.h ixwebsocket/IXProgressCallback.h ixwebsocket/IXWebSocket.h + ixwebsocket/IXWebSocketPoll.h ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketHandshake.h diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 8c6bf702..e655d196 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -113,6 +113,11 @@ namespace ix WebSocketSendInfo ping(const std::string& text); void close(); + /** + * Set callback to receive websocket messages. + * + * Be aware: your callback will be executed from websocket's internal thread! + */ void setOnMessageCallback(const OnMessageCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void resetTrafficTrackerCallback(); diff --git a/ixwebsocket/IXWebSocketPoll.cpp b/ixwebsocket/IXWebSocketPoll.cpp new file mode 100644 index 00000000..9634e198 --- /dev/null +++ b/ixwebsocket/IXWebSocketPoll.cpp @@ -0,0 +1,108 @@ +/* + * IXWebSocketPoll.cpp + * Author: Korchynskyi Dmytro + * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + */ + +#include "IXWebSocketPoll.h" + +namespace ix +{ + + WebSocketPoll::WebSocketPoll(WebSocket* websocket) + { + } + + WebSocketPoll::~WebSocketPoll() + { + if (!_messages.empty()) + { + + } + + bindWebsocket(nullptr); + } + + void WebSocketPoll::bindWebsocket(WebSocket * websocket) + { + if (_websocket != websocket) + { + // unbind old + if (_websocket) + { + _websocket->setOnMessageCallback(nullptr); + } + + _websocket = websocket; + + // bind new + if (_websocket) + { + _websocket->setOnMessageCallback([this]( + WebSocketMessageType type, + const std::string& str, + size_t wireSize, + const WebSocketErrorInfo& errorInfo, + const WebSocketOpenInfo& openInfo, + const WebSocketCloseInfo& closeInfo) + { + MessageDataPtr message(new Message()); + + message->type = type; + message->str = str; + message->wireSize = wireSize; + message->errorInfo = errorInfo; + message->openInfo = openInfo; + message->closeInfo = closeInfo; + + _messagesMutex.lock(); + _messages.emplace_back(std::move(message)); + _messagesMutex.unlock(); + }); + } + } + } + + void WebSocketPoll::setOnMessageCallback(const OnMessageCallback& callback) + { + _onMessageUserCallback = callback; + } + + WebSocketPoll::MessageDataPtr WebSocketPoll::popMessage() + { + MessageDataPtr message; + + _messagesMutex.lock(); + if (!_messages.empty()) + { + message = std::move(_messages.front()); + _messages.pop_front(); + } + _messagesMutex.unlock(); + + return message; + } + + void WebSocketPoll::poll(int count) + { + if (!_onMessageUserCallback) + return; + + MessageDataPtr message; + + while (count > 0 && (message = popMessage())) + { + _onMessageUserCallback( + message->type, + message->str, + message->wireSize, + message->errorInfo, + message->openInfo, + message->closeInfo + ); + + --count; + } + } + +} diff --git a/ixwebsocket/IXWebSocketPoll.h b/ixwebsocket/IXWebSocketPoll.h new file mode 100644 index 00000000..c16eb808 --- /dev/null +++ b/ixwebsocket/IXWebSocketPoll.h @@ -0,0 +1,52 @@ +/* + * IXWebSocketPoll.h + * Author: Korchynskyi Dmytro + * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include "IXWebSocket.h" +#include +#include +#include + +namespace ix +{ + /** + * A helper class to dispatch websocket message callbacks in your thread. + */ + class WebSocketPoll + { + public: + WebSocketPoll(WebSocket* websocket = nullptr); + ~WebSocketPoll(); + + void bindWebsocket(WebSocket* websocket); + + void setOnMessageCallback(const OnMessageCallback& callback); + + void poll(int count = 512); + + protected: + struct Message + { + WebSocketMessageType type; + std::string str; + size_t wireSize; + WebSocketErrorInfo errorInfo; + WebSocketOpenInfo openInfo; + WebSocketCloseInfo closeInfo; + }; + + using MessageDataPtr = std::shared_ptr; + + MessageDataPtr popMessage(); + + private: + WebSocket* _websocket = nullptr; + OnMessageCallback _onMessageUserCallback; + std::mutex _messagesMutex; + std::list _messages; + }; +}