add unittest for cobra to sentry bots

This commit is contained in:
Benjamin Sergeant 2020-03-12 09:07:01 -07:00
parent 09b9483ddf
commit cb1ec7dc96
8 changed files with 253 additions and 31 deletions

View File

@ -24,7 +24,9 @@ namespace ix
bool verbose, bool verbose,
bool strict, bool strict,
int jobs, int jobs,
size_t maxQueueSize) size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure(config); conn.configure(config);
@ -39,22 +41,26 @@ namespace ix
QueueManager queueManager(maxQueueSize, stop); QueueManager queueManager(maxQueueSize, stop);
auto timer = [&sentCount, &receivedCount] { auto timer = [&sentCount, &receivedCount, &stop] {
while (true) while (!stop)
{ {
spdlog::info("messages received {} sent {}", receivedCount, sentCount); spdlog::info("messages received {} sent {}", receivedCount, sentCount);
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("timer thread done");
}; };
std::thread t1(timer); std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount] { auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na"); std::string state("na");
while (true) if (!enableHeartbeat) return;
while (!stop)
{ {
std::stringstream ss; std::stringstream ss;
ss << "messages received " << receivedCount; ss << "messages received " << receivedCount;
@ -72,6 +78,8 @@ namespace ix
auto duration = std::chrono::minutes(1); auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("heartbeat thread done");
}; };
std::thread t2(heartbeat); std::thread t2(heartbeat);
@ -84,8 +92,8 @@ namespace ix
{ {
Json::Value msg = queueManager.pop(); Json::Value msg = queueManager.pop();
if (msg.isNull()) continue;
if (stop) return; if (stop) return;
if (msg.isNull()) continue;
auto ret = sentryClient.send(msg, verbose); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first; HttpResponsePtr response = ret.first;
@ -203,7 +211,7 @@ namespace ix
const Json::Value& msg) { const Json::Value& msg) {
if (verbose) if (verbose)
{ {
spdlog::info(jsonWriter.write(msg)); spdlog::info("Subscriber received message -> {}", jsonWriter.write(msg));
} }
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
@ -238,24 +246,46 @@ namespace ix
} }
}); });
while (true) // Run forever
if (runtime == -1)
{ {
auto duration = std::chrono::seconds(1); while (true)
std::this_thread::sleep_for(duration); {
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (strict && errorSending) break; if (strict && errorSending) break;
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
}
} }
conn.disconnect(); //
// Cleanup.
// join all the bg threads and stop them. // join all the bg threads and stop them.
//
conn.disconnect();
stop = true; stop = true;
t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
for (int i = 0; i < jobs; i++) for (int i = 0; i < jobs; i++)
{ {
spdlog::error("joining thread {}", i); spdlog::info("joining thread {}", i);
pool[i].join(); pool[i].join();
} }
return (strict && errorSending) ? 1 : 0; return (strict && errorSending) ? -1 : (int) sentCount;
} }
} // namespace ix } // namespace ix

View File

@ -17,5 +17,7 @@ namespace ix
bool verbose, bool verbose,
bool strict, bool strict,
int jobs, int jobs,
size_t maxQueueSize); size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix } // namespace ix

View File

@ -29,7 +29,8 @@ namespace ix
std::random_shuffle(games.begin(), games.end()); std::random_shuffle(games.begin(), games.end());
std::string game = games[0]; std::string game = games[0];
_condition.wait(lock, [this] { return !_stop; }); auto duration = std::chrono::seconds(1);
_condition.wait_for(lock, duration);
if (_queues[game].empty()) if (_queues[game].empty())
{ {

View File

@ -56,6 +56,7 @@ set (SOURCES
IXWebSocketSubProtocolTest.cpp IXWebSocketSubProtocolTest.cpp
IXSentryClientTest.cpp IXSentryClientTest.cpp
IXWebSocketChatTest.cpp IXWebSocketChatTest.cpp
IXCobraToSentryBotTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet
@ -99,6 +100,7 @@ target_link_libraries(ixwebsocket_unittest ixwebsocket)
target_link_libraries(ixwebsocket_unittest ixcrypto) target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore) target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry) target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest ixbots)
target_link_libraries(ixwebsocket_unittest spdlog) target_link_libraries(ixwebsocket_unittest spdlog)

View File

@ -0,0 +1,185 @@
/*
* cmd_satori_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
});
}
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
Json::Value msg;
msg["fps"] = 60;
cobraMetricsPublisher.setGenericAttributes("game", "ody");
// Wait a bit
ix::msleep(500);
// publish some messages
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
ix::msleep(500);
}
}
TEST_CASE("Cobra_to_sentry_bot", "[foo]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// Start a fake sentry http server
int sentryPort = getFreePort();
ix::HttpServer sentryServer(sentryPort, "127.0.0.1");
sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
if (request->method == "POST")
{
return std::make_shared<HttpResponse>(
200, "OK", HttpErrorCode::Ok, headers, std::string());
}
else
{
return std::make_shared<HttpResponse>(
405, "OK", HttpErrorCode::Invalid, headers, std::string("Invalid method"));
}
});
res = sentryServer.listen();
REQUIRE(res.first);
sentryServer.start();
setupTrafficTrackerCallback();
// Run the bot for a small amount of time
std::string channel = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
std::thread publisherThread(runPublisher, config, channel);
std::string filter;
bool verbose = true;
bool strict = true;
int jobs = 1;
size_t maxQueueSize = 10;
bool enableHeartbeat = false;
// https://xxxxx:yyyyyy@sentry.io/1234567
std::stringstream oss;
oss << "http://xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
std::string dsn = oss.str();
// Only run the bot for 3 seconds
int runtime = 3;
int sentCount = cobra_to_sentry_bot(config, channel, filter, dsn,
verbose, strict, jobs,
maxQueueSize, enableHeartbeat, runtime);
//
// We want at least 2 messages to be sent
//
REQUIRE(sentCount >= 2);
// Give us 1s for all messages to be received
ix::msleep(1000);
spdlog::info("Incoming bytes {}", incomingBytes);
spdlog::info("Outgoing bytes {}", outgoingBytes);
spdlog::info("Stopping snake server...");
snakeServer.stop();
spdlog::info("Stopping redis server...");
redisServer.stop();
publisherThread.join();
sentryServer.stop();
}
}

View File

@ -24,7 +24,7 @@ namespace ix
{ {
std::atomic<size_t> incomingBytes(0); std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0); std::atomic<size_t> outgoingBytes(0);
std::mutex Logger::_mutex; std::mutex TLogger::_mutex;
std::stack<int> freePorts; std::stack<int> freePorts;
void setupWebSocketTrafficTrackerCallback() void setupWebSocketTrafficTrackerCallback()
@ -43,9 +43,9 @@ namespace ix
void reportWebSocketTraffic() void reportWebSocketTraffic()
{ {
Logger() << incomingBytes; TLogger() << incomingBytes;
Logger() << "Incoming bytes: " << incomingBytes; TLogger() << "Incoming bytes: " << incomingBytes;
Logger() << "Outgoing bytes: " << outgoingBytes; TLogger() << "Outgoing bytes: " << outgoingBytes;
} }
void msleep(int ms) void msleep(int ms)
@ -65,7 +65,7 @@ namespace ix
void log(const std::string& msg) void log(const std::string& msg)
{ {
Logger() << msg; TLogger() << msg;
} }
void hexDump(const std::string& prefix, const std::string& s) void hexDump(const std::string& prefix, const std::string& s)
@ -90,17 +90,17 @@ namespace ix
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) { [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; TLogger() << "New connection";
Logger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; TLogger() << "Headers:";
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; TLogger() << it.first << ": " << it.second;
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; TLogger() << "Closed connection";
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
@ -118,7 +118,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
Logger() << res.second; TLogger() << res.second;
return false; return false;
} }

View File

@ -28,11 +28,11 @@ namespace ix
void setupWebSocketTrafficTrackerCallback(); void setupWebSocketTrafficTrackerCallback();
void reportWebSocketTraffic(); void reportWebSocketTraffic();
struct Logger struct TLogger
{ {
public: public:
template<typename T> template<typename T>
Logger& operator<<(T const& obj) TLogger& operator<<(T const& obj)
{ {
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);

View File

@ -17,7 +17,9 @@ namespace ix
int jobs, int jobs,
size_t maxQueueSize) size_t maxQueueSize)
{ {
bool enableHeartbeat = true;
int runtime = -1;
return cobra_to_sentry_bot( return cobra_to_sentry_bot(
config, channel, filter, dsn, verbose, strict, jobs, maxQueueSize); config, channel, filter, dsn, verbose, strict, jobs, maxQueueSize, enableHeartbeat, runtime);
} }
} // namespace ix } // namespace ix