(ixbots) add options to limit how many messages per minute should be processed

This commit is contained in:
Benjamin Sergeant 2020-05-12 21:40:17 -07:00
parent 2e904801a0
commit 1af39bf0eb
8 changed files with 65 additions and 48 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.6.0] - 2020-05-12
(ixbots) add options to limit how many messages per minute should be processed
## [9.5.9] - 2020-05-12 ## [9.5.9] - 2020-05-12
(ixbots) add new class to configure a bot to simplify passing options around (ixbots) add new class to configure a bot to simplify passing options around

View File

@ -26,6 +26,8 @@ namespace ix
auto enableHeartbeat = botConfig.enableHeartbeat; auto enableHeartbeat = botConfig.enableHeartbeat;
auto heartBeatTimeout = botConfig.heartBeatTimeout; auto heartBeatTimeout = botConfig.heartBeatTimeout;
auto runtime = botConfig.runtime; auto runtime = botConfig.runtime;
auto maxEventsPerMinute = botConfig.maxEventsPerMinute;
auto limitReceivedEvents = botConfig.limitReceivedEvents;
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure(config); conn.configure(config);
@ -37,9 +39,11 @@ namespace ix
uint64_t receivedCountTotal(0); uint64_t receivedCountTotal(0);
uint64_t sentCountPerSecs(0); uint64_t sentCountPerSecs(0);
uint64_t receivedCountPerSecs(0); uint64_t receivedCountPerSecs(0);
std::atomic<int> receivedCountPerMinutes(0);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false); std::atomic<bool> fatalCobraError(false);
int minuteCounter = 0;
auto timer = [&sentCount, auto timer = [&sentCount,
&receivedCount, &receivedCount,
@ -47,6 +51,8 @@ namespace ix
&receivedCountTotal, &receivedCountTotal,
&sentCountPerSecs, &sentCountPerSecs,
&receivedCountPerSecs, &receivedCountPerSecs,
&receivedCountPerMinutes,
&minuteCounter,
&stop] { &stop] {
while (!stop) while (!stop)
{ {
@ -67,13 +73,19 @@ namespace ix
CoreLogger::info(ss.str()); CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal; receivedCountPerSecs = receivedCount - receivedCountTotal;
sentCountPerSecs = sentCount - receivedCountTotal; sentCountPerSecs = sentCount - sentCountTotal;
receivedCountTotal += receivedCountPerSecs; receivedCountTotal += receivedCountPerSecs;
sentCountTotal += sentCountPerSecs; sentCountTotal += sentCountPerSecs;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (minuteCounter++ == 60)
{
receivedCountPerMinutes = 0;
minuteCounter = 0;
}
} }
CoreLogger::info("timer thread done"); CoreLogger::info("timer thread done");
@ -120,6 +132,9 @@ namespace ix
&subscriptionPosition, &subscriptionPosition,
&throttled, &throttled,
&receivedCount, &receivedCount,
&receivedCountPerMinutes,
maxEventsPerMinute,
limitReceivedEvents,
&fatalCobraError, &fatalCobraError,
&sentCount](const CobraEventPtr& event) { &sentCount](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open) if (event->type == ix::CobraEventType::Open)
@ -141,29 +156,34 @@ namespace ix
CoreLogger::info("Subscribing to " + channel); CoreLogger::info("Subscribing to " + channel);
CoreLogger::info("Subscribing at position " + subscriptionPosition); CoreLogger::info("Subscribing at position " + subscriptionPosition);
CoreLogger::info("Subscribing with filter " + filter); CoreLogger::info("Subscribing with filter " + filter);
conn.subscribe(channel, conn.subscribe(channel, filter, subscriptionPosition,
filter, [&sentCount, &receivedCountPerMinutes,
subscriptionPosition, maxEventsPerMinute, limitReceivedEvents,
[this, &throttled, &receivedCount,
&throttled, &subscriptionPosition, &fatalCobraError,
&receivedCount, this](const Json::Value& msg, const std::string& position) {
&subscriptionPosition, subscriptionPosition = position;
&fatalCobraError, ++receivedCount;
&sentCount](const Json::Value& msg, const std::string& position) {
subscriptionPosition = position;
// If we cannot send to sentry fast enough, drop the message ++receivedCountPerMinutes;
if (throttled) if (limitReceivedEvents)
{ {
return; if (receivedCountPerMinutes > maxEventsPerMinute)
} {
return;
}
}
++receivedCount; // If we cannot send to sentry fast enough, drop the message
if (throttled)
{
return;
}
_onBotMessageCallback( _onBotMessageCallback(
msg, position, throttled, msg, position, throttled,
fatalCobraError, sentCount); fatalCobraError, sentCount);
}); });
} }
else if (event->type == ix::CobraEventType::Subscribed) else if (event->type == ix::CobraEventType::Subscribed)
{ {

View File

@ -7,6 +7,7 @@
#pragma once #pragma once
#include <string> #include <string>
#include <limits>
#include <ixcobra/IXCobraConfig.h> #include <ixcobra/IXCobraConfig.h>
namespace ix namespace ix
@ -20,5 +21,7 @@ namespace ix
bool enableHeartbeat = true; bool enableHeartbeat = true;
int heartBeatTimeout = 60; int heartBeatTimeout = 60;
int runtime = -1; int runtime = -1;
int maxEventsPerMinute = std::numeric_limits<int>::max();
bool limitReceivedEvents = false;
}; };
} // namespace ix } // namespace ix

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.5.9" #define IX_WEBSOCKET_VERSION "9.6.0"

View File

@ -158,9 +158,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
SentryClient sentryClient(dsn); SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptionsClient); sentryClient.setTLSOptions(tlsOptionsClient);
int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
sentryClient,
verbose);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -112,12 +112,8 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
std::string timer; std::string timer;
bool verbose = true; bool verbose = true;
int64_t sentCount = ix::cobra_to_statsd_bot(cobraBotConfig, int64_t sentCount =
statsdClient, ix::cobra_to_statsd_bot(cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
fields,
gauge,
timer,
verbose);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -95,9 +95,7 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
// We could try to capture the output ... not sure how. // We could try to capture the output ... not sure how.
bool fluentd = true; bool fluentd = true;
int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
fluentd,
quiet);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -177,13 +177,18 @@ int main(int argc, char** argv)
app->add_option("--appkey", cobraBotConfig.cobraConfig.appkey, "Appkey")->required(); app->add_option("--appkey", cobraBotConfig.cobraConfig.appkey, "Appkey")->required();
app->add_option("--endpoint", cobraBotConfig.cobraConfig.endpoint, "Endpoint")->required(); app->add_option("--endpoint", cobraBotConfig.cobraConfig.endpoint, "Endpoint")->required();
app->add_option("--rolename", cobraBotConfig.cobraConfig.rolename, "Role name")->required(); app->add_option("--rolename", cobraBotConfig.cobraConfig.rolename, "Role name")->required();
app->add_option("--rolesecret", cobraBotConfig.cobraConfig.rolesecret, "Role secret")->required(); app->add_option("--rolesecret", cobraBotConfig.cobraConfig.rolesecret, "Role secret")
->required();
app->add_option("--channel", cobraBotConfig.channel, "Channel")->required(); app->add_option("--channel", cobraBotConfig.channel, "Channel")->required();
app->add_option("--filter", cobraBotConfig.filter, "Filter"); app->add_option("--filter", cobraBotConfig.filter, "Filter");
app->add_option("--position", cobraBotConfig.position, "Position"); app->add_option("--position", cobraBotConfig.position, "Position");
app->add_option("--runtime", cobraBotConfig.runtime, "Runtime"); app->add_option("--runtime", cobraBotConfig.runtime, "Runtime");
app->add_option("--heartbeat", cobraBotConfig.enableHeartbeat, "Runtime"); app->add_option("--heartbeat", cobraBotConfig.enableHeartbeat, "Runtime");
app->add_option("--heartbeat_timeout", cobraBotConfig.heartBeatTimeout, "Runtime"); app->add_option("--heartbeat_timeout", cobraBotConfig.heartBeatTimeout, "Runtime");
app->add_flag(
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
app->add_option(
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
}; };
app.add_flag("--version", version, "Print ws version"); app.add_flag("--version", version, "Print ws version");
@ -453,7 +458,8 @@ int main(int argc, char** argv)
cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true); cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
cobraConfig.socketTLSOptions = tlsOptions; cobraConfig.socketTLSOptions = tlsOptions;
cobraBotConfig.cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true); cobraBotConfig.cobraConfig.webSocketPerMessageDeflateOptions =
ix::WebSocketPerMessageDeflateOptions(true);
cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions; cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions;
int ret = 1; int ret = 1;
@ -525,9 +531,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("cobra_subscribe")) else if (app.got_subcommand("cobra_subscribe"))
{ {
int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
fluentd,
quiet);
ret = (int) sentCount; ret = (int) sentCount;
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
@ -559,12 +563,8 @@ int main(int argc, char** argv)
} }
else else
{ {
ret = (int) ix::cobra_to_statsd_bot(cobraBotConfig, ret = (int) ix::cobra_to_statsd_bot(
statsdClient, cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
fields,
gauge,
timer,
verbose);
} }
} }
} }
@ -573,9 +573,7 @@ int main(int argc, char** argv)
ix::SentryClient sentryClient(dsn); ix::SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptions); sentryClient.setTLSOptions(tlsOptions);
ret = (int) ix::cobra_to_sentry_bot(cobraBotConfig, ret = (int) ix::cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
sentryClient,
verbose);
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))
{ {