diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 498f38e1..1c28be85 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.2.6] - 2020-04-14 + +(ixcobra) snake server / handle invalid incoming json messages + cobra subscriber in fluentd mode insert a created_at timestamp entry + ## [9.2.5] - 2020-04-13 (websocket) WebSocketMessagePtr is a unique_ptr instead of a shared_ptr diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 77300e1a..0a401d3d 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -515,8 +515,10 @@ namespace ix // if (ws.fin && _chunks.empty()) { - emitMessage( - _fragmentedMessageKind, frameData, _receivedMessageCompressed, onMessageCallback); + emitMessage(_fragmentedMessageKind, + frameData, + _receivedMessageCompressed, + onMessageCallback); _receivedMessageCompressed = false; } diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index c37b4448..503fc49e 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.2.5" +#define IX_WEBSOCKET_VERSION "9.2.6" diff --git a/test/IXWebSocketBroadcastTest.cpp b/test/IXWebSocketBroadcastTest.cpp index fed38830..879df7a2 100644 --- a/test/IXWebSocketBroadcastTest.cpp +++ b/test/IXWebSocketBroadcastTest.cpp @@ -6,8 +6,8 @@ #include "IXTest.h" #include "catch.hpp" -#include #include "msgpack11.hpp" +#include #include #include #include @@ -130,7 +130,8 @@ namespace } else if (msg->type == ix::WebSocketMessageType::Error) { - ss << "websocket_broadcast_client: " << _user << " Error ! " << msg->errorInfo.reason; + ss << "websocket_broadcast_client: " << _user << " Error ! " + << msg->errorInfo.reason; log(ss.str()); } else if (msg->type == ix::WebSocketMessageType::Ping) @@ -234,7 +235,7 @@ namespace server.start(); return true; } -} // namespace ix +} // namespace TEST_CASE("Websocket_broadcast_server", "[websocket_server]") { @@ -247,7 +248,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") std::string session = ix::generateSessionId(); std::vector> chatClients; - for (int i = 0 ; i < 10; ++i) + for (int i = 0; i < 10; ++i) { std::string user("user_" + std::to_string(i)); chatClients.push_back(std::make_shared(user, session, port)); @@ -259,7 +260,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") while (true) { bool allReady = true; - for (size_t i = 0 ; i < chatClients.size(); ++i) + for (size_t i = 0; i < chatClients.size(); ++i) { allReady &= chatClients[i]->isReady(); } @@ -269,7 +270,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") for (int j = 0; j < 1000; j++) { - for (size_t i = 0 ; i < chatClients.size(); ++i) + for (size_t i = 0; i < chatClients.size(); ++i) { chatClients[i]->sendMessage("hello world"); } @@ -291,7 +292,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]") // Stop all clients size_t messageCount = chatClients.size() * 50; - for (size_t i = 0 ; i < chatClients.size(); ++i) + for (size_t i = 0; i < chatClients.size(); ++i) { REQUIRE(chatClients[i]->getReceivedMessagesCount() >= messageCount); chatClients[i]->stop(); diff --git a/ws/ws.cpp b/ws/ws.cpp index f728913e..03afe333 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -268,8 +268,10 @@ int main(int argc, char** argv) cobra2statsd->add_option("--port", statsdPort, "Statsd port"); cobra2statsd->add_option("--prefix", prefix, "Statsd prefix"); cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join(); - cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")->join(); - cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")->join(); + cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge") + ->join(); + cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer") + ->join(); cobra2statsd->add_option("channel", channel, "Channel")->required(); cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); @@ -449,7 +451,8 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_subscribe")) { - ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd, runtime); + ret = ix::ws_cobra_subscribe_main( + cobraConfig, channel, filter, position, quiet, fluentd, runtime); } else if (app.got_subcommand("cobra_publish")) { @@ -463,7 +466,7 @@ int main(int argc, char** argv) { if (!timer.empty() && !gauge.empty()) { - spdlog::error("--gauge and --timer options are exclusive. " \ + spdlog::error("--gauge and --timer options are exclusive. " "you can only supply one"); ret = 1; } diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index aed7d67d..00a3e2db 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -25,6 +25,17 @@ namespace ix return jsonWriter; } + std::string timeSinceEpoch() + { + std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); + std::chrono::system_clock::duration dtn = tp.time_since_epoch(); + + std::stringstream ss; + ss << dtn.count() * std::chrono::system_clock::period::num / + std::chrono::system_clock::period::den; + return ss.str(); + } + void writeToStdout(bool fluentd, const StreamWriterPtr& jsonWriter, const Json::Value& msg, @@ -36,12 +47,13 @@ namespace ix enveloppe["producer"] = "cobra"; enveloppe["consumer"] = "fluentd"; - Json::Value msgWithPosition(msg); - msgWithPosition["position"] = position; - enveloppe["message"] = msgWithPosition; + Json::Value nestedMessage(msg); + nestedMessage["position"] = position; + nestedMessage["created_at"] = timeSinceEpoch(); + enveloppe["message"] = nestedMessage; jsonWriter->write(enveloppe, &std::cout); - std::cout << std::endl; // add lf and flush + std::cout << std::endl; // add lf and flush } else { @@ -184,7 +196,7 @@ namespace ix // Run for a duration, used by unittesting now else { - for (int i = 0 ; i < runtime; ++i) + for (int i = 0; i < runtime; ++i) { auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration);