Add new cobra unittest, using server and client

This commit is contained in:
Benjamin Sergeant 2019-09-05 20:48:38 -07:00
parent 21404c23dd
commit c0f098a578
6 changed files with 446 additions and 1 deletions

View File

@ -3,6 +3,9 @@ language: bash
# See https://github.com/amaiorano/vectrexy/blob/master/.travis.yml # See https://github.com/amaiorano/vectrexy/blob/master/.travis.yml
# for ideas on installing vcpkg # for ideas on installing vcpkg
services:
- redis-server
matrix: matrix:
include: include:
# macOS # macOS

View File

@ -17,11 +17,15 @@ endif()
add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket) add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
set (WS ../ws)
include_directories( include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include ${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party
../third_party/msgpack11 ../third_party/msgpack11
../third_party/spdlog/include ../third_party/spdlog/include
../ws ../ws
../ws/snake
) )
# Shared sources # Shared sources
@ -30,7 +34,24 @@ set (SOURCES
IXTest.cpp IXTest.cpp
IXGetFreePort.cpp IXGetFreePort.cpp
../third_party/msgpack11/msgpack11.cpp ../third_party/msgpack11/msgpack11.cpp
../ws/ixcore/utils/IXCoreLogger.cpp ../third_party/jsoncpp/jsoncpp.cpp
${WS}/ixcore/utils/IXCoreLogger.cpp
${WS}/ixcrypto/IXBase64.cpp
${WS}/ixcrypto/IXHash.cpp
${WS}/ixcrypto/IXUuid.cpp
${WS}/ixcrypto/IXHMac.cpp
${WS}/ixcobra/IXCobraConnection.cpp
${WS}/ixcobra/IXCobraMetricsPublisher.cpp
${WS}/ixcobra/IXCobraMetricsThreadedPublisher.cpp
${WS}/snake/IXSnakeServer.cpp
${WS}/snake/IXSnakeProtocol.cpp
${WS}/snake/IXAppConfig.cpp
${WS}/IXRedisClient.cpp
IXSocketTest.cpp IXSocketTest.cpp
IXSocketConnectTest.cpp IXSocketConnectTest.cpp
@ -41,6 +62,7 @@ set (SOURCES
IXHttpClientTest.cpp IXHttpClientTest.cpp
IXHttpServerTest.cpp IXHttpServerTest.cpp
IXUnityBuildsTest.cpp IXUnityBuildsTest.cpp
IXCobraChatTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet

350
test/IXCobraChatTest.cpp Normal file
View File

@ -0,0 +1,350 @@
/*
* cmd_satori_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <chrono>
#include <ixcobra/IXCobraConnection.h>
#include <ixcrypto/IXUuid.h>
#include "IXTest.h"
#include "IXSnakeServer.h"
#include "catch.hpp"
using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback(
[](size_t size, bool incoming)
{
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
}
);
}
class SatoriChat
{
public:
SatoriChat(const std::string& user,
const std::string& session);
void subscribe(const std::string& channel);
void start();
void stop();
void run();
bool isReady() const;
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
bool hasPendingMessages() const;
Json::Value popMessage();
private:
std::string _user;
std::string _session;
std::queue<Json::Value> _publish_queue;
mutable std::mutex _queue_mutex;
std::thread _thread;
std::atomic<bool> _stop;
ix::CobraConnection _conn;
std::atomic<bool> _connectedAndSubscribed;
std::queue<Json::Value> _receivedQueue;
std::mutex _logMutex;
};
SatoriChat::SatoriChat(const std::string& user,
const std::string& session) :
_connectedAndSubscribed(false),
_stop(false),
_user(user),
_session(session)
{
;
}
void SatoriChat::start()
{
_thread = std::thread(&SatoriChat::run, this);
}
void SatoriChat::stop()
{
_stop = true;
_thread.join();
}
bool SatoriChat::isReady() const
{
return _connectedAndSubscribed;
}
size_t SatoriChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
}
bool SatoriChat::hasPendingMessages() const
{
std::unique_lock<std::mutex> lock(_queue_mutex);
return !_publish_queue.empty();
}
Json::Value SatoriChat::popMessage()
{
std::unique_lock<std::mutex> lock(_queue_mutex);
auto msg = _publish_queue.front();
_publish_queue.pop();
return msg;
}
//
// Callback to handle received messages, that are printed on the console
//
void SatoriChat::subscribe(const std::string& channel)
{
std::string filter;
_conn.subscribe(channel, filter,
[this](const Json::Value& msg)
{
std::cout << msg.toStyledString() << std::endl;
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in our own messages
if (msg_user == _user) return;
_receivedQueue.push(msg);
std::stringstream ss;
ss << std::endl
<< msg_user << " > " << msg_text
<< std::endl
<< _user << " > ";
log(ss.str());
});
}
void SatoriChat::sendMessage(const std::string& text)
{
Json::Value msg;
msg["user"] = _user;
msg["session"] = _session;
msg["text"] = text;
std::unique_lock<std::mutex> lock(_queue_mutex);
_publish_queue.push(msg);
}
//
// Do satori communication on a background thread, where we can have
// something like an event loop that publish, poll and receive data
//
void SatoriChat::run()
{
snake::AppConfig appConfig = makeSnakeServerConfig();
// Display config on the terminal for debugging
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string endpoint("ws://localhost:8008");
std::string channel = _session;
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
// appkey = "1121b8DfbB33E56dE1F82fC2A08cD1D7";
// endpoint = "ws://api-internal-cobra.addsrv.com";
// endpoint = "ws://localhost:8765";
// role = "unittest_subscriber";
// secret = "98B69bcdfc145C5fB7C2f4A5aFfe4fd3";
_conn.configure(appkey, endpoint, role, secret,
ix::WebSocketPerMessageDeflateOptions(true));
_conn.connect();
_conn.setEventCallback(
[this, channel]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
log("Subscriber connected: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated: " + _user);
subscribe(channel);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
log(errMsg + _user);
}
else if (eventType == ix::CobraConnection_EventType_Closed)
{
log("Connection closed: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
log("Subscription ok: " + _user + " subscription_id " + subscriptionId);
_connectedAndSubscribed = true;
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
}
}
);
while (!_stop)
{
{
while (hasPendingMessages())
{
auto msg = popMessage();
std::string text = msg["text"].asString();
std::stringstream ss;
ss << "Sending msg [" << text << "]";
log(ss.str());
Json::Value channels;
channels.append(channel);
_conn.publish(channels, msg);
}
}
ix::msleep(50);
}
_conn.unsubscribe(channel);
ix::msleep(50);
_conn.disconnect();
_conn.setEventCallback([]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
;
});
snakeServer.stop();
}
}
TEST_CASE("Cobra_chat", "[cobra_chat]")
{
SECTION("Exchange and count sent/received messages.")
{
snake::AppConfig appConfig = makeSnakeServerConfig();
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
int timeout;
setupTrafficTrackerCallback();
std::string session = ix::generateSessionId();
SatoriChat chatA("jean", session);
SatoriChat chatB("paul", session);
chatA.start();
chatB.start();
// Wait for all chat instance to be ready
timeout = 10 * 1000; // 10s
while (true)
{
if (chatA.isReady() && chatB.isReady()) break;
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
// Add a bit of extra time, for the subscription to be active
ix::msleep(1000);
chatA.sendMessage("from A1");
chatA.sendMessage("from A2");
chatA.sendMessage("from A3");
chatB.sendMessage("from B1");
chatB.sendMessage("from B2");
// 1. Wait for all messages to be sent
timeout = 10 * 1000; // 10s
while (chatA.hasPendingMessages() || chatB.hasPendingMessages())
{
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
// Give us 1s for all messages to be received
ix::msleep(1000);
chatA.stop();
chatB.stop();
// FIXME: improve this and make it exact matches
// we get unreliable result set
REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
std::cout << incomingBytes << std::endl;
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
snakeServer.stop();
}
}

View File

@ -137,4 +137,57 @@ namespace ix
server.start(); server.start();
return true; return true;
} }
std::vector<uint8_t> load(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize((size_t) size);
file.read((char*)&memblock.front(), static_cast<std::streamsize>(size));
return memblock;
}
std::string readAsString(const std::string& path)
{
auto vec = load(path);
return std::string(vec.begin(), vec.end());
}
snake::AppConfig makeSnakeServerConfig()
{
snake::AppConfig appConfig;
appConfig.port = 8008;
appConfig.hostname = "127.0.0.1";
appConfig.verbose = true;
appConfig.redisPort = 6379;
appConfig.redisPassword = "";
appConfig.redisHosts.push_back("localhost"); // only one host supported now
std::string appsConfigPath("appsConfig.json");
// Parse config file
auto str = readAsString(appsConfigPath);
if (str.empty())
{
std::cout << "Cannot read content of " << appsConfigPath << std::endl;
return appConfig;
}
std::cout << str << std::endl;
auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"];
// Display config on the terminal for debugging
dumpConfig(appConfig);
return appConfig;
}
} }

View File

@ -9,6 +9,7 @@
#include "IXGetFreePort.h" #include "IXGetFreePort.h"
#include <iostream> #include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include "IXAppConfig.h"
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -48,4 +49,6 @@ namespace ix
void log(const std::string& msg); void log(const std::string& msg);
bool startWebSocketEchoServer(ix::WebSocketServer& server); bool startWebSocketEchoServer(ix::WebSocketServer& server);
snake::AppConfig makeSnakeServerConfig();
} // namespace ix } // namespace ix

14
test/appsConfig.json Normal file
View File

@ -0,0 +1,14 @@
{
"apps": {
"FC2F10139A2BAc53BB72D9db967b024f": {
"roles": {
"_sub": {
"secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"
},
"_pub": {
"secret": "1c04DB8fFe76A4EeFE3E318C72d771db"
}
}
}
}
}