diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 317eab6a..c03216ef 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.2.3] - 2020-04-13 + +(ws) add a --runtime option to ws cobra_subscribe to optionally limit how much time it will run + ## [9.2.2] - 2020-04-04 (third_party deps) fix #177, update bundled spdlog to 1.6.0 diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index e49ce7c2..efb228bb 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.2.2" +#define IX_WEBSOCKET_VERSION "9.2.3" diff --git a/ws/ws.cpp b/ws/ws.cpp index 86f259ea..f728913e 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -115,6 +115,7 @@ int main(int argc, char** argv) uint32_t maxWaitBetweenReconnectionRetries; size_t maxQueueSize = 100; int pingIntervalSecs = 30; + int runtime = -1; // run indefinitely auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { app->add_option( @@ -238,6 +239,7 @@ int main(int argc, char** argv) cobraSubscribeApp->add_option("--position", position, "Stream position"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); + cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds"); addTLSOptions(cobraSubscribeApp); addCobraConfig(cobraSubscribeApp); @@ -276,6 +278,7 @@ int main(int argc, char** argv) cobra2statsd->add_option("--queue_size", maxQueueSize, "Size of the queue to hold messages before they are sent to Sentry"); + cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds"); addTLSOptions(cobra2statsd); addCobraConfig(cobra2statsd); @@ -290,6 +293,7 @@ int main(int argc, char** argv) cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--position", position, "Stream position"); + cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds"); addTLSOptions(cobra2sentry); addCobraConfig(cobra2sentry); @@ -445,7 +449,7 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_subscribe")) { - ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd); + ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd, runtime); } else if (app.got_subcommand("cobra_publish")) { @@ -466,7 +470,6 @@ int main(int argc, char** argv) else { bool enableHeartbeat = true; - int runtime = -1; // run indefinitely ix::StatsdClient statsdClient(hostname, statsdPort, prefix); std::string errMsg; @@ -496,7 +499,6 @@ int main(int argc, char** argv) else if (app.got_subcommand("cobra_to_sentry")) { bool enableHeartbeat = true; - int runtime = -1; ix::SentryClient sentryClient(dsn); sentryClient.setTLSOptions(tlsOptions); diff --git a/ws/ws.h b/ws/ws.h index 2c074ea7..b8039906 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -82,7 +82,8 @@ namespace ix const std::string& filter, const std::string& position, bool quiet, - bool fluentd); + bool fluentd, + int runtime); int ws_cobra_publish_main(const ix::CobraConfig& appkey, const std::string& channel, diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index 4291bb0c..aed7d67d 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -57,7 +57,8 @@ namespace ix const std::string& filter, const std::string& position, bool quiet, - bool fluentd) + bool fluentd, + int runtime) { ix::CobraConnection conn; conn.configure(config); @@ -65,11 +66,12 @@ namespace ix std::atomic msgPerSeconds(0); std::atomic msgCount(0); + std::atomic stop(false); std::atomic fatalCobraError(false); auto jsonWriter = makeStreamWriter(); - auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] { - while (!fatalCobraError) + auto timer = [&msgPerSeconds, &msgCount, &stop] { + while (!stop) { spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); @@ -168,11 +170,30 @@ namespace ix } }); - while (!fatalCobraError) + // Run forever + if (runtime == -1) { - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); + while (true) + { + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + + if (fatalCobraError) break; + } } + // Run for a duration, used by unittesting now + else + { + for (int i = 0 ; i < runtime; ++i) + { + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + + if (fatalCobraError) break; + } + } + + stop = true; conn.disconnect(); t.join();