From 1af39bf0ebda3b0275ae110faa8d89bb5e1770f2 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Tue, 12 May 2020 21:40:17 -0700 Subject: [PATCH] (ixbots) add options to limit how many messages per minute should be processed --- docs/CHANGELOG.md | 4 +++ ixbots/ixbots/IXCobraBot.cpp | 62 +++++++++++++++++++++----------- ixbots/ixbots/IXCobraBotConfig.h | 3 ++ ixwebsocket/IXWebSocketVersion.h | 2 +- test/IXCobraToSentryBotTest.cpp | 4 +-- test/IXCobraToStatsdBotTest.cpp | 8 ++--- test/IXCobraToStdoutBotTest.cpp | 4 +-- ws/ws.cpp | 26 +++++++------- 8 files changed, 65 insertions(+), 48 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5b634b75..27167206 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.6.0] - 2020-05-12 + +(ixbots) add options to limit how many messages per minute should be processed + ## [9.5.9] - 2020-05-12 (ixbots) add new class to configure a bot to simplify passing options around diff --git a/ixbots/ixbots/IXCobraBot.cpp b/ixbots/ixbots/IXCobraBot.cpp index 4992f20e..92d35072 100644 --- a/ixbots/ixbots/IXCobraBot.cpp +++ b/ixbots/ixbots/IXCobraBot.cpp @@ -26,6 +26,8 @@ namespace ix auto enableHeartbeat = botConfig.enableHeartbeat; auto heartBeatTimeout = botConfig.heartBeatTimeout; auto runtime = botConfig.runtime; + auto maxEventsPerMinute = botConfig.maxEventsPerMinute; + auto limitReceivedEvents = botConfig.limitReceivedEvents; ix::CobraConnection conn; conn.configure(config); @@ -37,9 +39,11 @@ namespace ix uint64_t receivedCountTotal(0); uint64_t sentCountPerSecs(0); uint64_t receivedCountPerSecs(0); + std::atomic receivedCountPerMinutes(0); std::atomic stop(false); std::atomic throttled(false); std::atomic fatalCobraError(false); + int minuteCounter = 0; auto timer = [&sentCount, &receivedCount, @@ -47,6 +51,8 @@ namespace ix &receivedCountTotal, &sentCountPerSecs, &receivedCountPerSecs, + &receivedCountPerMinutes, + &minuteCounter, &stop] { while (!stop) { @@ -67,13 +73,19 @@ namespace ix CoreLogger::info(ss.str()); receivedCountPerSecs = receivedCount - receivedCountTotal; - sentCountPerSecs = sentCount - receivedCountTotal; + sentCountPerSecs = sentCount - sentCountTotal; receivedCountTotal += receivedCountPerSecs; sentCountTotal += sentCountPerSecs; auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); + + if (minuteCounter++ == 60) + { + receivedCountPerMinutes = 0; + minuteCounter = 0; + } } CoreLogger::info("timer thread done"); @@ -120,6 +132,9 @@ namespace ix &subscriptionPosition, &throttled, &receivedCount, + &receivedCountPerMinutes, + maxEventsPerMinute, + limitReceivedEvents, &fatalCobraError, &sentCount](const CobraEventPtr& event) { if (event->type == ix::CobraEventType::Open) @@ -141,29 +156,34 @@ namespace ix CoreLogger::info("Subscribing to " + channel); CoreLogger::info("Subscribing at position " + subscriptionPosition); CoreLogger::info("Subscribing with filter " + filter); - conn.subscribe(channel, - filter, - subscriptionPosition, - [this, - &throttled, - &receivedCount, - &subscriptionPosition, - &fatalCobraError, - &sentCount](const Json::Value& msg, const std::string& position) { - subscriptionPosition = position; + conn.subscribe(channel, filter, subscriptionPosition, + [&sentCount, &receivedCountPerMinutes, + maxEventsPerMinute, limitReceivedEvents, + &throttled, &receivedCount, + &subscriptionPosition, &fatalCobraError, + this](const Json::Value& msg, const std::string& position) { + subscriptionPosition = position; + ++receivedCount; - // If we cannot send to sentry fast enough, drop the message - if (throttled) - { - return; - } + ++receivedCountPerMinutes; + if (limitReceivedEvents) + { + if (receivedCountPerMinutes > maxEventsPerMinute) + { + return; + } + } - ++receivedCount; + // If we cannot send to sentry fast enough, drop the message + if (throttled) + { + return; + } - _onBotMessageCallback( - msg, position, throttled, - fatalCobraError, sentCount); - }); + _onBotMessageCallback( + msg, position, throttled, + fatalCobraError, sentCount); + }); } else if (event->type == ix::CobraEventType::Subscribed) { diff --git a/ixbots/ixbots/IXCobraBotConfig.h b/ixbots/ixbots/IXCobraBotConfig.h index e8edb8e0..46d99dc8 100644 --- a/ixbots/ixbots/IXCobraBotConfig.h +++ b/ixbots/ixbots/IXCobraBotConfig.h @@ -7,6 +7,7 @@ #pragma once #include +#include #include namespace ix @@ -20,5 +21,7 @@ namespace ix bool enableHeartbeat = true; int heartBeatTimeout = 60; int runtime = -1; + int maxEventsPerMinute = std::numeric_limits::max(); + bool limitReceivedEvents = false; }; } // namespace ix diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index b289105a..f81819d9 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.5.9" +#define IX_WEBSOCKET_VERSION "9.6.0" diff --git a/test/IXCobraToSentryBotTest.cpp b/test/IXCobraToSentryBotTest.cpp index b5775efa..b61c06fe 100644 --- a/test/IXCobraToSentryBotTest.cpp +++ b/test/IXCobraToSentryBotTest.cpp @@ -158,9 +158,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]") SentryClient sentryClient(dsn); sentryClient.setTLSOptions(tlsOptionsClient); - int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, - sentryClient, - verbose); + int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose); // // We want at least 2 messages to be sent // diff --git a/test/IXCobraToStatsdBotTest.cpp b/test/IXCobraToStatsdBotTest.cpp index e6470dc8..ff436c08 100644 --- a/test/IXCobraToStatsdBotTest.cpp +++ b/test/IXCobraToStatsdBotTest.cpp @@ -112,12 +112,8 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]") std::string timer; bool verbose = true; - int64_t sentCount = ix::cobra_to_statsd_bot(cobraBotConfig, - statsdClient, - fields, - gauge, - timer, - verbose); + int64_t sentCount = + ix::cobra_to_statsd_bot(cobraBotConfig, statsdClient, fields, gauge, timer, verbose); // // We want at least 2 messages to be sent // diff --git a/test/IXCobraToStdoutBotTest.cpp b/test/IXCobraToStdoutBotTest.cpp index 79c07b62..1361d68c 100644 --- a/test/IXCobraToStdoutBotTest.cpp +++ b/test/IXCobraToStdoutBotTest.cpp @@ -95,9 +95,7 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]") // We could try to capture the output ... not sure how. bool fluentd = true; - int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, - fluentd, - quiet); + int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet); // // We want at least 2 messages to be sent // diff --git a/ws/ws.cpp b/ws/ws.cpp index 7ffd0650..c69f1c59 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -177,13 +177,18 @@ int main(int argc, char** argv) app->add_option("--appkey", cobraBotConfig.cobraConfig.appkey, "Appkey")->required(); app->add_option("--endpoint", cobraBotConfig.cobraConfig.endpoint, "Endpoint")->required(); app->add_option("--rolename", cobraBotConfig.cobraConfig.rolename, "Role name")->required(); - app->add_option("--rolesecret", cobraBotConfig.cobraConfig.rolesecret, "Role secret")->required(); + app->add_option("--rolesecret", cobraBotConfig.cobraConfig.rolesecret, "Role secret") + ->required(); app->add_option("--channel", cobraBotConfig.channel, "Channel")->required(); app->add_option("--filter", cobraBotConfig.filter, "Filter"); app->add_option("--position", cobraBotConfig.position, "Position"); app->add_option("--runtime", cobraBotConfig.runtime, "Runtime"); app->add_option("--heartbeat", cobraBotConfig.enableHeartbeat, "Runtime"); app->add_option("--heartbeat_timeout", cobraBotConfig.heartBeatTimeout, "Runtime"); + app->add_flag( + "--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute"); + app->add_option( + "--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute"); }; app.add_flag("--version", version, "Print ws version"); @@ -453,7 +458,8 @@ int main(int argc, char** argv) cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true); cobraConfig.socketTLSOptions = tlsOptions; - cobraBotConfig.cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true); + cobraBotConfig.cobraConfig.webSocketPerMessageDeflateOptions = + ix::WebSocketPerMessageDeflateOptions(true); cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions; int ret = 1; @@ -525,9 +531,7 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_subscribe")) { - int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, - fluentd, - quiet); + int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet); ret = (int) sentCount; } else if (app.got_subcommand("cobra_publish")) @@ -559,12 +563,8 @@ int main(int argc, char** argv) } else { - ret = (int) ix::cobra_to_statsd_bot(cobraBotConfig, - statsdClient, - fields, - gauge, - timer, - verbose); + ret = (int) ix::cobra_to_statsd_bot( + cobraBotConfig, statsdClient, fields, gauge, timer, verbose); } } } @@ -573,9 +573,7 @@ int main(int argc, char** argv) ix::SentryClient sentryClient(dsn); sentryClient.setTLSOptions(tlsOptions); - ret = (int) ix::cobra_to_sentry_bot(cobraBotConfig, - sentryClient, - verbose); + ret = (int) ix::cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose); } else if (app.got_subcommand("cobra_metrics_to_redis")) {