From c4e9abfe8034ffce5e86b116853c3a72883d293c Mon Sep 17 00:00:00 2001
From: Benjamin Sergeant <bsergean@gmail.com>
Date: Wed, 25 Dec 2019 22:15:57 -0800
Subject: [PATCH] (ws cobra to sentry) bound the queue size used to hold up
 cobra messages before they are sent to sentry. Default queue size is a 100
 messages. Without such limit the program runs out of memory when a subscriber
 receive a lot of messages that cannot make it to sentry

---
 docs/CHANGELOG.md                |  4 ++++
 ixwebsocket/IXWebSocketVersion.h |  2 +-
 ws/ws.cpp                        |  3 +++
 ws/ws.h                          |  1 +
 ws/ws_cobra_to_sentry.cpp        | 10 +++++++++-
 5 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 8b5c99ef..02d51045 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -1,6 +1,10 @@
 # Changelog
 All changes to this project will be documented in this file.
 
+## [7.8.2] - 2019-12-25
+
+(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry
+
 ## [7.8.1] - 2019-12-25
 
 (ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed)
diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h
index 556d2e20..ad08ae3b 100644
--- a/ixwebsocket/IXWebSocketVersion.h
+++ b/ixwebsocket/IXWebSocketVersion.h
@@ -6,4 +6,4 @@
 
 #pragma once
 
-#define IX_WEBSOCKET_VERSION "7.7.1"
+#define IX_WEBSOCKET_VERSION "7.8.2"
diff --git a/ws/ws.cpp b/ws/ws.cpp
index 9bf80194..79935b77 100644
--- a/ws/ws.cpp
+++ b/ws/ws.cpp
@@ -105,6 +105,7 @@ int main(int argc, char** argv)
     int count = 1;
     int jobs = 4;
     uint32_t maxWaitBetweenReconnectionRetries;
+    size_t maxQueueSize = 100;
 
     auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
         app->add_option(
@@ -268,6 +269,7 @@ int main(int argc, char** argv)
     cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
     cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
     cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
+    cobra2sentry->add_option("--queue_size", maxQueueSize, "Size of the queue to hold messages before they are sent to Sentry");
     cobra2sentry->add_option("channel", channel, "Channel")->required();
     cobra2sentry->add_flag("-v", verbose, "Verbose");
     cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
@@ -455,6 +457,7 @@ int main(int argc, char** argv)
                                           verbose,
                                           strict,
                                           jobs,
+                                          maxQueueSize,
                                           tlsOptions);
     }
     else if (app.got_subcommand("cobra_metrics_to_redis"))
diff --git a/ws/ws.h b/ws/ws.h
index 3e44a2fa..5d859139 100644
--- a/ws/ws.h
+++ b/ws/ws.h
@@ -119,6 +119,7 @@ namespace ix
                                 bool verbose,
                                 bool strict,
                                 int jobs,
+                                size_t maxQueueSize,
                                 const ix::SocketTLSOptions& tlsOptions);
 
     int ws_cobra_metrics_to_redis(const std::string& appkey,
diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp
index 18d7c667..db540663 100644
--- a/ws/ws_cobra_to_sentry.cpp
+++ b/ws/ws_cobra_to_sentry.cpp
@@ -28,6 +28,7 @@ namespace ix
                                 bool verbose,
                                 bool strict,
                                 int jobs,
+                                size_t maxQueueSize,
                                 const ix::SocketTLSOptions& tlsOptions)
     {
         ix::CobraConnection conn;
@@ -176,6 +177,7 @@ namespace ix
                                &receivedCount,
                                &condition,
                                &conditionVariableMutex,
+                               &maxQueueSize,
                                &queue](ix::CobraConnectionEventType eventType,
                                        const std::string& errMsg,
                                        const ix::WebSocketHttpHeaders& headers,
@@ -205,6 +207,7 @@ namespace ix
                                 &receivedCount,
                                 &condition,
                                 &conditionVariableMutex,
+                                &maxQueueSize,
                                 &queue](const Json::Value& msg) {
                                    if (verbose)
                                    {
@@ -222,7 +225,12 @@ namespace ix
 
                                    {
                                        std::unique_lock<std::mutex> lock(conditionVariableMutex);
-                                       queue.push(msg);
+                                       // if the sending is not fast enough there is no point
+                                       // in queuing too many events.
+                                       if (queue.size() < maxQueueSize)
+                                       {
+                                           queue.push(msg);
+                                       }
                                    }
 
                                    condition.notify_one();