(ws) add a --runtime option to ws cobra_subscribe to optionally limit how much time it will run

This commit is contained in:
Benjamin Sergeant 2020-04-13 19:03:53 -07:00
parent f1c106728b
commit c57cf413fb
5 changed files with 39 additions and 11 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.2.3] - 2020-04-13
(ws) add a --runtime option to ws cobra_subscribe to optionally limit how much time it will run
## [9.2.2] - 2020-04-04 ## [9.2.2] - 2020-04-04
(third_party deps) fix #177, update bundled spdlog to 1.6.0 (third_party deps) fix #177, update bundled spdlog to 1.6.0

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.2.2" #define IX_WEBSOCKET_VERSION "9.2.3"

View File

@ -115,6 +115,7 @@ int main(int argc, char** argv)
uint32_t maxWaitBetweenReconnectionRetries; uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100; size_t maxQueueSize = 100;
int pingIntervalSecs = 30; int pingIntervalSecs = 30;
int runtime = -1; // run indefinitely
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option( app->add_option(
@ -238,6 +239,7 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_option("--position", position, "Stream position"); cobraSubscribeApp->add_option("--position", position, "Stream position");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds");
addTLSOptions(cobraSubscribeApp); addTLSOptions(cobraSubscribeApp);
addCobraConfig(cobraSubscribeApp); addCobraConfig(cobraSubscribeApp);
@ -276,6 +278,7 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--queue_size", cobra2statsd->add_option("--queue_size",
maxQueueSize, maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry"); "Size of the queue to hold messages before they are sent to Sentry");
cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds");
addTLSOptions(cobra2statsd); addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd); addCobraConfig(cobra2statsd);
@ -290,6 +293,7 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position"); cobra2sentry->add_option("--position", position, "Stream position");
cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds");
addTLSOptions(cobra2sentry); addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry); addCobraConfig(cobra2sentry);
@ -445,7 +449,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("cobra_subscribe")) else if (app.got_subcommand("cobra_subscribe"))
{ {
ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd); ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd, runtime);
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
{ {
@ -466,7 +470,6 @@ int main(int argc, char** argv)
else else
{ {
bool enableHeartbeat = true; bool enableHeartbeat = true;
int runtime = -1; // run indefinitely
ix::StatsdClient statsdClient(hostname, statsdPort, prefix); ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
std::string errMsg; std::string errMsg;
@ -496,7 +499,6 @@ int main(int argc, char** argv)
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
{ {
bool enableHeartbeat = true; bool enableHeartbeat = true;
int runtime = -1;
ix::SentryClient sentryClient(dsn); ix::SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptions); sentryClient.setTLSOptions(tlsOptions);

View File

@ -82,7 +82,8 @@ namespace ix
const std::string& filter, const std::string& filter,
const std::string& position, const std::string& position,
bool quiet, bool quiet,
bool fluentd); bool fluentd,
int runtime);
int ws_cobra_publish_main(const ix::CobraConfig& appkey, int ws_cobra_publish_main(const ix::CobraConfig& appkey,
const std::string& channel, const std::string& channel,

View File

@ -57,7 +57,8 @@ namespace ix
const std::string& filter, const std::string& filter,
const std::string& position, const std::string& position,
bool quiet, bool quiet,
bool fluentd) bool fluentd,
int runtime)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure(config); conn.configure(config);
@ -65,11 +66,12 @@ namespace ix
std::atomic<int> msgPerSeconds(0); std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0); std::atomic<int> msgCount(0);
std::atomic<bool> stop(false);
std::atomic<bool> fatalCobraError(false); std::atomic<bool> fatalCobraError(false);
auto jsonWriter = makeStreamWriter(); auto jsonWriter = makeStreamWriter();
auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] { auto timer = [&msgPerSeconds, &msgCount, &stop] {
while (!fatalCobraError) while (!stop)
{ {
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
@ -168,11 +170,30 @@ namespace ix
} }
}); });
while (!fatalCobraError) // Run forever
if (runtime == -1)
{
while (true)
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
} }
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
}
}
stop = true;
conn.disconnect(); conn.disconnect();
t.join(); t.join();