more stale references to ixcore
This commit is contained in:
parent
e9dc7f7aed
commit
e4a1ac80c2
@ -80,7 +80,6 @@ foreach(TEST_TARGET_NAME ${TEST_TARGET_NAMES})
|
|||||||
# library with the most dependencies come first
|
# library with the most dependencies come first
|
||||||
target_link_libraries(${TEST_TARGET_NAME} ixwebsocket_test)
|
target_link_libraries(${TEST_TARGET_NAME} ixwebsocket_test)
|
||||||
target_link_libraries(${TEST_TARGET_NAME} ixwebsocket)
|
target_link_libraries(${TEST_TARGET_NAME} ixwebsocket)
|
||||||
target_link_libraries(${TEST_TARGET_NAME} ixcore)
|
|
||||||
|
|
||||||
target_link_libraries(${TEST_TARGET_NAME} spdlog)
|
target_link_libraries(${TEST_TARGET_NAME} spdlog)
|
||||||
|
|
||||||
|
@ -1,351 +0,0 @@
|
|||||||
/*
|
|
||||||
* cmd_satori_chat.cpp
|
|
||||||
* Author: Benjamin Sergeant
|
|
||||||
* Copyright (c) 2017 Machine Zone. All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "IXTest.h"
|
|
||||||
#include "catch.hpp"
|
|
||||||
#include <chrono>
|
|
||||||
#include <iostream>
|
|
||||||
#include <ixcobra/IXCobraConnection.h>
|
|
||||||
#include <ixcrypto/IXUuid.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
|
||||||
|
|
||||||
using namespace ix;
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
std::atomic<size_t> incomingBytes(0);
|
|
||||||
std::atomic<size_t> outgoingBytes(0);
|
|
||||||
|
|
||||||
void setupTrafficTrackerCallback()
|
|
||||||
{
|
|
||||||
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
|
|
||||||
if (incoming)
|
|
||||||
{
|
|
||||||
incomingBytes += size;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
outgoingBytes += size;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
class CobraChat
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
CobraChat(const std::string& user,
|
|
||||||
const std::string& session,
|
|
||||||
const ix::CobraConfig& config);
|
|
||||||
|
|
||||||
void subscribe(const std::string& channel);
|
|
||||||
void start();
|
|
||||||
void stop();
|
|
||||||
void run();
|
|
||||||
bool isReady() const;
|
|
||||||
|
|
||||||
void sendMessage(const std::string& text);
|
|
||||||
size_t getReceivedMessagesCount() const;
|
|
||||||
|
|
||||||
bool hasPendingMessages() const;
|
|
||||||
Json::Value popMessage();
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::string _user;
|
|
||||||
std::string _session;
|
|
||||||
ix::CobraConfig _cobraConfig;
|
|
||||||
|
|
||||||
std::queue<Json::Value> _publish_queue;
|
|
||||||
mutable std::mutex _queue_mutex;
|
|
||||||
|
|
||||||
std::thread _thread;
|
|
||||||
std::atomic<bool> _stop;
|
|
||||||
|
|
||||||
ix::CobraConnection _conn;
|
|
||||||
std::atomic<bool> _connectedAndSubscribed;
|
|
||||||
|
|
||||||
std::queue<Json::Value> _receivedQueue;
|
|
||||||
|
|
||||||
std::mutex _logMutex;
|
|
||||||
};
|
|
||||||
|
|
||||||
CobraChat::CobraChat(const std::string& user,
|
|
||||||
const std::string& session,
|
|
||||||
const ix::CobraConfig& config)
|
|
||||||
: _user(user)
|
|
||||||
, _session(session)
|
|
||||||
, _cobraConfig(config)
|
|
||||||
, _stop(false)
|
|
||||||
, _connectedAndSubscribed(false)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void CobraChat::start()
|
|
||||||
{
|
|
||||||
_thread = std::thread(&CobraChat::run, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
void CobraChat::stop()
|
|
||||||
{
|
|
||||||
_stop = true;
|
|
||||||
_thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool CobraChat::isReady() const
|
|
||||||
{
|
|
||||||
return _connectedAndSubscribed;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t CobraChat::getReceivedMessagesCount() const
|
|
||||||
{
|
|
||||||
return _receivedQueue.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool CobraChat::hasPendingMessages() const
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
|
||||||
return !_publish_queue.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
Json::Value CobraChat::popMessage()
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
|
||||||
auto msg = _publish_queue.front();
|
|
||||||
_publish_queue.pop();
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Callback to handle received messages, that are printed on the console
|
|
||||||
//
|
|
||||||
void CobraChat::subscribe(const std::string& channel)
|
|
||||||
{
|
|
||||||
std::string filter;
|
|
||||||
std::string position("$");
|
|
||||||
int batchSize = 1;
|
|
||||||
|
|
||||||
_conn.subscribe(channel,
|
|
||||||
filter,
|
|
||||||
position,
|
|
||||||
batchSize,
|
|
||||||
[this](const Json::Value& msg, const std::string& /*position*/) {
|
|
||||||
spdlog::info("receive {}", msg.toStyledString());
|
|
||||||
|
|
||||||
if (!msg.isObject()) return;
|
|
||||||
if (!msg.isMember("user")) return;
|
|
||||||
if (!msg.isMember("text")) return;
|
|
||||||
if (!msg.isMember("session")) return;
|
|
||||||
|
|
||||||
std::string msg_user = msg["user"].asString();
|
|
||||||
std::string msg_text = msg["text"].asString();
|
|
||||||
std::string msg_session = msg["session"].asString();
|
|
||||||
|
|
||||||
// We are not interested in messages
|
|
||||||
// from a different session.
|
|
||||||
if (msg_session != _session) return;
|
|
||||||
|
|
||||||
// We are not interested in our own messages
|
|
||||||
if (msg_user == _user) return;
|
|
||||||
|
|
||||||
_receivedQueue.push(msg);
|
|
||||||
|
|
||||||
std::stringstream ss;
|
|
||||||
ss << std::endl
|
|
||||||
<< msg_user << " > " << msg_text << std::endl
|
|
||||||
<< _user << " > ";
|
|
||||||
log(ss.str());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void CobraChat::sendMessage(const std::string& text)
|
|
||||||
{
|
|
||||||
Json::Value msg;
|
|
||||||
msg["user"] = _user;
|
|
||||||
msg["session"] = _session;
|
|
||||||
msg["text"] = text;
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
|
||||||
_publish_queue.push(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Do satori communication on a background thread, where we can have
|
|
||||||
// something like an event loop that publish, poll and receive data
|
|
||||||
//
|
|
||||||
void CobraChat::run()
|
|
||||||
{
|
|
||||||
std::string channel = _session;
|
|
||||||
|
|
||||||
_conn.configure(_cobraConfig);
|
|
||||||
_conn.connect();
|
|
||||||
|
|
||||||
_conn.setEventCallback([this, channel](const CobraEventPtr& event) {
|
|
||||||
if (event->type == ix::CobraEventType::Open)
|
|
||||||
{
|
|
||||||
log("Subscriber connected: " + _user);
|
|
||||||
for (auto&& it : event->headers)
|
|
||||||
{
|
|
||||||
log("Headers " + it.first + " " + it.second);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Authenticated)
|
|
||||||
{
|
|
||||||
log("Subscriber authenticated: " + _user);
|
|
||||||
subscribe(channel);
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Error)
|
|
||||||
{
|
|
||||||
log(event->errMsg + _user);
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Closed)
|
|
||||||
{
|
|
||||||
log("Connection closed: " + _user);
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Subscribed)
|
|
||||||
{
|
|
||||||
log("Subscription ok: " + _user + " subscription_id " + event->subscriptionId);
|
|
||||||
_connectedAndSubscribed = true;
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::UnSubscribed)
|
|
||||||
{
|
|
||||||
log("Unsubscription ok: " + _user + " subscription_id " + event->subscriptionId);
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Published)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber: published message acked: " << event->msgId;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while (!_stop)
|
|
||||||
{
|
|
||||||
{
|
|
||||||
while (hasPendingMessages())
|
|
||||||
{
|
|
||||||
auto msg = popMessage();
|
|
||||||
|
|
||||||
std::string text = msg["text"].asString();
|
|
||||||
|
|
||||||
std::stringstream ss;
|
|
||||||
ss << "Sending msg [" << text << "]";
|
|
||||||
log(ss.str());
|
|
||||||
|
|
||||||
Json::Value channels;
|
|
||||||
channels.append(channel);
|
|
||||||
_conn.publish(channels, msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ix::msleep(50);
|
|
||||||
}
|
|
||||||
|
|
||||||
_conn.unsubscribe(channel);
|
|
||||||
|
|
||||||
ix::msleep(50);
|
|
||||||
_conn.disconnect();
|
|
||||||
|
|
||||||
_conn.setEventCallback([](const CobraEventPtr& /*event*/) {});
|
|
||||||
}
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
TEST_CASE("Cobra_chat", "[cobra_chat]")
|
|
||||||
{
|
|
||||||
SECTION("Exchange and count sent/received messages.")
|
|
||||||
{
|
|
||||||
int port = getFreePort();
|
|
||||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
|
||||||
|
|
||||||
// Start a redis server
|
|
||||||
ix::RedisServer redisServer(appConfig.redisPort);
|
|
||||||
auto res = redisServer.listen();
|
|
||||||
REQUIRE(res.first);
|
|
||||||
redisServer.start();
|
|
||||||
|
|
||||||
// Start a snake server
|
|
||||||
snake::SnakeServer snakeServer(appConfig);
|
|
||||||
snakeServer.run();
|
|
||||||
|
|
||||||
int timeout;
|
|
||||||
setupTrafficTrackerCallback();
|
|
||||||
|
|
||||||
std::string session = ix::generateSessionId();
|
|
||||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
|
||||||
std::string role = "_sub";
|
|
||||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
|
||||||
std::string endpoint = makeCobraEndpoint(port, true);
|
|
||||||
|
|
||||||
ix::CobraConfig config;
|
|
||||||
config.endpoint = endpoint;
|
|
||||||
config.appkey = appkey;
|
|
||||||
config.rolename = role;
|
|
||||||
config.rolesecret = secret;
|
|
||||||
config.socketTLSOptions = makeClientTLSOptions();
|
|
||||||
|
|
||||||
CobraChat chatA("jean", session, config);
|
|
||||||
CobraChat chatB("paul", session, config);
|
|
||||||
|
|
||||||
chatA.start();
|
|
||||||
chatB.start();
|
|
||||||
|
|
||||||
// Wait for all chat instance to be ready
|
|
||||||
timeout = 10 * 1000; // 10s
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
if (chatA.isReady() && chatB.isReady()) break;
|
|
||||||
ix::msleep(10);
|
|
||||||
|
|
||||||
timeout -= 10;
|
|
||||||
if (timeout <= 0)
|
|
||||||
{
|
|
||||||
snakeServer.stop();
|
|
||||||
redisServer.stop();
|
|
||||||
REQUIRE(false); // timeout
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add a bit of extra time, for the subscription to be active
|
|
||||||
ix::msleep(1000);
|
|
||||||
|
|
||||||
chatA.sendMessage("from A1");
|
|
||||||
chatA.sendMessage("from A2");
|
|
||||||
chatA.sendMessage("from A3");
|
|
||||||
|
|
||||||
chatB.sendMessage("from B1");
|
|
||||||
chatB.sendMessage("from B2");
|
|
||||||
|
|
||||||
// 1. Wait for all messages to be sent
|
|
||||||
timeout = 10 * 1000; // 10s
|
|
||||||
while (chatA.hasPendingMessages() || chatB.hasPendingMessages())
|
|
||||||
{
|
|
||||||
ix::msleep(10);
|
|
||||||
|
|
||||||
timeout -= 10;
|
|
||||||
if (timeout <= 0)
|
|
||||||
{
|
|
||||||
snakeServer.stop();
|
|
||||||
redisServer.stop();
|
|
||||||
REQUIRE(false); // timeout
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Give us 1s for all messages to be received
|
|
||||||
ix::msleep(1000);
|
|
||||||
|
|
||||||
chatA.stop();
|
|
||||||
chatB.stop();
|
|
||||||
|
|
||||||
REQUIRE(chatA.getReceivedMessagesCount() == 2);
|
|
||||||
REQUIRE(chatB.getReceivedMessagesCount() == 3);
|
|
||||||
|
|
||||||
spdlog::info("Incoming bytes {}", incomingBytes);
|
|
||||||
spdlog::info("Outgoing bytes {}", outgoingBytes);
|
|
||||||
|
|
||||||
spdlog::info("Stopping snake server...");
|
|
||||||
snakeServer.stop();
|
|
||||||
|
|
||||||
spdlog::info("Stopping redis server...");
|
|
||||||
redisServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,303 +0,0 @@
|
|||||||
/*
|
|
||||||
* Author: Benjamin Sergeant
|
|
||||||
* Copyright (c) 2018 Machine Zone. All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "IXTest.h"
|
|
||||||
#include "catch.hpp"
|
|
||||||
#include <iostream>
|
|
||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
|
||||||
#include <ixcrypto/IXUuid.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
|
||||||
#include <set>
|
|
||||||
|
|
||||||
using namespace ix;
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
std::atomic<size_t> incomingBytes(0);
|
|
||||||
std::atomic<size_t> outgoingBytes(0);
|
|
||||||
|
|
||||||
void setupTrafficTrackerCallback()
|
|
||||||
{
|
|
||||||
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
|
|
||||||
if (incoming)
|
|
||||||
{
|
|
||||||
incomingBytes += size;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
outgoingBytes += size;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
std::atomic<bool> gStop;
|
|
||||||
std::atomic<bool> gSubscriberConnectedAndSubscribed;
|
|
||||||
std::atomic<size_t> gUniqueMessageIdsCount;
|
|
||||||
std::atomic<int> gMessageCount;
|
|
||||||
|
|
||||||
std::set<std::string> gIds;
|
|
||||||
std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex.
|
|
||||||
|
|
||||||
//
|
|
||||||
// Background thread subscribe to the channel and validates what was sent
|
|
||||||
//
|
|
||||||
void startSubscriber(const ix::CobraConfig& config, const std::string& channel)
|
|
||||||
{
|
|
||||||
gSubscriberConnectedAndSubscribed = false;
|
|
||||||
gUniqueMessageIdsCount = 0;
|
|
||||||
gMessageCount = 0;
|
|
||||||
|
|
||||||
ix::CobraConnection conn;
|
|
||||||
conn.configure(config);
|
|
||||||
conn.connect();
|
|
||||||
|
|
||||||
conn.setEventCallback([&conn, &channel](const CobraEventPtr& event) {
|
|
||||||
if (event->type == ix::CobraEventType::Open)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber connected:";
|
|
||||||
for (auto&& it : event->headers)
|
|
||||||
{
|
|
||||||
log("Headers " + it.first + " " + it.second);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Closed)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber closed:" << event->errMsg;
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Error)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber error:" << event->errMsg;
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Authenticated)
|
|
||||||
{
|
|
||||||
log("Subscriber authenticated");
|
|
||||||
std::string filter;
|
|
||||||
std::string position("$");
|
|
||||||
int batchSize = 1;
|
|
||||||
|
|
||||||
conn.subscribe(channel,
|
|
||||||
filter,
|
|
||||||
position,
|
|
||||||
batchSize,
|
|
||||||
[](const Json::Value& msg, const std::string& /*position*/) {
|
|
||||||
log(msg.toStyledString());
|
|
||||||
|
|
||||||
std::string id = msg["id"].asString();
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> guard(gProtectIds);
|
|
||||||
gIds.insert(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
gMessageCount++;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Subscribed)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber: subscribed to channel " << event->subscriptionId;
|
|
||||||
if (event->subscriptionId == channel)
|
|
||||||
{
|
|
||||||
gSubscriberConnectedAndSubscribed = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::UnSubscribed)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber: unsubscribed from channel " << event->subscriptionId;
|
|
||||||
if (event->subscriptionId != channel)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (event->type == ix::CobraEventType::Published)
|
|
||||||
{
|
|
||||||
TLogger() << "Subscriber: published message acked: " << event->msgId;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
while (!gStop)
|
|
||||||
{
|
|
||||||
std::chrono::duration<double, std::milli> duration(10);
|
|
||||||
std::this_thread::sleep_for(duration);
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.unsubscribe(channel);
|
|
||||||
conn.disconnect();
|
|
||||||
|
|
||||||
gUniqueMessageIdsCount = gIds.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
// publish 100 messages, during roughly 100ms
|
|
||||||
// this is used to test thread safety of CobraMetricsPublisher::push
|
|
||||||
void runAdditionalPublisher(ix::CobraMetricsPublisher* cobraMetricsPublisher)
|
|
||||||
{
|
|
||||||
Json::Value data;
|
|
||||||
data["foo"] = "bar";
|
|
||||||
|
|
||||||
for (int i = 0; i < 100; ++i)
|
|
||||||
{
|
|
||||||
cobraMetricsPublisher->push("sms_metric_F_id", data);
|
|
||||||
ix::msleep(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
|
|
||||||
{
|
|
||||||
int port = getFreePort();
|
|
||||||
bool preferTLS = true;
|
|
||||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, preferTLS);
|
|
||||||
|
|
||||||
// Start a redis server
|
|
||||||
ix::RedisServer redisServer(appConfig.redisPort);
|
|
||||||
auto res = redisServer.listen();
|
|
||||||
REQUIRE(res.first);
|
|
||||||
redisServer.start();
|
|
||||||
|
|
||||||
// Start a snake server
|
|
||||||
snake::SnakeServer snakeServer(appConfig);
|
|
||||||
snakeServer.run();
|
|
||||||
|
|
||||||
setupTrafficTrackerCallback();
|
|
||||||
|
|
||||||
std::string channel = ix::generateSessionId();
|
|
||||||
std::string endpoint = makeCobraEndpoint(port, preferTLS);
|
|
||||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
|
||||||
std::string role = "_sub";
|
|
||||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
|
||||||
|
|
||||||
ix::CobraConfig config;
|
|
||||||
config.endpoint = endpoint;
|
|
||||||
config.appkey = appkey;
|
|
||||||
config.rolename = role;
|
|
||||||
config.rolesecret = secret;
|
|
||||||
config.socketTLSOptions = makeClientTLSOptions();
|
|
||||||
|
|
||||||
gStop = false;
|
|
||||||
std::thread subscriberThread(&startSubscriber, config, channel);
|
|
||||||
|
|
||||||
int timeout = 10 * 1000; // 10s
|
|
||||||
|
|
||||||
// Wait until the subscriber is ready (authenticated + subscription successful)
|
|
||||||
while (!gSubscriberConnectedAndSubscribed)
|
|
||||||
{
|
|
||||||
std::chrono::duration<double, std::milli> duration(10);
|
|
||||||
std::this_thread::sleep_for(duration);
|
|
||||||
|
|
||||||
timeout -= 10;
|
|
||||||
if (timeout <= 0)
|
|
||||||
{
|
|
||||||
snakeServer.stop();
|
|
||||||
redisServer.stop();
|
|
||||||
REQUIRE(false); // timeout
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ix::CobraMetricsPublisher cobraMetricsPublisher;
|
|
||||||
cobraMetricsPublisher.configure(config, channel);
|
|
||||||
cobraMetricsPublisher.setSession(uuid4());
|
|
||||||
cobraMetricsPublisher.enable(true);
|
|
||||||
|
|
||||||
Json::Value data;
|
|
||||||
data["foo"] = "bar";
|
|
||||||
|
|
||||||
// (1) Publish without restrictions
|
|
||||||
cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1)
|
|
||||||
cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2)
|
|
||||||
|
|
||||||
// (2) Restrict what is sent using a blacklist
|
|
||||||
// Add one entry to the blacklist
|
|
||||||
// (will send msg #3)
|
|
||||||
cobraMetricsPublisher.setBlacklist({
|
|
||||||
"sms_metric_B_id" // this id will not be sent
|
|
||||||
});
|
|
||||||
// (msg #4)
|
|
||||||
cobraMetricsPublisher.push("sms_metric_A_id", data);
|
|
||||||
// ...
|
|
||||||
cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent
|
|
||||||
|
|
||||||
// Reset the blacklist
|
|
||||||
// (msg #5)
|
|
||||||
cobraMetricsPublisher.setBlacklist({}); // 4.
|
|
||||||
|
|
||||||
// (3) Restrict what is sent using rate control
|
|
||||||
|
|
||||||
// (msg #6)
|
|
||||||
cobraMetricsPublisher.setRateControl({
|
|
||||||
{"sms_metric_C_id", 1}, // published once per minute (60 seconds) max
|
|
||||||
});
|
|
||||||
// (msg #7)
|
|
||||||
cobraMetricsPublisher.push("sms_metric_C_id", data);
|
|
||||||
cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent
|
|
||||||
|
|
||||||
ix::msleep(1400);
|
|
||||||
|
|
||||||
// (msg #8)
|
|
||||||
cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent
|
|
||||||
|
|
||||||
ix::msleep(600); // wait a bit so that the last message is sent and can be received
|
|
||||||
|
|
||||||
log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher.");
|
|
||||||
|
|
||||||
// Test suspend + resume
|
|
||||||
for (int i = 0; i < 3; ++i)
|
|
||||||
{
|
|
||||||
cobraMetricsPublisher.suspend();
|
|
||||||
ix::msleep(500);
|
|
||||||
REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore
|
|
||||||
|
|
||||||
cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time
|
|
||||||
|
|
||||||
cobraMetricsPublisher.resume();
|
|
||||||
ix::msleep(2000); // give cobra 2s to connect
|
|
||||||
REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now
|
|
||||||
|
|
||||||
cobraMetricsPublisher.push("sms_metric_E_id", data);
|
|
||||||
}
|
|
||||||
|
|
||||||
ix::msleep(500);
|
|
||||||
|
|
||||||
// Test multi-threaded publish
|
|
||||||
std::thread bgPublisher1(&runAdditionalPublisher, &cobraMetricsPublisher);
|
|
||||||
std::thread bgPublisher2(&runAdditionalPublisher, &cobraMetricsPublisher);
|
|
||||||
std::thread bgPublisher3(&runAdditionalPublisher, &cobraMetricsPublisher);
|
|
||||||
std::thread bgPublisher4(&runAdditionalPublisher, &cobraMetricsPublisher);
|
|
||||||
std::thread bgPublisher5(&runAdditionalPublisher, &cobraMetricsPublisher);
|
|
||||||
|
|
||||||
bgPublisher1.join();
|
|
||||||
bgPublisher2.join();
|
|
||||||
bgPublisher3.join();
|
|
||||||
bgPublisher4.join();
|
|
||||||
bgPublisher5.join();
|
|
||||||
|
|
||||||
// Now stop the thread
|
|
||||||
gStop = true;
|
|
||||||
subscriberThread.join();
|
|
||||||
|
|
||||||
//
|
|
||||||
// Validate that we received all message kinds, and the correct number of messages
|
|
||||||
//
|
|
||||||
CHECK(gIds.count("sms_metric_A_id") == 1);
|
|
||||||
CHECK(gIds.count("sms_metric_B_id") == 1);
|
|
||||||
CHECK(gIds.count("sms_metric_C_id") == 1);
|
|
||||||
CHECK(gIds.count("sms_metric_D_id") == 1);
|
|
||||||
CHECK(gIds.count("sms_metric_E_id") == 1);
|
|
||||||
CHECK(gIds.count("sms_metric_F_id") == 1);
|
|
||||||
CHECK(gIds.count("sms_set_rate_control_id") == 1);
|
|
||||||
CHECK(gIds.count("sms_set_blacklist_id") == 1);
|
|
||||||
|
|
||||||
spdlog::info("Incoming bytes {}", incomingBytes);
|
|
||||||
spdlog::info("Outgoing bytes {}", outgoingBytes);
|
|
||||||
|
|
||||||
spdlog::info("Stopping snake server...");
|
|
||||||
snakeServer.stop();
|
|
||||||
|
|
||||||
spdlog::info("Stopping redis server...");
|
|
||||||
redisServer.stop();
|
|
||||||
}
|
|
@ -1,154 +0,0 @@
|
|||||||
/*
|
|
||||||
* IXCobraToSentryTest.cpp
|
|
||||||
* Author: Benjamin Sergeant
|
|
||||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "IXTest.h"
|
|
||||||
#include "catch.hpp"
|
|
||||||
#include <chrono>
|
|
||||||
#include <iostream>
|
|
||||||
#include <ixbots/IXCobraToSentryBot.h>
|
|
||||||
#include <ixcobra/IXCobraConnection.h>
|
|
||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
|
||||||
#include <ixcrypto/IXUuid.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
|
||||||
#include <ixsentry/IXSentryClient.h>
|
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
|
||||||
|
|
||||||
using namespace ix;
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
std::atomic<size_t> incomingBytes(0);
|
|
||||||
std::atomic<size_t> outgoingBytes(0);
|
|
||||||
|
|
||||||
void setupTrafficTrackerCallback()
|
|
||||||
{
|
|
||||||
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
|
|
||||||
if (incoming)
|
|
||||||
{
|
|
||||||
incomingBytes += size;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
outgoingBytes += size;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
|
|
||||||
{
|
|
||||||
SECTION("Exchange and count sent/received messages.")
|
|
||||||
{
|
|
||||||
int port = getFreePort();
|
|
||||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
|
||||||
|
|
||||||
// Start a redis server
|
|
||||||
ix::RedisServer redisServer(appConfig.redisPort);
|
|
||||||
auto res = redisServer.listen();
|
|
||||||
REQUIRE(res.first);
|
|
||||||
redisServer.start();
|
|
||||||
|
|
||||||
// Start a snake server
|
|
||||||
snake::SnakeServer snakeServer(appConfig);
|
|
||||||
snakeServer.run();
|
|
||||||
|
|
||||||
// Start a fake sentry http server
|
|
||||||
SocketTLSOptions tlsOptionsServer = makeServerTLSOptions(true);
|
|
||||||
|
|
||||||
int sentryPort = getFreePort();
|
|
||||||
ix::HttpServer sentryServer(sentryPort, "127.0.0.1");
|
|
||||||
sentryServer.setTLSOptions(tlsOptionsServer);
|
|
||||||
|
|
||||||
sentryServer.setOnConnectionCallback(
|
|
||||||
[](HttpRequestPtr request,
|
|
||||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
|
|
||||||
WebSocketHttpHeaders headers;
|
|
||||||
headers["Server"] = userAgent();
|
|
||||||
|
|
||||||
// Log request
|
|
||||||
std::stringstream ss;
|
|
||||||
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
|
|
||||||
<< " " << request->method << " " << request->headers["User-Agent"] << " "
|
|
||||||
<< request->uri;
|
|
||||||
|
|
||||||
if (request->method == "POST")
|
|
||||||
{
|
|
||||||
return std::make_shared<HttpResponse>(
|
|
||||||
200, "OK", HttpErrorCode::Ok, headers, std::string());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return std::make_shared<HttpResponse>(
|
|
||||||
405, "OK", HttpErrorCode::Invalid, headers, std::string("Invalid method"));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
res = sentryServer.listen();
|
|
||||||
REQUIRE(res.first);
|
|
||||||
sentryServer.start();
|
|
||||||
|
|
||||||
setupTrafficTrackerCallback();
|
|
||||||
|
|
||||||
// Run the bot for a small amount of time
|
|
||||||
std::string channel = ix::generateSessionId();
|
|
||||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
|
||||||
std::string role = "_sub";
|
|
||||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
|
||||||
std::string endpoint = makeCobraEndpoint(port, true);
|
|
||||||
|
|
||||||
ix::CobraConfig config;
|
|
||||||
config.endpoint = endpoint;
|
|
||||||
config.appkey = appkey;
|
|
||||||
config.rolename = role;
|
|
||||||
config.rolesecret = secret;
|
|
||||||
config.socketTLSOptions = makeClientTLSOptions();
|
|
||||||
|
|
||||||
std::thread publisherThread(runPublisher, config, channel);
|
|
||||||
|
|
||||||
ix::CobraBotConfig cobraBotConfig;
|
|
||||||
cobraBotConfig.cobraConfig = config;
|
|
||||||
cobraBotConfig.channel = channel;
|
|
||||||
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
|
|
||||||
cobraBotConfig.enableHeartbeat = false;
|
|
||||||
bool verbose = true;
|
|
||||||
|
|
||||||
// FIXME: try to get this working with https instead of http
|
|
||||||
// to regress the TLS 1.3 OpenSSL bug
|
|
||||||
// -> https://github.com/openssl/openssl/issues/7967
|
|
||||||
// https://xxxxx:yyyyyy@sentry.io/1234567
|
|
||||||
std::stringstream oss;
|
|
||||||
oss << getHttpScheme() << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
|
|
||||||
std::string dsn = oss.str();
|
|
||||||
|
|
||||||
SocketTLSOptions tlsOptionsClient = makeClientTLSOptions();
|
|
||||||
|
|
||||||
SentryClient sentryClient(dsn);
|
|
||||||
sentryClient.setTLSOptions(tlsOptionsClient);
|
|
||||||
|
|
||||||
int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
|
|
||||||
//
|
|
||||||
// We want at least 2 messages to be sent
|
|
||||||
//
|
|
||||||
REQUIRE(sentCount >= 2);
|
|
||||||
|
|
||||||
// Give us 1s for all messages to be received
|
|
||||||
ix::msleep(1000);
|
|
||||||
|
|
||||||
spdlog::info("Incoming bytes {}", incomingBytes);
|
|
||||||
spdlog::info("Outgoing bytes {}", outgoingBytes);
|
|
||||||
|
|
||||||
spdlog::info("Stopping snake server...");
|
|
||||||
snakeServer.stop();
|
|
||||||
|
|
||||||
spdlog::info("Stopping redis server...");
|
|
||||||
redisServer.stop();
|
|
||||||
|
|
||||||
publisherThread.join();
|
|
||||||
sentryServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,101 +0,0 @@
|
|||||||
/*
|
|
||||||
* IXCobraToStatsdTest.cpp
|
|
||||||
* Author: Benjamin Sergeant
|
|
||||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "IXTest.h"
|
|
||||||
#include "catch.hpp"
|
|
||||||
#include <chrono>
|
|
||||||
#include <iostream>
|
|
||||||
#include <ixbots/IXCobraToStatsdBot.h>
|
|
||||||
#include <ixcobra/IXCobraConnection.h>
|
|
||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
|
||||||
#include <ixcrypto/IXUuid.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
|
||||||
#include <ixsentry/IXSentryClient.h>
|
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
|
||||||
|
|
||||||
using namespace ix;
|
|
||||||
|
|
||||||
TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
|
|
||||||
{
|
|
||||||
SECTION("Exchange and count sent/received messages.")
|
|
||||||
{
|
|
||||||
int port = getFreePort();
|
|
||||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
|
||||||
|
|
||||||
// Start a redis server
|
|
||||||
ix::RedisServer redisServer(appConfig.redisPort);
|
|
||||||
auto res = redisServer.listen();
|
|
||||||
REQUIRE(res.first);
|
|
||||||
redisServer.start();
|
|
||||||
|
|
||||||
// Start a snake server
|
|
||||||
snake::SnakeServer snakeServer(appConfig);
|
|
||||||
snakeServer.run();
|
|
||||||
|
|
||||||
// Start a fake statsd server (ultimately)
|
|
||||||
|
|
||||||
// Run the bot for a small amount of time
|
|
||||||
std::string channel = ix::generateSessionId();
|
|
||||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
|
||||||
std::string role = "_sub";
|
|
||||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
|
||||||
std::string endpoint = makeCobraEndpoint(port, true);
|
|
||||||
|
|
||||||
ix::CobraConfig config;
|
|
||||||
config.endpoint = endpoint;
|
|
||||||
config.appkey = appkey;
|
|
||||||
config.rolename = role;
|
|
||||||
config.rolesecret = secret;
|
|
||||||
config.socketTLSOptions = makeClientTLSOptions();
|
|
||||||
|
|
||||||
std::thread publisherThread(runPublisher, config, channel);
|
|
||||||
|
|
||||||
ix::CobraBotConfig cobraBotConfig;
|
|
||||||
cobraBotConfig.cobraConfig = config;
|
|
||||||
cobraBotConfig.channel = channel;
|
|
||||||
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
|
|
||||||
cobraBotConfig.enableHeartbeat = false;
|
|
||||||
|
|
||||||
std::string hostname("127.0.0.1");
|
|
||||||
// std::string hostname("www.google.com");
|
|
||||||
int statsdPort = 8125;
|
|
||||||
std::string prefix("ix.test");
|
|
||||||
StatsdClient statsdClient(hostname, statsdPort, prefix);
|
|
||||||
|
|
||||||
std::string errMsg;
|
|
||||||
bool initialized = statsdClient.init(errMsg);
|
|
||||||
if (!initialized)
|
|
||||||
{
|
|
||||||
spdlog::error(errMsg);
|
|
||||||
}
|
|
||||||
REQUIRE(initialized);
|
|
||||||
|
|
||||||
std::string fields("device.game\ndevice.os_name");
|
|
||||||
std::string gauge;
|
|
||||||
std::string timer;
|
|
||||||
bool verbose = true;
|
|
||||||
|
|
||||||
int64_t sentCount =
|
|
||||||
ix::cobra_to_statsd_bot(cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
|
|
||||||
//
|
|
||||||
// We want at least 2 messages to be sent
|
|
||||||
//
|
|
||||||
REQUIRE(sentCount >= 2);
|
|
||||||
|
|
||||||
// Give us 1s for all messages to be received
|
|
||||||
ix::msleep(1000);
|
|
||||||
|
|
||||||
spdlog::info("Stopping snake server...");
|
|
||||||
snakeServer.stop();
|
|
||||||
|
|
||||||
spdlog::info("Stopping redis server...");
|
|
||||||
redisServer.stop();
|
|
||||||
|
|
||||||
publisherThread.join();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,85 +0,0 @@
|
|||||||
/*
|
|
||||||
* IXCobraToStdoutTest.cpp
|
|
||||||
* Author: Benjamin Sergeant
|
|
||||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "IXTest.h"
|
|
||||||
#include "catch.hpp"
|
|
||||||
#include <chrono>
|
|
||||||
#include <iostream>
|
|
||||||
#include <ixbots/IXCobraToStdoutBot.h>
|
|
||||||
#include <ixcobra/IXCobraConnection.h>
|
|
||||||
#include <ixcrypto/IXUuid.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
|
||||||
#include <ixsentry/IXSentryClient.h>
|
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
|
||||||
|
|
||||||
using namespace ix;
|
|
||||||
|
|
||||||
TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
|
|
||||||
{
|
|
||||||
SECTION("Exchange and count sent/received messages.")
|
|
||||||
{
|
|
||||||
int port = getFreePort();
|
|
||||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
|
||||||
|
|
||||||
// Start a redis server
|
|
||||||
ix::RedisServer redisServer(appConfig.redisPort);
|
|
||||||
auto res = redisServer.listen();
|
|
||||||
REQUIRE(res.first);
|
|
||||||
redisServer.start();
|
|
||||||
|
|
||||||
// Start a snake server
|
|
||||||
snake::SnakeServer snakeServer(appConfig);
|
|
||||||
snakeServer.run();
|
|
||||||
|
|
||||||
// Run the bot for a small amount of time
|
|
||||||
std::string channel = ix::generateSessionId();
|
|
||||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
|
||||||
std::string role = "_sub";
|
|
||||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
|
||||||
std::string endpoint = makeCobraEndpoint(port, true);
|
|
||||||
|
|
||||||
ix::CobraConfig config;
|
|
||||||
config.endpoint = endpoint;
|
|
||||||
config.appkey = appkey;
|
|
||||||
config.rolename = role;
|
|
||||||
config.rolesecret = secret;
|
|
||||||
config.socketTLSOptions = makeClientTLSOptions();
|
|
||||||
|
|
||||||
std::thread publisherThread(runPublisher, config, channel);
|
|
||||||
|
|
||||||
ix::CobraBotConfig cobraBotConfig;
|
|
||||||
cobraBotConfig.cobraConfig = config;
|
|
||||||
cobraBotConfig.channel = channel;
|
|
||||||
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
|
|
||||||
cobraBotConfig.enableHeartbeat = false;
|
|
||||||
bool quiet = false;
|
|
||||||
|
|
||||||
cobraBotConfig.filter =
|
|
||||||
std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'";
|
|
||||||
|
|
||||||
// We could try to capture the output ... not sure how.
|
|
||||||
bool fluentd = true;
|
|
||||||
|
|
||||||
int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
|
|
||||||
//
|
|
||||||
// We want at least 2 messages to be sent
|
|
||||||
//
|
|
||||||
REQUIRE(sentCount >= 2);
|
|
||||||
|
|
||||||
// Give us 1s for all messages to be received
|
|
||||||
ix::msleep(1000);
|
|
||||||
|
|
||||||
spdlog::info("Stopping snake server...");
|
|
||||||
snakeServer.stop();
|
|
||||||
|
|
||||||
spdlog::info("Stopping redis server...");
|
|
||||||
redisServer.stop();
|
|
||||||
|
|
||||||
publisherThread.join();
|
|
||||||
}
|
|
||||||
}
|
|
@ -30,7 +30,6 @@ add_executable(ws
|
|||||||
|
|
||||||
# library with the most dependencies come first
|
# library with the most dependencies come first
|
||||||
target_link_libraries(ws ixwebsocket)
|
target_link_libraries(ws ixwebsocket)
|
||||||
target_link_libraries(ws ixcore)
|
|
||||||
|
|
||||||
target_link_libraries(ws spdlog)
|
target_link_libraries(ws spdlog)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user