(redis cobra bots) update the cobra to redis bot to use the bot framework, and change it to report fps metrics into redis streams.
This commit is contained in:
parent
ecfca1f905
commit
c5aadffa08
@ -251,6 +251,7 @@ if (USE_WS OR USE_TEST)
|
|||||||
add_subdirectory(ixcore)
|
add_subdirectory(ixcore)
|
||||||
add_subdirectory(ixcrypto)
|
add_subdirectory(ixcrypto)
|
||||||
add_subdirectory(ixcobra)
|
add_subdirectory(ixcobra)
|
||||||
|
add_subdirectory(ixredis)
|
||||||
add_subdirectory(ixsnake)
|
add_subdirectory(ixsnake)
|
||||||
add_subdirectory(ixsentry)
|
add_subdirectory(ixsentry)
|
||||||
add_subdirectory(ixbots)
|
add_subdirectory(ixbots)
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
All changes to this project will be documented in this file.
|
All changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [9.6.9] - 2020-06-10
|
||||||
|
|
||||||
|
(redis cobra bots) update the cobra to redis bot to use the bot framework, and change it to report fps metrics into redis streams.
|
||||||
|
|
||||||
## [9.6.6] - 2020-06-04
|
## [9.6.6] - 2020-06-04
|
||||||
|
|
||||||
(statsd cobra bots) statsd improvement: prefix does not need a dot as a suffix, message size can be larger than 256 bytes, error handling was invalid, use core logger for logging instead of std::cerr
|
(statsd cobra bots) statsd improvement: prefix does not need a dot as a suffix, message size can be larger than 256 bytes, error handling was invalid, use core logger for logging instead of std::cerr
|
||||||
|
@ -9,6 +9,7 @@ set (IXBOTS_SOURCES
|
|||||||
ixbots/IXCobraToStatsdBot.cpp
|
ixbots/IXCobraToStatsdBot.cpp
|
||||||
ixbots/IXCobraToStdoutBot.cpp
|
ixbots/IXCobraToStdoutBot.cpp
|
||||||
ixbots/IXCobraMetricsToStatsdBot.cpp
|
ixbots/IXCobraMetricsToStatsdBot.cpp
|
||||||
|
ixbots/IXCobraMetricsToRedisBot.cpp
|
||||||
ixbots/IXStatsdClient.cpp
|
ixbots/IXStatsdClient.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,6 +20,7 @@ set (IXBOTS_HEADERS
|
|||||||
ixbots/IXCobraToStatsdBot.h
|
ixbots/IXCobraToStatsdBot.h
|
||||||
ixbots/IXCobraToStdoutBot.h
|
ixbots/IXCobraToStdoutBot.h
|
||||||
ixbots/IXCobraMetricsToStatsdBot.h
|
ixbots/IXCobraMetricsToStatsdBot.h
|
||||||
|
ixbots/IXCobraMetricsToRedisBot.h
|
||||||
ixbots/IXStatsdClient.h
|
ixbots/IXStatsdClient.h
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -38,6 +40,7 @@ set(IXBOTS_INCLUDE_DIRS
|
|||||||
../ixcore
|
../ixcore
|
||||||
../ixwebsocket
|
../ixwebsocket
|
||||||
../ixcobra
|
../ixcobra
|
||||||
|
../ixredis
|
||||||
../ixsentry
|
../ixsentry
|
||||||
${JSONCPP_INCLUDE_DIRS}
|
${JSONCPP_INCLUDE_DIRS}
|
||||||
${SPDLOG_INCLUDE_DIRS})
|
${SPDLOG_INCLUDE_DIRS})
|
||||||
|
@ -292,4 +292,22 @@ namespace ix
|
|||||||
{
|
{
|
||||||
_onBotMessageCallback = callback;
|
_onBotMessageCallback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string CobraBot::getDeviceIdentifier(const Json::Value& msg)
|
||||||
|
{
|
||||||
|
std::string deviceId("na");
|
||||||
|
|
||||||
|
auto osName = msg["device"]["os_name"];
|
||||||
|
if (osName == "Android")
|
||||||
|
{
|
||||||
|
deviceId = msg["device"]["model"].asString();
|
||||||
|
}
|
||||||
|
else if (osName == "iOS")
|
||||||
|
{
|
||||||
|
deviceId = msg["device"]["hardware_model"].asString();
|
||||||
|
}
|
||||||
|
|
||||||
|
return deviceId;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ix
|
} // namespace ix
|
||||||
|
@ -28,6 +28,8 @@ namespace ix
|
|||||||
int64_t run(const CobraBotConfig& botConfig);
|
int64_t run(const CobraBotConfig& botConfig);
|
||||||
void setOnBotMessageCallback(const OnBotMessageCallback& callback);
|
void setOnBotMessageCallback(const OnBotMessageCallback& callback);
|
||||||
|
|
||||||
|
std::string getDeviceIdentifier(const Json::Value& msg);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
OnBotMessageCallback _onBotMessageCallback;
|
OnBotMessageCallback _onBotMessageCallback;
|
||||||
};
|
};
|
||||||
|
118
ixbots/ixbots/IXCobraMetricsToRedisBot.cpp
Normal file
118
ixbots/ixbots/IXCobraMetricsToRedisBot.cpp
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
/*
|
||||||
|
* IXCobraMetricsToRedisBot.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXCobraMetricsToRedisBot.h"
|
||||||
|
|
||||||
|
#include "IXCobraBot.h"
|
||||||
|
#include "IXStatsdClient.h"
|
||||||
|
#include <chrono>
|
||||||
|
#include <ixcobra/IXCobraConnection.h>
|
||||||
|
#include <ixcore/utils/IXCoreLogger.h>
|
||||||
|
#include <sstream>
|
||||||
|
#include <vector>
|
||||||
|
#include <algorithm>
|
||||||
|
#include <map>
|
||||||
|
#include <cctype>
|
||||||
|
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
std::string removeSpaces(const std::string& str)
|
||||||
|
{
|
||||||
|
std::string out(str);
|
||||||
|
out.erase(
|
||||||
|
std::remove_if(out.begin(), out.end(), [](unsigned char x) { return std::isspace(x); }),
|
||||||
|
out.end());
|
||||||
|
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
bool processPerfMetricsEventSlowFrames(const Json::Value& msg,
|
||||||
|
RedisClient& redisClient,
|
||||||
|
const std::string& deviceId)
|
||||||
|
{
|
||||||
|
auto frameRateHistogramCounts = msg["data"]["FrameRateHistogramCounts"];
|
||||||
|
|
||||||
|
int slowFrames = 0;
|
||||||
|
slowFrames += frameRateHistogramCounts[4].asInt();
|
||||||
|
slowFrames += frameRateHistogramCounts[5].asInt();
|
||||||
|
slowFrames += frameRateHistogramCounts[6].asInt();
|
||||||
|
slowFrames += frameRateHistogramCounts[7].asInt();
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << msg["id"].asString() << "_slow_frames" << "."
|
||||||
|
<< msg["device"]["game"].asString() << "."
|
||||||
|
<< msg["device"]["os_name"].asString() << "."
|
||||||
|
<< removeSpaces(msg["data"]["Tag"].asString());
|
||||||
|
|
||||||
|
std::string id = ss.str();
|
||||||
|
std::string errMsg;
|
||||||
|
if (redisClient.xadd(id, std::to_string(slowFrames), errMsg).empty())
|
||||||
|
{
|
||||||
|
CoreLogger::info(std::string("redis xadd error: ") + errMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (deviceId == "N841AP" || deviceId == "SM-N960U")
|
||||||
|
{
|
||||||
|
ss.str(""); // reset the stringstream
|
||||||
|
ss << msg["id"].asString() << "_slow_frames_by_device" << "."
|
||||||
|
<< deviceId << "."
|
||||||
|
<< msg["device"]["game"].asString() << "."
|
||||||
|
<< msg["device"]["os_name"].asString() << "."
|
||||||
|
<< removeSpaces(msg["data"]["Tag"].asString());
|
||||||
|
|
||||||
|
std::string id = ss.str();
|
||||||
|
if (redisClient.xadd(id, std::to_string(slowFrames), errMsg).empty())
|
||||||
|
{
|
||||||
|
CoreLogger::info(std::string("redis xadd error: ") + errMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t cobra_metrics_to_redis_bot(const ix::CobraBotConfig& config,
|
||||||
|
RedisClient& redisClient,
|
||||||
|
bool verbose)
|
||||||
|
{
|
||||||
|
CobraBot bot;
|
||||||
|
|
||||||
|
bot.setOnBotMessageCallback(
|
||||||
|
[&redisClient, &verbose, &bot]
|
||||||
|
(const Json::Value& msg,
|
||||||
|
const std::string& /*position*/,
|
||||||
|
std::atomic<bool>& /*throttled*/,
|
||||||
|
std::atomic<bool>& /*fatalCobraError*/,
|
||||||
|
std::atomic<uint64_t>& sentCount) -> void {
|
||||||
|
if (msg["device"].isNull() || msg["id"].isNull())
|
||||||
|
{
|
||||||
|
CoreLogger::info("no device or id entry, skipping event");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Display full message with
|
||||||
|
if (verbose)
|
||||||
|
{
|
||||||
|
CoreLogger::info(msg.toStyledString());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool success = false;
|
||||||
|
if (msg["id"].asString() == "engine_performance_metrics_id")
|
||||||
|
{
|
||||||
|
auto deviceId = bot.getDeviceIdentifier(msg);
|
||||||
|
success = processPerfMetricsEventSlowFrames(msg, redisClient, deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (success) sentCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
return bot.run(config);
|
||||||
|
}
|
||||||
|
} // namespace ix
|
20
ixbots/ixbots/IXCobraMetricsToRedisBot.h
Normal file
20
ixbots/ixbots/IXCobraMetricsToRedisBot.h
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
/*
|
||||||
|
* IXCobraMetricsToRedisBot.h
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <ixredis/IXRedisClient.h>
|
||||||
|
#include "IXCobraBotConfig.h"
|
||||||
|
#include <stddef.h>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
int64_t cobra_metrics_to_redis_bot(const ix::CobraBotConfig& config,
|
||||||
|
RedisClient& redisClient,
|
||||||
|
bool verbose);
|
||||||
|
} // namespace ix
|
||||||
|
|
27
ixredis/CMakeLists.txt
Normal file
27
ixredis/CMakeLists.txt
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
#
|
||||||
|
# Author: Benjamin Sergeant
|
||||||
|
# Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
#
|
||||||
|
|
||||||
|
set (IXREDIS_SOURCES
|
||||||
|
ixredis/IXRedisClient.cpp
|
||||||
|
ixredis/IXRedisServer.cpp
|
||||||
|
)
|
||||||
|
|
||||||
|
set (IXREDIS_HEADERS
|
||||||
|
ixredis/IXRedisClient.h
|
||||||
|
ixredis/IXRedisServer.h
|
||||||
|
)
|
||||||
|
|
||||||
|
add_library(ixredis STATIC
|
||||||
|
${IXREDIS_SOURCES}
|
||||||
|
${IXREDIS_HEADERS}
|
||||||
|
)
|
||||||
|
|
||||||
|
set(IXREDIS_INCLUDE_DIRS
|
||||||
|
.
|
||||||
|
..
|
||||||
|
../ixcore
|
||||||
|
../ixwebsocket)
|
||||||
|
|
||||||
|
target_include_directories( ixredis PUBLIC ${IXREDIS_INCLUDE_DIRS} )
|
@ -9,6 +9,7 @@
|
|||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <iomanip>
|
#include <iomanip>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <ixwebsocket/IXSocket.h>
|
||||||
#include <ixwebsocket/IXSocketFactory.h>
|
#include <ixwebsocket/IXSocketFactory.h>
|
||||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
@ -8,9 +8,9 @@
|
|||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <ixwebsocket/IXSocket.h>
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <ixwebsocket/IXSocket.h>
|
||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
{
|
{
|
@ -6,8 +6,8 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "IXSocket.h"
|
#include <ixwebsocket/IXSocket.h>
|
||||||
#include "IXSocketServer.h"
|
#include <ixwebsocket/IXSocketServer.h>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
@ -7,16 +7,12 @@ set (IXSNAKE_SOURCES
|
|||||||
ixsnake/IXSnakeServer.cpp
|
ixsnake/IXSnakeServer.cpp
|
||||||
ixsnake/IXSnakeProtocol.cpp
|
ixsnake/IXSnakeProtocol.cpp
|
||||||
ixsnake/IXAppConfig.cpp
|
ixsnake/IXAppConfig.cpp
|
||||||
ixsnake/IXRedisClient.cpp
|
|
||||||
ixsnake/IXRedisServer.cpp
|
|
||||||
)
|
)
|
||||||
|
|
||||||
set (IXSNAKE_HEADERS
|
set (IXSNAKE_HEADERS
|
||||||
ixsnake/IXSnakeServer.h
|
ixsnake/IXSnakeServer.h
|
||||||
ixsnake/IXSnakeProtocol.h
|
ixsnake/IXSnakeProtocol.h
|
||||||
ixsnake/IXAppConfig.h
|
ixsnake/IXAppConfig.h
|
||||||
ixsnake/IXRedisClient.h
|
|
||||||
ixsnake/IXRedisServer.h
|
|
||||||
)
|
)
|
||||||
|
|
||||||
add_library(ixsnake STATIC
|
add_library(ixsnake STATIC
|
||||||
@ -30,6 +26,7 @@ set(IXSNAKE_INCLUDE_DIRS
|
|||||||
../ixcore
|
../ixcore
|
||||||
../ixcrypto
|
../ixcrypto
|
||||||
../ixwebsocket
|
../ixwebsocket
|
||||||
|
../ixredis
|
||||||
../third_party)
|
../third_party)
|
||||||
|
|
||||||
target_include_directories( ixsnake PUBLIC ${IXSNAKE_INCLUDE_DIRS} )
|
target_include_directories( ixsnake PUBLIC ${IXSNAKE_INCLUDE_DIRS} )
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "IXRedisClient.h"
|
#include <ixredis/IXRedisClient.h>
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <ixwebsocket/IXConnectionState.h>
|
#include <ixwebsocket/IXConnectionState.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
@ -6,4 +6,4 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#define IX_WEBSOCKET_VERSION "9.6.8"
|
#define IX_WEBSOCKET_VERSION "9.6.9"
|
||||||
|
@ -96,6 +96,7 @@ target_link_libraries(ixwebsocket_unittest ixbots)
|
|||||||
target_link_libraries(ixwebsocket_unittest ixsnake)
|
target_link_libraries(ixwebsocket_unittest ixsnake)
|
||||||
target_link_libraries(ixwebsocket_unittest ixcobra)
|
target_link_libraries(ixwebsocket_unittest ixcobra)
|
||||||
target_link_libraries(ixwebsocket_unittest ixsentry)
|
target_link_libraries(ixwebsocket_unittest ixsentry)
|
||||||
|
target_link_libraries(ixwebsocket_unittest ixredis)
|
||||||
target_link_libraries(ixwebsocket_unittest ixwebsocket)
|
target_link_libraries(ixwebsocket_unittest ixwebsocket)
|
||||||
target_link_libraries(ixwebsocket_unittest ixcrypto)
|
target_link_libraries(ixwebsocket_unittest ixcrypto)
|
||||||
target_link_libraries(ixwebsocket_unittest ixcore)
|
target_link_libraries(ixwebsocket_unittest ixcore)
|
||||||
|
@ -10,7 +10,7 @@
|
|||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <ixcobra/IXCobraConnection.h>
|
#include <ixcobra/IXCobraConnection.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsnake/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
|
|
||||||
using namespace ix;
|
using namespace ix;
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsnake/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
#include <set>
|
#include <set>
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixsnake/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
#include <ixwebsocket/IXHttpServer.h>
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
#include <ixwebsocket/IXUserAgent.h>
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixsnake/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
#include <ixwebsocket/IXHttpServer.h>
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
#include <ixwebsocket/IXUserAgent.h>
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixsnake/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
#include <ixwebsocket/IXHttpServer.h>
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
#include <ixwebsocket/IXUserAgent.h>
|
||||||
|
@ -51,7 +51,6 @@ add_executable(ws
|
|||||||
ws_snake.cpp
|
ws_snake.cpp
|
||||||
ws_cobra_metrics_publish.cpp
|
ws_cobra_metrics_publish.cpp
|
||||||
ws_cobra_publish.cpp
|
ws_cobra_publish.cpp
|
||||||
ws_cobra_metrics_to_redis.cpp
|
|
||||||
ws_httpd.cpp
|
ws_httpd.cpp
|
||||||
ws_autobahn.cpp
|
ws_autobahn.cpp
|
||||||
ws_proxy_server.cpp
|
ws_proxy_server.cpp
|
||||||
@ -64,6 +63,7 @@ target_link_libraries(ws ixbots)
|
|||||||
target_link_libraries(ws ixsnake)
|
target_link_libraries(ws ixsnake)
|
||||||
target_link_libraries(ws ixcobra)
|
target_link_libraries(ws ixcobra)
|
||||||
target_link_libraries(ws ixsentry)
|
target_link_libraries(ws ixsentry)
|
||||||
|
target_link_libraries(ws ixredis)
|
||||||
target_link_libraries(ws ixwebsocket)
|
target_link_libraries(ws ixwebsocket)
|
||||||
target_link_libraries(ws ixcrypto)
|
target_link_libraries(ws ixcrypto)
|
||||||
target_link_libraries(ws ixcore)
|
target_link_libraries(ws ixcore)
|
||||||
|
21
ws/ws.cpp
21
ws/ws.cpp
@ -15,6 +15,8 @@
|
|||||||
#include <ixbots/IXCobraToStatsdBot.h>
|
#include <ixbots/IXCobraToStatsdBot.h>
|
||||||
#include <ixbots/IXCobraToStdoutBot.h>
|
#include <ixbots/IXCobraToStdoutBot.h>
|
||||||
#include <ixbots/IXCobraMetricsToStatsdBot.h>
|
#include <ixbots/IXCobraMetricsToStatsdBot.h>
|
||||||
|
#include <ixbots/IXCobraMetricsToRedisBot.h>
|
||||||
|
#include <ixredis/IXRedisClient.h>
|
||||||
#include <ixcore/utils/IXCoreLogger.h>
|
#include <ixcore/utils/IXCoreLogger.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixwebsocket/IXNetSystem.h>
|
#include <ixwebsocket/IXNetSystem.h>
|
||||||
@ -363,13 +365,10 @@ int main(int argc, char** argv)
|
|||||||
CLI::App* cobra2redisApp =
|
CLI::App* cobra2redisApp =
|
||||||
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
|
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
|
||||||
cobra2redisApp->fallthrough();
|
cobra2redisApp->fallthrough();
|
||||||
cobra2redisApp->add_option("channel", channel, "Channel")->required();
|
|
||||||
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
|
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
|
||||||
cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
|
|
||||||
cobra2redisApp->add_option("--position", position, "Stream position");
|
|
||||||
cobra2redisApp->add_option("--hostname", hostname, "Redis hostname");
|
cobra2redisApp->add_option("--hostname", hostname, "Redis hostname");
|
||||||
cobra2redisApp->add_option("--port", redisPort, "Redis port");
|
cobra2redisApp->add_option("--port", redisPort, "Redis port");
|
||||||
cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
|
cobra2redisApp->add_flag("-v", verbose, "Verbose");
|
||||||
addTLSOptions(cobra2redisApp);
|
addTLSOptions(cobra2redisApp);
|
||||||
addCobraConfig(cobra2redisApp);
|
addCobraConfig(cobra2redisApp);
|
||||||
|
|
||||||
@ -605,8 +604,18 @@ int main(int argc, char** argv)
|
|||||||
}
|
}
|
||||||
else if (app.got_subcommand("cobra_metrics_to_redis"))
|
else if (app.got_subcommand("cobra_metrics_to_redis"))
|
||||||
{
|
{
|
||||||
ret = ix::ws_cobra_metrics_to_redis(
|
ix::RedisClient redisClient;
|
||||||
cobraConfig, channel, filter, position, hostname, redisPort);
|
if (!redisClient.connect(redisHosts, redisPort))
|
||||||
|
{
|
||||||
|
spdlog::error("Cannot connect to redis host {}:{}",
|
||||||
|
redisHosts, redisPort);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ret = (int) ix::cobra_metrics_to_redis_bot(
|
||||||
|
cobraBotConfig, redisClient, verbose);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (app.got_subcommand("snake"))
|
else if (app.got_subcommand("snake"))
|
||||||
{
|
{
|
||||||
|
@ -1,166 +0,0 @@
|
|||||||
/*
|
|
||||||
* ws_cobra_metrics_to_redis.cpp
|
|
||||||
* Author: Benjamin Sergeant
|
|
||||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
#include <chrono>
|
|
||||||
#include <condition_variable>
|
|
||||||
#include <ixcobra/IXCobraConnection.h>
|
|
||||||
#include <ixsnake/IXRedisClient.h>
|
|
||||||
#include <mutex>
|
|
||||||
#include <queue>
|
|
||||||
#include <spdlog/spdlog.h>
|
|
||||||
#include <sstream>
|
|
||||||
#include <thread>
|
|
||||||
|
|
||||||
namespace ix
|
|
||||||
{
|
|
||||||
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
|
|
||||||
const std::string& channel,
|
|
||||||
const std::string& filter,
|
|
||||||
const std::string& position,
|
|
||||||
const std::string& host,
|
|
||||||
int port)
|
|
||||||
{
|
|
||||||
ix::CobraConnection conn;
|
|
||||||
conn.configure(config);
|
|
||||||
conn.connect();
|
|
||||||
|
|
||||||
// Display incoming messages
|
|
||||||
std::atomic<int> msgPerSeconds(0);
|
|
||||||
std::atomic<int> msgCount(0);
|
|
||||||
|
|
||||||
auto timer = [&msgPerSeconds, &msgCount] {
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
|
|
||||||
|
|
||||||
msgPerSeconds = 0;
|
|
||||||
auto duration = std::chrono::seconds(1);
|
|
||||||
std::this_thread::sleep_for(duration);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ix::RedisClient redisClient;
|
|
||||||
if (!redisClient.connect(host, port))
|
|
||||||
{
|
|
||||||
spdlog::error("Cannot connect to redis host {}:{}", host, port);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::mutex conditionVariableMutex;
|
|
||||||
std::condition_variable condition;
|
|
||||||
std::queue<Json::Value> queue;
|
|
||||||
|
|
||||||
auto redisSender = [&condition, &queue, &conditionVariableMutex, &redisClient] {
|
|
||||||
Json::FastWriter jsonWriter;
|
|
||||||
|
|
||||||
int batchSize = 1000;
|
|
||||||
int i = 0;
|
|
||||||
|
|
||||||
std::stringstream pipe;
|
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
Json::Value msg;
|
|
||||||
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(conditionVariableMutex);
|
|
||||||
condition.wait(lock, [&queue] { return !queue.empty(); });
|
|
||||||
|
|
||||||
msg = queue.front();
|
|
||||||
queue.pop();
|
|
||||||
}
|
|
||||||
|
|
||||||
// compute channel
|
|
||||||
std::stringstream ss;
|
|
||||||
ss << "session=" << msg["session"].asString() << ";msgid=" << msg["id"].asString();
|
|
||||||
|
|
||||||
std::string channel = ss.str();
|
|
||||||
|
|
||||||
std::string errMsg;
|
|
||||||
pipe << redisClient.prepareXaddCommand(channel, jsonWriter.write(msg));
|
|
||||||
|
|
||||||
if (i++ == batchSize)
|
|
||||||
{
|
|
||||||
if (!redisClient.sendCommand(pipe.str(), batchSize, errMsg))
|
|
||||||
{
|
|
||||||
spdlog::error("error sending command: {}", errMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
i = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
std::thread t1(timer);
|
|
||||||
std::thread t2(redisSender);
|
|
||||||
|
|
||||||
conn.setEventCallback([&conn,
|
|
||||||
&channel,
|
|
||||||
&filter,
|
|
||||||
&position,
|
|
||||||
&msgCount,
|
|
||||||
&msgPerSeconds,
|
|
||||||
&conditionVariableMutex,
|
|
||||||
&condition,
|
|
||||||
&queue](const CobraEventPtr& event) {
|
|
||||||
if (event->type == ix::CobraEventType::Open)
|
|
||||||
{
|
|
||||||
spdlog::info("Subscriber connected");
|
|
||||||
|
|
||||||
for (auto&& it : event->headers)
|
|
||||||
{
|
|
||||||
spdlog::info("{}: {}", it.first, it.second);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Authenticated)
|
|
||||||
{
|
|
||||||
spdlog::info("Subscriber authenticated");
|
|
||||||
|
|
||||||
conn.subscribe(
|
|
||||||
channel,
|
|
||||||
filter,
|
|
||||||
position,
|
|
||||||
[&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue](
|
|
||||||
const Json::Value& msg, const std::string& /*position*/) {
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(conditionVariableMutex);
|
|
||||||
queue.push(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
condition.notify_one();
|
|
||||||
|
|
||||||
msgPerSeconds++;
|
|
||||||
msgCount++;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Subscribed)
|
|
||||||
{
|
|
||||||
spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::UnSubscribed)
|
|
||||||
{
|
|
||||||
spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Error)
|
|
||||||
{
|
|
||||||
spdlog::error("Subscriber: error {}", event->errMsg);
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Published)
|
|
||||||
{
|
|
||||||
spdlog::error("Published message hacked: {}", event->msgId);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
std::chrono::duration<double, std::milli> duration(10);
|
|
||||||
std::this_thread::sleep_for(duration);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
} // namespace ix
|
|
@ -4,7 +4,7 @@
|
|||||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <ixsnake/IXRedisClient.h>
|
#include <ixredis/IXRedisClient.h>
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <ixsnake/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <ixsnake/IXRedisClient.h>
|
#include <ixredis/IXRedisClient.h>
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
Loading…
Reference in New Issue
Block a user