diff --git a/CMakeLists.txt b/CMakeLists.txt index f86fcf95..cebf10e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXNetSystem.cpp ixwebsocket/IXWebSocket.cpp + ixwebsocket/IXWebSocketMessageQueue.cpp ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketHandshake.cpp @@ -54,6 +55,7 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXNetSystem.h ixwebsocket/IXProgressCallback.h ixwebsocket/IXWebSocket.h + ixwebsocket/IXWebSocketMessageQueue.h ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketHandshake.h diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 1dda6f66..eda20141 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -114,7 +114,10 @@ namespace ix WebSocketSendInfo ping(const std::string& text); void close(); + // Set callback to receive websocket messages. + // Be aware: your callback will be executed from websocket's internal thread! void setOnMessageCallback(const OnMessageCallback& callback); + static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void resetTrafficTrackerCallback(); diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp new file mode 100644 index 00000000..6823b01c --- /dev/null +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -0,0 +1,116 @@ +/* + * IXWebSocketMessageQueue.cpp + * Author: Korchynskyi Dmytro + * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXWebSocketMessageQueue.h" + +namespace ix +{ + + WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket) + { + bindWebsocket(websocket); + } + + WebSocketMessageQueue::~WebSocketMessageQueue() + { + if (!_messages.empty()) + { + // not handled all messages + } + + bindWebsocket(nullptr); + } + + void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket) + { + if (_websocket == websocket) return; + + // unbind old + if (_websocket) + { + // set dummy callback just to avoid crash + _websocket->setOnMessageCallback([]( + WebSocketMessageType type, + const std::string & str, + size_t wireSize, + const WebSocketErrorInfo & errorInfo, + const WebSocketOpenInfo & openInfo, + const WebSocketCloseInfo & closeInfo) + {}); + } + + _websocket = websocket; + + // bind new + if (_websocket) + { + _websocket->setOnMessageCallback([this]( + WebSocketMessageType type, + const std::string& str, + size_t wireSize, + const WebSocketErrorInfo& errorInfo, + const WebSocketOpenInfo& openInfo, + const WebSocketCloseInfo& closeInfo) + { + MessagePtr message(new Message()); + + message->type = type; + message->str = str; + message->wireSize = wireSize; + message->errorInfo = errorInfo; + message->openInfo = openInfo; + message->closeInfo = closeInfo; + + { + std::lock_guard lock(_messagesMutex); + _messages.emplace_back(std::move(message)); + } + }); + } + } + + void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback) + { + _onMessageUserCallback = callback; + } + + WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage() + { + MessagePtr message; + std::lock_guard lock(_messagesMutex); + + if (!_messages.empty()) + { + message = std::move(_messages.front()); + _messages.pop_front(); + } + + return message; + } + + void WebSocketMessageQueue::poll(int count) + { + if (!_onMessageUserCallback) + return; + + MessagePtr message; + + while (count > 0 && (message = popMessage())) + { + _onMessageUserCallback( + message->type, + message->str, + message->wireSize, + message->errorInfo, + message->openInfo, + message->closeInfo + ); + + --count; + } + } + +} diff --git a/ixwebsocket/IXWebSocketMessageQueue.h b/ixwebsocket/IXWebSocketMessageQueue.h new file mode 100644 index 00000000..543189e9 --- /dev/null +++ b/ixwebsocket/IXWebSocketMessageQueue.h @@ -0,0 +1,52 @@ +/* + * IXWebSocketMessageQueue.h + * Author: Korchynskyi Dmytro + * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include "IXWebSocket.h" +#include +#include +#include + +namespace ix +{ + // + // A helper class to dispatch websocket message callbacks in your thread. + // + class WebSocketMessageQueue + { + public: + WebSocketMessageQueue(WebSocket* websocket = nullptr); + ~WebSocketMessageQueue(); + + void bindWebsocket(WebSocket* websocket); + + void setOnMessageCallback(const OnMessageCallback& callback); + + void poll(int count = 512); + + protected: + struct Message + { + WebSocketMessageType type; + std::string str; + size_t wireSize; + WebSocketErrorInfo errorInfo; + WebSocketOpenInfo openInfo; + WebSocketCloseInfo closeInfo; + }; + + using MessagePtr = std::shared_ptr; + + MessagePtr popMessage(); + + private: + WebSocket* _websocket = nullptr; + OnMessageCallback _onMessageUserCallback; + std::mutex _messagesMutex; + std::list _messages; + }; +}