diff --git a/.travis.yml b/.travis.yml index e9e62cb4..1425922d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,9 @@ language: bash # See https://github.com/amaiorano/vectrexy/blob/master/.travis.yml # for ideas on installing vcpkg +services: + - redis-server + matrix: include: # macOS diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8565da9b..9be3c850 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -17,11 +17,15 @@ endif() add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket) +set (WS ../ws) + include_directories( ${PROJECT_SOURCE_DIR}/Catch2/single_include + ../third_party ../third_party/msgpack11 ../third_party/spdlog/include ../ws + ../ws/snake ) # Shared sources @@ -30,7 +34,24 @@ set (SOURCES IXTest.cpp IXGetFreePort.cpp ../third_party/msgpack11/msgpack11.cpp - ../ws/ixcore/utils/IXCoreLogger.cpp + ../third_party/jsoncpp/jsoncpp.cpp + + ${WS}/ixcore/utils/IXCoreLogger.cpp + + ${WS}/ixcrypto/IXBase64.cpp + ${WS}/ixcrypto/IXHash.cpp + ${WS}/ixcrypto/IXUuid.cpp + ${WS}/ixcrypto/IXHMac.cpp + + ${WS}/ixcobra/IXCobraConnection.cpp + ${WS}/ixcobra/IXCobraMetricsPublisher.cpp + ${WS}/ixcobra/IXCobraMetricsThreadedPublisher.cpp + + ${WS}/snake/IXSnakeServer.cpp + ${WS}/snake/IXSnakeProtocol.cpp + ${WS}/snake/IXAppConfig.cpp + + ${WS}/IXRedisClient.cpp IXSocketTest.cpp IXSocketConnectTest.cpp @@ -41,6 +62,7 @@ set (SOURCES IXHttpClientTest.cpp IXHttpServerTest.cpp IXUnityBuildsTest.cpp + IXCobraChatTest.cpp ) # Some unittest don't work on windows yet diff --git a/test/IXCobraChatTest.cpp b/test/IXCobraChatTest.cpp new file mode 100644 index 00000000..09d82637 --- /dev/null +++ b/test/IXCobraChatTest.cpp @@ -0,0 +1,350 @@ +/* + * cmd_satori_chat.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2017 Machine Zone. All rights reserved. + */ + +#include +#include +#include +#include +#include "IXTest.h" +#include "IXSnakeServer.h" + +#include "catch.hpp" + +using namespace ix; + +namespace +{ + std::atomic incomingBytes(0); + std::atomic outgoingBytes(0); + + void setupTrafficTrackerCallback() + { + ix::CobraConnection::setTrafficTrackerCallback( + [](size_t size, bool incoming) + { + if (incoming) + { + incomingBytes += size; + } + else + { + outgoingBytes += size; + } + } + ); + } + + class SatoriChat + { + public: + SatoriChat(const std::string& user, + const std::string& session); + + void subscribe(const std::string& channel); + void start(); + void stop(); + void run(); + bool isReady() const; + + void sendMessage(const std::string& text); + size_t getReceivedMessagesCount() const; + + bool hasPendingMessages() const; + Json::Value popMessage(); + + private: + std::string _user; + std::string _session; + + std::queue _publish_queue; + mutable std::mutex _queue_mutex; + + std::thread _thread; + std::atomic _stop; + + ix::CobraConnection _conn; + std::atomic _connectedAndSubscribed; + + std::queue _receivedQueue; + + std::mutex _logMutex; + }; + + SatoriChat::SatoriChat(const std::string& user, + const std::string& session) : + _connectedAndSubscribed(false), + _stop(false), + _user(user), + _session(session) + { + ; + } + + void SatoriChat::start() + { + _thread = std::thread(&SatoriChat::run, this); + } + + void SatoriChat::stop() + { + _stop = true; + _thread.join(); + } + + bool SatoriChat::isReady() const + { + return _connectedAndSubscribed; + } + + size_t SatoriChat::getReceivedMessagesCount() const + { + return _receivedQueue.size(); + } + + bool SatoriChat::hasPendingMessages() const + { + std::unique_lock lock(_queue_mutex); + return !_publish_queue.empty(); + } + + Json::Value SatoriChat::popMessage() + { + std::unique_lock lock(_queue_mutex); + auto msg = _publish_queue.front(); + _publish_queue.pop(); + return msg; + } + + // + // Callback to handle received messages, that are printed on the console + // + void SatoriChat::subscribe(const std::string& channel) + { + std::string filter; + _conn.subscribe(channel, filter, + [this](const Json::Value& msg) + { + std::cout << msg.toStyledString() << std::endl; + if (!msg.isObject()) return; + if (!msg.isMember("user")) return; + if (!msg.isMember("text")) return; + if (!msg.isMember("session")) return; + + std::string msg_user = msg["user"].asString(); + std::string msg_text = msg["text"].asString(); + std::string msg_session = msg["session"].asString(); + + // We are not interested in messages + // from a different session. + if (msg_session != _session) return; + + // We are not interested in our own messages + if (msg_user == _user) return; + + _receivedQueue.push(msg); + + std::stringstream ss; + ss << std::endl + << msg_user << " > " << msg_text + << std::endl + << _user << " > "; + log(ss.str()); + }); + } + + void SatoriChat::sendMessage(const std::string& text) + { + Json::Value msg; + msg["user"] = _user; + msg["session"] = _session; + msg["text"] = text; + + std::unique_lock lock(_queue_mutex); + _publish_queue.push(msg); + } + + // + // Do satori communication on a background thread, where we can have + // something like an event loop that publish, poll and receive data + // + void SatoriChat::run() + { + snake::AppConfig appConfig = makeSnakeServerConfig(); + + // Display config on the terminal for debugging + dumpConfig(appConfig); + + snake::SnakeServer snakeServer(appConfig); + snakeServer.run(); + + // "chat" conf + std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); + std::string endpoint("ws://localhost:8008"); + std::string channel = _session; + std::string role = "_sub"; + std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; + + // appkey = "1121b8DfbB33E56dE1F82fC2A08cD1D7"; + // endpoint = "ws://api-internal-cobra.addsrv.com"; + // endpoint = "ws://localhost:8765"; + // role = "unittest_subscriber"; + // secret = "98B69bcdfc145C5fB7C2f4A5aFfe4fd3"; + + _conn.configure(appkey, endpoint, role, secret, + ix::WebSocketPerMessageDeflateOptions(true)); + _conn.connect(); + + _conn.setEventCallback( + [this, channel] + (ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) + { + if (eventType == ix::CobraConnection_EventType_Open) + { + log("Subscriber connected: " + _user); + } + else if (eventType == ix::CobraConnection_EventType_Authenticated) + { + log("Subscriber authenticated: " + _user); + subscribe(channel); + } + else if (eventType == ix::CobraConnection_EventType_Error) + { + log(errMsg + _user); + } + else if (eventType == ix::CobraConnection_EventType_Closed) + { + log("Connection closed: " + _user); + } + else if (eventType == ix::CobraConnection_EventType_Subscribed) + { + log("Subscription ok: " + _user + " subscription_id " + subscriptionId); + _connectedAndSubscribed = true; + } + else if (eventType == ix::CobraConnection_EventType_UnSubscribed) + { + log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId); + } + } + ); + + while (!_stop) + { + { + while (hasPendingMessages()) + { + auto msg = popMessage(); + + std::string text = msg["text"].asString(); + + std::stringstream ss; + ss << "Sending msg [" << text << "]"; + log(ss.str()); + + Json::Value channels; + channels.append(channel); + _conn.publish(channels, msg); + } + } + + ix::msleep(50); + } + + _conn.unsubscribe(channel); + + ix::msleep(50); + _conn.disconnect(); + + _conn.setEventCallback([] + (ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) + { + ; + }); + + snakeServer.stop(); + } +} + +TEST_CASE("Cobra_chat", "[cobra_chat]") +{ + SECTION("Exchange and count sent/received messages.") + { + snake::AppConfig appConfig = makeSnakeServerConfig(); + snake::SnakeServer snakeServer(appConfig); + snakeServer.run(); + + int timeout; + setupTrafficTrackerCallback(); + + std::string session = ix::generateSessionId(); + SatoriChat chatA("jean", session); + SatoriChat chatB("paul", session); + + chatA.start(); + chatB.start(); + + // Wait for all chat instance to be ready + timeout = 10 * 1000; // 10s + while (true) + { + if (chatA.isReady() && chatB.isReady()) break; + ix::msleep(10); + + timeout -= 10; + if (timeout <= 0) + { + REQUIRE(false); // timeout + } + } + + // Add a bit of extra time, for the subscription to be active + ix::msleep(1000); + + chatA.sendMessage("from A1"); + chatA.sendMessage("from A2"); + chatA.sendMessage("from A3"); + + chatB.sendMessage("from B1"); + chatB.sendMessage("from B2"); + + // 1. Wait for all messages to be sent + timeout = 10 * 1000; // 10s + while (chatA.hasPendingMessages() || chatB.hasPendingMessages()) + { + ix::msleep(10); + + timeout -= 10; + if (timeout <= 0) + { + REQUIRE(false); // timeout + } + } + + // Give us 1s for all messages to be received + ix::msleep(1000); + + chatA.stop(); + chatB.stop(); + + // FIXME: improve this and make it exact matches + // we get unreliable result set + REQUIRE(chatA.getReceivedMessagesCount() == 2); + REQUIRE(chatB.getReceivedMessagesCount() == 3); + + std::cout << incomingBytes << std::endl; + std::cout << "Incoming bytes: " << incomingBytes << std::endl; + std::cout << "Outgoing bytes: " << outgoingBytes << std::endl; + + snakeServer.stop(); + } +} diff --git a/test/IXTest.cpp b/test/IXTest.cpp index 79924b3a..3f06299f 100644 --- a/test/IXTest.cpp +++ b/test/IXTest.cpp @@ -137,4 +137,57 @@ namespace ix server.start(); return true; } + + std::vector load(const std::string& path) + { + std::vector memblock; + + std::ifstream file(path); + if (!file.is_open()) return memblock; + + file.seekg(0, file.end); + std::streamoff size = file.tellg(); + file.seekg(0, file.beg); + + memblock.resize((size_t) size); + file.read((char*)&memblock.front(), static_cast(size)); + + return memblock; + } + + std::string readAsString(const std::string& path) + { + auto vec = load(path); + return std::string(vec.begin(), vec.end()); + } + + snake::AppConfig makeSnakeServerConfig() + { + snake::AppConfig appConfig; + appConfig.port = 8008; + appConfig.hostname = "127.0.0.1"; + appConfig.verbose = true; + appConfig.redisPort = 6379; + appConfig.redisPassword = ""; + appConfig.redisHosts.push_back("localhost"); // only one host supported now + + std::string appsConfigPath("appsConfig.json"); + + // Parse config file + auto str = readAsString(appsConfigPath); + if (str.empty()) + { + std::cout << "Cannot read content of " << appsConfigPath << std::endl; + return appConfig; + } + + std::cout << str << std::endl; + auto apps = nlohmann::json::parse(str); + appConfig.apps = apps["apps"]; + + // Display config on the terminal for debugging + dumpConfig(appConfig); + + return appConfig; + } } diff --git a/test/IXTest.h b/test/IXTest.h index ccd8e1c3..28816c5f 100644 --- a/test/IXTest.h +++ b/test/IXTest.h @@ -9,6 +9,7 @@ #include "IXGetFreePort.h" #include #include +#include "IXAppConfig.h" #include #include #include @@ -48,4 +49,6 @@ namespace ix void log(const std::string& msg); bool startWebSocketEchoServer(ix::WebSocketServer& server); + + snake::AppConfig makeSnakeServerConfig(); } // namespace ix diff --git a/test/appsConfig.json b/test/appsConfig.json new file mode 100644 index 00000000..14f8f48b --- /dev/null +++ b/test/appsConfig.json @@ -0,0 +1,14 @@ +{ + "apps": { + "FC2F10139A2BAc53BB72D9db967b024f": { + "roles": { + "_sub": { + "secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba" + }, + "_pub": { + "secret": "1c04DB8fFe76A4EeFE3E318C72d771db" + } + } + } + } +}