(redis cobra bots) update the cobra to redis bot to use the bot framework, and change it to report fps metrics into redis streams.
This commit is contained in:
		@@ -251,6 +251,7 @@ if (USE_WS OR USE_TEST)
 | 
			
		||||
  add_subdirectory(ixcore)
 | 
			
		||||
  add_subdirectory(ixcrypto)
 | 
			
		||||
  add_subdirectory(ixcobra)
 | 
			
		||||
  add_subdirectory(ixredis)
 | 
			
		||||
  add_subdirectory(ixsnake)
 | 
			
		||||
  add_subdirectory(ixsentry)
 | 
			
		||||
  add_subdirectory(ixbots)
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,10 @@
 | 
			
		||||
# Changelog
 | 
			
		||||
All changes to this project will be documented in this file.
 | 
			
		||||
 | 
			
		||||
## [9.6.9] - 2020-06-10
 | 
			
		||||
 | 
			
		||||
(redis cobra bots) update the cobra to redis bot to use the bot framework, and change it to report fps metrics into redis streams.
 | 
			
		||||
 | 
			
		||||
## [9.6.6] - 2020-06-04
 | 
			
		||||
 | 
			
		||||
(statsd cobra bots) statsd improvement: prefix does not need a dot as a suffix, message size can be larger than 256 bytes, error handling was invalid, use core logger for logging instead of std::cerr
 | 
			
		||||
 
 | 
			
		||||
@@ -9,6 +9,7 @@ set (IXBOTS_SOURCES
 | 
			
		||||
    ixbots/IXCobraToStatsdBot.cpp
 | 
			
		||||
    ixbots/IXCobraToStdoutBot.cpp
 | 
			
		||||
    ixbots/IXCobraMetricsToStatsdBot.cpp
 | 
			
		||||
    ixbots/IXCobraMetricsToRedisBot.cpp
 | 
			
		||||
    ixbots/IXStatsdClient.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -19,6 +20,7 @@ set (IXBOTS_HEADERS
 | 
			
		||||
    ixbots/IXCobraToStatsdBot.h
 | 
			
		||||
    ixbots/IXCobraToStdoutBot.h
 | 
			
		||||
    ixbots/IXCobraMetricsToStatsdBot.h
 | 
			
		||||
    ixbots/IXCobraMetricsToRedisBot.h
 | 
			
		||||
    ixbots/IXStatsdClient.h
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -38,6 +40,7 @@ set(IXBOTS_INCLUDE_DIRS
 | 
			
		||||
    ../ixcore
 | 
			
		||||
    ../ixwebsocket
 | 
			
		||||
    ../ixcobra
 | 
			
		||||
    ../ixredis
 | 
			
		||||
    ../ixsentry
 | 
			
		||||
    ${JSONCPP_INCLUDE_DIRS}
 | 
			
		||||
    ${SPDLOG_INCLUDE_DIRS})
 | 
			
		||||
 
 | 
			
		||||
@@ -292,4 +292,22 @@ namespace ix
 | 
			
		||||
    {
 | 
			
		||||
        _onBotMessageCallback = callback;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::string CobraBot::getDeviceIdentifier(const Json::Value& msg)
 | 
			
		||||
    {
 | 
			
		||||
        std::string deviceId("na");
 | 
			
		||||
 | 
			
		||||
        auto osName = msg["device"]["os_name"];
 | 
			
		||||
        if (osName == "Android")
 | 
			
		||||
        {
 | 
			
		||||
            deviceId = msg["device"]["model"].asString();
 | 
			
		||||
        }
 | 
			
		||||
        else if (osName == "iOS")
 | 
			
		||||
        {
 | 
			
		||||
            deviceId = msg["device"]["hardware_model"].asString();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return deviceId;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,8 @@ namespace ix
 | 
			
		||||
        int64_t run(const CobraBotConfig& botConfig);
 | 
			
		||||
        void setOnBotMessageCallback(const OnBotMessageCallback& callback);
 | 
			
		||||
 | 
			
		||||
        std::string getDeviceIdentifier(const Json::Value& msg);
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        OnBotMessageCallback _onBotMessageCallback;
 | 
			
		||||
    };
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										118
									
								
								ixbots/ixbots/IXCobraMetricsToRedisBot.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										118
									
								
								ixbots/ixbots/IXCobraMetricsToRedisBot.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,118 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXCobraMetricsToRedisBot.cpp
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "IXCobraMetricsToRedisBot.h"
 | 
			
		||||
 | 
			
		||||
#include "IXCobraBot.h"
 | 
			
		||||
#include "IXStatsdClient.h"
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <ixcobra/IXCobraConnection.h>
 | 
			
		||||
#include <ixcore/utils/IXCoreLogger.h>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
#include <vector>
 | 
			
		||||
#include <algorithm>
 | 
			
		||||
#include <map>
 | 
			
		||||
#include <cctype>
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
namespace
 | 
			
		||||
{
 | 
			
		||||
    std::string removeSpaces(const std::string& str)
 | 
			
		||||
    {
 | 
			
		||||
        std::string out(str);
 | 
			
		||||
        out.erase(
 | 
			
		||||
            std::remove_if(out.begin(), out.end(), [](unsigned char x) { return std::isspace(x); }),
 | 
			
		||||
            out.end());
 | 
			
		||||
 | 
			
		||||
        return out;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    bool processPerfMetricsEventSlowFrames(const Json::Value& msg,
 | 
			
		||||
                                           RedisClient& redisClient,
 | 
			
		||||
                                           const std::string& deviceId)
 | 
			
		||||
    {
 | 
			
		||||
        auto frameRateHistogramCounts = msg["data"]["FrameRateHistogramCounts"];
 | 
			
		||||
 | 
			
		||||
        int slowFrames = 0;
 | 
			
		||||
        slowFrames += frameRateHistogramCounts[4].asInt();
 | 
			
		||||
        slowFrames += frameRateHistogramCounts[5].asInt();
 | 
			
		||||
        slowFrames += frameRateHistogramCounts[6].asInt();
 | 
			
		||||
        slowFrames += frameRateHistogramCounts[7].asInt();
 | 
			
		||||
 | 
			
		||||
        std::stringstream ss;
 | 
			
		||||
        ss << msg["id"].asString() << "_slow_frames" << "."
 | 
			
		||||
           << msg["device"]["game"].asString() << "."
 | 
			
		||||
           << msg["device"]["os_name"].asString() << "."
 | 
			
		||||
           << removeSpaces(msg["data"]["Tag"].asString());
 | 
			
		||||
 | 
			
		||||
        std::string id = ss.str();
 | 
			
		||||
        std::string errMsg;
 | 
			
		||||
        if (redisClient.xadd(id, std::to_string(slowFrames), errMsg).empty())
 | 
			
		||||
        {
 | 
			
		||||
            CoreLogger::info(std::string("redis xadd error: ") + errMsg);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (deviceId == "N841AP" || deviceId == "SM-N960U")
 | 
			
		||||
        {
 | 
			
		||||
            ss.str(""); // reset the stringstream
 | 
			
		||||
            ss << msg["id"].asString() << "_slow_frames_by_device" << "."
 | 
			
		||||
               << deviceId << "."
 | 
			
		||||
               << msg["device"]["game"].asString() << "."
 | 
			
		||||
               << msg["device"]["os_name"].asString() << "."
 | 
			
		||||
               << removeSpaces(msg["data"]["Tag"].asString());
 | 
			
		||||
 | 
			
		||||
            std::string id = ss.str();
 | 
			
		||||
            if (redisClient.xadd(id, std::to_string(slowFrames), errMsg).empty())
 | 
			
		||||
            {
 | 
			
		||||
                CoreLogger::info(std::string("redis xadd error: ") + errMsg);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int64_t cobra_metrics_to_redis_bot(const ix::CobraBotConfig& config,
 | 
			
		||||
                                       RedisClient& redisClient,
 | 
			
		||||
                                       bool verbose)
 | 
			
		||||
    {
 | 
			
		||||
        CobraBot bot;
 | 
			
		||||
 | 
			
		||||
        bot.setOnBotMessageCallback(
 | 
			
		||||
            [&redisClient, &verbose, &bot]
 | 
			
		||||
             (const Json::Value& msg,
 | 
			
		||||
              const std::string& /*position*/,
 | 
			
		||||
              std::atomic<bool>& /*throttled*/,
 | 
			
		||||
              std::atomic<bool>& /*fatalCobraError*/,
 | 
			
		||||
              std::atomic<uint64_t>& sentCount) -> void {
 | 
			
		||||
            if (msg["device"].isNull() || msg["id"].isNull())
 | 
			
		||||
            {
 | 
			
		||||
                CoreLogger::info("no device or id entry, skipping event");
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            //
 | 
			
		||||
            // Display full message with
 | 
			
		||||
            if (verbose)
 | 
			
		||||
            {
 | 
			
		||||
                CoreLogger::info(msg.toStyledString());
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            bool success = false;
 | 
			
		||||
            if (msg["id"].asString() == "engine_performance_metrics_id")
 | 
			
		||||
            {
 | 
			
		||||
                auto deviceId = bot.getDeviceIdentifier(msg);
 | 
			
		||||
                success = processPerfMetricsEventSlowFrames(msg, redisClient, deviceId);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (success) sentCount++;
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        return bot.run(config);
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
							
								
								
									
										20
									
								
								ixbots/ixbots/IXCobraMetricsToRedisBot.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								ixbots/ixbots/IXCobraMetricsToRedisBot.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,20 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXCobraMetricsToRedisBot.h
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <ixredis/IXRedisClient.h>
 | 
			
		||||
#include "IXCobraBotConfig.h"
 | 
			
		||||
#include <stddef.h>
 | 
			
		||||
#include <string>
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    int64_t cobra_metrics_to_redis_bot(const ix::CobraBotConfig& config,
 | 
			
		||||
                                       RedisClient& redisClient,
 | 
			
		||||
                                       bool verbose);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										27
									
								
								ixredis/CMakeLists.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								ixredis/CMakeLists.txt
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,27 @@
 | 
			
		||||
#
 | 
			
		||||
# Author: Benjamin Sergeant
 | 
			
		||||
# Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
#
 | 
			
		||||
 | 
			
		||||
set (IXREDIS_SOURCES
 | 
			
		||||
    ixredis/IXRedisClient.cpp
 | 
			
		||||
    ixredis/IXRedisServer.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
set (IXREDIS_HEADERS
 | 
			
		||||
    ixredis/IXRedisClient.h
 | 
			
		||||
    ixredis/IXRedisServer.h
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
add_library(ixredis STATIC
 | 
			
		||||
    ${IXREDIS_SOURCES}
 | 
			
		||||
    ${IXREDIS_HEADERS}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
set(IXREDIS_INCLUDE_DIRS
 | 
			
		||||
    .
 | 
			
		||||
    ..
 | 
			
		||||
    ../ixcore
 | 
			
		||||
    ../ixwebsocket)
 | 
			
		||||
 | 
			
		||||
target_include_directories( ixredis PUBLIC ${IXREDIS_INCLUDE_DIRS} )
 | 
			
		||||
@@ -9,6 +9,7 @@
 | 
			
		||||
#include <cstring>
 | 
			
		||||
#include <iomanip>
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <ixwebsocket/IXSocket.h>
 | 
			
		||||
#include <ixwebsocket/IXSocketFactory.h>
 | 
			
		||||
#include <ixwebsocket/IXSocketTLSOptions.h>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
@@ -8,9 +8,9 @@
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <ixwebsocket/IXSocket.h>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <ixwebsocket/IXSocket.h>
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
@@ -6,8 +6,8 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "IXSocket.h"
 | 
			
		||||
#include "IXSocketServer.h"
 | 
			
		||||
#include <ixwebsocket/IXSocket.h>
 | 
			
		||||
#include <ixwebsocket/IXSocketServer.h>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <map>
 | 
			
		||||
#include <memory>
 | 
			
		||||
@@ -7,16 +7,12 @@ set (IXSNAKE_SOURCES
 | 
			
		||||
    ixsnake/IXSnakeServer.cpp
 | 
			
		||||
    ixsnake/IXSnakeProtocol.cpp
 | 
			
		||||
    ixsnake/IXAppConfig.cpp
 | 
			
		||||
    ixsnake/IXRedisClient.cpp
 | 
			
		||||
    ixsnake/IXRedisServer.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
set (IXSNAKE_HEADERS
 | 
			
		||||
    ixsnake/IXSnakeServer.h
 | 
			
		||||
    ixsnake/IXSnakeProtocol.h
 | 
			
		||||
    ixsnake/IXAppConfig.h
 | 
			
		||||
    ixsnake/IXRedisClient.h
 | 
			
		||||
    ixsnake/IXRedisServer.h
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
add_library(ixsnake STATIC
 | 
			
		||||
@@ -30,6 +26,7 @@ set(IXSNAKE_INCLUDE_DIRS
 | 
			
		||||
    ../ixcore
 | 
			
		||||
    ../ixcrypto
 | 
			
		||||
    ../ixwebsocket
 | 
			
		||||
    ../ixredis
 | 
			
		||||
    ../third_party)
 | 
			
		||||
 | 
			
		||||
target_include_directories( ixsnake PUBLIC ${IXSNAKE_INCLUDE_DIRS} )
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,7 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "IXRedisClient.h"
 | 
			
		||||
#include <ixredis/IXRedisClient.h>
 | 
			
		||||
#include <future>
 | 
			
		||||
#include <ixwebsocket/IXConnectionState.h>
 | 
			
		||||
#include <string>
 | 
			
		||||
 
 | 
			
		||||
@@ -6,4 +6,4 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.6.8"
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.6.9"
 | 
			
		||||
 
 | 
			
		||||
@@ -96,6 +96,7 @@ target_link_libraries(ixwebsocket_unittest ixbots)
 | 
			
		||||
target_link_libraries(ixwebsocket_unittest ixsnake)
 | 
			
		||||
target_link_libraries(ixwebsocket_unittest ixcobra)
 | 
			
		||||
target_link_libraries(ixwebsocket_unittest ixsentry)
 | 
			
		||||
target_link_libraries(ixwebsocket_unittest ixredis)
 | 
			
		||||
target_link_libraries(ixwebsocket_unittest ixwebsocket)
 | 
			
		||||
target_link_libraries(ixwebsocket_unittest ixcrypto)
 | 
			
		||||
target_link_libraries(ixwebsocket_unittest ixcore)
 | 
			
		||||
 
 | 
			
		||||
@@ -10,7 +10,7 @@
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <ixcobra/IXCobraConnection.h>
 | 
			
		||||
#include <ixcrypto/IXUuid.h>
 | 
			
		||||
#include <ixsnake/IXRedisServer.h>
 | 
			
		||||
#include <ixredis/IXRedisServer.h>
 | 
			
		||||
#include <ixsnake/IXSnakeServer.h>
 | 
			
		||||
 | 
			
		||||
using namespace ix;
 | 
			
		||||
 
 | 
			
		||||
@@ -8,7 +8,7 @@
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <ixcobra/IXCobraMetricsPublisher.h>
 | 
			
		||||
#include <ixcrypto/IXUuid.h>
 | 
			
		||||
#include <ixsnake/IXRedisServer.h>
 | 
			
		||||
#include <ixredis/IXRedisServer.h>
 | 
			
		||||
#include <ixsnake/IXSnakeServer.h>
 | 
			
		||||
#include <set>
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,7 @@
 | 
			
		||||
#include <ixcobra/IXCobraMetricsPublisher.h>
 | 
			
		||||
#include <ixcrypto/IXUuid.h>
 | 
			
		||||
#include <ixsentry/IXSentryClient.h>
 | 
			
		||||
#include <ixsnake/IXRedisServer.h>
 | 
			
		||||
#include <ixredis/IXRedisServer.h>
 | 
			
		||||
#include <ixsnake/IXSnakeServer.h>
 | 
			
		||||
#include <ixwebsocket/IXHttpServer.h>
 | 
			
		||||
#include <ixwebsocket/IXUserAgent.h>
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,7 @@
 | 
			
		||||
#include <ixcobra/IXCobraMetricsPublisher.h>
 | 
			
		||||
#include <ixcrypto/IXUuid.h>
 | 
			
		||||
#include <ixsentry/IXSentryClient.h>
 | 
			
		||||
#include <ixsnake/IXRedisServer.h>
 | 
			
		||||
#include <ixredis/IXRedisServer.h>
 | 
			
		||||
#include <ixsnake/IXSnakeServer.h>
 | 
			
		||||
#include <ixwebsocket/IXHttpServer.h>
 | 
			
		||||
#include <ixwebsocket/IXUserAgent.h>
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,7 @@
 | 
			
		||||
#include <ixcobra/IXCobraMetricsPublisher.h>
 | 
			
		||||
#include <ixcrypto/IXUuid.h>
 | 
			
		||||
#include <ixsentry/IXSentryClient.h>
 | 
			
		||||
#include <ixsnake/IXRedisServer.h>
 | 
			
		||||
#include <ixredis/IXRedisServer.h>
 | 
			
		||||
#include <ixsnake/IXSnakeServer.h>
 | 
			
		||||
#include <ixwebsocket/IXHttpServer.h>
 | 
			
		||||
#include <ixwebsocket/IXUserAgent.h>
 | 
			
		||||
 
 | 
			
		||||
@@ -51,7 +51,6 @@ add_executable(ws
 | 
			
		||||
  ws_snake.cpp
 | 
			
		||||
  ws_cobra_metrics_publish.cpp
 | 
			
		||||
  ws_cobra_publish.cpp
 | 
			
		||||
  ws_cobra_metrics_to_redis.cpp
 | 
			
		||||
  ws_httpd.cpp
 | 
			
		||||
  ws_autobahn.cpp
 | 
			
		||||
  ws_proxy_server.cpp
 | 
			
		||||
@@ -64,6 +63,7 @@ target_link_libraries(ws ixbots)
 | 
			
		||||
target_link_libraries(ws ixsnake)
 | 
			
		||||
target_link_libraries(ws ixcobra)
 | 
			
		||||
target_link_libraries(ws ixsentry)
 | 
			
		||||
target_link_libraries(ws ixredis)
 | 
			
		||||
target_link_libraries(ws ixwebsocket)
 | 
			
		||||
target_link_libraries(ws ixcrypto)
 | 
			
		||||
target_link_libraries(ws ixcore)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										21
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										21
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -15,6 +15,8 @@
 | 
			
		||||
#include <ixbots/IXCobraToStatsdBot.h>
 | 
			
		||||
#include <ixbots/IXCobraToStdoutBot.h>
 | 
			
		||||
#include <ixbots/IXCobraMetricsToStatsdBot.h>
 | 
			
		||||
#include <ixbots/IXCobraMetricsToRedisBot.h>
 | 
			
		||||
#include <ixredis/IXRedisClient.h>
 | 
			
		||||
#include <ixcore/utils/IXCoreLogger.h>
 | 
			
		||||
#include <ixsentry/IXSentryClient.h>
 | 
			
		||||
#include <ixwebsocket/IXNetSystem.h>
 | 
			
		||||
@@ -363,13 +365,10 @@ int main(int argc, char** argv)
 | 
			
		||||
    CLI::App* cobra2redisApp =
 | 
			
		||||
        app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
 | 
			
		||||
    cobra2redisApp->fallthrough();
 | 
			
		||||
    cobra2redisApp->add_option("channel", channel, "Channel")->required();
 | 
			
		||||
    cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
 | 
			
		||||
    cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
 | 
			
		||||
    cobra2redisApp->add_option("--position", position, "Stream position");
 | 
			
		||||
    cobra2redisApp->add_option("--hostname", hostname, "Redis hostname");
 | 
			
		||||
    cobra2redisApp->add_option("--port", redisPort, "Redis port");
 | 
			
		||||
    cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
 | 
			
		||||
    cobra2redisApp->add_flag("-v", verbose, "Verbose");
 | 
			
		||||
    addTLSOptions(cobra2redisApp);
 | 
			
		||||
    addCobraConfig(cobra2redisApp);
 | 
			
		||||
 | 
			
		||||
@@ -605,8 +604,18 @@ int main(int argc, char** argv)
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("cobra_metrics_to_redis"))
 | 
			
		||||
    {
 | 
			
		||||
        ret = ix::ws_cobra_metrics_to_redis(
 | 
			
		||||
            cobraConfig, channel, filter, position, hostname, redisPort);
 | 
			
		||||
        ix::RedisClient redisClient;
 | 
			
		||||
        if (!redisClient.connect(redisHosts, redisPort))
 | 
			
		||||
        {
 | 
			
		||||
            spdlog::error("Cannot connect to redis host {}:{}",
 | 
			
		||||
                          redisHosts, redisPort);
 | 
			
		||||
            return 1;
 | 
			
		||||
        }
 | 
			
		||||
        else
 | 
			
		||||
        {
 | 
			
		||||
            ret = (int) ix::cobra_metrics_to_redis_bot(
 | 
			
		||||
                cobraBotConfig, redisClient, verbose);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("snake"))
 | 
			
		||||
    {
 | 
			
		||||
 
 | 
			
		||||
@@ -1,166 +0,0 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  ws_cobra_metrics_to_redis.cpp
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <condition_variable>
 | 
			
		||||
#include <ixcobra/IXCobraConnection.h>
 | 
			
		||||
#include <ixsnake/IXRedisClient.h>
 | 
			
		||||
#include <mutex>
 | 
			
		||||
#include <queue>
 | 
			
		||||
#include <spdlog/spdlog.h>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
#include <thread>
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
 | 
			
		||||
                                  const std::string& channel,
 | 
			
		||||
                                  const std::string& filter,
 | 
			
		||||
                                  const std::string& position,
 | 
			
		||||
                                  const std::string& host,
 | 
			
		||||
                                  int port)
 | 
			
		||||
    {
 | 
			
		||||
        ix::CobraConnection conn;
 | 
			
		||||
        conn.configure(config);
 | 
			
		||||
        conn.connect();
 | 
			
		||||
 | 
			
		||||
        // Display incoming messages
 | 
			
		||||
        std::atomic<int> msgPerSeconds(0);
 | 
			
		||||
        std::atomic<int> msgCount(0);
 | 
			
		||||
 | 
			
		||||
        auto timer = [&msgPerSeconds, &msgCount] {
 | 
			
		||||
            while (true)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
 | 
			
		||||
 | 
			
		||||
                msgPerSeconds = 0;
 | 
			
		||||
                auto duration = std::chrono::seconds(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        ix::RedisClient redisClient;
 | 
			
		||||
        if (!redisClient.connect(host, port))
 | 
			
		||||
        {
 | 
			
		||||
            spdlog::error("Cannot connect to redis host {}:{}", host, port);
 | 
			
		||||
            return 0;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        std::mutex conditionVariableMutex;
 | 
			
		||||
        std::condition_variable condition;
 | 
			
		||||
        std::queue<Json::Value> queue;
 | 
			
		||||
 | 
			
		||||
        auto redisSender = [&condition, &queue, &conditionVariableMutex, &redisClient] {
 | 
			
		||||
            Json::FastWriter jsonWriter;
 | 
			
		||||
 | 
			
		||||
            int batchSize = 1000;
 | 
			
		||||
            int i = 0;
 | 
			
		||||
 | 
			
		||||
            std::stringstream pipe;
 | 
			
		||||
 | 
			
		||||
            while (true)
 | 
			
		||||
            {
 | 
			
		||||
                Json::Value msg;
 | 
			
		||||
 | 
			
		||||
                {
 | 
			
		||||
                    std::unique_lock<std::mutex> lock(conditionVariableMutex);
 | 
			
		||||
                    condition.wait(lock, [&queue] { return !queue.empty(); });
 | 
			
		||||
 | 
			
		||||
                    msg = queue.front();
 | 
			
		||||
                    queue.pop();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                // compute channel
 | 
			
		||||
                std::stringstream ss;
 | 
			
		||||
                ss << "session=" << msg["session"].asString() << ";msgid=" << msg["id"].asString();
 | 
			
		||||
 | 
			
		||||
                std::string channel = ss.str();
 | 
			
		||||
 | 
			
		||||
                std::string errMsg;
 | 
			
		||||
                pipe << redisClient.prepareXaddCommand(channel, jsonWriter.write(msg));
 | 
			
		||||
 | 
			
		||||
                if (i++ == batchSize)
 | 
			
		||||
                {
 | 
			
		||||
                    if (!redisClient.sendCommand(pipe.str(), batchSize, errMsg))
 | 
			
		||||
                    {
 | 
			
		||||
                        spdlog::error("error sending command: {}", errMsg);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    i = 0;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        std::thread t1(timer);
 | 
			
		||||
        std::thread t2(redisSender);
 | 
			
		||||
 | 
			
		||||
        conn.setEventCallback([&conn,
 | 
			
		||||
                               &channel,
 | 
			
		||||
                               &filter,
 | 
			
		||||
                               &position,
 | 
			
		||||
                               &msgCount,
 | 
			
		||||
                               &msgPerSeconds,
 | 
			
		||||
                               &conditionVariableMutex,
 | 
			
		||||
                               &condition,
 | 
			
		||||
                               &queue](const CobraEventPtr& event) {
 | 
			
		||||
            if (event->type == ix::CobraEventType::Open)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("Subscriber connected");
 | 
			
		||||
 | 
			
		||||
                for (auto&& it : event->headers)
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::info("{}: {}", it.first, it.second);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            else if (event->type == ix::CobraEventType::Authenticated)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("Subscriber authenticated");
 | 
			
		||||
 | 
			
		||||
                conn.subscribe(
 | 
			
		||||
                    channel,
 | 
			
		||||
                    filter,
 | 
			
		||||
                    position,
 | 
			
		||||
                    [&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue](
 | 
			
		||||
                        const Json::Value& msg, const std::string& /*position*/) {
 | 
			
		||||
                        {
 | 
			
		||||
                            std::unique_lock<std::mutex> lock(conditionVariableMutex);
 | 
			
		||||
                            queue.push(msg);
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
                        condition.notify_one();
 | 
			
		||||
 | 
			
		||||
                        msgPerSeconds++;
 | 
			
		||||
                        msgCount++;
 | 
			
		||||
                    });
 | 
			
		||||
            }
 | 
			
		||||
            else if (event->type == ix::CobraEventType::Subscribed)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
 | 
			
		||||
            }
 | 
			
		||||
            else if (event->type == ix::CobraEventType::UnSubscribed)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
 | 
			
		||||
            }
 | 
			
		||||
            else if (event->type == ix::CobraEventType::Error)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Subscriber: error {}", event->errMsg);
 | 
			
		||||
            }
 | 
			
		||||
            else if (event->type == ix::CobraEventType::Published)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Published message hacked: {}", event->msgId);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        while (true)
 | 
			
		||||
        {
 | 
			
		||||
            std::chrono::duration<double, std::milli> duration(10);
 | 
			
		||||
            std::this_thread::sleep_for(duration);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
@@ -4,7 +4,7 @@
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include <ixsnake/IXRedisClient.h>
 | 
			
		||||
#include <ixredis/IXRedisClient.h>
 | 
			
		||||
#include <spdlog/spdlog.h>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -4,7 +4,7 @@
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include <ixsnake/IXRedisServer.h>
 | 
			
		||||
#include <ixredis/IXRedisServer.h>
 | 
			
		||||
#include <spdlog/spdlog.h>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,7 @@
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <ixsnake/IXRedisClient.h>
 | 
			
		||||
#include <ixredis/IXRedisClient.h>
 | 
			
		||||
#include <spdlog/spdlog.h>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
#include <thread>
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user