diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp index 99b408c5..dd8e883c 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.cpp +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -24,7 +24,9 @@ namespace ix bool verbose, bool strict, int jobs, - size_t maxQueueSize) + size_t maxQueueSize, + bool enableHeartbeat, + int runtime) { ix::CobraConnection conn; conn.configure(config); @@ -39,22 +41,26 @@ namespace ix QueueManager queueManager(maxQueueSize, stop); - auto timer = [&sentCount, &receivedCount] { - while (true) + auto timer = [&sentCount, &receivedCount, &stop] { + while (!stop) { spdlog::info("messages received {} sent {}", receivedCount, sentCount); auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); } + + spdlog::info("timer thread done"); }; std::thread t1(timer); - auto heartbeat = [&sentCount, &receivedCount] { + auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] { std::string state("na"); - while (true) + if (!enableHeartbeat) return; + + while (!stop) { std::stringstream ss; ss << "messages received " << receivedCount; @@ -72,6 +78,8 @@ namespace ix auto duration = std::chrono::minutes(1); std::this_thread::sleep_for(duration); } + + spdlog::info("heartbeat thread done"); }; std::thread t2(heartbeat); @@ -84,8 +92,8 @@ namespace ix { Json::Value msg = queueManager.pop(); - if (msg.isNull()) continue; if (stop) return; + if (msg.isNull()) continue; auto ret = sentryClient.send(msg, verbose); HttpResponsePtr response = ret.first; @@ -203,7 +211,7 @@ namespace ix const Json::Value& msg) { if (verbose) { - spdlog::info(jsonWriter.write(msg)); + spdlog::info("Subscriber received message -> {}", jsonWriter.write(msg)); } // If we cannot send to sentry fast enough, drop the message @@ -238,24 +246,46 @@ namespace ix } }); - while (true) + // Run forever + if (runtime == -1) { - auto duration = std::chrono::seconds(1); - std::this_thread::sleep_for(duration); + while (true) + { + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); - if (strict && errorSending) break; + if (strict && errorSending) break; + } + } + // Run for a duration, used by unittesting now + else + { + for (int i = 0 ; i < runtime; ++i) + { + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + + if (strict && errorSending) break; + } } - conn.disconnect(); - + // + // Cleanup. // join all the bg threads and stop them. + // + conn.disconnect(); stop = true; + + t1.join(); + if (t2.joinable()) t2.join(); + spdlog::info("heartbeat thread done"); + for (int i = 0; i < jobs; i++) { - spdlog::error("joining thread {}", i); + spdlog::info("joining thread {}", i); pool[i].join(); } - return (strict && errorSending) ? 1 : 0; + return (strict && errorSending) ? -1 : (int) sentCount; } } // namespace ix diff --git a/ixbots/ixbots/IXCobraToSentryBot.h b/ixbots/ixbots/IXCobraToSentryBot.h index ce37d8ab..e1ce74d1 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.h +++ b/ixbots/ixbots/IXCobraToSentryBot.h @@ -17,5 +17,7 @@ namespace ix bool verbose, bool strict, int jobs, - size_t maxQueueSize); + size_t maxQueueSize, + bool enableHeartbeat, + int runtime); } // namespace ix diff --git a/ixbots/ixbots/IXQueueManager.cpp b/ixbots/ixbots/IXQueueManager.cpp index 115ff528..c6703c05 100644 --- a/ixbots/ixbots/IXQueueManager.cpp +++ b/ixbots/ixbots/IXQueueManager.cpp @@ -29,7 +29,8 @@ namespace ix std::random_shuffle(games.begin(), games.end()); std::string game = games[0]; - _condition.wait(lock, [this] { return !_stop; }); + auto duration = std::chrono::seconds(1); + _condition.wait_for(lock, duration); if (_queues[game].empty()) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9c915772..30717aaf 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -56,6 +56,7 @@ set (SOURCES IXWebSocketSubProtocolTest.cpp IXSentryClientTest.cpp IXWebSocketChatTest.cpp + IXCobraToSentryBotTest.cpp ) # Some unittest don't work on windows yet @@ -99,6 +100,7 @@ target_link_libraries(ixwebsocket_unittest ixwebsocket) target_link_libraries(ixwebsocket_unittest ixcrypto) target_link_libraries(ixwebsocket_unittest ixcore) target_link_libraries(ixwebsocket_unittest ixsentry) +target_link_libraries(ixwebsocket_unittest ixbots) target_link_libraries(ixwebsocket_unittest spdlog) diff --git a/test/IXCobraToSentryBotTest.cpp b/test/IXCobraToSentryBotTest.cpp new file mode 100644 index 00000000..44813430 --- /dev/null +++ b/test/IXCobraToSentryBotTest.cpp @@ -0,0 +1,185 @@ +/* + * cmd_satori_chat.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2017 Machine Zone. All rights reserved. + */ + +#include "IXTest.h" +#include "catch.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace ix; + +namespace +{ + std::atomic incomingBytes(0); + std::atomic outgoingBytes(0); + + void setupTrafficTrackerCallback() + { + ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) { + if (incoming) + { + incomingBytes += size; + } + else + { + outgoingBytes += size; + } + }); + } + + void runPublisher(const ix::CobraConfig& config, const std::string& channel) + { + ix::CobraMetricsPublisher cobraMetricsPublisher; + + SocketTLSOptions socketTLSOptions; + bool perMessageDeflate = true; + cobraMetricsPublisher.configure(config.appkey, + config.endpoint, + channel, + config.rolename, + config.rolesecret, + perMessageDeflate, + socketTLSOptions); + cobraMetricsPublisher.setSession(uuid4()); + cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active + + Json::Value msg; + msg["fps"] = 60; + + cobraMetricsPublisher.setGenericAttributes("game", "ody"); + + // Wait a bit + ix::msleep(500); + + // publish some messages + cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1) + cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2) + ix::msleep(500); + + cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3) + cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4) + ix::msleep(500); + + cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4) + cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5) + ix::msleep(500); + } +} + +TEST_CASE("Cobra_to_sentry_bot", "[foo]") +{ + SECTION("Exchange and count sent/received messages.") + { + int port = getFreePort(); + snake::AppConfig appConfig = makeSnakeServerConfig(port); + + // Start a redis server + ix::RedisServer redisServer(appConfig.redisPort); + auto res = redisServer.listen(); + REQUIRE(res.first); + redisServer.start(); + + // Start a snake server + snake::SnakeServer snakeServer(appConfig); + snakeServer.run(); + + // Start a fake sentry http server + int sentryPort = getFreePort(); + ix::HttpServer sentryServer(sentryPort, "127.0.0.1"); + sentryServer.setOnConnectionCallback( + [](HttpRequestPtr request, + std::shared_ptr /*connectionState*/) -> HttpResponsePtr { + WebSocketHttpHeaders headers; + headers["Server"] = userAgent(); + + // Log request + std::stringstream ss; + ss << request->method << " " << request->headers["User-Agent"] << " " + << request->uri; + + if (request->method == "POST") + { + return std::make_shared( + 200, "OK", HttpErrorCode::Ok, headers, std::string()); + } + else + { + return std::make_shared( + 405, "OK", HttpErrorCode::Invalid, headers, std::string("Invalid method")); + } + }); + + res = sentryServer.listen(); + REQUIRE(res.first); + sentryServer.start(); + + setupTrafficTrackerCallback(); + + // Run the bot for a small amount of time + std::string channel = ix::generateSessionId(); + std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); + std::string role = "_sub"; + std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; + + std::stringstream ss; + ss << "ws://localhost:" << port; + std::string endpoint = ss.str(); + + ix::CobraConfig config; + config.endpoint = endpoint; + config.appkey = appkey; + config.rolename = role; + config.rolesecret = secret; + + std::thread publisherThread(runPublisher, config, channel); + + std::string filter; + bool verbose = true; + bool strict = true; + int jobs = 1; + size_t maxQueueSize = 10; + bool enableHeartbeat = false; + + // https://xxxxx:yyyyyy@sentry.io/1234567 + std::stringstream oss; + oss << "http://xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567"; + std::string dsn = oss.str(); + + // Only run the bot for 3 seconds + int runtime = 3; + + int sentCount = cobra_to_sentry_bot(config, channel, filter, dsn, + verbose, strict, jobs, + maxQueueSize, enableHeartbeat, runtime); + // + // We want at least 2 messages to be sent + // + REQUIRE(sentCount >= 2); + + // Give us 1s for all messages to be received + ix::msleep(1000); + + spdlog::info("Incoming bytes {}", incomingBytes); + spdlog::info("Outgoing bytes {}", outgoingBytes); + + spdlog::info("Stopping snake server..."); + snakeServer.stop(); + + spdlog::info("Stopping redis server..."); + redisServer.stop(); + + publisherThread.join(); + sentryServer.stop(); + } +} diff --git a/test/IXTest.cpp b/test/IXTest.cpp index e048230f..acb650a6 100644 --- a/test/IXTest.cpp +++ b/test/IXTest.cpp @@ -24,7 +24,7 @@ namespace ix { std::atomic incomingBytes(0); std::atomic outgoingBytes(0); - std::mutex Logger::_mutex; + std::mutex TLogger::_mutex; std::stack freePorts; void setupWebSocketTrafficTrackerCallback() @@ -43,9 +43,9 @@ namespace ix void reportWebSocketTraffic() { - Logger() << incomingBytes; - Logger() << "Incoming bytes: " << incomingBytes; - Logger() << "Outgoing bytes: " << outgoingBytes; + TLogger() << incomingBytes; + TLogger() << "Incoming bytes: " << incomingBytes; + TLogger() << "Outgoing bytes: " << outgoingBytes; } void msleep(int ms) @@ -65,7 +65,7 @@ namespace ix void log(const std::string& msg) { - Logger() << msg; + TLogger() << msg; } void hexDump(const std::string& prefix, const std::string& s) @@ -90,17 +90,17 @@ namespace ix [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Open) { - Logger() << "New connection"; - Logger() << "Uri: " << msg->openInfo.uri; - Logger() << "Headers:"; + TLogger() << "New connection"; + TLogger() << "Uri: " << msg->openInfo.uri; + TLogger() << "Headers:"; for (auto it : msg->openInfo.headers) { - Logger() << it.first << ": " << it.second; + TLogger() << it.first << ": " << it.second; } } else if (msg->type == ix::WebSocketMessageType::Close) { - Logger() << "Closed connection"; + TLogger() << "Closed connection"; } else if (msg->type == ix::WebSocketMessageType::Message) { @@ -118,7 +118,7 @@ namespace ix auto res = server.listen(); if (!res.first) { - Logger() << res.second; + TLogger() << res.second; return false; } diff --git a/test/IXTest.h b/test/IXTest.h index b4de0f4b..afdb7368 100644 --- a/test/IXTest.h +++ b/test/IXTest.h @@ -28,11 +28,11 @@ namespace ix void setupWebSocketTrafficTrackerCallback(); void reportWebSocketTraffic(); - struct Logger + struct TLogger { public: template - Logger& operator<<(T const& obj) + TLogger& operator<<(T const& obj) { std::lock_guard lock(_mutex); diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index 87887f71..fbc121f8 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -17,7 +17,9 @@ namespace ix int jobs, size_t maxQueueSize) { + bool enableHeartbeat = true; + int runtime = -1; return cobra_to_sentry_bot( - config, channel, filter, dsn, verbose, strict, jobs, maxQueueSize); + config, channel, filter, dsn, verbose, strict, jobs, maxQueueSize, enableHeartbeat, runtime); } } // namespace ix