diff --git a/CMakeLists.txt b/CMakeLists.txt index c75095a3..acb58929 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -251,6 +251,7 @@ if (USE_WS OR USE_TEST) add_subdirectory(ixcore) add_subdirectory(ixcrypto) add_subdirectory(ixcobra) + add_subdirectory(ixredis) add_subdirectory(ixsnake) add_subdirectory(ixsentry) add_subdirectory(ixbots) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 86a0b6f0..d74ade86 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.6.9] - 2020-06-10 + +(redis cobra bots) update the cobra to redis bot to use the bot framework, and change it to report fps metrics into redis streams. + ## [9.6.6] - 2020-06-04 (statsd cobra bots) statsd improvement: prefix does not need a dot as a suffix, message size can be larger than 256 bytes, error handling was invalid, use core logger for logging instead of std::cerr diff --git a/ixbots/CMakeLists.txt b/ixbots/CMakeLists.txt index 697486b3..eaa45744 100644 --- a/ixbots/CMakeLists.txt +++ b/ixbots/CMakeLists.txt @@ -9,6 +9,7 @@ set (IXBOTS_SOURCES ixbots/IXCobraToStatsdBot.cpp ixbots/IXCobraToStdoutBot.cpp ixbots/IXCobraMetricsToStatsdBot.cpp + ixbots/IXCobraMetricsToRedisBot.cpp ixbots/IXStatsdClient.cpp ) @@ -19,6 +20,7 @@ set (IXBOTS_HEADERS ixbots/IXCobraToStatsdBot.h ixbots/IXCobraToStdoutBot.h ixbots/IXCobraMetricsToStatsdBot.h + ixbots/IXCobraMetricsToRedisBot.h ixbots/IXStatsdClient.h ) @@ -38,6 +40,7 @@ set(IXBOTS_INCLUDE_DIRS ../ixcore ../ixwebsocket ../ixcobra + ../ixredis ../ixsentry ${JSONCPP_INCLUDE_DIRS} ${SPDLOG_INCLUDE_DIRS}) diff --git a/ixbots/ixbots/IXCobraBot.cpp b/ixbots/ixbots/IXCobraBot.cpp index db1a0f49..c0274a4b 100644 --- a/ixbots/ixbots/IXCobraBot.cpp +++ b/ixbots/ixbots/IXCobraBot.cpp @@ -292,4 +292,22 @@ namespace ix { _onBotMessageCallback = callback; } + + std::string CobraBot::getDeviceIdentifier(const Json::Value& msg) + { + std::string deviceId("na"); + + auto osName = msg["device"]["os_name"]; + if (osName == "Android") + { + deviceId = msg["device"]["model"].asString(); + } + else if (osName == "iOS") + { + deviceId = msg["device"]["hardware_model"].asString(); + } + + return deviceId; + } + } // namespace ix diff --git a/ixbots/ixbots/IXCobraBot.h b/ixbots/ixbots/IXCobraBot.h index e8c8b4f9..199da104 100644 --- a/ixbots/ixbots/IXCobraBot.h +++ b/ixbots/ixbots/IXCobraBot.h @@ -28,6 +28,8 @@ namespace ix int64_t run(const CobraBotConfig& botConfig); void setOnBotMessageCallback(const OnBotMessageCallback& callback); + std::string getDeviceIdentifier(const Json::Value& msg); + private: OnBotMessageCallback _onBotMessageCallback; }; diff --git a/ixbots/ixbots/IXCobraMetricsToRedisBot.cpp b/ixbots/ixbots/IXCobraMetricsToRedisBot.cpp new file mode 100644 index 00000000..f95d7125 --- /dev/null +++ b/ixbots/ixbots/IXCobraMetricsToRedisBot.cpp @@ -0,0 +1,118 @@ +/* + * IXCobraMetricsToRedisBot.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ + +#include "IXCobraMetricsToRedisBot.h" + +#include "IXCobraBot.h" +#include "IXStatsdClient.h" +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace +{ + std::string removeSpaces(const std::string& str) + { + std::string out(str); + out.erase( + std::remove_if(out.begin(), out.end(), [](unsigned char x) { return std::isspace(x); }), + out.end()); + + return out; + } +} + +namespace ix +{ + bool processPerfMetricsEventSlowFrames(const Json::Value& msg, + RedisClient& redisClient, + const std::string& deviceId) + { + auto frameRateHistogramCounts = msg["data"]["FrameRateHistogramCounts"]; + + int slowFrames = 0; + slowFrames += frameRateHistogramCounts[4].asInt(); + slowFrames += frameRateHistogramCounts[5].asInt(); + slowFrames += frameRateHistogramCounts[6].asInt(); + slowFrames += frameRateHistogramCounts[7].asInt(); + + std::stringstream ss; + ss << msg["id"].asString() << "_slow_frames" << "." + << msg["device"]["game"].asString() << "." + << msg["device"]["os_name"].asString() << "." + << removeSpaces(msg["data"]["Tag"].asString()); + + std::string id = ss.str(); + std::string errMsg; + if (redisClient.xadd(id, std::to_string(slowFrames), errMsg).empty()) + { + CoreLogger::info(std::string("redis xadd error: ") + errMsg); + } + + if (deviceId == "N841AP" || deviceId == "SM-N960U") + { + ss.str(""); // reset the stringstream + ss << msg["id"].asString() << "_slow_frames_by_device" << "." + << deviceId << "." + << msg["device"]["game"].asString() << "." + << msg["device"]["os_name"].asString() << "." + << removeSpaces(msg["data"]["Tag"].asString()); + + std::string id = ss.str(); + if (redisClient.xadd(id, std::to_string(slowFrames), errMsg).empty()) + { + CoreLogger::info(std::string("redis xadd error: ") + errMsg); + } + } + + return true; + } + + int64_t cobra_metrics_to_redis_bot(const ix::CobraBotConfig& config, + RedisClient& redisClient, + bool verbose) + { + CobraBot bot; + + bot.setOnBotMessageCallback( + [&redisClient, &verbose, &bot] + (const Json::Value& msg, + const std::string& /*position*/, + std::atomic& /*throttled*/, + std::atomic& /*fatalCobraError*/, + std::atomic& sentCount) -> void { + if (msg["device"].isNull() || msg["id"].isNull()) + { + CoreLogger::info("no device or id entry, skipping event"); + return; + } + + // + // Display full message with + if (verbose) + { + CoreLogger::info(msg.toStyledString()); + } + + bool success = false; + if (msg["id"].asString() == "engine_performance_metrics_id") + { + auto deviceId = bot.getDeviceIdentifier(msg); + success = processPerfMetricsEventSlowFrames(msg, redisClient, deviceId); + } + + if (success) sentCount++; + }); + + return bot.run(config); + } +} // namespace ix diff --git a/ixbots/ixbots/IXCobraMetricsToRedisBot.h b/ixbots/ixbots/IXCobraMetricsToRedisBot.h new file mode 100644 index 00000000..5fbe3905 --- /dev/null +++ b/ixbots/ixbots/IXCobraMetricsToRedisBot.h @@ -0,0 +1,20 @@ +/* + * IXCobraMetricsToRedisBot.h + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ +#pragma once + +#include +#include +#include "IXCobraBotConfig.h" +#include +#include + +namespace ix +{ + int64_t cobra_metrics_to_redis_bot(const ix::CobraBotConfig& config, + RedisClient& redisClient, + bool verbose); +} // namespace ix + diff --git a/ixredis/CMakeLists.txt b/ixredis/CMakeLists.txt new file mode 100644 index 00000000..b2118702 --- /dev/null +++ b/ixredis/CMakeLists.txt @@ -0,0 +1,27 @@ +# +# Author: Benjamin Sergeant +# Copyright (c) 2020 Machine Zone, Inc. All rights reserved. +# + +set (IXREDIS_SOURCES + ixredis/IXRedisClient.cpp + ixredis/IXRedisServer.cpp +) + +set (IXREDIS_HEADERS + ixredis/IXRedisClient.h + ixredis/IXRedisServer.h +) + +add_library(ixredis STATIC + ${IXREDIS_SOURCES} + ${IXREDIS_HEADERS} +) + +set(IXREDIS_INCLUDE_DIRS + . + .. + ../ixcore + ../ixwebsocket) + +target_include_directories( ixredis PUBLIC ${IXREDIS_INCLUDE_DIRS} ) diff --git a/ixsnake/ixsnake/IXRedisClient.cpp b/ixredis/ixredis/IXRedisClient.cpp similarity index 99% rename from ixsnake/ixsnake/IXRedisClient.cpp rename to ixredis/ixredis/IXRedisClient.cpp index 5429c44a..c20c055a 100644 --- a/ixsnake/ixsnake/IXRedisClient.cpp +++ b/ixredis/ixredis/IXRedisClient.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/ixsnake/ixsnake/IXRedisClient.h b/ixredis/ixredis/IXRedisClient.h similarity index 100% rename from ixsnake/ixsnake/IXRedisClient.h rename to ixredis/ixredis/IXRedisClient.h index 1d1640c1..34895efd 100644 --- a/ixsnake/ixsnake/IXRedisClient.h +++ b/ixredis/ixredis/IXRedisClient.h @@ -8,9 +8,9 @@ #include #include -#include #include #include +#include namespace ix { diff --git a/ixsnake/ixsnake/IXRedisServer.cpp b/ixredis/ixredis/IXRedisServer.cpp similarity index 100% rename from ixsnake/ixsnake/IXRedisServer.cpp rename to ixredis/ixredis/IXRedisServer.cpp diff --git a/ixsnake/ixsnake/IXRedisServer.h b/ixredis/ixredis/IXRedisServer.h similarity index 96% rename from ixsnake/ixsnake/IXRedisServer.h rename to ixredis/ixredis/IXRedisServer.h index 6d2bfc00..a94c4da0 100644 --- a/ixsnake/ixsnake/IXRedisServer.h +++ b/ixredis/ixredis/IXRedisServer.h @@ -6,8 +6,8 @@ #pragma once -#include "IXSocket.h" -#include "IXSocketServer.h" +#include +#include #include #include #include diff --git a/ixsnake/CMakeLists.txt b/ixsnake/CMakeLists.txt index 8a4e7a00..bf11ed8a 100644 --- a/ixsnake/CMakeLists.txt +++ b/ixsnake/CMakeLists.txt @@ -7,16 +7,12 @@ set (IXSNAKE_SOURCES ixsnake/IXSnakeServer.cpp ixsnake/IXSnakeProtocol.cpp ixsnake/IXAppConfig.cpp - ixsnake/IXRedisClient.cpp - ixsnake/IXRedisServer.cpp ) set (IXSNAKE_HEADERS ixsnake/IXSnakeServer.h ixsnake/IXSnakeProtocol.h ixsnake/IXAppConfig.h - ixsnake/IXRedisClient.h - ixsnake/IXRedisServer.h ) add_library(ixsnake STATIC @@ -30,6 +26,7 @@ set(IXSNAKE_INCLUDE_DIRS ../ixcore ../ixcrypto ../ixwebsocket + ../ixredis ../third_party) target_include_directories( ixsnake PUBLIC ${IXSNAKE_INCLUDE_DIRS} ) diff --git a/ixsnake/ixsnake/IXSnakeConnectionState.h b/ixsnake/ixsnake/IXSnakeConnectionState.h index b4785608..db6a0f35 100644 --- a/ixsnake/ixsnake/IXSnakeConnectionState.h +++ b/ixsnake/ixsnake/IXSnakeConnectionState.h @@ -6,7 +6,7 @@ #pragma once -#include "IXRedisClient.h" +#include #include #include #include diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 0ee30493..0f714205 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.6.8" +#define IX_WEBSOCKET_VERSION "9.6.9" diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 70599804..a7de276c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -96,6 +96,7 @@ target_link_libraries(ixwebsocket_unittest ixbots) target_link_libraries(ixwebsocket_unittest ixsnake) target_link_libraries(ixwebsocket_unittest ixcobra) target_link_libraries(ixwebsocket_unittest ixsentry) +target_link_libraries(ixwebsocket_unittest ixredis) target_link_libraries(ixwebsocket_unittest ixwebsocket) target_link_libraries(ixwebsocket_unittest ixcrypto) target_link_libraries(ixwebsocket_unittest ixcore) diff --git a/test/IXCobraChatTest.cpp b/test/IXCobraChatTest.cpp index 6ffbd80f..4530da28 100644 --- a/test/IXCobraChatTest.cpp +++ b/test/IXCobraChatTest.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include using namespace ix; diff --git a/test/IXCobraMetricsPublisherTest.cpp b/test/IXCobraMetricsPublisherTest.cpp index 10653bd3..01b0affe 100644 --- a/test/IXCobraMetricsPublisherTest.cpp +++ b/test/IXCobraMetricsPublisherTest.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include diff --git a/test/IXCobraToSentryBotTest.cpp b/test/IXCobraToSentryBotTest.cpp index b61c06fe..2b442349 100644 --- a/test/IXCobraToSentryBotTest.cpp +++ b/test/IXCobraToSentryBotTest.cpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/IXCobraToStatsdBotTest.cpp b/test/IXCobraToStatsdBotTest.cpp index ff436c08..90555934 100644 --- a/test/IXCobraToStatsdBotTest.cpp +++ b/test/IXCobraToStatsdBotTest.cpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/test/IXCobraToStdoutBotTest.cpp b/test/IXCobraToStdoutBotTest.cpp index 1361d68c..d0d561d5 100644 --- a/test/IXCobraToStdoutBotTest.cpp +++ b/test/IXCobraToStdoutBotTest.cpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index c9803ce1..552a1ea3 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -51,7 +51,6 @@ add_executable(ws ws_snake.cpp ws_cobra_metrics_publish.cpp ws_cobra_publish.cpp - ws_cobra_metrics_to_redis.cpp ws_httpd.cpp ws_autobahn.cpp ws_proxy_server.cpp @@ -64,6 +63,7 @@ target_link_libraries(ws ixbots) target_link_libraries(ws ixsnake) target_link_libraries(ws ixcobra) target_link_libraries(ws ixsentry) +target_link_libraries(ws ixredis) target_link_libraries(ws ixwebsocket) target_link_libraries(ws ixcrypto) target_link_libraries(ws ixcore) diff --git a/ws/ws.cpp b/ws/ws.cpp index 015c5ed6..30d65a3d 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -363,13 +365,10 @@ int main(int argc, char** argv) CLI::App* cobra2redisApp = app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis"); cobra2redisApp->fallthrough(); - cobra2redisApp->add_option("channel", channel, "Channel")->required(); cobra2redisApp->add_option("--pidfile", pidfile, "Pid file"); - cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter"); - cobra2redisApp->add_option("--position", position, "Stream position"); cobra2redisApp->add_option("--hostname", hostname, "Redis hostname"); cobra2redisApp->add_option("--port", redisPort, "Redis port"); - cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats"); + cobra2redisApp->add_flag("-v", verbose, "Verbose"); addTLSOptions(cobra2redisApp); addCobraConfig(cobra2redisApp); @@ -605,8 +604,18 @@ int main(int argc, char** argv) } else if (app.got_subcommand("cobra_metrics_to_redis")) { - ret = ix::ws_cobra_metrics_to_redis( - cobraConfig, channel, filter, position, hostname, redisPort); + ix::RedisClient redisClient; + if (!redisClient.connect(redisHosts, redisPort)) + { + spdlog::error("Cannot connect to redis host {}:{}", + redisHosts, redisPort); + return 1; + } + else + { + ret = (int) ix::cobra_metrics_to_redis_bot( + cobraBotConfig, redisClient, verbose); + } } else if (app.got_subcommand("snake")) { diff --git a/ws/ws_cobra_metrics_to_redis.cpp b/ws/ws_cobra_metrics_to_redis.cpp deleted file mode 100644 index 1a61876d..00000000 --- a/ws/ws_cobra_metrics_to_redis.cpp +++ /dev/null @@ -1,166 +0,0 @@ -/* - * ws_cobra_metrics_to_redis.cpp - * Author: Benjamin Sergeant - * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ix -{ - int ws_cobra_metrics_to_redis(const ix::CobraConfig& config, - const std::string& channel, - const std::string& filter, - const std::string& position, - const std::string& host, - int port) - { - ix::CobraConnection conn; - conn.configure(config); - conn.connect(); - - // Display incoming messages - std::atomic msgPerSeconds(0); - std::atomic msgCount(0); - - auto timer = [&msgPerSeconds, &msgCount] { - while (true) - { - spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); - - msgPerSeconds = 0; - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); - } - }; - - ix::RedisClient redisClient; - if (!redisClient.connect(host, port)) - { - spdlog::error("Cannot connect to redis host {}:{}", host, port); - return 0; - } - - std::mutex conditionVariableMutex; - std::condition_variable condition; - std::queue queue; - - auto redisSender = [&condition, &queue, &conditionVariableMutex, &redisClient] { - Json::FastWriter jsonWriter; - - int batchSize = 1000; - int i = 0; - - std::stringstream pipe; - - while (true) - { - Json::Value msg; - - { - std::unique_lock lock(conditionVariableMutex); - condition.wait(lock, [&queue] { return !queue.empty(); }); - - msg = queue.front(); - queue.pop(); - } - - // compute channel - std::stringstream ss; - ss << "session=" << msg["session"].asString() << ";msgid=" << msg["id"].asString(); - - std::string channel = ss.str(); - - std::string errMsg; - pipe << redisClient.prepareXaddCommand(channel, jsonWriter.write(msg)); - - if (i++ == batchSize) - { - if (!redisClient.sendCommand(pipe.str(), batchSize, errMsg)) - { - spdlog::error("error sending command: {}", errMsg); - } - - i = 0; - } - } - }; - - std::thread t1(timer); - std::thread t2(redisSender); - - conn.setEventCallback([&conn, - &channel, - &filter, - &position, - &msgCount, - &msgPerSeconds, - &conditionVariableMutex, - &condition, - &queue](const CobraEventPtr& event) { - if (event->type == ix::CobraEventType::Open) - { - spdlog::info("Subscriber connected"); - - for (auto&& it : event->headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (event->type == ix::CobraEventType::Authenticated) - { - spdlog::info("Subscriber authenticated"); - - conn.subscribe( - channel, - filter, - position, - [&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue]( - const Json::Value& msg, const std::string& /*position*/) { - { - std::unique_lock lock(conditionVariableMutex); - queue.push(msg); - } - - condition.notify_one(); - - msgPerSeconds++; - msgCount++; - }); - } - else if (event->type == ix::CobraEventType::Subscribed) - { - spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::UnSubscribed) - { - spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId); - } - else if (event->type == ix::CobraEventType::Error) - { - spdlog::error("Subscriber: error {}", event->errMsg); - } - else if (event->type == ix::CobraEventType::Published) - { - spdlog::error("Published message hacked: {}", event->msgId); - } - }); - - while (true) - { - std::chrono::duration duration(10); - std::this_thread::sleep_for(duration); - } - - return 0; - } -} // namespace ix diff --git a/ws/ws_redis_publish.cpp b/ws/ws_redis_publish.cpp index 3bd61f1a..ae0fc808 100644 --- a/ws/ws_redis_publish.cpp +++ b/ws/ws_redis_publish.cpp @@ -4,7 +4,7 @@ * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. */ -#include +#include #include #include diff --git a/ws/ws_redis_server.cpp b/ws/ws_redis_server.cpp index d5dc8791..09f02c8c 100644 --- a/ws/ws_redis_server.cpp +++ b/ws/ws_redis_server.cpp @@ -4,7 +4,7 @@ * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. */ -#include +#include #include #include diff --git a/ws/ws_redis_subscribe.cpp b/ws/ws_redis_subscribe.cpp index 4d0b5d57..52311a9e 100644 --- a/ws/ws_redis_subscribe.cpp +++ b/ws/ws_redis_subscribe.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include #include