From 5cc21c87fb9b708d3e7a6de2f2a382263c7b5e27 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 1 Aug 2019 15:22:24 -0700 Subject: [PATCH] new options for cobra commands - ws cobra_subscribe has a new -q (quiet) option - ws cobra_subscribe knows to and display msg stats (count and # of messages received per second) - ws cobra_subscribe, cobra_to_statsd and cobra_to_sentry commands have a new option, --filter to restrict the events they want to receive --- CHANGELOG.md | 6 +++- ws/ixcobra/IXCobraConnection.cpp | 8 ++++- ws/ixcobra/IXCobraConnection.h | 4 ++- ws/ws.cpp | 17 +++++----- ws/ws.h | 6 +++- ws/ws_cobra_subscribe.cpp | 53 +++++++++++++++++++++++++------- ws/ws_cobra_to_sentry.cpp | 5 +-- ws/ws_cobra_to_statsd.cpp | 5 +-- 8 files changed, 77 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67ed72eb..c208a09b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,12 @@ # Changelog All notable changes to this project will be documented in this file. +## [5.0.2] - 2019-08-01 +- ws cobra_subscribe has a new -q (quiet) option +- ws cobra_subscribe knows to and display msg stats (count and # of messages received per second) +- ws cobra_subscribe, cobra_to_statsd and cobra_to_sentry commands have a new option, --filter to restrict the events they want to receive + ## [5.0.1] - 2019-07-25 -### Unreleased - ws connect command has a new option to send in binary mode (still default to text) - ws connect command has readline history thanks to libnoise-cpp. Now ws connect one can use using arrows to lookup previous sent messages and edit them diff --git a/ws/ixcobra/IXCobraConnection.cpp b/ws/ixcobra/IXCobraConnection.cpp index 4cb77bd5..4cfea942 100644 --- a/ws/ixcobra/IXCobraConnection.cpp +++ b/ws/ixcobra/IXCobraConnection.cpp @@ -429,12 +429,18 @@ namespace ix } void CobraConnection::subscribe(const std::string& channel, - SubscriptionCallback cb) + const std::string& filter, + SubscriptionCallback cb) { // Create and send a subscribe pdu Json::Value body; body["channel"] = channel; + if (!filter.empty()) + { + body["filter"] = filter; + } + Json::Value pdu; pdu["action"] = "rtm/subscribe"; pdu["body"] = body; diff --git a/ws/ixcobra/IXCobraConnection.h b/ws/ixcobra/IXCobraConnection.h index f1f0f24a..d0021660 100644 --- a/ws/ixcobra/IXCobraConnection.h +++ b/ws/ixcobra/IXCobraConnection.h @@ -75,7 +75,9 @@ namespace ix // Subscribe to a channel, and execute a callback when an incoming // message arrives. - void subscribe(const std::string& channel, SubscriptionCallback cb); + void subscribe(const std::string& channel, + const std::string& filter = std::string(), + SubscriptionCallback cb = nullptr); /// Unsubscribe from a channel void unsubscribe(const std::string& channel); diff --git a/ws/ws.cpp b/ws/ws.cpp index 1e49013b..4e34e642 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -9,15 +9,10 @@ // #include "ws.h" -// -// Main drive for websocket utilities -// - #include #include #include #include -// #include #include #include @@ -60,6 +55,7 @@ int main(int argc, char** argv) std::string hostname("127.0.0.1"); std::string pidfile; std::string channel; + std::string filter; std::string message; std::string password; std::string appkey; @@ -76,6 +72,7 @@ int main(int argc, char** argv) bool followRedirects = false; bool verbose = false; bool save = false; + bool quiet = false; bool compress = false; bool strict = false; bool stress = false; @@ -170,6 +167,8 @@ int main(int argc, char** argv) cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret"); cobraSubscribeApp->add_option("channel", channel, "Channel")->required(); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); + cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); + cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); cobraPublish->add_option("--appkey", appkey, "Appkey"); @@ -194,6 +193,7 @@ int main(int argc, char** argv) cobra2statsd->add_option("channel", channel, "Channel")->required(); cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); + cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra to sentry"); cobra2sentry->add_option("--appkey", appkey, "Appkey"); @@ -206,6 +206,7 @@ int main(int argc, char** argv) cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); + cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); CLI::App* runApp = app.add_subcommand("snake", "Snake server"); runApp->add_option("--port", port, "Connection url"); @@ -290,7 +291,7 @@ int main(int argc, char** argv) { ret = ix::ws_cobra_subscribe_main(appkey, endpoint, rolename, rolesecret, - channel); + channel, filter, quiet); } else if (app.got_subcommand("cobra_publish")) { @@ -302,14 +303,14 @@ int main(int argc, char** argv) { ret = ix::ws_cobra_to_statsd_main(appkey, endpoint, rolename, rolesecret, - channel, hostname, statsdPort, + channel, filter, hostname, statsdPort, prefix, fields, verbose); } else if (app.got_subcommand("cobra_to_sentry")) { ret = ix::ws_cobra_to_sentry_main(appkey, endpoint, rolename, rolesecret, - channel, dsn, + channel, filter, dsn, verbose, strict, jobs); } else if (app.got_subcommand("snake")) diff --git a/ws/ws.h b/ws/ws.h index 1cfe9ff1..3f8776f0 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -56,7 +56,9 @@ namespace ix const std::string& endpoint, const std::string& rolename, const std::string& rolesecret, - const std::string& channel); + const std::string& channel, + const std::string& filter, + bool quiet); int ws_cobra_publish_main(const std::string& appkey, const std::string& endpoint, @@ -71,6 +73,7 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, + const std::string& filter, const std::string& host, int port, const std::string& prefix, @@ -82,6 +85,7 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, + const std::string& filter, const std::string& dsn, bool verbose, bool strict, diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index 80f0a6d3..dbc275ea 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -11,13 +11,17 @@ #include #include +#include + namespace ix { int ws_cobra_subscribe_main(const std::string& appkey, const std::string& endpoint, const std::string& rolename, const std::string& rolesecret, - const std::string& channel) + const std::string& channel, + const std::string& filter, + bool quiet) { ix::CobraConnection conn; @@ -28,8 +32,28 @@ namespace ix Json::FastWriter jsonWriter; + // Display incoming messages + std::atomic msgPerSeconds(0); + std::atomic msgCount(0); + + auto timer = [&msgPerSeconds, &msgCount] + { + while (true) + { + std::cout << "#messages " << msgCount << " " + << "msg/s " << msgPerSeconds + << std::endl; + + msgPerSeconds = 0; + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t(timer); + conn.setEventCallback( - [&conn, &channel, &jsonWriter] + [&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet] (ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, @@ -37,33 +61,40 @@ namespace ix { if (eventType == ix::CobraConnection_EventType_Open) { - std::cout << "Subscriber: connected" << std::endl; + spdlog::info("Subscriber connected"); for (auto it : headers) { - std::cerr << it.first << ": " << it.second << std::endl; + spdlog::info("{}: {}", it.first, it.second); } } else if (eventType == ix::CobraConnection_EventType_Authenticated) { - std::cout << "Subscriber authenticated" << std::endl; - conn.subscribe(channel, - [&jsonWriter](const Json::Value& msg) + spdlog::info("Subscriber authenticated"); + conn.subscribe(channel, filter, + [&jsonWriter, &quiet, + &msgPerSeconds, &msgCount](const Json::Value& msg) { - std::cout << jsonWriter.write(msg) << std::endl; + if (!quiet) + { + std::cout << jsonWriter.write(msg) << std::endl; + } + + msgPerSeconds++; + msgCount++; }); } else if (eventType == ix::CobraConnection_EventType_Subscribed) { - std::cout << "Subscriber: subscribed to channel " << subscriptionId << std::endl; + spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); } else if (eventType == ix::CobraConnection_EventType_UnSubscribed) { - std::cout << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl; + spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); } else if (eventType == ix::CobraConnection_EventType_Error) { - std::cout << "Subscriber: error" << errMsg << std::endl; + spdlog::error("Subscriber: error {}", errMsg); } } ); diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index b35f8c0f..46123bb3 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -25,6 +25,7 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, + const std::string& filter, const std::string& dsn, bool verbose, bool strict, @@ -94,7 +95,7 @@ namespace ix } conn.setEventCallback( - [&conn, &channel, &jsonWriter, + [&conn, &channel, &filter, &jsonWriter, verbose, &receivedCount, &sentCount, &condition, &conditionVariableMutex, &progressCondition, &queue] @@ -119,7 +120,7 @@ namespace ix else if (eventType == ix::CobraConnection_EventType_Authenticated) { std::cerr << "Subscriber authenticated" << std::endl; - conn.subscribe(channel, + conn.subscribe(channel, filter, [&jsonWriter, verbose, &sentCount, &receivedCount, &condition, &conditionVariableMutex, diff --git a/ws/ws_cobra_to_statsd.cpp b/ws/ws_cobra_to_statsd.cpp index 7d16efa7..ed0c0af1 100644 --- a/ws/ws_cobra_to_statsd.cpp +++ b/ws/ws_cobra_to_statsd.cpp @@ -63,6 +63,7 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, + const std::string& filter, const std::string& host, int port, const std::string& prefix, @@ -90,7 +91,7 @@ namespace ix uint64_t msgCount = 0; conn.setEventCallback( - [&conn, &channel, &jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount] + [&conn, &channel, &filter, &jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount] (ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, @@ -112,7 +113,7 @@ namespace ix else if (eventType == ix::CobraConnection_EventType_Authenticated) { spdlog::info("Subscriber authenticated"); - conn.subscribe(channel, + conn.subscribe(channel, filter, [&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount] (const Json::Value& msg)