cobra subscriber in fluentd mode insert a created_at timestamp entry
This commit is contained in:
		@@ -1,6 +1,10 @@
 | 
				
			|||||||
# Changelog
 | 
					# Changelog
 | 
				
			||||||
All changes to this project will be documented in this file.
 | 
					All changes to this project will be documented in this file.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## [9.2.6] - 2020-04-14
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(ixcobra) snake server / handle invalid incoming json messages + cobra subscriber in fluentd mode insert a created_at timestamp entry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## [9.2.5] - 2020-04-13
 | 
					## [9.2.5] - 2020-04-13
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(websocket) WebSocketMessagePtr is a unique_ptr instead of a shared_ptr
 | 
					(websocket) WebSocketMessagePtr is a unique_ptr instead of a shared_ptr
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -515,8 +515,10 @@ namespace ix
 | 
				
			|||||||
                //
 | 
					                //
 | 
				
			||||||
                if (ws.fin && _chunks.empty())
 | 
					                if (ws.fin && _chunks.empty())
 | 
				
			||||||
                {
 | 
					                {
 | 
				
			||||||
                    emitMessage(
 | 
					                    emitMessage(_fragmentedMessageKind,
 | 
				
			||||||
                        _fragmentedMessageKind, frameData, _receivedMessageCompressed, onMessageCallback);
 | 
					                                frameData,
 | 
				
			||||||
 | 
					                                _receivedMessageCompressed,
 | 
				
			||||||
 | 
					                                onMessageCallback);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    _receivedMessageCompressed = false;
 | 
					                    _receivedMessageCompressed = false;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,4 +6,4 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
#pragma once
 | 
					#pragma once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define IX_WEBSOCKET_VERSION "9.2.5"
 | 
					#define IX_WEBSOCKET_VERSION "9.2.6"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,8 +6,8 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
#include "IXTest.h"
 | 
					#include "IXTest.h"
 | 
				
			||||||
#include "catch.hpp"
 | 
					#include "catch.hpp"
 | 
				
			||||||
#include <iostream>
 | 
					 | 
				
			||||||
#include "msgpack11.hpp"
 | 
					#include "msgpack11.hpp"
 | 
				
			||||||
 | 
					#include <iostream>
 | 
				
			||||||
#include <ixwebsocket/IXSocket.h>
 | 
					#include <ixwebsocket/IXSocket.h>
 | 
				
			||||||
#include <ixwebsocket/IXSocketFactory.h>
 | 
					#include <ixwebsocket/IXSocketFactory.h>
 | 
				
			||||||
#include <ixwebsocket/IXWebSocket.h>
 | 
					#include <ixwebsocket/IXWebSocket.h>
 | 
				
			||||||
@@ -130,7 +130,8 @@ namespace
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
            else if (msg->type == ix::WebSocketMessageType::Error)
 | 
					            else if (msg->type == ix::WebSocketMessageType::Error)
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                ss << "websocket_broadcast_client: " << _user << " Error ! " << msg->errorInfo.reason;
 | 
					                ss << "websocket_broadcast_client: " << _user << " Error ! "
 | 
				
			||||||
 | 
					                   << msg->errorInfo.reason;
 | 
				
			||||||
                log(ss.str());
 | 
					                log(ss.str());
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            else if (msg->type == ix::WebSocketMessageType::Ping)
 | 
					            else if (msg->type == ix::WebSocketMessageType::Ping)
 | 
				
			||||||
@@ -234,7 +235,7 @@ namespace
 | 
				
			|||||||
        server.start();
 | 
					        server.start();
 | 
				
			||||||
        return true;
 | 
					        return true;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
} // namespace ix
 | 
					} // namespace
 | 
				
			||||||
 | 
					
 | 
				
			||||||
TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
 | 
					TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
@@ -247,7 +248,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        std::string session = ix::generateSessionId();
 | 
					        std::string session = ix::generateSessionId();
 | 
				
			||||||
        std::vector<std::shared_ptr<WebSocketChat>> chatClients;
 | 
					        std::vector<std::shared_ptr<WebSocketChat>> chatClients;
 | 
				
			||||||
        for (int i = 0 ; i < 10; ++i)
 | 
					        for (int i = 0; i < 10; ++i)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            std::string user("user_" + std::to_string(i));
 | 
					            std::string user("user_" + std::to_string(i));
 | 
				
			||||||
            chatClients.push_back(std::make_shared<WebSocketChat>(user, session, port));
 | 
					            chatClients.push_back(std::make_shared<WebSocketChat>(user, session, port));
 | 
				
			||||||
@@ -259,7 +260,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
 | 
				
			|||||||
        while (true)
 | 
					        while (true)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            bool allReady = true;
 | 
					            bool allReady = true;
 | 
				
			||||||
            for (size_t i = 0 ; i < chatClients.size(); ++i)
 | 
					            for (size_t i = 0; i < chatClients.size(); ++i)
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                allReady &= chatClients[i]->isReady();
 | 
					                allReady &= chatClients[i]->isReady();
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@@ -269,7 +270,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        for (int j = 0; j < 1000; j++)
 | 
					        for (int j = 0; j < 1000; j++)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            for (size_t i = 0 ; i < chatClients.size(); ++i)
 | 
					            for (size_t i = 0; i < chatClients.size(); ++i)
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                chatClients[i]->sendMessage("hello world");
 | 
					                chatClients[i]->sendMessage("hello world");
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@@ -291,7 +292,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        // Stop all clients
 | 
					        // Stop all clients
 | 
				
			||||||
        size_t messageCount = chatClients.size() * 50;
 | 
					        size_t messageCount = chatClients.size() * 50;
 | 
				
			||||||
        for (size_t i = 0 ; i < chatClients.size(); ++i)
 | 
					        for (size_t i = 0; i < chatClients.size(); ++i)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            REQUIRE(chatClients[i]->getReceivedMessagesCount() >= messageCount);
 | 
					            REQUIRE(chatClients[i]->getReceivedMessagesCount() >= messageCount);
 | 
				
			||||||
            chatClients[i]->stop();
 | 
					            chatClients[i]->stop();
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										11
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -268,8 +268,10 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    cobra2statsd->add_option("--port", statsdPort, "Statsd port");
 | 
					    cobra2statsd->add_option("--port", statsdPort, "Statsd port");
 | 
				
			||||||
    cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
 | 
					    cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
 | 
				
			||||||
    cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join();
 | 
					    cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join();
 | 
				
			||||||
    cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")->join();
 | 
					    cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")
 | 
				
			||||||
    cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")->join();
 | 
					        ->join();
 | 
				
			||||||
 | 
					    cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")
 | 
				
			||||||
 | 
					        ->join();
 | 
				
			||||||
    cobra2statsd->add_option("channel", channel, "Channel")->required();
 | 
					    cobra2statsd->add_option("channel", channel, "Channel")->required();
 | 
				
			||||||
    cobra2statsd->add_flag("-v", verbose, "Verbose");
 | 
					    cobra2statsd->add_flag("-v", verbose, "Verbose");
 | 
				
			||||||
    cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
 | 
					    cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
 | 
				
			||||||
@@ -449,7 +451,8 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
    else if (app.got_subcommand("cobra_subscribe"))
 | 
					    else if (app.got_subcommand("cobra_subscribe"))
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd, runtime);
 | 
					        ret = ix::ws_cobra_subscribe_main(
 | 
				
			||||||
 | 
					            cobraConfig, channel, filter, position, quiet, fluentd, runtime);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else if (app.got_subcommand("cobra_publish"))
 | 
					    else if (app.got_subcommand("cobra_publish"))
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
@@ -463,7 +466,7 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    {
 | 
					    {
 | 
				
			||||||
        if (!timer.empty() && !gauge.empty())
 | 
					        if (!timer.empty() && !gauge.empty())
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            spdlog::error("--gauge and --timer options are exclusive. " \
 | 
					            spdlog::error("--gauge and --timer options are exclusive. "
 | 
				
			||||||
                          "you can only supply one");
 | 
					                          "you can only supply one");
 | 
				
			||||||
            ret = 1;
 | 
					            ret = 1;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,6 +25,17 @@ namespace ix
 | 
				
			|||||||
        return jsonWriter;
 | 
					        return jsonWriter;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    std::string timeSinceEpoch()
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        std::chrono::system_clock::time_point tp = std::chrono::system_clock::now();
 | 
				
			||||||
 | 
					        std::chrono::system_clock::duration dtn = tp.time_since_epoch();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        std::stringstream ss;
 | 
				
			||||||
 | 
					        ss << dtn.count() * std::chrono::system_clock::period::num /
 | 
				
			||||||
 | 
					                  std::chrono::system_clock::period::den;
 | 
				
			||||||
 | 
					        return ss.str();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void writeToStdout(bool fluentd,
 | 
					    void writeToStdout(bool fluentd,
 | 
				
			||||||
                       const StreamWriterPtr& jsonWriter,
 | 
					                       const StreamWriterPtr& jsonWriter,
 | 
				
			||||||
                       const Json::Value& msg,
 | 
					                       const Json::Value& msg,
 | 
				
			||||||
@@ -36,12 +47,13 @@ namespace ix
 | 
				
			|||||||
            enveloppe["producer"] = "cobra";
 | 
					            enveloppe["producer"] = "cobra";
 | 
				
			||||||
            enveloppe["consumer"] = "fluentd";
 | 
					            enveloppe["consumer"] = "fluentd";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            Json::Value msgWithPosition(msg);
 | 
					            Json::Value nestedMessage(msg);
 | 
				
			||||||
            msgWithPosition["position"] = position;
 | 
					            nestedMessage["position"] = position;
 | 
				
			||||||
            enveloppe["message"] = msgWithPosition;
 | 
					            nestedMessage["created_at"] = timeSinceEpoch();
 | 
				
			||||||
 | 
					            enveloppe["message"] = nestedMessage;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            jsonWriter->write(enveloppe, &std::cout);
 | 
					            jsonWriter->write(enveloppe, &std::cout);
 | 
				
			||||||
            std::cout << std::endl;  // add lf and flush
 | 
					            std::cout << std::endl; // add lf and flush
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        else
 | 
					        else
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
@@ -184,7 +196,7 @@ namespace ix
 | 
				
			|||||||
        // Run for a duration, used by unittesting now
 | 
					        // Run for a duration, used by unittesting now
 | 
				
			||||||
        else
 | 
					        else
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            for (int i = 0 ; i < runtime; ++i)
 | 
					            for (int i = 0; i < runtime; ++i)
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                auto duration = std::chrono::seconds(1);
 | 
					                auto duration = std::chrono::seconds(1);
 | 
				
			||||||
                std::this_thread::sleep_for(duration);
 | 
					                std::this_thread::sleep_for(duration);
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user