(cobra client) can subscribe with a position

This commit is contained in:
Benjamin Sergeant 2020-03-13 16:06:13 -07:00
parent 332ffb0603
commit 9801ebdb36
16 changed files with 48 additions and 14 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [8.2.4] - 2020-03-13
(cobra client) can subscribe with a position
## [8.2.3] - 2020-03-13 ## [8.2.3] - 2020-03-13
(cobra client) pass the message position to the subscription data callback (cobra client) pass the message position to the subscription data callback

View File

@ -19,6 +19,7 @@ namespace ix
int cobra_to_sentry_bot(const CobraConfig& config, int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose,
bool strict, bool strict,
@ -37,7 +38,7 @@ namespace ix
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
QueueManager queueManager(maxQueueSize, stop); QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, &receivedCount, &stop] { auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop) while (!stop)
@ -173,6 +174,7 @@ namespace ix
conn.setEventCallback([&conn, conn.setEventCallback([&conn,
&channel, &channel,
&filter, &filter,
&position,
&jsonWriter, &jsonWriter,
verbose, verbose,
&throttled, &throttled,
@ -200,6 +202,7 @@ namespace ix
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
position,
[&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
const Json::Value& msg, const std::string& position) { const Json::Value& msg, const std::string& position) {
if (verbose) if (verbose)

View File

@ -14,6 +14,7 @@ namespace ix
int cobra_to_sentry_bot(const CobraConfig& config, int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose,
bool strict, bool strict,

View File

@ -62,6 +62,7 @@ namespace ix
int cobra_to_statsd_bot(const ix::CobraConfig& config, int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
const std::string& host, const std::string& host,
int port, int port,
const std::string& prefix, const std::string& prefix,
@ -80,7 +81,7 @@ namespace ix
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
size_t maxQueueSize = 1000; size_t maxQueueSize = 1000;
QueueManager queueManager(maxQueueSize, stop); QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, &receivedCount] { auto timer = [&sentCount, &receivedCount] {
while (true) while (true)
@ -153,7 +154,7 @@ namespace ix
std::thread t3(statsdSender); std::thread t3(statsdSender);
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount]( [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount](
ix::CobraConnectionEventType eventType, ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -177,6 +178,7 @@ namespace ix
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
position,
[&jsonWriter, &queueManager, verbose, &receivedCount]( [&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg, const std::string& position) { const Json::Value& msg, const std::string& position) {
if (verbose) if (verbose)

View File

@ -14,6 +14,7 @@ namespace ix
int cobra_to_statsd_bot(const ix::CobraConfig& config, int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
const std::string& host, const std::string& host,
int port, int port,
const std::string& prefix, const std::string& prefix,

View File

@ -7,7 +7,6 @@
#pragma once #pragma once
#include <stddef.h> #include <stddef.h>
#include <atomic>
#include <json/json.h> #include <json/json.h>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
@ -19,9 +18,8 @@ namespace ix
class QueueManager class QueueManager
{ {
public: public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop) QueueManager(size_t maxQueueSize)
: _maxQueueSize(maxQueueSize) : _maxQueueSize(maxQueueSize)
, _stop(stop)
{ {
} }
@ -33,6 +31,5 @@ namespace ix
std::mutex _mutex; std::mutex _mutex;
std::condition_variable _condition; std::condition_variable _condition;
size_t _maxQueueSize; size_t _maxQueueSize;
std::atomic<bool>& _stop;
}; };
} }

View File

@ -565,6 +565,7 @@ namespace ix
void CobraConnection::subscribe(const std::string& channel, void CobraConnection::subscribe(const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
SubscriptionCallback cb) SubscriptionCallback cb)
{ {
// Create and send a subscribe pdu // Create and send a subscribe pdu
@ -576,6 +577,11 @@ namespace ix
body["filter"] = filter; body["filter"] = filter;
} }
if (!position.empty())
{
body["position"] = position;
}
Json::Value pdu; Json::Value pdu;
pdu["action"] = "rtm/subscribe"; pdu["action"] = "rtm/subscribe";
pdu["body"] = body; pdu["body"] = body;

View File

@ -98,6 +98,7 @@ namespace ix
// message arrives. // message arrives.
void subscribe(const std::string& channel, void subscribe(const std::string& channel,
const std::string& filter = std::string(), const std::string& filter = std::string(),
const std::string& position = std::string(),
SubscriptionCallback cb = nullptr); SubscriptionCallback cb = nullptr);
/// Unsubscribe from a channel /// Unsubscribe from a channel

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "8.2.3" #define IX_WEBSOCKET_VERSION "8.2.4"

View File

@ -122,8 +122,10 @@ namespace
void CobraChat::subscribe(const std::string& channel) void CobraChat::subscribe(const std::string& channel)
{ {
std::string filter; std::string filter;
std::string position("$");
_conn.subscribe( _conn.subscribe(
channel, filter, [this](const Json::Value& msg, const std::string& /*position*/) { channel, filter, position, [this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString()); spdlog::info("receive {}", msg.toStyledString());
if (!msg.isObject()) return; if (!msg.isObject()) return;

View File

@ -93,8 +93,10 @@ namespace
{ {
log("Subscriber authenticated"); log("Subscriber authenticated");
std::string filter; std::string filter;
std::string position("$");
conn.subscribe( conn.subscribe(
CHANNEL, filter, [](const Json::Value& msg, const std::string& /*position*/) { CHANNEL, filter, position, [](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString()); log(msg.toStyledString());
std::string id = msg["id"].asString(); std::string id = msg["id"].asString();

View File

@ -153,6 +153,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]")
std::thread publisherThread(runPublisher, config, channel); std::thread publisherThread(runPublisher, config, channel);
std::string filter; std::string filter;
std::string position("$");
bool verbose = true; bool verbose = true;
bool strict = true; bool strict = true;
size_t maxQueueSize = 10; size_t maxQueueSize = 10;
@ -182,6 +183,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]")
int sentCount = cobra_to_sentry_bot(config, int sentCount = cobra_to_sentry_bot(config,
channel, channel,
filter, filter,
position,
sentryClient, sentryClient,
verbose, verbose,
strict, strict,

View File

@ -65,6 +65,7 @@ int main(int argc, char** argv)
std::string pidfile; std::string pidfile;
std::string channel; std::string channel;
std::string filter; std::string filter;
std::string position;
std::string message; std::string message;
std::string password; std::string password;
std::string prefix("ws.test.v0"); std::string prefix("ws.test.v0");
@ -229,6 +230,7 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_option("--channel", channel, "Channel")->required(); cobraSubscribeApp->add_option("--channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_option("--position", position, "Stream position");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
addTLSOptions(cobraSubscribeApp); addTLSOptions(cobraSubscribeApp);
@ -263,6 +265,7 @@ int main(int argc, char** argv)
cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_flag("-v", verbose, "Verbose");
cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
cobra2statsd->add_option("--position", position, "Stream position");
addTLSOptions(cobra2statsd); addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd); addCobraConfig(cobra2statsd);
@ -276,6 +279,7 @@ int main(int argc, char** argv)
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position");
addTLSOptions(cobra2sentry); addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry); addCobraConfig(cobra2sentry);
@ -284,6 +288,7 @@ int main(int argc, char** argv)
cobra2redisApp->add_option("channel", channel, "Channel")->required(); cobra2redisApp->add_option("channel", channel, "Channel")->required();
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file"); cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter"); cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
cobra2redisApp->add_option("--position", position, "Stream position");
cobra2redisApp->add_option("--hostname", hostname, "Redis hostname"); cobra2redisApp->add_option("--hostname", hostname, "Redis hostname");
cobra2redisApp->add_option("--port", redisPort, "Redis port"); cobra2redisApp->add_option("--port", redisPort, "Redis port");
cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats"); cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
@ -429,7 +434,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("cobra_subscribe")) else if (app.got_subcommand("cobra_subscribe"))
{ {
ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, quiet, fluentd); ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd);
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
{ {
@ -442,7 +447,7 @@ int main(int argc, char** argv)
else if (app.got_subcommand("cobra_to_statsd")) else if (app.got_subcommand("cobra_to_statsd"))
{ {
ret = ix::cobra_to_statsd_bot( ret = ix::cobra_to_statsd_bot(
cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose); cobraConfig, channel, filter, position, hostname, statsdPort, prefix, fields, verbose);
} }
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
{ {
@ -453,6 +458,7 @@ int main(int argc, char** argv)
ret = ix::cobra_to_sentry_bot(cobraConfig, ret = ix::cobra_to_sentry_bot(cobraConfig,
channel, channel,
filter, filter,
position,
sentryClient, sentryClient,
verbose, verbose,
strict, strict,
@ -462,7 +468,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))
{ {
ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, hostname, redisPort); ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, position, hostname, redisPort);
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {

View File

@ -78,6 +78,7 @@ namespace ix
int ws_cobra_subscribe_main(const ix::CobraConfig& config, int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
bool quiet, bool quiet,
bool fluentd); bool fluentd);
@ -93,6 +94,7 @@ namespace ix
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config, int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
const std::string& host, const std::string& host,
int port); int port);

View File

@ -20,6 +20,7 @@ namespace ix
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config, int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
const std::string& host, const std::string& host,
int port) int port)
{ {
@ -100,6 +101,7 @@ namespace ix
conn.setEventCallback([&conn, conn.setEventCallback([&conn,
&channel, &channel,
&filter, &filter,
&position,
&msgCount, &msgCount,
&msgPerSeconds, &msgPerSeconds,
&conditionVariableMutex, &conditionVariableMutex,
@ -125,6 +127,7 @@ namespace ix
conn.subscribe( conn.subscribe(
channel, channel,
filter, filter,
position,
[&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue]( [&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue](
const Json::Value& msg, const std::string& /*position*/) { const Json::Value& msg, const std::string& /*position*/) {
{ {

View File

@ -41,6 +41,7 @@ namespace ix
int ws_cobra_subscribe_main(const ix::CobraConfig& config, int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position,
bool quiet, bool quiet,
bool fluentd) bool fluentd)
{ {
@ -68,7 +69,7 @@ namespace ix
std::thread t(timer); std::thread t(timer);
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet, &fluentd]( [&conn, &channel, &jsonWriter, &filter, &position, &msgCount, &msgPerSeconds, &quiet, &fluentd](
ix::CobraConnectionEventType eventType, ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -88,6 +89,7 @@ namespace ix
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
position,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd]( [&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd](
const Json::Value& msg, const std::string& position) { const Json::Value& msg, const std::string& position) {
if (!quiet) if (!quiet)