(ws cobra to sentry) refactor queue related code into a class

This commit is contained in:
Benjamin Sergeant 2019-12-27 18:24:45 -08:00
parent c4e9abfe80
commit 50bea7dffa
3 changed files with 56 additions and 41 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [7.8.3] - 2019-12-27
(ws cobra to sentry) refactor queue related code into a class
## [7.8.2] - 2019-12-25 ## [7.8.2] - 2019-12-25
(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.8.2" #define IX_WEBSOCKET_VERSION "7.8.3"

View File

@ -18,6 +18,46 @@
namespace ix namespace ix
{ {
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool> &stop) : _maxQueueSize(maxQueueSize), _stop(stop) {}
Json::Value pop();
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this] { return !_queue.empty() && !_stop; });
auto msg = _queue.front();
_queue.pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
}
int ws_cobra_to_sentry_main(const std::string& appkey, int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
@ -47,9 +87,7 @@ namespace ix
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::condition_variable condition; QueueManager queueManager(maxQueueSize, stop);
std::mutex conditionVariableMutex;
std::queue<Json::Value> queue;
auto timer = [&sentCount, &receivedCount] { auto timer = [&sentCount, &receivedCount] {
while (true) while (true)
@ -63,9 +101,7 @@ namespace ix
std::thread t1(timer); std::thread t1(timer);
auto sentrySender = [&condition, auto sentrySender = [&queueManager,
&conditionVariableMutex,
&queue,
verbose, verbose,
&errorSending, &errorSending,
&sentCount, &sentCount,
@ -76,15 +112,7 @@ namespace ix
while (true) while (true)
{ {
Json::Value msg; Json::Value msg = queueManager.pop();
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
condition.wait(lock, [&queue, &stop] { return !queue.empty() && !stop; });
msg = queue.front();
queue.pop();
}
auto ret = sentryClient.send(msg, verbose); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first; HttpResponsePtr response = ret.first;
@ -175,10 +203,8 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&condition, &queueManager](
&conditionVariableMutex, ix::CobraConnectionEventType eventType,
&maxQueueSize,
&queue](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId, const std::string& subscriptionId,
@ -205,10 +231,7 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&condition, &queueManager](const Json::Value& msg) {
&conditionVariableMutex,
&maxQueueSize,
&queue](const Json::Value& msg) {
if (verbose) if (verbose)
{ {
spdlog::info(jsonWriter.write(msg)); spdlog::info(jsonWriter.write(msg));
@ -217,23 +240,11 @@ namespace ix
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
if (throttled) if (throttled)
{ {
condition.notify_one();
return; return;
} }
++receivedCount; ++receivedCount;
queueManager.add(msg);
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (queue.size() < maxQueueSize)
{
queue.push(msg);
}
}
condition.notify_one();
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)