(cobra client) pass the message position to the subscription data callback

This commit is contained in:
Benjamin Sergeant 2020-03-13 12:49:37 -07:00
parent 90df3d1805
commit 332ffb0603
11 changed files with 59 additions and 43 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [8.2.3] - 2020-03-13
(cobra client) pass the message position to the subscription data callback
## [8.2.2] - 2020-03-12 ## [8.2.2] - 2020-03-12
(openssl tls backend) Fix a hand in OpenSSL when using TLS v1.3 ... by disabling TLS v1.3 (openssl tls backend) Fix a hand in OpenSSL when using TLS v1.3 ... by disabling TLS v1.3

View File

@ -201,10 +201,10 @@ namespace ix
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
const Json::Value& msg) { const Json::Value& msg, const std::string& position) {
if (verbose) if (verbose)
{ {
spdlog::info("Subscriber received message -> {}", jsonWriter.write(msg)); spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
} }
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message

View File

@ -178,10 +178,10 @@ namespace ix
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, &queueManager, verbose, &receivedCount]( [&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg) { const Json::Value& msg, const std::string& position) {
if (verbose) if (verbose)
{ {
spdlog::info(jsonWriter.write(msg)); spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
} }
receivedCount++; receivedCount++;

View File

@ -441,9 +441,12 @@ namespace ix
if (!body.isMember("messages")) return false; if (!body.isMember("messages")) return false;
Json::Value messages = body["messages"]; Json::Value messages = body["messages"];
if (!body.isMember("position")) return false;
std::string position = body["position"].asString();
for (auto&& msg : messages) for (auto&& msg : messages)
{ {
cb->second(msg); cb->second(msg, position);
} }
return true; return true;

View File

@ -42,7 +42,7 @@ namespace ix
CobraConnection_PublishMode_Batch = 1 CobraConnection_PublishMode_Batch = 1
}; };
using SubscriptionCallback = std::function<void(const Json::Value&)>; using SubscriptionCallback = std::function<void(const Json::Value&, const std::string&)>;
using EventCallback = std::function<void(CobraConnectionEventType, using EventCallback = std::function<void(CobraConnectionEventType,
const std::string&, const std::string&,
const WebSocketHttpHeaders&, const WebSocketHttpHeaders&,

View File

@ -189,7 +189,7 @@ namespace snake
nlohmann::json response = { nlohmann::json response = {
{"action", "rtm/subscription/data"}, {"action", "rtm/subscription/data"},
{"id", id++}, {"id", id++},
{"body", {{"subscription_id", subscriptionId}, {"messages", {msg}}}}}; {"body", {{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
ws->sendText(response.dump()); ws->sendText(response.dump());
}; };

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "8.2.2" #define IX_WEBSOCKET_VERSION "8.2.3"

View File

@ -122,7 +122,8 @@ namespace
void CobraChat::subscribe(const std::string& channel) void CobraChat::subscribe(const std::string& channel)
{ {
std::string filter; std::string filter;
_conn.subscribe(channel, filter, [this](const Json::Value& msg) { _conn.subscribe(
channel, filter, [this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString()); spdlog::info("receive {}", msg.toStyledString());
if (!msg.isObject()) return; if (!msg.isObject()) return;

View File

@ -93,7 +93,8 @@ namespace
{ {
log("Subscriber authenticated"); log("Subscriber authenticated");
std::string filter; std::string filter;
conn.subscribe(CHANNEL, filter, [](const Json::Value& msg) { conn.subscribe(
CHANNEL, filter, [](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString()); log(msg.toStyledString());
std::string id = msg["id"].asString(); std::string id = msg["id"].asString();

View File

@ -126,7 +126,7 @@ namespace ix
channel, channel,
filter, filter,
[&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue]( [&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue](
const Json::Value& msg) { const Json::Value& msg, const std::string& /*position*/) {
{ {
std::unique_lock<std::mutex> lock(conditionVariableMutex); std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg); queue.push(msg);

View File

@ -14,21 +14,28 @@
namespace ix namespace ix
{ {
void writeToStdout(bool fluentd, Json::FastWriter& jsonWriter, const Json::Value& msg) void writeToStdout(bool fluentd,
Json::FastWriter& jsonWriter,
const Json::Value& msg,
const std::string& position)
{ {
Json::Value enveloppe; Json::Value enveloppe;
if (fluentd) if (fluentd)
{ {
enveloppe["producer"] = "cobra"; enveloppe["producer"] = "cobra";
enveloppe["consumer"] = "fluentd"; enveloppe["consumer"] = "fluentd";
enveloppe["message"] = msg;
Json::Value msgWithPosition(msg);
msgWithPosition["position"] = position;
enveloppe["message"] = msgWithPosition;
std::cout << jsonWriter.write(enveloppe);
} }
else else
{ {
enveloppe = msg; enveloppe = msg;
std::cout << position << " " << jsonWriter.write(enveloppe);
} }
std::cout << jsonWriter.write(enveloppe);
} }
int ws_cobra_subscribe_main(const ix::CobraConfig& config, int ws_cobra_subscribe_main(const ix::CobraConfig& config,
@ -82,10 +89,10 @@ namespace ix
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd]( [&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd](
const Json::Value& msg) { const Json::Value& msg, const std::string& position) {
if (!quiet) if (!quiet)
{ {
writeToStdout(fluentd, jsonWriter, msg); writeToStdout(fluentd, jsonWriter, msg, position);
} }
msgPerSeconds++; msgPerSeconds++;