(ixbots) add options to limit how many messages per minute should be processed
This commit is contained in:
		@@ -1,6 +1,10 @@
 | 
				
			|||||||
# Changelog
 | 
					# Changelog
 | 
				
			||||||
All changes to this project will be documented in this file.
 | 
					All changes to this project will be documented in this file.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## [9.6.0] - 2020-05-12
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(ixbots) add options to limit how many messages per minute should be processed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## [9.5.9] - 2020-05-12
 | 
					## [9.5.9] - 2020-05-12
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(ixbots) add new class to configure a bot to simplify passing options around
 | 
					(ixbots) add new class to configure a bot to simplify passing options around
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -26,6 +26,8 @@ namespace ix
 | 
				
			|||||||
        auto enableHeartbeat = botConfig.enableHeartbeat;
 | 
					        auto enableHeartbeat = botConfig.enableHeartbeat;
 | 
				
			||||||
        auto heartBeatTimeout = botConfig.heartBeatTimeout;
 | 
					        auto heartBeatTimeout = botConfig.heartBeatTimeout;
 | 
				
			||||||
        auto runtime = botConfig.runtime;
 | 
					        auto runtime = botConfig.runtime;
 | 
				
			||||||
 | 
					        auto maxEventsPerMinute = botConfig.maxEventsPerMinute;
 | 
				
			||||||
 | 
					        auto limitReceivedEvents = botConfig.limitReceivedEvents;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ix::CobraConnection conn;
 | 
					        ix::CobraConnection conn;
 | 
				
			||||||
        conn.configure(config);
 | 
					        conn.configure(config);
 | 
				
			||||||
@@ -37,9 +39,11 @@ namespace ix
 | 
				
			|||||||
        uint64_t receivedCountTotal(0);
 | 
					        uint64_t receivedCountTotal(0);
 | 
				
			||||||
        uint64_t sentCountPerSecs(0);
 | 
					        uint64_t sentCountPerSecs(0);
 | 
				
			||||||
        uint64_t receivedCountPerSecs(0);
 | 
					        uint64_t receivedCountPerSecs(0);
 | 
				
			||||||
 | 
					        std::atomic<int> receivedCountPerMinutes(0);
 | 
				
			||||||
        std::atomic<bool> stop(false);
 | 
					        std::atomic<bool> stop(false);
 | 
				
			||||||
        std::atomic<bool> throttled(false);
 | 
					        std::atomic<bool> throttled(false);
 | 
				
			||||||
        std::atomic<bool> fatalCobraError(false);
 | 
					        std::atomic<bool> fatalCobraError(false);
 | 
				
			||||||
 | 
					        int minuteCounter = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        auto timer = [&sentCount,
 | 
					        auto timer = [&sentCount,
 | 
				
			||||||
                      &receivedCount,
 | 
					                      &receivedCount,
 | 
				
			||||||
@@ -47,6 +51,8 @@ namespace ix
 | 
				
			|||||||
                      &receivedCountTotal,
 | 
					                      &receivedCountTotal,
 | 
				
			||||||
                      &sentCountPerSecs,
 | 
					                      &sentCountPerSecs,
 | 
				
			||||||
                      &receivedCountPerSecs,
 | 
					                      &receivedCountPerSecs,
 | 
				
			||||||
 | 
					                      &receivedCountPerMinutes,
 | 
				
			||||||
 | 
					                      &minuteCounter,
 | 
				
			||||||
                      &stop] {
 | 
					                      &stop] {
 | 
				
			||||||
            while (!stop)
 | 
					            while (!stop)
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
@@ -67,13 +73,19 @@ namespace ix
 | 
				
			|||||||
                CoreLogger::info(ss.str());
 | 
					                CoreLogger::info(ss.str());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                receivedCountPerSecs = receivedCount - receivedCountTotal;
 | 
					                receivedCountPerSecs = receivedCount - receivedCountTotal;
 | 
				
			||||||
                sentCountPerSecs = sentCount - receivedCountTotal;
 | 
					                sentCountPerSecs = sentCount - sentCountTotal;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                receivedCountTotal += receivedCountPerSecs;
 | 
					                receivedCountTotal += receivedCountPerSecs;
 | 
				
			||||||
                sentCountTotal += sentCountPerSecs;
 | 
					                sentCountTotal += sentCountPerSecs;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                auto duration = std::chrono::seconds(1);
 | 
					                auto duration = std::chrono::seconds(1);
 | 
				
			||||||
                std::this_thread::sleep_for(duration);
 | 
					                std::this_thread::sleep_for(duration);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if (minuteCounter++ == 60)
 | 
				
			||||||
 | 
					                {
 | 
				
			||||||
 | 
					                    receivedCountPerMinutes = 0;
 | 
				
			||||||
 | 
					                    minuteCounter = 0;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            CoreLogger::info("timer thread done");
 | 
					            CoreLogger::info("timer thread done");
 | 
				
			||||||
@@ -120,6 +132,9 @@ namespace ix
 | 
				
			|||||||
                               &subscriptionPosition,
 | 
					                               &subscriptionPosition,
 | 
				
			||||||
                               &throttled,
 | 
					                               &throttled,
 | 
				
			||||||
                               &receivedCount,
 | 
					                               &receivedCount,
 | 
				
			||||||
 | 
					                               &receivedCountPerMinutes,
 | 
				
			||||||
 | 
					                               maxEventsPerMinute,
 | 
				
			||||||
 | 
					                               limitReceivedEvents,
 | 
				
			||||||
                               &fatalCobraError,
 | 
					                               &fatalCobraError,
 | 
				
			||||||
                               &sentCount](const CobraEventPtr& event) {
 | 
					                               &sentCount](const CobraEventPtr& event) {
 | 
				
			||||||
            if (event->type == ix::CobraEventType::Open)
 | 
					            if (event->type == ix::CobraEventType::Open)
 | 
				
			||||||
@@ -141,16 +156,23 @@ namespace ix
 | 
				
			|||||||
                CoreLogger::info("Subscribing to " + channel);
 | 
					                CoreLogger::info("Subscribing to " + channel);
 | 
				
			||||||
                CoreLogger::info("Subscribing at position " + subscriptionPosition);
 | 
					                CoreLogger::info("Subscribing at position " + subscriptionPosition);
 | 
				
			||||||
                CoreLogger::info("Subscribing with filter " + filter);
 | 
					                CoreLogger::info("Subscribing with filter " + filter);
 | 
				
			||||||
                conn.subscribe(channel,
 | 
					                conn.subscribe(channel, filter, subscriptionPosition,
 | 
				
			||||||
                               filter,
 | 
					                    [&sentCount, &receivedCountPerMinutes,
 | 
				
			||||||
                               subscriptionPosition,
 | 
					                     maxEventsPerMinute, limitReceivedEvents,
 | 
				
			||||||
                               [this,
 | 
					                     &throttled, &receivedCount,
 | 
				
			||||||
                                &throttled,
 | 
					                     &subscriptionPosition, &fatalCobraError,
 | 
				
			||||||
                                &receivedCount,
 | 
					                     this](const Json::Value& msg, const std::string& position) {
 | 
				
			||||||
                                &subscriptionPosition,
 | 
					 | 
				
			||||||
                                &fatalCobraError,
 | 
					 | 
				
			||||||
                                &sentCount](const Json::Value& msg, const std::string& position) {
 | 
					 | 
				
			||||||
                        subscriptionPosition = position;
 | 
					                        subscriptionPosition = position;
 | 
				
			||||||
 | 
					                        ++receivedCount;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        ++receivedCountPerMinutes;
 | 
				
			||||||
 | 
					                        if (limitReceivedEvents)
 | 
				
			||||||
 | 
					                        {
 | 
				
			||||||
 | 
					                            if (receivedCountPerMinutes > maxEventsPerMinute)
 | 
				
			||||||
 | 
					                            {
 | 
				
			||||||
 | 
					                                return;
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        // If we cannot send to sentry fast enough, drop the message
 | 
					                        // If we cannot send to sentry fast enough, drop the message
 | 
				
			||||||
                        if (throttled)
 | 
					                        if (throttled)
 | 
				
			||||||
@@ -158,8 +180,6 @@ namespace ix
 | 
				
			|||||||
                            return;
 | 
					                            return;
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                   ++receivedCount;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        _onBotMessageCallback(
 | 
					                        _onBotMessageCallback(
 | 
				
			||||||
                            msg, position, throttled,
 | 
					                            msg, position, throttled,
 | 
				
			||||||
                            fatalCobraError, sentCount);
 | 
					                            fatalCobraError, sentCount);
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -7,6 +7,7 @@
 | 
				
			|||||||
#pragma once
 | 
					#pragma once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include <string>
 | 
					#include <string>
 | 
				
			||||||
 | 
					#include <limits>
 | 
				
			||||||
#include <ixcobra/IXCobraConfig.h>
 | 
					#include <ixcobra/IXCobraConfig.h>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
namespace ix
 | 
					namespace ix
 | 
				
			||||||
@@ -20,5 +21,7 @@ namespace ix
 | 
				
			|||||||
        bool enableHeartbeat = true;
 | 
					        bool enableHeartbeat = true;
 | 
				
			||||||
        int heartBeatTimeout = 60;
 | 
					        int heartBeatTimeout = 60;
 | 
				
			||||||
        int runtime = -1;
 | 
					        int runtime = -1;
 | 
				
			||||||
 | 
					        int maxEventsPerMinute = std::numeric_limits<int>::max();
 | 
				
			||||||
 | 
					        bool limitReceivedEvents = false;
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
} // namespace ix
 | 
					} // namespace ix
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,4 +6,4 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
#pragma once
 | 
					#pragma once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define IX_WEBSOCKET_VERSION "9.5.9"
 | 
					#define IX_WEBSOCKET_VERSION "9.6.0"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -158,9 +158,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
 | 
				
			|||||||
        SentryClient sentryClient(dsn);
 | 
					        SentryClient sentryClient(dsn);
 | 
				
			||||||
        sentryClient.setTLSOptions(tlsOptionsClient);
 | 
					        sentryClient.setTLSOptions(tlsOptionsClient);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig,
 | 
					        int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
 | 
				
			||||||
                                                sentryClient,
 | 
					 | 
				
			||||||
                                                verbose);
 | 
					 | 
				
			||||||
        //
 | 
					        //
 | 
				
			||||||
        // We want at least 2 messages to be sent
 | 
					        // We want at least 2 messages to be sent
 | 
				
			||||||
        //
 | 
					        //
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -112,12 +112,8 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
 | 
				
			|||||||
        std::string timer;
 | 
					        std::string timer;
 | 
				
			||||||
        bool verbose = true;
 | 
					        bool verbose = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        int64_t sentCount = ix::cobra_to_statsd_bot(cobraBotConfig,
 | 
					        int64_t sentCount =
 | 
				
			||||||
                                                    statsdClient,
 | 
					            ix::cobra_to_statsd_bot(cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
 | 
				
			||||||
                                                    fields,
 | 
					 | 
				
			||||||
                                                    gauge,
 | 
					 | 
				
			||||||
                                                    timer,
 | 
					 | 
				
			||||||
                                                    verbose);
 | 
					 | 
				
			||||||
        //
 | 
					        //
 | 
				
			||||||
        // We want at least 2 messages to be sent
 | 
					        // We want at least 2 messages to be sent
 | 
				
			||||||
        //
 | 
					        //
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -95,9 +95,7 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
 | 
				
			|||||||
        // We could try to capture the output ... not sure how.
 | 
					        // We could try to capture the output ... not sure how.
 | 
				
			||||||
        bool fluentd = true;
 | 
					        bool fluentd = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig,
 | 
					        int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
 | 
				
			||||||
                                                    fluentd,
 | 
					 | 
				
			||||||
                                                    quiet);
 | 
					 | 
				
			||||||
        //
 | 
					        //
 | 
				
			||||||
        // We want at least 2 messages to be sent
 | 
					        // We want at least 2 messages to be sent
 | 
				
			||||||
        //
 | 
					        //
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										26
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -177,13 +177,18 @@ int main(int argc, char** argv)
 | 
				
			|||||||
        app->add_option("--appkey", cobraBotConfig.cobraConfig.appkey, "Appkey")->required();
 | 
					        app->add_option("--appkey", cobraBotConfig.cobraConfig.appkey, "Appkey")->required();
 | 
				
			||||||
        app->add_option("--endpoint", cobraBotConfig.cobraConfig.endpoint, "Endpoint")->required();
 | 
					        app->add_option("--endpoint", cobraBotConfig.cobraConfig.endpoint, "Endpoint")->required();
 | 
				
			||||||
        app->add_option("--rolename", cobraBotConfig.cobraConfig.rolename, "Role name")->required();
 | 
					        app->add_option("--rolename", cobraBotConfig.cobraConfig.rolename, "Role name")->required();
 | 
				
			||||||
        app->add_option("--rolesecret", cobraBotConfig.cobraConfig.rolesecret, "Role secret")->required();
 | 
					        app->add_option("--rolesecret", cobraBotConfig.cobraConfig.rolesecret, "Role secret")
 | 
				
			||||||
 | 
					            ->required();
 | 
				
			||||||
        app->add_option("--channel", cobraBotConfig.channel, "Channel")->required();
 | 
					        app->add_option("--channel", cobraBotConfig.channel, "Channel")->required();
 | 
				
			||||||
        app->add_option("--filter", cobraBotConfig.filter, "Filter");
 | 
					        app->add_option("--filter", cobraBotConfig.filter, "Filter");
 | 
				
			||||||
        app->add_option("--position", cobraBotConfig.position, "Position");
 | 
					        app->add_option("--position", cobraBotConfig.position, "Position");
 | 
				
			||||||
        app->add_option("--runtime", cobraBotConfig.runtime, "Runtime");
 | 
					        app->add_option("--runtime", cobraBotConfig.runtime, "Runtime");
 | 
				
			||||||
        app->add_option("--heartbeat", cobraBotConfig.enableHeartbeat, "Runtime");
 | 
					        app->add_option("--heartbeat", cobraBotConfig.enableHeartbeat, "Runtime");
 | 
				
			||||||
        app->add_option("--heartbeat_timeout", cobraBotConfig.heartBeatTimeout, "Runtime");
 | 
					        app->add_option("--heartbeat_timeout", cobraBotConfig.heartBeatTimeout, "Runtime");
 | 
				
			||||||
 | 
					        app->add_flag(
 | 
				
			||||||
 | 
					            "--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
 | 
				
			||||||
 | 
					        app->add_option(
 | 
				
			||||||
 | 
					            "--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    app.add_flag("--version", version, "Print ws version");
 | 
					    app.add_flag("--version", version, "Print ws version");
 | 
				
			||||||
@@ -453,7 +458,8 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
 | 
					    cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
 | 
				
			||||||
    cobraConfig.socketTLSOptions = tlsOptions;
 | 
					    cobraConfig.socketTLSOptions = tlsOptions;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    cobraBotConfig.cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
 | 
					    cobraBotConfig.cobraConfig.webSocketPerMessageDeflateOptions =
 | 
				
			||||||
 | 
					        ix::WebSocketPerMessageDeflateOptions(true);
 | 
				
			||||||
    cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions;
 | 
					    cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    int ret = 1;
 | 
					    int ret = 1;
 | 
				
			||||||
@@ -525,9 +531,7 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
    else if (app.got_subcommand("cobra_subscribe"))
 | 
					    else if (app.got_subcommand("cobra_subscribe"))
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig,
 | 
					        int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
 | 
				
			||||||
                                                    fluentd,
 | 
					 | 
				
			||||||
                                                    quiet);
 | 
					 | 
				
			||||||
        ret = (int) sentCount;
 | 
					        ret = (int) sentCount;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else if (app.got_subcommand("cobra_publish"))
 | 
					    else if (app.got_subcommand("cobra_publish"))
 | 
				
			||||||
@@ -559,12 +563,8 @@ int main(int argc, char** argv)
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
            else
 | 
					            else
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                ret = (int) ix::cobra_to_statsd_bot(cobraBotConfig,
 | 
					                ret = (int) ix::cobra_to_statsd_bot(
 | 
				
			||||||
                                                    statsdClient,
 | 
					                    cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
 | 
				
			||||||
                                                    fields,
 | 
					 | 
				
			||||||
                                                    gauge,
 | 
					 | 
				
			||||||
                                                    timer,
 | 
					 | 
				
			||||||
                                                    verbose);
 | 
					 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -573,9 +573,7 @@ int main(int argc, char** argv)
 | 
				
			|||||||
        ix::SentryClient sentryClient(dsn);
 | 
					        ix::SentryClient sentryClient(dsn);
 | 
				
			||||||
        sentryClient.setTLSOptions(tlsOptions);
 | 
					        sentryClient.setTLSOptions(tlsOptions);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ret = (int) ix::cobra_to_sentry_bot(cobraBotConfig,
 | 
					        ret = (int) ix::cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
 | 
				
			||||||
                                            sentryClient,
 | 
					 | 
				
			||||||
                                            verbose);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else if (app.got_subcommand("cobra_metrics_to_redis"))
 | 
					    else if (app.got_subcommand("cobra_metrics_to_redis"))
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user