(cobra bots) add a utility class to factor out the common bots features (heartbeat) and move all bots to used it + convert cobra_subscribe to be a bot and add a unittest for it

This commit is contained in:
Benjamin Sergeant
2020-04-16 21:58:10 -07:00
parent 0f5d15aa11
commit a2abe861d3
20 changed files with 478 additions and 558 deletions

View File

@ -13,6 +13,7 @@
#include <fstream>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixbots/IXCobraToStdoutBot.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixsentry/IXSentryClient.h>
#include <ixwebsocket/IXNetSystem.h>
@ -93,7 +94,6 @@ int main(int argc, char** argv)
bool quiet = false;
bool fluentd = false;
bool compress = false;
bool strict = false;
bool stress = false;
bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
@ -291,7 +291,6 @@ int main(int argc, char** argv)
"Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position");
@ -451,8 +450,18 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_subscribe"))
{
ret = ix::ws_cobra_subscribe_main(
cobraConfig, channel, filter, position, quiet, fluentd, runtime);
bool enableHeartbeat = true;
int64_t sentCount = ix::cobra_to_stdout_bot(cobraConfig,
channel,
filter,
position,
fluentd,
quiet,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
ret = (int) sentCount;
}
else if (app.got_subcommand("cobra_publish"))
{
@ -484,18 +493,18 @@ int main(int argc, char** argv)
}
else
{
ret = ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
ret = (int) ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
}
}
}
@ -505,16 +514,15 @@ int main(int argc, char** argv)
ix::SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptions);
ret = ix::cobra_to_sentry_bot(cobraConfig,
channel,
filter,
position,
sentryClient,
verbose,
strict,
maxQueueSize,
enableHeartbeat,
runtime);
ret = (int) ix::cobra_to_sentry_bot(cobraConfig,
channel,
filter,
position,
sentryClient,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
}
else if (app.got_subcommand("cobra_metrics_to_redis"))
{