From 45d40dc15994c18d74a30a15930a649e601b6995 Mon Sep 17 00:00:00 2001 From: Dimon4eg Date: Wed, 8 May 2019 22:02:56 +0300 Subject: [PATCH 1/7] Added IXWebSocketPoll class --- CMakeLists.txt | 2 + ixwebsocket/IXWebSocket.h | 5 ++ ixwebsocket/IXWebSocketPoll.cpp | 108 ++++++++++++++++++++++++++++++++ ixwebsocket/IXWebSocketPoll.h | 52 +++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 ixwebsocket/IXWebSocketPoll.cpp create mode 100644 ixwebsocket/IXWebSocketPoll.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f86fcf95..9860e023 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXNetSystem.cpp ixwebsocket/IXWebSocket.cpp + ixwebsocket/IXWebSocketPoll.cpp ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketHandshake.cpp @@ -54,6 +55,7 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXNetSystem.h ixwebsocket/IXProgressCallback.h ixwebsocket/IXWebSocket.h + ixwebsocket/IXWebSocketPoll.h ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketHandshake.h diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 8c6bf702..e655d196 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -113,6 +113,11 @@ namespace ix WebSocketSendInfo ping(const std::string& text); void close(); + /** + * Set callback to receive websocket messages. + * + * Be aware: your callback will be executed from websocket's internal thread! + */ void setOnMessageCallback(const OnMessageCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void resetTrafficTrackerCallback(); diff --git a/ixwebsocket/IXWebSocketPoll.cpp b/ixwebsocket/IXWebSocketPoll.cpp new file mode 100644 index 00000000..9634e198 --- /dev/null +++ b/ixwebsocket/IXWebSocketPoll.cpp @@ -0,0 +1,108 @@ +/* + * IXWebSocketPoll.cpp + * Author: Korchynskyi Dmytro + * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + */ + +#include "IXWebSocketPoll.h" + +namespace ix +{ + + WebSocketPoll::WebSocketPoll(WebSocket* websocket) + { + } + + WebSocketPoll::~WebSocketPoll() + { + if (!_messages.empty()) + { + + } + + bindWebsocket(nullptr); + } + + void WebSocketPoll::bindWebsocket(WebSocket * websocket) + { + if (_websocket != websocket) + { + // unbind old + if (_websocket) + { + _websocket->setOnMessageCallback(nullptr); + } + + _websocket = websocket; + + // bind new + if (_websocket) + { + _websocket->setOnMessageCallback([this]( + WebSocketMessageType type, + const std::string& str, + size_t wireSize, + const WebSocketErrorInfo& errorInfo, + const WebSocketOpenInfo& openInfo, + const WebSocketCloseInfo& closeInfo) + { + MessageDataPtr message(new Message()); + + message->type = type; + message->str = str; + message->wireSize = wireSize; + message->errorInfo = errorInfo; + message->openInfo = openInfo; + message->closeInfo = closeInfo; + + _messagesMutex.lock(); + _messages.emplace_back(std::move(message)); + _messagesMutex.unlock(); + }); + } + } + } + + void WebSocketPoll::setOnMessageCallback(const OnMessageCallback& callback) + { + _onMessageUserCallback = callback; + } + + WebSocketPoll::MessageDataPtr WebSocketPoll::popMessage() + { + MessageDataPtr message; + + _messagesMutex.lock(); + if (!_messages.empty()) + { + message = std::move(_messages.front()); + _messages.pop_front(); + } + _messagesMutex.unlock(); + + return message; + } + + void WebSocketPoll::poll(int count) + { + if (!_onMessageUserCallback) + return; + + MessageDataPtr message; + + while (count > 0 && (message = popMessage())) + { + _onMessageUserCallback( + message->type, + message->str, + message->wireSize, + message->errorInfo, + message->openInfo, + message->closeInfo + ); + + --count; + } + } + +} diff --git a/ixwebsocket/IXWebSocketPoll.h b/ixwebsocket/IXWebSocketPoll.h new file mode 100644 index 00000000..c16eb808 --- /dev/null +++ b/ixwebsocket/IXWebSocketPoll.h @@ -0,0 +1,52 @@ +/* + * IXWebSocketPoll.h + * Author: Korchynskyi Dmytro + * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include "IXWebSocket.h" +#include +#include +#include + +namespace ix +{ + /** + * A helper class to dispatch websocket message callbacks in your thread. + */ + class WebSocketPoll + { + public: + WebSocketPoll(WebSocket* websocket = nullptr); + ~WebSocketPoll(); + + void bindWebsocket(WebSocket* websocket); + + void setOnMessageCallback(const OnMessageCallback& callback); + + void poll(int count = 512); + + protected: + struct Message + { + WebSocketMessageType type; + std::string str; + size_t wireSize; + WebSocketErrorInfo errorInfo; + WebSocketOpenInfo openInfo; + WebSocketCloseInfo closeInfo; + }; + + using MessageDataPtr = std::shared_ptr; + + MessageDataPtr popMessage(); + + private: + WebSocket* _websocket = nullptr; + OnMessageCallback _onMessageUserCallback; + std::mutex _messagesMutex; + std::list _messages; + }; +} From 636a69e9e1b301af538a2c911e9bd310324eef8a Mon Sep 17 00:00:00 2001 From: Dimon4eg Date: Wed, 8 May 2019 22:24:39 +0300 Subject: [PATCH 2/7] qf --- ixwebsocket/IXWebSocketPoll.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ixwebsocket/IXWebSocketPoll.cpp b/ixwebsocket/IXWebSocketPoll.cpp index 9634e198..53d07d5f 100644 --- a/ixwebsocket/IXWebSocketPoll.cpp +++ b/ixwebsocket/IXWebSocketPoll.cpp @@ -11,13 +11,14 @@ namespace ix WebSocketPoll::WebSocketPoll(WebSocket* websocket) { + bindWebsocket(websocket); } WebSocketPoll::~WebSocketPoll() { if (!_messages.empty()) { - + // not handled all messages } bindWebsocket(nullptr); From 467f99b3bb8c7640b38bfb6d9e9f18397a07e401 Mon Sep 17 00:00:00 2001 From: dimon4eg Date: Thu, 9 May 2019 00:09:51 +0300 Subject: [PATCH 3/7] Rename to WebSocketMessageQueue --- CMakeLists.txt | 4 ++-- ...bSocketPoll.cpp => IXWebSocketMessageQueue.cpp} | 14 +++++++------- ...IXWebSocketPoll.h => IXWebSocketMessageQueue.h} | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) rename ixwebsocket/{IXWebSocketPoll.cpp => IXWebSocketMessageQueue.cpp} (84%) rename ixwebsocket/{IXWebSocketPoll.h => IXWebSocketMessageQueue.h} (89%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9860e023..cebf10e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,7 +28,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXNetSystem.cpp ixwebsocket/IXWebSocket.cpp - ixwebsocket/IXWebSocketPoll.cpp + ixwebsocket/IXWebSocketMessageQueue.cpp ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketHandshake.cpp @@ -55,7 +55,7 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXNetSystem.h ixwebsocket/IXProgressCallback.h ixwebsocket/IXWebSocket.h - ixwebsocket/IXWebSocketPoll.h + ixwebsocket/IXWebSocketMessageQueue.h ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketHandshake.h diff --git a/ixwebsocket/IXWebSocketPoll.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp similarity index 84% rename from ixwebsocket/IXWebSocketPoll.cpp rename to ixwebsocket/IXWebSocketMessageQueue.cpp index 53d07d5f..25a66d67 100644 --- a/ixwebsocket/IXWebSocketPoll.cpp +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -4,17 +4,17 @@ * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. */ -#include "IXWebSocketPoll.h" +#include "IXWebSocketMessageQueue.h" namespace ix { - WebSocketPoll::WebSocketPoll(WebSocket* websocket) + WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket) { bindWebsocket(websocket); } - WebSocketPoll::~WebSocketPoll() + WebSocketMessageQueue::~WebSocketMessageQueue() { if (!_messages.empty()) { @@ -24,7 +24,7 @@ namespace ix bindWebsocket(nullptr); } - void WebSocketPoll::bindWebsocket(WebSocket * websocket) + void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket) { if (_websocket != websocket) { @@ -64,12 +64,12 @@ namespace ix } } - void WebSocketPoll::setOnMessageCallback(const OnMessageCallback& callback) + void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback) { _onMessageUserCallback = callback; } - WebSocketPoll::MessageDataPtr WebSocketPoll::popMessage() + WebSocketMessageQueue::MessageDataPtr WebSocketMessageQueue::popMessage() { MessageDataPtr message; @@ -84,7 +84,7 @@ namespace ix return message; } - void WebSocketPoll::poll(int count) + void WebSocketMessageQueue::poll(int count) { if (!_onMessageUserCallback) return; diff --git a/ixwebsocket/IXWebSocketPoll.h b/ixwebsocket/IXWebSocketMessageQueue.h similarity index 89% rename from ixwebsocket/IXWebSocketPoll.h rename to ixwebsocket/IXWebSocketMessageQueue.h index c16eb808..46507527 100644 --- a/ixwebsocket/IXWebSocketPoll.h +++ b/ixwebsocket/IXWebSocketMessageQueue.h @@ -16,11 +16,11 @@ namespace ix /** * A helper class to dispatch websocket message callbacks in your thread. */ - class WebSocketPoll + class WebSocketMessageQueue { public: - WebSocketPoll(WebSocket* websocket = nullptr); - ~WebSocketPoll(); + WebSocketMessageQueue(WebSocket* websocket = nullptr); + ~WebSocketMessageQueue(); void bindWebsocket(WebSocket* websocket); From 2727d39fa47a14a931e01d2e3e72d93fd073bd38 Mon Sep 17 00:00:00 2001 From: dimon4eg Date: Thu, 9 May 2019 00:16:37 +0300 Subject: [PATCH 4/7] update comments --- ixwebsocket/IXWebSocket.h | 8 +++----- ixwebsocket/IXWebSocketMessageQueue.cpp | 4 ++-- ixwebsocket/IXWebSocketMessageQueue.h | 10 +++++----- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index e655d196..c1d8c0e4 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -113,12 +113,10 @@ namespace ix WebSocketSendInfo ping(const std::string& text); void close(); - /** - * Set callback to receive websocket messages. - * - * Be aware: your callback will be executed from websocket's internal thread! - */ + // Set callback to receive websocket messages. + // Be aware: your callback will be executed from websocket's internal thread! void setOnMessageCallback(const OnMessageCallback& callback); + static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void resetTrafficTrackerCallback(); diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp index 25a66d67..b7c074f7 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -1,7 +1,7 @@ /* - * IXWebSocketPoll.cpp + * IXWebSocketMessageQueue.cpp * Author: Korchynskyi Dmytro - * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. */ #include "IXWebSocketMessageQueue.h" diff --git a/ixwebsocket/IXWebSocketMessageQueue.h b/ixwebsocket/IXWebSocketMessageQueue.h index 46507527..961f57e2 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.h +++ b/ixwebsocket/IXWebSocketMessageQueue.h @@ -1,7 +1,7 @@ /* - * IXWebSocketPoll.h + * IXWebSocketMessageQueue.h * Author: Korchynskyi Dmytro - * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. */ #pragma once @@ -13,9 +13,9 @@ namespace ix { - /** - * A helper class to dispatch websocket message callbacks in your thread. - */ + // + // A helper class to dispatch websocket message callbacks in your thread. + // class WebSocketMessageQueue { public: From 28ae70ed20f6214badf5c446ac738d98e130cf70 Mon Sep 17 00:00:00 2001 From: dimon4eg Date: Thu, 9 May 2019 00:20:26 +0300 Subject: [PATCH 5/7] use lock_guard --- ixwebsocket/IXWebSocketMessageQueue.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp index b7c074f7..a62757bf 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -56,9 +56,10 @@ namespace ix message->openInfo = openInfo; message->closeInfo = closeInfo; - _messagesMutex.lock(); - _messages.emplace_back(std::move(message)); - _messagesMutex.unlock(); + { + std::lock_guard lock(_messagesMutex); + _messages.emplace_back(std::move(message)); + } }); } } @@ -72,14 +73,13 @@ namespace ix WebSocketMessageQueue::MessageDataPtr WebSocketMessageQueue::popMessage() { MessageDataPtr message; + std::lock_guard lock(_messagesMutex); - _messagesMutex.lock(); if (!_messages.empty()) { message = std::move(_messages.front()); _messages.pop_front(); } - _messagesMutex.unlock(); return message; } From 75011d0b4e41f9ee8b0c970f4cf4bcf34ae7f554 Mon Sep 17 00:00:00 2001 From: dimon4eg Date: Thu, 9 May 2019 00:23:16 +0300 Subject: [PATCH 6/7] simplify bindWebsocket --- ixwebsocket/IXWebSocketMessageQueue.cpp | 61 ++++++++++++------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp index a62757bf..222478d8 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -26,42 +26,41 @@ namespace ix void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket) { - if (_websocket != websocket) + if (_websocket == websocket) return; + + // unbind old + if (_websocket) { - // unbind old - if (_websocket) - { - _websocket->setOnMessageCallback(nullptr); - } + _websocket->setOnMessageCallback(nullptr); + } - _websocket = websocket; + _websocket = websocket; - // bind new - if (_websocket) + // bind new + if (_websocket) + { + _websocket->setOnMessageCallback([this]( + WebSocketMessageType type, + const std::string& str, + size_t wireSize, + const WebSocketErrorInfo& errorInfo, + const WebSocketOpenInfo& openInfo, + const WebSocketCloseInfo& closeInfo) { - _websocket->setOnMessageCallback([this]( - WebSocketMessageType type, - const std::string& str, - size_t wireSize, - const WebSocketErrorInfo& errorInfo, - const WebSocketOpenInfo& openInfo, - const WebSocketCloseInfo& closeInfo) + MessageDataPtr message(new Message()); + + message->type = type; + message->str = str; + message->wireSize = wireSize; + message->errorInfo = errorInfo; + message->openInfo = openInfo; + message->closeInfo = closeInfo; + { - MessageDataPtr message(new Message()); - - message->type = type; - message->str = str; - message->wireSize = wireSize; - message->errorInfo = errorInfo; - message->openInfo = openInfo; - message->closeInfo = closeInfo; - - { - std::lock_guard lock(_messagesMutex); - _messages.emplace_back(std::move(message)); - } - }); - } + std::lock_guard lock(_messagesMutex); + _messages.emplace_back(std::move(message)); + } + }); } } From a69408fa252470ff5a6c1ba2832e0621907db26d Mon Sep 17 00:00:00 2001 From: dimon4eg Date: Thu, 9 May 2019 01:05:47 +0300 Subject: [PATCH 7/7] rename ptr --- ixwebsocket/IXWebSocketMessageQueue.cpp | 8 ++++---- ixwebsocket/IXWebSocketMessageQueue.h | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ixwebsocket/IXWebSocketMessageQueue.cpp b/ixwebsocket/IXWebSocketMessageQueue.cpp index 222478d8..62e759bd 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.cpp +++ b/ixwebsocket/IXWebSocketMessageQueue.cpp @@ -47,7 +47,7 @@ namespace ix const WebSocketOpenInfo& openInfo, const WebSocketCloseInfo& closeInfo) { - MessageDataPtr message(new Message()); + MessagePtr message(new Message()); message->type = type; message->str = str; @@ -69,9 +69,9 @@ namespace ix _onMessageUserCallback = callback; } - WebSocketMessageQueue::MessageDataPtr WebSocketMessageQueue::popMessage() + WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage() { - MessageDataPtr message; + MessagePtr message; std::lock_guard lock(_messagesMutex); if (!_messages.empty()) @@ -88,7 +88,7 @@ namespace ix if (!_onMessageUserCallback) return; - MessageDataPtr message; + MessagePtr message; while (count > 0 && (message = popMessage())) { diff --git a/ixwebsocket/IXWebSocketMessageQueue.h b/ixwebsocket/IXWebSocketMessageQueue.h index 961f57e2..543189e9 100644 --- a/ixwebsocket/IXWebSocketMessageQueue.h +++ b/ixwebsocket/IXWebSocketMessageQueue.h @@ -39,14 +39,14 @@ namespace ix WebSocketCloseInfo closeInfo; }; - using MessageDataPtr = std::shared_ptr; + using MessagePtr = std::shared_ptr; - MessageDataPtr popMessage(); + MessagePtr popMessage(); private: WebSocket* _websocket = nullptr; OnMessageCallback _onMessageUserCallback; std::mutex _messagesMutex; - std::list _messages; + std::list _messages; }; }