diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2628f1fe..1798ec3e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [8.2.4] - 2020-03-13 + +(cobra client) can subscribe with a position + ## [8.2.3] - 2020-03-13 (cobra client) pass the message position to the subscription data callback diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp index bca4b6de..7cfd5a42 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.cpp +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -19,6 +19,7 @@ namespace ix int cobra_to_sentry_bot(const CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, SentryClient& sentryClient, bool verbose, bool strict, @@ -37,7 +38,7 @@ namespace ix std::atomic stop(false); std::atomic throttled(false); - QueueManager queueManager(maxQueueSize, stop); + QueueManager queueManager(maxQueueSize); auto timer = [&sentCount, &receivedCount, &stop] { while (!stop) @@ -173,6 +174,7 @@ namespace ix conn.setEventCallback([&conn, &channel, &filter, + &position, &jsonWriter, verbose, &throttled, @@ -200,6 +202,7 @@ namespace ix spdlog::info("Subscriber authenticated"); conn.subscribe(channel, filter, + position, [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( const Json::Value& msg, const std::string& position) { if (verbose) diff --git a/ixbots/ixbots/IXCobraToSentryBot.h b/ixbots/ixbots/IXCobraToSentryBot.h index 6c4b128e..c1792847 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.h +++ b/ixbots/ixbots/IXCobraToSentryBot.h @@ -14,6 +14,7 @@ namespace ix int cobra_to_sentry_bot(const CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, SentryClient& sentryClient, bool verbose, bool strict, diff --git a/ixbots/ixbots/IXCobraToStatsdBot.cpp b/ixbots/ixbots/IXCobraToStatsdBot.cpp index fa9cb5ad..cf5dfb80 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.cpp +++ b/ixbots/ixbots/IXCobraToStatsdBot.cpp @@ -62,6 +62,7 @@ namespace ix int cobra_to_statsd_bot(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, const std::string& host, int port, const std::string& prefix, @@ -80,7 +81,7 @@ namespace ix std::atomic stop(false); size_t maxQueueSize = 1000; - QueueManager queueManager(maxQueueSize, stop); + QueueManager queueManager(maxQueueSize); auto timer = [&sentCount, &receivedCount] { while (true) @@ -153,7 +154,7 @@ namespace ix std::thread t3(statsdSender); conn.setEventCallback( - [&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount]( + [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount]( ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, @@ -177,6 +178,7 @@ namespace ix spdlog::info("Subscriber authenticated"); conn.subscribe(channel, filter, + position, [&jsonWriter, &queueManager, verbose, &receivedCount]( const Json::Value& msg, const std::string& position) { if (verbose) diff --git a/ixbots/ixbots/IXCobraToStatsdBot.h b/ixbots/ixbots/IXCobraToStatsdBot.h index 5f424388..691ae3b9 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.h +++ b/ixbots/ixbots/IXCobraToStatsdBot.h @@ -14,6 +14,7 @@ namespace ix int cobra_to_statsd_bot(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, const std::string& host, int port, const std::string& prefix, diff --git a/ixbots/ixbots/IXQueueManager.h b/ixbots/ixbots/IXQueueManager.h index 0a3cbdd1..6685edad 100644 --- a/ixbots/ixbots/IXQueueManager.h +++ b/ixbots/ixbots/IXQueueManager.h @@ -7,7 +7,6 @@ #pragma once #include -#include #include #include #include @@ -19,9 +18,8 @@ namespace ix class QueueManager { public: - QueueManager(size_t maxQueueSize, std::atomic& stop) + QueueManager(size_t maxQueueSize) : _maxQueueSize(maxQueueSize) - , _stop(stop) { } @@ -33,6 +31,5 @@ namespace ix std::mutex _mutex; std::condition_variable _condition; size_t _maxQueueSize; - std::atomic& _stop; }; } diff --git a/ixcobra/ixcobra/IXCobraConnection.cpp b/ixcobra/ixcobra/IXCobraConnection.cpp index f48527ab..7b2c7c71 100644 --- a/ixcobra/ixcobra/IXCobraConnection.cpp +++ b/ixcobra/ixcobra/IXCobraConnection.cpp @@ -565,6 +565,7 @@ namespace ix void CobraConnection::subscribe(const std::string& channel, const std::string& filter, + const std::string& position, SubscriptionCallback cb) { // Create and send a subscribe pdu @@ -576,6 +577,11 @@ namespace ix body["filter"] = filter; } + if (!position.empty()) + { + body["position"] = position; + } + Json::Value pdu; pdu["action"] = "rtm/subscribe"; pdu["body"] = body; diff --git a/ixcobra/ixcobra/IXCobraConnection.h b/ixcobra/ixcobra/IXCobraConnection.h index 67fd2086..b69f8e07 100644 --- a/ixcobra/ixcobra/IXCobraConnection.h +++ b/ixcobra/ixcobra/IXCobraConnection.h @@ -98,6 +98,7 @@ namespace ix // message arrives. void subscribe(const std::string& channel, const std::string& filter = std::string(), + const std::string& position = std::string(), SubscriptionCallback cb = nullptr); /// Unsubscribe from a channel diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 585863f1..c0390132 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "8.2.3" +#define IX_WEBSOCKET_VERSION "8.2.4" diff --git a/test/IXCobraChatTest.cpp b/test/IXCobraChatTest.cpp index e2c1c1f2..27080585 100644 --- a/test/IXCobraChatTest.cpp +++ b/test/IXCobraChatTest.cpp @@ -122,8 +122,10 @@ namespace void CobraChat::subscribe(const std::string& channel) { std::string filter; + std::string position("$"); + _conn.subscribe( - channel, filter, [this](const Json::Value& msg, const std::string& /*position*/) { + channel, filter, position, [this](const Json::Value& msg, const std::string& /*position*/) { spdlog::info("receive {}", msg.toStyledString()); if (!msg.isObject()) return; diff --git a/test/IXCobraMetricsPublisherTest.cpp b/test/IXCobraMetricsPublisherTest.cpp index c08c40de..939fc073 100644 --- a/test/IXCobraMetricsPublisherTest.cpp +++ b/test/IXCobraMetricsPublisherTest.cpp @@ -93,8 +93,10 @@ namespace { log("Subscriber authenticated"); std::string filter; + std::string position("$"); + conn.subscribe( - CHANNEL, filter, [](const Json::Value& msg, const std::string& /*position*/) { + CHANNEL, filter, position, [](const Json::Value& msg, const std::string& /*position*/) { log(msg.toStyledString()); std::string id = msg["id"].asString(); diff --git a/test/IXCobraToSentryBotTest.cpp b/test/IXCobraToSentryBotTest.cpp index a5b9d61c..1a2fd718 100644 --- a/test/IXCobraToSentryBotTest.cpp +++ b/test/IXCobraToSentryBotTest.cpp @@ -153,6 +153,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]") std::thread publisherThread(runPublisher, config, channel); std::string filter; + std::string position("$"); bool verbose = true; bool strict = true; size_t maxQueueSize = 10; @@ -182,6 +183,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]") int sentCount = cobra_to_sentry_bot(config, channel, filter, + position, sentryClient, verbose, strict, diff --git a/ws/ws.cpp b/ws/ws.cpp index 7f8d95db..a32d7dd9 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -65,6 +65,7 @@ int main(int argc, char** argv) std::string pidfile; std::string channel; std::string filter; + std::string position; std::string message; std::string password; std::string prefix("ws.test.v0"); @@ -229,6 +230,7 @@ int main(int argc, char** argv) cobraSubscribeApp->add_option("--channel", channel, "Channel")->required(); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); + cobraSubscribeApp->add_option("--position", position, "Stream position"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); addTLSOptions(cobraSubscribeApp); @@ -263,6 +265,7 @@ int main(int argc, char** argv) cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); + cobra2statsd->add_option("--position", position, "Stream position"); addTLSOptions(cobra2statsd); addCobraConfig(cobra2statsd); @@ -276,6 +279,7 @@ int main(int argc, char** argv) cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); + cobra2sentry->add_option("--position", position, "Stream position"); addTLSOptions(cobra2sentry); addCobraConfig(cobra2sentry); @@ -284,6 +288,7 @@ int main(int argc, char** argv) cobra2redisApp->add_option("channel", channel, "Channel")->required(); cobra2redisApp->add_option("--pidfile", pidfile, "Pid file"); cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter"); + cobra2redisApp->add_option("--position", position, "Stream position"); cobra2redisApp->add_option("--hostname", hostname, "Redis hostname"); cobra2redisApp->add_option("--port", redisPort, "Redis port"); cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats"); @@ -429,7 +434,7 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_subscribe")) { - ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, quiet, fluentd); + ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd); } else if (app.got_subcommand("cobra_publish")) { @@ -442,7 +447,7 @@ int main(int argc, char** argv) else if (app.got_subcommand("cobra_to_statsd")) { ret = ix::cobra_to_statsd_bot( - cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose); + cobraConfig, channel, filter, position, hostname, statsdPort, prefix, fields, verbose); } else if (app.got_subcommand("cobra_to_sentry")) { @@ -453,6 +458,7 @@ int main(int argc, char** argv) ret = ix::cobra_to_sentry_bot(cobraConfig, channel, filter, + position, sentryClient, verbose, strict, @@ -462,7 +468,7 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_metrics_to_redis")) { - ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, hostname, redisPort); + ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, position, hostname, redisPort); } else if (app.got_subcommand("snake")) { diff --git a/ws/ws.h b/ws/ws.h index cfa0d2f7..4b84a7c2 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -78,6 +78,7 @@ namespace ix int ws_cobra_subscribe_main(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, bool quiet, bool fluentd); @@ -93,6 +94,7 @@ namespace ix int ws_cobra_metrics_to_redis(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, const std::string& host, int port); diff --git a/ws/ws_cobra_metrics_to_redis.cpp b/ws/ws_cobra_metrics_to_redis.cpp index 67e5a101..9888e059 100644 --- a/ws/ws_cobra_metrics_to_redis.cpp +++ b/ws/ws_cobra_metrics_to_redis.cpp @@ -20,6 +20,7 @@ namespace ix int ws_cobra_metrics_to_redis(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, const std::string& host, int port) { @@ -100,6 +101,7 @@ namespace ix conn.setEventCallback([&conn, &channel, &filter, + &position, &msgCount, &msgPerSeconds, &conditionVariableMutex, @@ -125,6 +127,7 @@ namespace ix conn.subscribe( channel, filter, + position, [&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue]( const Json::Value& msg, const std::string& /*position*/) { { diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index 0066dc6b..8e286a45 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -41,6 +41,7 @@ namespace ix int ws_cobra_subscribe_main(const ix::CobraConfig& config, const std::string& channel, const std::string& filter, + const std::string& position, bool quiet, bool fluentd) { @@ -68,7 +69,7 @@ namespace ix std::thread t(timer); conn.setEventCallback( - [&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet, &fluentd]( + [&conn, &channel, &jsonWriter, &filter, &position, &msgCount, &msgPerSeconds, &quiet, &fluentd]( ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, @@ -88,6 +89,7 @@ namespace ix spdlog::info("Subscriber authenticated"); conn.subscribe(channel, filter, + position, [&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd]( const Json::Value& msg, const std::string& position) { if (!quiet)