(ws cobra_subscribe) add a --fluentd option to wrap a message in an enveloppe so that fluentd can recognize it

This commit is contained in:
Benjamin Sergeant 2020-03-09 15:25:43 -07:00
parent 4ef04b8339
commit b1f30bb40f
6 changed files with 56 additions and 27 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [8.1.9] - 2020-03-09
(ws cobra_subscribe) add a --fluentd option to wrap a message in an enveloppe so that fluentd can recognize it
## [8.1.8] - 2020-03-02 ## [8.1.8] - 2020-03-02
(websocket server) fix regression with disabling zlib extension on the server side. If a client does not support this extension the server will handle it fine. We still need to figure out how to disable the option. (websocket server) fix regression with disabling zlib extension on the server side. If a client does not support this extension the server will handle it fine. We still need to figure out how to disable the option.

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "8.1.7" #define IX_WEBSOCKET_VERSION "8.1.9"

View File

@ -88,6 +88,7 @@ int main(int argc, char** argv)
bool verbose = false; bool verbose = false;
bool save = false; bool save = false;
bool quiet = false; bool quiet = false;
bool fluentd = false;
bool compress = false; bool compress = false;
bool strict = false; bool strict = false;
bool stress = false; bool stress = false;
@ -227,6 +228,7 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
addTLSOptions(cobraSubscribeApp); addTLSOptions(cobraSubscribeApp);
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
@ -437,7 +439,7 @@ int main(int argc, char** argv)
else if (app.got_subcommand("cobra_subscribe")) else if (app.got_subcommand("cobra_subscribe"))
{ {
ret = ix::ws_cobra_subscribe_main( ret = ix::ws_cobra_subscribe_main(
appkey, endpoint, rolename, rolesecret, channel, filter, quiet, tlsOptions); appkey, endpoint, rolename, rolesecret, channel, filter, quiet, fluentd, tlsOptions);
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
{ {

View File

@ -81,6 +81,7 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
bool quiet, bool quiet,
bool fluentd,
const ix::SocketTLSOptions& tlsOptions); const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_publish_main(const std::string& appkey, int ws_cobra_publish_main(const std::string& appkey,

View File

@ -6,6 +6,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -13,6 +14,23 @@
namespace ix namespace ix
{ {
void writeToStdout(bool fluentd, Json::FastWriter& jsonWriter, const Json::Value& msg)
{
Json::Value enveloppe;
if (fluentd)
{
enveloppe["producer"] = "cobra";
enveloppe["consumer"] = "fluentd";
enveloppe["message"] = msg;
}
else
{
enveloppe = msg;
}
std::cout << jsonWriter.write(enveloppe);
}
int ws_cobra_subscribe_main(const std::string& appkey, int ws_cobra_subscribe_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
@ -20,6 +38,7 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
bool quiet, bool quiet,
bool fluentd,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
@ -51,7 +70,7 @@ namespace ix
std::thread t(timer); std::thread t(timer);
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet]( [&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet, &fluentd](
ix::CobraConnectionEventType eventType, ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -69,13 +88,13 @@ namespace ix
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe( conn.subscribe(channel,
channel,
filter, filter,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) { [&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd](
const Json::Value& msg) {
if (!quiet) if (!quiet)
{ {
spdlog::info(jsonWriter.write(msg)); writeToStdout(fluentd, jsonWriter, msg);
} }
msgPerSeconds++; msgPerSeconds++;

View File

@ -30,8 +30,14 @@ namespace ix
void start(); void start();
void stop(); void stop();
int getSentBytes() { return _sentBytes; } int getSentBytes()
int getReceivedBytes() { return _receivedBytes; } {
return _sentBytes;
}
int getReceivedBytes()
{
return _receivedBytes;
}
void sendMessage(const std::string& text); void sendMessage(const std::string& text);
@ -76,9 +82,7 @@ namespace ix
_webSocket.addSubProtocol(subprotocol); _webSocket.addSubProtocol(subprotocol);
} }
WebSocket::setTrafficTrackerCallback( WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) {
[this](int size, bool incoming)
{
if (incoming) if (incoming)
{ {
_receivedBytes += size; _receivedBytes += size;
@ -87,8 +91,7 @@ namespace ix
{ {
_sentBytes += size; _sentBytes += size;
} }
} });
);
} }
void WebSocketConnect::log(const std::string& msg) void WebSocketConnect::log(const std::string& msg)