(cobra bots) add a --heartbeat_timeout option to specify when the bot should terminate because no events are received

This commit is contained in:
Benjamin Sergeant 2020-05-06 22:01:48 -07:00
parent c030a62c8b
commit 0772ef7ef5
15 changed files with 58 additions and 14 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.5.6] - 2020-05-06
(cobra bots) add a --heartbeat_timeout option to specify when the bot should terminate because no events are received
## [9.5.5] - 2020-05-06 ## [9.5.5] - 2020-05-06
(openssl tls) when OpenSSL is older than 1.1, register the crypto locking callback to be thread safe. Should fix lots of CI failures (openssl tls) when OpenSSL is older than 1.1, register the crypto locking callback to be thread safe. Should fix lots of CI failures

View File

@ -22,6 +22,7 @@ namespace ix
const std::string& filter, const std::string& filter,
const std::string& position, const std::string& position,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
@ -78,7 +79,7 @@ namespace ix
std::thread t1(timer); std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] { auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat, &heartBeatTimeout] {
std::string state("na"); std::string state("na");
if (!enableHeartbeat) return; if (!enableHeartbeat) return;
@ -94,11 +95,12 @@ namespace ix
if (currentState == state) if (currentState == state)
{ {
CoreLogger::error("no messages received or sent for 1 minute, exiting"); CoreLogger::error("no messages received or sent for 1 minute, exiting");
exit(1); fatalCobraError = true;
break;
} }
state = currentState; state = currentState;
auto duration = std::chrono::minutes(1); auto duration = std::chrono::seconds(heartBeatTimeout);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }

View File

@ -30,6 +30,7 @@ namespace ix
const std::string& filter, const std::string& filter,
const std::string& position, const std::string& position,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
void setOnBotMessageCallback(const OnBotMessageCallback& callback); void setOnBotMessageCallback(const OnBotMessageCallback& callback);

View File

@ -23,6 +23,7 @@ namespace ix
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
CobraBot bot; CobraBot bot;
@ -81,6 +82,7 @@ namespace ix
filter, filter,
position, position,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} // namespace ix } // namespace ix

View File

@ -19,5 +19,6 @@ namespace ix
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
} // namespace ix } // namespace ix

View File

@ -63,6 +63,7 @@ namespace ix
const std::string& timer, const std::string& timer,
bool verbose, bool verbose,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
@ -146,6 +147,7 @@ namespace ix
filter, filter,
position, position,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} // namespace ix } // namespace ix

View File

@ -23,5 +23,6 @@ namespace ix
const std::string& timer, const std::string& timer,
bool verbose, bool verbose,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
} // namespace ix } // namespace ix

View File

@ -70,6 +70,7 @@ namespace ix
bool fluentd, bool fluentd,
bool quiet, bool quiet,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
CobraBot bot; CobraBot bot;
@ -93,6 +94,7 @@ namespace ix
filter, filter,
position, position,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} // namespace ix } // namespace ix

View File

@ -19,5 +19,6 @@ namespace ix
bool fluentd, bool fluentd,
bool quiet, bool quiet,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
} // namespace ix } // namespace ix

View File

@ -50,10 +50,7 @@ namespace ix
bool openSSLServerHandshake(std::string& errMsg); bool openSSLServerHandshake(std::string& errMsg);
// Required for OpenSSL < 1.1 // Required for OpenSSL < 1.1
static void openSSLLockingCallback(int mode, static void openSSLLockingCallback(int mode, int type, const char* /*file*/, int /*line*/);
int type,
const char* /*file*/,
int /*line*/);
SSL* _ssl_connection; SSL* _ssl_connection;
SSL_CTX* _ssl_context; SSL_CTX* _ssl_context;

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.5.5" #define IX_WEBSOCKET_VERSION "9.5.6"

View File

@ -142,6 +142,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
std::string position("$"); std::string position("$");
bool verbose = true; bool verbose = true;
bool enableHeartbeat = false; bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// FIXME: try to get this working with https instead of http // FIXME: try to get this working with https instead of http
// to regress the TLS 1.3 OpenSSL bug // to regress the TLS 1.3 OpenSSL bug
@ -159,8 +160,15 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
// Only run the bot for 3 seconds // Only run the bot for 3 seconds
int runtime = 3; int runtime = 3;
int64_t sentCount = cobra_to_sentry_bot( int64_t sentCount = cobra_to_sentry_bot(config,
config, channel, filter, position, sentryClient, verbose, enableHeartbeat, runtime); channel,
filter,
position,
sentryClient,
verbose,
enableHeartbeat,
heartBeatTimeout,
runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -91,6 +91,7 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
std::string position("$"); std::string position("$");
bool verbose = true; bool verbose = true;
bool enableHeartbeat = false; bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// Only run the bot for 3 seconds // Only run the bot for 3 seconds
int runtime = 3; int runtime = 3;
@ -123,6 +124,7 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
timer, timer,
verbose, verbose,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent

View File

@ -89,6 +89,7 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
std::string position("$"); std::string position("$");
bool quiet = false; bool quiet = false;
bool enableHeartbeat = false; bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// Only run the bot for 3 seconds // Only run the bot for 3 seconds
int runtime = 3; int runtime = 3;
@ -96,8 +97,15 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
// We could try to capture the output ... not sure how. // We could try to capture the output ... not sure how.
bool fluentd = true; bool fluentd = true;
int64_t sentCount = ix::cobra_to_stdout_bot( int64_t sentCount = ix::cobra_to_stdout_bot(config,
config, channel, filter, position, fluentd, quiet, enableHeartbeat, runtime); channel,
filter,
position,
fluentd,
quiet,
enableHeartbeat,
heartBeatTimeout,
runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -150,6 +150,7 @@ int main(int argc, char** argv)
uint32_t maxWaitBetweenReconnectionRetries; uint32_t maxWaitBetweenReconnectionRetries;
int pingIntervalSecs = 30; int pingIntervalSecs = 30;
int runtime = -1; // run indefinitely int runtime = -1; // run indefinitely
int heartBeatTimeout = 60;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option( app->add_option(
@ -287,6 +288,7 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds"); cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds");
cobraSubscribeApp->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobraSubscribeApp); addTLSOptions(cobraSubscribeApp);
addCobraConfig(cobraSubscribeApp); addCobraConfig(cobraSubscribeApp);
@ -328,6 +330,7 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
cobra2statsd->add_option("--position", position, "Stream position"); cobra2statsd->add_option("--position", position, "Stream position");
cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds"); cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds");
cobra2statsd->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobra2statsd); addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd); addCobraConfig(cobra2statsd);
@ -340,6 +343,7 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position"); cobra2sentry->add_option("--position", position, "Stream position");
cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds"); cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds");
cobra2sentry->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobra2sentry); addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry); addCobraConfig(cobra2sentry);
@ -522,8 +526,15 @@ int main(int argc, char** argv)
else if (app.got_subcommand("cobra_subscribe")) else if (app.got_subcommand("cobra_subscribe"))
{ {
bool enableHeartbeat = true; bool enableHeartbeat = true;
int64_t sentCount = ix::cobra_to_stdout_bot( int64_t sentCount = ix::cobra_to_stdout_bot(cobraConfig,
cobraConfig, channel, filter, position, fluentd, quiet, enableHeartbeat, runtime); channel,
filter,
position,
fluentd,
quiet,
enableHeartbeat,
heartBeatTimeout,
runtime);
ret = (int) sentCount; ret = (int) sentCount;
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
@ -566,6 +577,7 @@ int main(int argc, char** argv)
timer, timer,
verbose, verbose,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} }
@ -583,6 +595,7 @@ int main(int argc, char** argv)
sentryClient, sentryClient,
verbose, verbose,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))