From 50bea7dffa36b8c5667d158e897e8b1b069d2fa4 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Fri, 27 Dec 2019 18:24:45 -0800 Subject: [PATCH] (ws cobra to sentry) refactor queue related code into a class --- docs/CHANGELOG.md | 4 ++ ixwebsocket/IXWebSocketVersion.h | 2 +- ws/ws_cobra_to_sentry.cpp | 91 ++++++++++++++++++-------------- 3 files changed, 56 insertions(+), 41 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 02d51045..101a998b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [7.8.3] - 2019-12-27 + +(ws cobra to sentry) refactor queue related code into a class + ## [7.8.2] - 2019-12-25 (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index ad08ae3b..3c69901e 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "7.8.2" +#define IX_WEBSOCKET_VERSION "7.8.3" diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index db540663..474dfd0c 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -18,6 +18,46 @@ namespace ix { + class QueueManager + { + public: + QueueManager(size_t maxQueueSize, std::atomic &stop) : _maxQueueSize(maxQueueSize), _stop(stop) {} + + Json::Value pop(); + void add(Json::Value msg); + + private: + std::queue _queue; + std::mutex _mutex; + std::condition_variable _condition; + size_t _maxQueueSize; + std::atomic& _stop; + }; + + Json::Value QueueManager::pop() + { + std::unique_lock lock(_mutex); + + _condition.wait(lock, [this] { return !_queue.empty() && !_stop; }); + + auto msg = _queue.front(); + _queue.pop(); + return msg; + } + + void QueueManager::add(Json::Value msg) + { + std::unique_lock lock(_mutex); + + // if the sending is not fast enough there is no point + // in queuing too many events. + if (_queue.size() < _maxQueueSize) + { + _queue.push(msg); + _condition.notify_one(); + } + } + int ws_cobra_to_sentry_main(const std::string& appkey, const std::string& endpoint, const std::string& rolename, @@ -47,9 +87,7 @@ namespace ix std::atomic stop(false); std::atomic throttled(false); - std::condition_variable condition; - std::mutex conditionVariableMutex; - std::queue queue; + QueueManager queueManager(maxQueueSize, stop); auto timer = [&sentCount, &receivedCount] { while (true) @@ -63,9 +101,7 @@ namespace ix std::thread t1(timer); - auto sentrySender = [&condition, - &conditionVariableMutex, - &queue, + auto sentrySender = [&queueManager, verbose, &errorSending, &sentCount, @@ -76,15 +112,7 @@ namespace ix while (true) { - Json::Value msg; - - { - std::unique_lock lock(conditionVariableMutex); - condition.wait(lock, [&queue, &stop] { return !queue.empty() && !stop; }); - - msg = queue.front(); - queue.pop(); - } + Json::Value msg = queueManager.pop(); auto ret = sentryClient.send(msg, verbose); HttpResponsePtr response = ret.first; @@ -175,14 +203,12 @@ namespace ix verbose, &throttled, &receivedCount, - &condition, - &conditionVariableMutex, - &maxQueueSize, - &queue](ix::CobraConnectionEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { + &queueManager]( + ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { if (eventType == ix::CobraConnection_EventType_Open) { spdlog::info("Subscriber connected"); @@ -205,10 +231,7 @@ namespace ix verbose, &throttled, &receivedCount, - &condition, - &conditionVariableMutex, - &maxQueueSize, - &queue](const Json::Value& msg) { + &queueManager](const Json::Value& msg) { if (verbose) { spdlog::info(jsonWriter.write(msg)); @@ -217,23 +240,11 @@ namespace ix // If we cannot send to sentry fast enough, drop the message if (throttled) { - condition.notify_one(); return; } ++receivedCount; - - { - std::unique_lock lock(conditionVariableMutex); - // if the sending is not fast enough there is no point - // in queuing too many events. - if (queue.size() < maxQueueSize) - { - queue.push(msg); - } - } - - condition.notify_one(); + queueManager.add(msg); }); } else if (eventType == ix::CobraConnection_EventType_Subscribed)