(cobra bots) add a --heartbeat_timeout option to specify when the bot should terminate because no events are received
This commit is contained in:
		@@ -1,6 +1,10 @@
 | 
			
		||||
# Changelog
 | 
			
		||||
All changes to this project will be documented in this file.
 | 
			
		||||
 | 
			
		||||
## [9.5.6] - 2020-05-06
 | 
			
		||||
 | 
			
		||||
(cobra bots) add a --heartbeat_timeout option to specify when the bot should terminate because no events are received
 | 
			
		||||
 | 
			
		||||
## [9.5.5] - 2020-05-06
 | 
			
		||||
 | 
			
		||||
(openssl tls) when OpenSSL is older than 1.1, register the crypto locking callback to be thread safe. Should fix lots of CI failures
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@ namespace ix
 | 
			
		||||
                          const std::string& filter,
 | 
			
		||||
                          const std::string& position,
 | 
			
		||||
                          bool enableHeartbeat,
 | 
			
		||||
                          int heartBeatTimeout,
 | 
			
		||||
                          int runtime)
 | 
			
		||||
    {
 | 
			
		||||
        ix::CobraConnection conn;
 | 
			
		||||
@@ -78,7 +79,7 @@ namespace ix
 | 
			
		||||
 | 
			
		||||
        std::thread t1(timer);
 | 
			
		||||
 | 
			
		||||
        auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
 | 
			
		||||
        auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat, &heartBeatTimeout] {
 | 
			
		||||
            std::string state("na");
 | 
			
		||||
 | 
			
		||||
            if (!enableHeartbeat) return;
 | 
			
		||||
@@ -94,11 +95,12 @@ namespace ix
 | 
			
		||||
                if (currentState == state)
 | 
			
		||||
                {
 | 
			
		||||
                    CoreLogger::error("no messages received or sent for 1 minute, exiting");
 | 
			
		||||
                    exit(1);
 | 
			
		||||
                    fatalCobraError = true;
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
                state = currentState;
 | 
			
		||||
 | 
			
		||||
                auto duration = std::chrono::minutes(1);
 | 
			
		||||
                auto duration = std::chrono::seconds(heartBeatTimeout);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -30,6 +30,7 @@ namespace ix
 | 
			
		||||
                    const std::string& filter,
 | 
			
		||||
                    const std::string& position,
 | 
			
		||||
                    bool enableHeartbeat,
 | 
			
		||||
                    int heartBeatTimeout,
 | 
			
		||||
                    int runtime);
 | 
			
		||||
 | 
			
		||||
        void setOnBotMessageCallback(const OnBotMessageCallback& callback);
 | 
			
		||||
 
 | 
			
		||||
@@ -23,6 +23,7 @@ namespace ix
 | 
			
		||||
                                SentryClient& sentryClient,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int heartBeatTimeout,
 | 
			
		||||
                                int runtime)
 | 
			
		||||
    {
 | 
			
		||||
        CobraBot bot;
 | 
			
		||||
@@ -81,6 +82,7 @@ namespace ix
 | 
			
		||||
                       filter,
 | 
			
		||||
                       position,
 | 
			
		||||
                       enableHeartbeat,
 | 
			
		||||
                       heartBeatTimeout,
 | 
			
		||||
                       runtime);
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -19,5 +19,6 @@ namespace ix
 | 
			
		||||
                                SentryClient& sentryClient,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int heartBeatTimeout,
 | 
			
		||||
                                int runtime);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -63,6 +63,7 @@ namespace ix
 | 
			
		||||
                                const std::string& timer,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int heartBeatTimeout,
 | 
			
		||||
                                int runtime)
 | 
			
		||||
    {
 | 
			
		||||
        ix::CobraConnection conn;
 | 
			
		||||
@@ -146,6 +147,7 @@ namespace ix
 | 
			
		||||
                       filter,
 | 
			
		||||
                       position,
 | 
			
		||||
                       enableHeartbeat,
 | 
			
		||||
                       heartBeatTimeout,
 | 
			
		||||
                       runtime);
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -23,5 +23,6 @@ namespace ix
 | 
			
		||||
                                const std::string& timer,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int heartBeatTimeout,
 | 
			
		||||
                                int runtime);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -70,6 +70,7 @@ namespace ix
 | 
			
		||||
                                bool fluentd,
 | 
			
		||||
                                bool quiet,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int heartBeatTimeout,
 | 
			
		||||
                                int runtime)
 | 
			
		||||
    {
 | 
			
		||||
        CobraBot bot;
 | 
			
		||||
@@ -93,6 +94,7 @@ namespace ix
 | 
			
		||||
                       filter,
 | 
			
		||||
                       position,
 | 
			
		||||
                       enableHeartbeat,
 | 
			
		||||
                       heartBeatTimeout,
 | 
			
		||||
                       runtime);
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -19,5 +19,6 @@ namespace ix
 | 
			
		||||
                                bool fluentd,
 | 
			
		||||
                                bool quiet,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int heartBeatTimeout,
 | 
			
		||||
                                int runtime);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -50,10 +50,7 @@ namespace ix
 | 
			
		||||
        bool openSSLServerHandshake(std::string& errMsg);
 | 
			
		||||
 | 
			
		||||
        // Required for OpenSSL < 1.1
 | 
			
		||||
        static void openSSLLockingCallback(int mode,
 | 
			
		||||
                                           int type,
 | 
			
		||||
                                           const char* /*file*/,
 | 
			
		||||
                                           int /*line*/);
 | 
			
		||||
        static void openSSLLockingCallback(int mode, int type, const char* /*file*/, int /*line*/);
 | 
			
		||||
 | 
			
		||||
        SSL* _ssl_connection;
 | 
			
		||||
        SSL_CTX* _ssl_context;
 | 
			
		||||
 
 | 
			
		||||
@@ -6,4 +6,4 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.5.5"
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.5.6"
 | 
			
		||||
 
 | 
			
		||||
@@ -142,6 +142,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        bool verbose = true;
 | 
			
		||||
        bool enableHeartbeat = false;
 | 
			
		||||
        int heartBeatTimeout = 60;
 | 
			
		||||
 | 
			
		||||
        // FIXME: try to get this working with https instead of http
 | 
			
		||||
        //        to regress the TLS 1.3 OpenSSL bug
 | 
			
		||||
@@ -159,8 +160,15 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
 | 
			
		||||
        // Only run the bot for 3 seconds
 | 
			
		||||
        int runtime = 3;
 | 
			
		||||
 | 
			
		||||
        int64_t sentCount = cobra_to_sentry_bot(
 | 
			
		||||
            config, channel, filter, position, sentryClient, verbose, enableHeartbeat, runtime);
 | 
			
		||||
        int64_t sentCount = cobra_to_sentry_bot(config,
 | 
			
		||||
                                                channel,
 | 
			
		||||
                                                filter,
 | 
			
		||||
                                                position,
 | 
			
		||||
                                                sentryClient,
 | 
			
		||||
                                                verbose,
 | 
			
		||||
                                                enableHeartbeat,
 | 
			
		||||
                                                heartBeatTimeout,
 | 
			
		||||
                                                runtime);
 | 
			
		||||
        //
 | 
			
		||||
        // We want at least 2 messages to be sent
 | 
			
		||||
        //
 | 
			
		||||
 
 | 
			
		||||
@@ -91,6 +91,7 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        bool verbose = true;
 | 
			
		||||
        bool enableHeartbeat = false;
 | 
			
		||||
        int heartBeatTimeout = 60;
 | 
			
		||||
 | 
			
		||||
        // Only run the bot for 3 seconds
 | 
			
		||||
        int runtime = 3;
 | 
			
		||||
@@ -123,6 +124,7 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
 | 
			
		||||
                                                    timer,
 | 
			
		||||
                                                    verbose,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    heartBeatTimeout,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
        //
 | 
			
		||||
        // We want at least 2 messages to be sent
 | 
			
		||||
 
 | 
			
		||||
@@ -89,6 +89,7 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        bool quiet = false;
 | 
			
		||||
        bool enableHeartbeat = false;
 | 
			
		||||
        int heartBeatTimeout = 60;
 | 
			
		||||
 | 
			
		||||
        // Only run the bot for 3 seconds
 | 
			
		||||
        int runtime = 3;
 | 
			
		||||
@@ -96,8 +97,15 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
 | 
			
		||||
        // We could try to capture the output ... not sure how.
 | 
			
		||||
        bool fluentd = true;
 | 
			
		||||
 | 
			
		||||
        int64_t sentCount = ix::cobra_to_stdout_bot(
 | 
			
		||||
            config, channel, filter, position, fluentd, quiet, enableHeartbeat, runtime);
 | 
			
		||||
        int64_t sentCount = ix::cobra_to_stdout_bot(config,
 | 
			
		||||
                                                    channel,
 | 
			
		||||
                                                    filter,
 | 
			
		||||
                                                    position,
 | 
			
		||||
                                                    fluentd,
 | 
			
		||||
                                                    quiet,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    heartBeatTimeout,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
        //
 | 
			
		||||
        // We want at least 2 messages to be sent
 | 
			
		||||
        //
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										17
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -150,6 +150,7 @@ int main(int argc, char** argv)
 | 
			
		||||
    uint32_t maxWaitBetweenReconnectionRetries;
 | 
			
		||||
    int pingIntervalSecs = 30;
 | 
			
		||||
    int runtime = -1; // run indefinitely
 | 
			
		||||
    int heartBeatTimeout = 60;
 | 
			
		||||
 | 
			
		||||
    auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
 | 
			
		||||
        app->add_option(
 | 
			
		||||
@@ -287,6 +288,7 @@ int main(int argc, char** argv)
 | 
			
		||||
    cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
 | 
			
		||||
    cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
 | 
			
		||||
    cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds");
 | 
			
		||||
    cobraSubscribeApp->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
 | 
			
		||||
    addTLSOptions(cobraSubscribeApp);
 | 
			
		||||
    addCobraConfig(cobraSubscribeApp);
 | 
			
		||||
 | 
			
		||||
@@ -328,6 +330,7 @@ int main(int argc, char** argv)
 | 
			
		||||
    cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
 | 
			
		||||
    cobra2statsd->add_option("--position", position, "Stream position");
 | 
			
		||||
    cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds");
 | 
			
		||||
    cobra2statsd->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
 | 
			
		||||
    addTLSOptions(cobra2statsd);
 | 
			
		||||
    addCobraConfig(cobra2statsd);
 | 
			
		||||
 | 
			
		||||
@@ -340,6 +343,7 @@ int main(int argc, char** argv)
 | 
			
		||||
    cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
 | 
			
		||||
    cobra2sentry->add_option("--position", position, "Stream position");
 | 
			
		||||
    cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds");
 | 
			
		||||
    cobra2sentry->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
 | 
			
		||||
    addTLSOptions(cobra2sentry);
 | 
			
		||||
    addCobraConfig(cobra2sentry);
 | 
			
		||||
 | 
			
		||||
@@ -522,8 +526,15 @@ int main(int argc, char** argv)
 | 
			
		||||
    else if (app.got_subcommand("cobra_subscribe"))
 | 
			
		||||
    {
 | 
			
		||||
        bool enableHeartbeat = true;
 | 
			
		||||
        int64_t sentCount = ix::cobra_to_stdout_bot(
 | 
			
		||||
            cobraConfig, channel, filter, position, fluentd, quiet, enableHeartbeat, runtime);
 | 
			
		||||
        int64_t sentCount = ix::cobra_to_stdout_bot(cobraConfig,
 | 
			
		||||
                                                    channel,
 | 
			
		||||
                                                    filter,
 | 
			
		||||
                                                    position,
 | 
			
		||||
                                                    fluentd,
 | 
			
		||||
                                                    quiet,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    heartBeatTimeout,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
        ret = (int) sentCount;
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("cobra_publish"))
 | 
			
		||||
@@ -566,6 +577,7 @@ int main(int argc, char** argv)
 | 
			
		||||
                                                    timer,
 | 
			
		||||
                                                    verbose,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    heartBeatTimeout,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@@ -583,6 +595,7 @@ int main(int argc, char** argv)
 | 
			
		||||
                                            sentryClient,
 | 
			
		||||
                                            verbose,
 | 
			
		||||
                                            enableHeartbeat,
 | 
			
		||||
                                            heartBeatTimeout,
 | 
			
		||||
                                            runtime);
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("cobra_metrics_to_redis"))
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user