diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b3497c6b..2628f1fe 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [8.2.3] - 2020-03-13 + +(cobra client) pass the message position to the subscription data callback + ## [8.2.2] - 2020-03-12 (openssl tls backend) Fix a hand in OpenSSL when using TLS v1.3 ... by disabling TLS v1.3 diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp index 76069c12..bca4b6de 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.cpp +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -201,10 +201,10 @@ namespace ix conn.subscribe(channel, filter, [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager]( - const Json::Value& msg) { + const Json::Value& msg, const std::string& position) { if (verbose) { - spdlog::info("Subscriber received message -> {}", jsonWriter.write(msg)); + spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg)); } // If we cannot send to sentry fast enough, drop the message diff --git a/ixbots/ixbots/IXCobraToStatsdBot.cpp b/ixbots/ixbots/IXCobraToStatsdBot.cpp index 17f8e985..fa9cb5ad 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.cpp +++ b/ixbots/ixbots/IXCobraToStatsdBot.cpp @@ -178,10 +178,10 @@ namespace ix conn.subscribe(channel, filter, [&jsonWriter, &queueManager, verbose, &receivedCount]( - const Json::Value& msg) { + const Json::Value& msg, const std::string& position) { if (verbose) { - spdlog::info(jsonWriter.write(msg)); + spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg)); } receivedCount++; diff --git a/ixcobra/ixcobra/IXCobraConnection.cpp b/ixcobra/ixcobra/IXCobraConnection.cpp index 93a3148f..f48527ab 100644 --- a/ixcobra/ixcobra/IXCobraConnection.cpp +++ b/ixcobra/ixcobra/IXCobraConnection.cpp @@ -441,9 +441,12 @@ namespace ix if (!body.isMember("messages")) return false; Json::Value messages = body["messages"]; + if (!body.isMember("position")) return false; + std::string position = body["position"].asString(); + for (auto&& msg : messages) { - cb->second(msg); + cb->second(msg, position); } return true; diff --git a/ixcobra/ixcobra/IXCobraConnection.h b/ixcobra/ixcobra/IXCobraConnection.h index 316630fc..67fd2086 100644 --- a/ixcobra/ixcobra/IXCobraConnection.h +++ b/ixcobra/ixcobra/IXCobraConnection.h @@ -42,7 +42,7 @@ namespace ix CobraConnection_PublishMode_Batch = 1 }; - using SubscriptionCallback = std::function; + using SubscriptionCallback = std::function; using EventCallback = std::functionsendText(response.dump()); }; diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 08c64420..585863f1 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "8.2.2" +#define IX_WEBSOCKET_VERSION "8.2.3" diff --git a/test/IXCobraChatTest.cpp b/test/IXCobraChatTest.cpp index e88c8f53..e2c1c1f2 100644 --- a/test/IXCobraChatTest.cpp +++ b/test/IXCobraChatTest.cpp @@ -122,31 +122,32 @@ namespace void CobraChat::subscribe(const std::string& channel) { std::string filter; - _conn.subscribe(channel, filter, [this](const Json::Value& msg) { - spdlog::info("receive {}", msg.toStyledString()); + _conn.subscribe( + channel, filter, [this](const Json::Value& msg, const std::string& /*position*/) { + spdlog::info("receive {}", msg.toStyledString()); - if (!msg.isObject()) return; - if (!msg.isMember("user")) return; - if (!msg.isMember("text")) return; - if (!msg.isMember("session")) return; + if (!msg.isObject()) return; + if (!msg.isMember("user")) return; + if (!msg.isMember("text")) return; + if (!msg.isMember("session")) return; - std::string msg_user = msg["user"].asString(); - std::string msg_text = msg["text"].asString(); - std::string msg_session = msg["session"].asString(); + std::string msg_user = msg["user"].asString(); + std::string msg_text = msg["text"].asString(); + std::string msg_session = msg["session"].asString(); - // We are not interested in messages - // from a different session. - if (msg_session != _session) return; + // We are not interested in messages + // from a different session. + if (msg_session != _session) return; - // We are not interested in our own messages - if (msg_user == _user) return; + // We are not interested in our own messages + if (msg_user == _user) return; - _receivedQueue.push(msg); + _receivedQueue.push(msg); - std::stringstream ss; - ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > "; - log(ss.str()); - }); + std::stringstream ss; + ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > "; + log(ss.str()); + }); } void CobraChat::sendMessage(const std::string& text) diff --git a/test/IXCobraMetricsPublisherTest.cpp b/test/IXCobraMetricsPublisherTest.cpp index 4075cac5..c08c40de 100644 --- a/test/IXCobraMetricsPublisherTest.cpp +++ b/test/IXCobraMetricsPublisherTest.cpp @@ -93,17 +93,18 @@ namespace { log("Subscriber authenticated"); std::string filter; - conn.subscribe(CHANNEL, filter, [](const Json::Value& msg) { - log(msg.toStyledString()); + conn.subscribe( + CHANNEL, filter, [](const Json::Value& msg, const std::string& /*position*/) { + log(msg.toStyledString()); - std::string id = msg["id"].asString(); - { - std::lock_guard guard(gProtectIds); - gIds.insert(id); - } + std::string id = msg["id"].asString(); + { + std::lock_guard guard(gProtectIds); + gIds.insert(id); + } - gMessageCount++; - }); + gMessageCount++; + }); } else if (eventType == ix::CobraConnection_EventType_Subscribed) { diff --git a/ws/ws_cobra_metrics_to_redis.cpp b/ws/ws_cobra_metrics_to_redis.cpp index da6439dc..67e5a101 100644 --- a/ws/ws_cobra_metrics_to_redis.cpp +++ b/ws/ws_cobra_metrics_to_redis.cpp @@ -126,7 +126,7 @@ namespace ix channel, filter, [&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue]( - const Json::Value& msg) { + const Json::Value& msg, const std::string& /*position*/) { { std::unique_lock lock(conditionVariableMutex); queue.push(msg); diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index ec3e4265..0066dc6b 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -14,21 +14,28 @@ namespace ix { - void writeToStdout(bool fluentd, Json::FastWriter& jsonWriter, const Json::Value& msg) + void writeToStdout(bool fluentd, + Json::FastWriter& jsonWriter, + const Json::Value& msg, + const std::string& position) { Json::Value enveloppe; if (fluentd) { enveloppe["producer"] = "cobra"; enveloppe["consumer"] = "fluentd"; - enveloppe["message"] = msg; + + Json::Value msgWithPosition(msg); + msgWithPosition["position"] = position; + enveloppe["message"] = msgWithPosition; + + std::cout << jsonWriter.write(enveloppe); } else { enveloppe = msg; + std::cout << position << " " << jsonWriter.write(enveloppe); } - - std::cout << jsonWriter.write(enveloppe); } int ws_cobra_subscribe_main(const ix::CobraConfig& config, @@ -82,10 +89,10 @@ namespace ix conn.subscribe(channel, filter, [&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd]( - const Json::Value& msg) { + const Json::Value& msg, const std::string& position) { if (!quiet) { - writeToStdout(fluentd, jsonWriter, msg); + writeToStdout(fluentd, jsonWriter, msg, position); } msgPerSeconds++;