diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ffcf1dcf..37bf5116 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [8.1.9] - 2020-03-09 + +(ws cobra_subscribe) add a --fluentd option to wrap a message in an enveloppe so that fluentd can recognize it + ## [8.1.8] - 2020-03-02 (websocket server) fix regression with disabling zlib extension on the server side. If a client does not support this extension the server will handle it fine. We still need to figure out how to disable the option. diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index f0a51af4..21f3f444 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "8.1.7" +#define IX_WEBSOCKET_VERSION "8.1.9" diff --git a/ws/ws.cpp b/ws/ws.cpp index 05c57e73..96fa8e27 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -88,6 +88,7 @@ int main(int argc, char** argv) bool verbose = false; bool save = false; bool quiet = false; + bool fluentd = false; bool compress = false; bool strict = false; bool stress = false; @@ -227,6 +228,7 @@ int main(int argc, char** argv) cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); + cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); addTLSOptions(cobraSubscribeApp); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); @@ -437,7 +439,7 @@ int main(int argc, char** argv) else if (app.got_subcommand("cobra_subscribe")) { ret = ix::ws_cobra_subscribe_main( - appkey, endpoint, rolename, rolesecret, channel, filter, quiet, tlsOptions); + appkey, endpoint, rolename, rolesecret, channel, filter, quiet, fluentd, tlsOptions); } else if (app.got_subcommand("cobra_publish")) { diff --git a/ws/ws.h b/ws/ws.h index 007793fa..b281e1fb 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -81,6 +81,7 @@ namespace ix const std::string& channel, const std::string& filter, bool quiet, + bool fluentd, const ix::SocketTLSOptions& tlsOptions); int ws_cobra_publish_main(const std::string& appkey, diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index 6661d616..4a833ea6 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -13,6 +14,23 @@ namespace ix { + void writeToStdout(bool fluentd, Json::FastWriter& jsonWriter, const Json::Value& msg) + { + Json::Value enveloppe; + if (fluentd) + { + enveloppe["producer"] = "cobra"; + enveloppe["consumer"] = "fluentd"; + enveloppe["message"] = msg; + } + else + { + enveloppe = msg; + } + + std::cout << jsonWriter.write(enveloppe); + } + int ws_cobra_subscribe_main(const std::string& appkey, const std::string& endpoint, const std::string& rolename, @@ -20,6 +38,7 @@ namespace ix const std::string& channel, const std::string& filter, bool quiet, + bool fluentd, const ix::SocketTLSOptions& tlsOptions) { ix::CobraConnection conn; @@ -51,7 +70,7 @@ namespace ix std::thread t(timer); conn.setEventCallback( - [&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet]( + [&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet, &fluentd]( ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, @@ -69,18 +88,18 @@ namespace ix else if (eventType == ix::CobraConnection_EventType_Authenticated) { spdlog::info("Subscriber authenticated"); - conn.subscribe( - channel, - filter, - [&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) { - if (!quiet) - { - spdlog::info(jsonWriter.write(msg)); - } + conn.subscribe(channel, + filter, + [&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd]( + const Json::Value& msg) { + if (!quiet) + { + writeToStdout(fluentd, jsonWriter, msg); + } - msgPerSeconds++; - msgCount++; - }); + msgPerSeconds++; + msgCount++; + }); } else if (eventType == ix::CobraConnection_EventType_Subscribed) { diff --git a/ws/ws_connect.cpp b/ws/ws_connect.cpp index ca0d399d..7598a669 100644 --- a/ws/ws_connect.cpp +++ b/ws/ws_connect.cpp @@ -30,8 +30,14 @@ namespace ix void start(); void stop(); - int getSentBytes() { return _sentBytes; } - int getReceivedBytes() { return _receivedBytes; } + int getSentBytes() + { + return _sentBytes; + } + int getReceivedBytes() + { + return _receivedBytes; + } void sendMessage(const std::string& text); @@ -76,19 +82,16 @@ namespace ix _webSocket.addSubProtocol(subprotocol); } - WebSocket::setTrafficTrackerCallback( - [this](int size, bool incoming) + WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) { + if (incoming) { - if (incoming) - { - _receivedBytes += size; - } - else - { - _sentBytes += size; - } + _receivedBytes += size; } - ); + else + { + _sentBytes += size; + } + }); } void WebSocketConnect::log(const std::string& msg)