diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c1ce1b15..6cd84428 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -3,6 +3,8 @@ All notable changes to this project will be documented in this file. ## [6.0.1] - 2019-09-05 +- add cobra metrics publisher + server unittest +- add cobra client + server unittest - ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled - IXCobraConnection / pdu handlers can crash if they receive json data which is not an object diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9be3c850..3a05fb73 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -63,6 +63,7 @@ set (SOURCES IXHttpServerTest.cpp IXUnityBuildsTest.cpp IXCobraChatTest.cpp + IXCobraMetricsPublisherTest.cpp ) # Some unittest don't work on windows yet diff --git a/test/IXCobraChatTest.cpp b/test/IXCobraChatTest.cpp index 09d82637..dbc7b00b 100644 --- a/test/IXCobraChatTest.cpp +++ b/test/IXCobraChatTest.cpp @@ -172,7 +172,7 @@ namespace // void SatoriChat::run() { - snake::AppConfig appConfig = makeSnakeServerConfig(); + snake::AppConfig appConfig = makeSnakeServerConfig(8008); // Display config on the terminal for debugging dumpConfig(appConfig); @@ -182,16 +182,10 @@ namespace // "chat" conf std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); - std::string endpoint("ws://localhost:8008"); std::string channel = _session; std::string role = "_sub"; std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; - - // appkey = "1121b8DfbB33E56dE1F82fC2A08cD1D7"; - // endpoint = "ws://api-internal-cobra.addsrv.com"; - // endpoint = "ws://localhost:8765"; - // role = "unittest_subscriber"; - // secret = "98B69bcdfc145C5fB7C2f4A5aFfe4fd3"; + std::string endpoint("ws://localhost:8008"); _conn.configure(appkey, endpoint, role, secret, ix::WebSocketPerMessageDeflateOptions(true)); @@ -231,6 +225,10 @@ namespace { log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId); } + else if (eventType == ix::CobraConnection_EventType_Published) + { + Logger() << "Subscriber: published message acked: " << msgId; + } } ); @@ -279,7 +277,7 @@ TEST_CASE("Cobra_chat", "[cobra_chat]") { SECTION("Exchange and count sent/received messages.") { - snake::AppConfig appConfig = makeSnakeServerConfig(); + snake::AppConfig appConfig = makeSnakeServerConfig(8008); snake::SnakeServer snakeServer(appConfig); snakeServer.run(); diff --git a/test/IXCobraMetricsPublisherTest.cpp b/test/IXCobraMetricsPublisherTest.cpp new file mode 100644 index 00000000..a6ab0f41 --- /dev/null +++ b/test/IXCobraMetricsPublisherTest.cpp @@ -0,0 +1,233 @@ +/* + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone. All rights reserved. + */ + +#include +#include +#include +#include +#include "IXTest.h" +#include "IXSnakeServer.h" + +#include "catch.hpp" + +using namespace ix; + +namespace +{ + // + // This project / appkey is configure on cobra to not do any batching. + // This way we can start a subscriber and receive all messages as they come in. + // + std::string ENDPOINT("ws://localhost:8009"); + std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f"); + std::string CHANNEL("unittest_channel"); + std::string PUBLISHER_ROLE("_pub"); + std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db"); + std::string SUBSCRIBER_ROLE("_sub"); + std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"); + + std::atomic gStop; + std::atomic gSubscriberConnectedAndSubscribed; + std::atomic gUniqueMessageIdsCount; + std::atomic gMessageCount; + + std::set gIds; + std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex. + + // + // Background thread subscribe to the channel and validates what was sent + // + void startSubscriber() + { + gSubscriberConnectedAndSubscribed = false; + gUniqueMessageIdsCount = 0; + gMessageCount = 0; + + ix::CobraConnection conn; + conn.configure(APPKEY, ENDPOINT, SUBSCRIBER_ROLE, SUBSCRIBER_SECRET, + ix::WebSocketPerMessageDeflateOptions(true)); + conn.connect(); + + conn.setEventCallback( + [&conn] + (ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) + { + if (eventType == ix::CobraConnection_EventType_Open) + { + Logger() << "Subscriber connected:"; + } + else if (eventType == ix::CobraConnection_EventType_Authenticated) + { + log("Subscriber authenticated"); + std::string filter; + conn.subscribe(CHANNEL, filter, + [](const Json::Value& msg) + { + log(msg.toStyledString()); + + std::string id = msg["id"].asString(); + { + std::lock_guard guard(gProtectIds); + gIds.insert(id); + } + + gMessageCount++; + }); + } + else if (eventType == ix::CobraConnection_EventType_Subscribed) + { + Logger() << "Subscriber: subscribed to channel " << subscriptionId; + if (subscriptionId == CHANNEL) + { + gSubscriberConnectedAndSubscribed = true; + } + else + { + Logger() << "Subscriber: unexpected channel " << subscriptionId; + } + } + else if (eventType == ix::CobraConnection_EventType_UnSubscribed) + { + Logger() << "Subscriber: ununexpected from channel " << subscriptionId; + if (subscriptionId != CHANNEL) + { + Logger() << "Subscriber: unexpected channel " << subscriptionId; + } + } + else if (eventType == ix::CobraConnection_EventType_Published) + { + Logger() << "Subscriber: published message acked: " << msgId; + } + } + ); + + while (!gStop) + { + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + } + + conn.unsubscribe(CHANNEL); + conn.disconnect(); + + gUniqueMessageIdsCount = gIds.size(); + } +} + +TEST_CASE("Cobra_Metrics_Publisher", "[cobra]") +{ + snake::AppConfig appConfig = makeSnakeServerConfig(8009); + snake::SnakeServer snakeServer(appConfig); + snakeServer.run(); + + // Make channel name unique + CHANNEL += uuid4(); + + gStop = false; + std::thread bgThread(&startSubscriber); + + int timeout = 10 * 1000; // 10s + + // Wait until the subscriber is ready (authenticated + subscription successful) + while (!gSubscriberConnectedAndSubscribed) + { + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + + timeout -= 10; + if (timeout <= 0) + { + REQUIRE(false); // timeout + } + } + + ix::CobraMetricsPublisher cobraMetricsPublisher; + + bool perMessageDeflate = true; + cobraMetricsPublisher.configure(APPKEY, ENDPOINT, CHANNEL, + PUBLISHER_ROLE, PUBLISHER_SECRET, perMessageDeflate); + cobraMetricsPublisher.setSession(uuid4()); + cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active + + Json::Value data; + data["foo"] = "bar"; + + // (1) Publish without restrictions + cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1) + cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2) + + // (2) Restrict what is sent using a blacklist + // Add one entry to the blacklist + // (will send msg #3) + cobraMetricsPublisher.setBlacklist({ + "sms_metric_B_id" // this id will not be sent + }); + // (msg #4) + cobraMetricsPublisher.push("sms_metric_A_id", data); + // ... + cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent + + // Reset the blacklist + // (msg #5) + cobraMetricsPublisher.setBlacklist({}); // 4. + + // (3) Restrict what is sent using rate control + + // (msg #6) + cobraMetricsPublisher.setRateControl({ + {"sms_metric_C_id", 1}, // published once per minute (60 seconds) max + }); + // (msg #7) + cobraMetricsPublisher.push("sms_metric_C_id", data); + cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent + + ix::msleep(1400); + + // (msg #8) + cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent + + ix::msleep(600); // wait a bit so that the last message is sent and can be received + + log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher."); + + // Test suspend + resume + for (int i = 0 ; i < 3 ; ++i) + { + cobraMetricsPublisher.suspend(); + ix::msleep(500); + REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore + + cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time + + cobraMetricsPublisher.resume(); + ix::msleep(2000); // give cobra 2s to connect + REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now + + cobraMetricsPublisher.push("sms_metric_E_id", data); + } + + ix::msleep(500); + + // Now stop the thread + gStop = true; + bgThread.join(); + + // + // Validate that we received all message kinds, and the correct number of messages + // + CHECK(gIds.count("sms_metric_A_id") == 1); + CHECK(gIds.count("sms_metric_B_id") == 1); + CHECK(gIds.count("sms_metric_C_id") == 1); + CHECK(gIds.count("sms_metric_D_id") == 1); + CHECK(gIds.count("sms_metric_E_id") == 1); + CHECK(gIds.count("sms_set_rate_control_id") == 1); + CHECK(gIds.count("sms_set_blacklist_id") == 1); + + snakeServer.stop(); +} diff --git a/test/IXTest.cpp b/test/IXTest.cpp index 3f06299f..541aa1ae 100644 --- a/test/IXTest.cpp +++ b/test/IXTest.cpp @@ -161,10 +161,10 @@ namespace ix return std::string(vec.begin(), vec.end()); } - snake::AppConfig makeSnakeServerConfig() + snake::AppConfig makeSnakeServerConfig(int port) { snake::AppConfig appConfig; - appConfig.port = 8008; + appConfig.port = port; appConfig.hostname = "127.0.0.1"; appConfig.verbose = true; appConfig.redisPort = 6379; diff --git a/test/IXTest.h b/test/IXTest.h index 28816c5f..c72f7e64 100644 --- a/test/IXTest.h +++ b/test/IXTest.h @@ -50,5 +50,5 @@ namespace ix bool startWebSocketEchoServer(ix::WebSocketServer& server); - snake::AppConfig makeSnakeServerConfig(); + snake::AppConfig makeSnakeServerConfig(int port); } // namespace ix diff --git a/ws/snake/IXSnakeProtocol.cpp b/ws/snake/IXSnakeProtocol.cpp index f156d18a..28419892 100644 --- a/ws/snake/IXSnakeProtocol.cpp +++ b/ws/snake/IXSnakeProtocol.cpp @@ -284,7 +284,6 @@ namespace snake void handleUnSubscribe( std::shared_ptr state, std::shared_ptr ws, - const AppConfig& appConfig, const nlohmann::json& pdu) { // extract subscription_id @@ -333,7 +332,7 @@ namespace snake } else if (action == "rtm/unsubscribe") { - handleUnSubscribe(state, ws, appConfig, pdu); + handleUnSubscribe(state, ws, pdu); } else {