add cobra metrics publisher + server unittest
This commit is contained in:
		| @@ -3,6 +3,8 @@ All notable changes to this project will be documented in this file. | |||||||
|  |  | ||||||
| ## [6.0.1] - 2019-09-05 | ## [6.0.1] - 2019-09-05 | ||||||
|  |  | ||||||
|  | - add cobra metrics publisher + server unittest | ||||||
|  | - add cobra client + server unittest | ||||||
| - ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled | - ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled | ||||||
| - IXCobraConnection / pdu handlers can crash if they receive json data which is not an object | - IXCobraConnection / pdu handlers can crash if they receive json data which is not an object | ||||||
|  |  | ||||||
|   | |||||||
| @@ -63,6 +63,7 @@ set (SOURCES | |||||||
|   IXHttpServerTest.cpp |   IXHttpServerTest.cpp | ||||||
|   IXUnityBuildsTest.cpp |   IXUnityBuildsTest.cpp | ||||||
|   IXCobraChatTest.cpp |   IXCobraChatTest.cpp | ||||||
|  |   IXCobraMetricsPublisherTest.cpp | ||||||
| ) | ) | ||||||
|  |  | ||||||
| # Some unittest don't work on windows yet | # Some unittest don't work on windows yet | ||||||
|   | |||||||
| @@ -172,7 +172,7 @@ namespace | |||||||
|     // |     // | ||||||
|     void SatoriChat::run() |     void SatoriChat::run() | ||||||
|     { |     { | ||||||
|         snake::AppConfig appConfig = makeSnakeServerConfig(); |         snake::AppConfig appConfig = makeSnakeServerConfig(8008); | ||||||
|  |  | ||||||
|         // Display config on the terminal for debugging |         // Display config on the terminal for debugging | ||||||
|         dumpConfig(appConfig); |         dumpConfig(appConfig); | ||||||
| @@ -182,16 +182,10 @@ namespace | |||||||
|  |  | ||||||
|         // "chat" conf |         // "chat" conf | ||||||
|         std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); |         std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); | ||||||
|         std::string endpoint("ws://localhost:8008"); |  | ||||||
|         std::string channel = _session; |         std::string channel = _session; | ||||||
|         std::string role = "_sub"; |         std::string role = "_sub"; | ||||||
|         std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; |         std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; | ||||||
|  |         std::string endpoint("ws://localhost:8008"); | ||||||
|         // appkey = "1121b8DfbB33E56dE1F82fC2A08cD1D7"; |  | ||||||
|         // endpoint = "ws://api-internal-cobra.addsrv.com"; |  | ||||||
|         // endpoint = "ws://localhost:8765"; |  | ||||||
|         // role = "unittest_subscriber"; |  | ||||||
|         // secret = "98B69bcdfc145C5fB7C2f4A5aFfe4fd3"; |  | ||||||
|  |  | ||||||
|         _conn.configure(appkey, endpoint, role, secret, |         _conn.configure(appkey, endpoint, role, secret, | ||||||
|                         ix::WebSocketPerMessageDeflateOptions(true)); |                         ix::WebSocketPerMessageDeflateOptions(true)); | ||||||
| @@ -231,6 +225,10 @@ namespace | |||||||
|                 { |                 { | ||||||
|                     log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId); |                     log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId); | ||||||
|                 } |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Published) | ||||||
|  |                 { | ||||||
|  |                     Logger() << "Subscriber: published message acked: " << msgId; | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
| @@ -279,7 +277,7 @@ TEST_CASE("Cobra_chat", "[cobra_chat]") | |||||||
| { | { | ||||||
|     SECTION("Exchange and count sent/received messages.") |     SECTION("Exchange and count sent/received messages.") | ||||||
|     { |     { | ||||||
|         snake::AppConfig appConfig = makeSnakeServerConfig(); |         snake::AppConfig appConfig = makeSnakeServerConfig(8008); | ||||||
|         snake::SnakeServer snakeServer(appConfig); |         snake::SnakeServer snakeServer(appConfig); | ||||||
|         snakeServer.run(); |         snakeServer.run(); | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										233
									
								
								test/IXCobraMetricsPublisherTest.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										233
									
								
								test/IXCobraMetricsPublisherTest.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,233 @@ | |||||||
|  | /* | ||||||
|  |  *  Author: Benjamin Sergeant | ||||||
|  |  *  Copyright (c) 2018 Machine Zone. All rights reserved. | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | #include <iostream> | ||||||
|  | #include <set> | ||||||
|  | #include <ixcrypto/IXUuid.h> | ||||||
|  | #include <ixcobra/IXCobraMetricsPublisher.h> | ||||||
|  | #include "IXTest.h" | ||||||
|  | #include "IXSnakeServer.h" | ||||||
|  |  | ||||||
|  | #include "catch.hpp" | ||||||
|  |  | ||||||
|  | using namespace ix; | ||||||
|  |  | ||||||
|  | namespace | ||||||
|  | { | ||||||
|  |     // | ||||||
|  |     // This project / appkey is configure on cobra to not do any batching. | ||||||
|  |     // This way we can start a subscriber and receive all messages as they come in. | ||||||
|  |     // | ||||||
|  |     std::string ENDPOINT("ws://localhost:8009"); | ||||||
|  |     std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f"); | ||||||
|  |     std::string CHANNEL("unittest_channel"); | ||||||
|  |     std::string PUBLISHER_ROLE("_pub"); | ||||||
|  |     std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db"); | ||||||
|  |     std::string SUBSCRIBER_ROLE("_sub"); | ||||||
|  |     std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"); | ||||||
|  |  | ||||||
|  |     std::atomic<bool> gStop; | ||||||
|  |     std::atomic<bool> gSubscriberConnectedAndSubscribed; | ||||||
|  |     std::atomic<int> gUniqueMessageIdsCount; | ||||||
|  |     std::atomic<int> gMessageCount; | ||||||
|  |  | ||||||
|  |     std::set<std::string> gIds; | ||||||
|  |     std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex. | ||||||
|  |  | ||||||
|  |     // | ||||||
|  |     // Background thread subscribe to the channel and validates what was sent | ||||||
|  |     // | ||||||
|  |     void startSubscriber() | ||||||
|  |     { | ||||||
|  |         gSubscriberConnectedAndSubscribed = false; | ||||||
|  |         gUniqueMessageIdsCount = 0; | ||||||
|  |         gMessageCount = 0; | ||||||
|  |  | ||||||
|  |         ix::CobraConnection conn; | ||||||
|  |         conn.configure(APPKEY, ENDPOINT, SUBSCRIBER_ROLE, SUBSCRIBER_SECRET, | ||||||
|  |                        ix::WebSocketPerMessageDeflateOptions(true)); | ||||||
|  |         conn.connect(); | ||||||
|  |  | ||||||
|  |         conn.setEventCallback( | ||||||
|  |             [&conn] | ||||||
|  |             (ix::CobraConnectionEventType eventType, | ||||||
|  |              const std::string& errMsg, | ||||||
|  |              const ix::WebSocketHttpHeaders& headers, | ||||||
|  |              const std::string& subscriptionId, | ||||||
|  |              CobraConnection::MsgId msgId) | ||||||
|  |             { | ||||||
|  |                 if (eventType == ix::CobraConnection_EventType_Open) | ||||||
|  |                 { | ||||||
|  |                     Logger() << "Subscriber connected:"; | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Authenticated) | ||||||
|  |                 { | ||||||
|  |                     log("Subscriber authenticated"); | ||||||
|  |                     std::string filter; | ||||||
|  |                     conn.subscribe(CHANNEL, filter, | ||||||
|  |                                    [](const Json::Value& msg) | ||||||
|  |                                    { | ||||||
|  |                                        log(msg.toStyledString()); | ||||||
|  |  | ||||||
|  |                                        std::string id = msg["id"].asString(); | ||||||
|  |                                        { | ||||||
|  |                                            std::lock_guard<std::mutex> guard(gProtectIds); | ||||||
|  |                                            gIds.insert(id); | ||||||
|  |                                        } | ||||||
|  |  | ||||||
|  |                                        gMessageCount++; | ||||||
|  |                                    }); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Subscribed) | ||||||
|  |                 { | ||||||
|  |                     Logger() << "Subscriber: subscribed to channel " << subscriptionId; | ||||||
|  |                     if (subscriptionId == CHANNEL) | ||||||
|  |                     { | ||||||
|  |                         gSubscriberConnectedAndSubscribed = true; | ||||||
|  |                     } | ||||||
|  |                     else | ||||||
|  |                     { | ||||||
|  |                         Logger() << "Subscriber: unexpected channel " << subscriptionId; | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_UnSubscribed) | ||||||
|  |                 { | ||||||
|  |                     Logger() << "Subscriber: ununexpected from channel " << subscriptionId; | ||||||
|  |                     if (subscriptionId != CHANNEL) | ||||||
|  |                     { | ||||||
|  |                         Logger() << "Subscriber: unexpected channel " << subscriptionId; | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Published) | ||||||
|  |                 { | ||||||
|  |                     Logger() << "Subscriber: published message acked: " << msgId; | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         while (!gStop) | ||||||
|  |         { | ||||||
|  |             std::chrono::duration<double, std::milli> duration(10); | ||||||
|  |             std::this_thread::sleep_for(duration); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         conn.unsubscribe(CHANNEL); | ||||||
|  |         conn.disconnect(); | ||||||
|  |  | ||||||
|  |         gUniqueMessageIdsCount = gIds.size(); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | TEST_CASE("Cobra_Metrics_Publisher", "[cobra]") | ||||||
|  | { | ||||||
|  |     snake::AppConfig appConfig = makeSnakeServerConfig(8009); | ||||||
|  |     snake::SnakeServer snakeServer(appConfig); | ||||||
|  |     snakeServer.run(); | ||||||
|  |  | ||||||
|  |     // Make channel name unique | ||||||
|  |     CHANNEL += uuid4(); | ||||||
|  |  | ||||||
|  |     gStop = false; | ||||||
|  |     std::thread bgThread(&startSubscriber); | ||||||
|  |  | ||||||
|  |     int timeout = 10 * 1000; // 10s | ||||||
|  |      | ||||||
|  |     // Wait until the subscriber is ready (authenticated + subscription successful) | ||||||
|  |     while (!gSubscriberConnectedAndSubscribed) | ||||||
|  |     { | ||||||
|  |         std::chrono::duration<double, std::milli> duration(10); | ||||||
|  |         std::this_thread::sleep_for(duration); | ||||||
|  |          | ||||||
|  |         timeout -= 10; | ||||||
|  |         if (timeout <= 0) | ||||||
|  |         { | ||||||
|  |             REQUIRE(false); // timeout | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     ix::CobraMetricsPublisher cobraMetricsPublisher; | ||||||
|  |  | ||||||
|  |     bool perMessageDeflate = true; | ||||||
|  |     cobraMetricsPublisher.configure(APPKEY, ENDPOINT, CHANNEL, | ||||||
|  |                                     PUBLISHER_ROLE, PUBLISHER_SECRET, perMessageDeflate); | ||||||
|  |     cobraMetricsPublisher.setSession(uuid4()); | ||||||
|  |     cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active | ||||||
|  |  | ||||||
|  |     Json::Value data; | ||||||
|  |     data["foo"] = "bar"; | ||||||
|  |  | ||||||
|  |     // (1) Publish without restrictions | ||||||
|  |     cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1) | ||||||
|  |     cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2) | ||||||
|  |  | ||||||
|  |     // (2) Restrict what is sent using a blacklist | ||||||
|  |     // Add one entry to the blacklist | ||||||
|  |     // (will send msg #3) | ||||||
|  |     cobraMetricsPublisher.setBlacklist({ | ||||||
|  |         "sms_metric_B_id" // this id will not be sent | ||||||
|  |     }); | ||||||
|  |     // (msg #4) | ||||||
|  |     cobraMetricsPublisher.push("sms_metric_A_id", data); | ||||||
|  |     // ... | ||||||
|  |     cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent | ||||||
|  |  | ||||||
|  |     // Reset the blacklist | ||||||
|  |     // (msg #5) | ||||||
|  |     cobraMetricsPublisher.setBlacklist({}); // 4. | ||||||
|  |  | ||||||
|  |     // (3) Restrict what is sent using rate control | ||||||
|  |  | ||||||
|  |     // (msg #6) | ||||||
|  |     cobraMetricsPublisher.setRateControl({ | ||||||
|  |         {"sms_metric_C_id", 1},  // published once per minute (60 seconds) max | ||||||
|  |     }); | ||||||
|  |     // (msg #7) | ||||||
|  |     cobraMetricsPublisher.push("sms_metric_C_id", data); | ||||||
|  |     cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent | ||||||
|  |  | ||||||
|  |     ix::msleep(1400); | ||||||
|  |  | ||||||
|  |     // (msg #8) | ||||||
|  |     cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent | ||||||
|  |  | ||||||
|  |     ix::msleep(600); // wait a bit so that the last message is sent and can be received | ||||||
|  |  | ||||||
|  |     log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher."); | ||||||
|  |  | ||||||
|  |     // Test suspend + resume | ||||||
|  |     for (int i = 0 ; i < 3 ; ++i) | ||||||
|  |     { | ||||||
|  |         cobraMetricsPublisher.suspend(); | ||||||
|  |         ix::msleep(500); | ||||||
|  |         REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore | ||||||
|  |  | ||||||
|  |         cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time | ||||||
|  |  | ||||||
|  |         cobraMetricsPublisher.resume(); | ||||||
|  |         ix::msleep(2000); // give cobra 2s to connect | ||||||
|  |         REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now | ||||||
|  |  | ||||||
|  |         cobraMetricsPublisher.push("sms_metric_E_id", data); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     ix::msleep(500); | ||||||
|  |  | ||||||
|  |     // Now stop the thread | ||||||
|  |     gStop = true; | ||||||
|  |     bgThread.join(); | ||||||
|  |  | ||||||
|  |     // | ||||||
|  |     // Validate that we received all message kinds, and the correct number of messages | ||||||
|  |     // | ||||||
|  |     CHECK(gIds.count("sms_metric_A_id") == 1); | ||||||
|  |     CHECK(gIds.count("sms_metric_B_id") == 1); | ||||||
|  |     CHECK(gIds.count("sms_metric_C_id") == 1); | ||||||
|  |     CHECK(gIds.count("sms_metric_D_id") == 1); | ||||||
|  |     CHECK(gIds.count("sms_metric_E_id") == 1); | ||||||
|  |     CHECK(gIds.count("sms_set_rate_control_id") == 1); | ||||||
|  |     CHECK(gIds.count("sms_set_blacklist_id") == 1); | ||||||
|  |  | ||||||
|  |     snakeServer.stop(); | ||||||
|  | } | ||||||
| @@ -161,10 +161,10 @@ namespace ix | |||||||
|         return std::string(vec.begin(), vec.end()); |         return std::string(vec.begin(), vec.end()); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     snake::AppConfig makeSnakeServerConfig() |     snake::AppConfig makeSnakeServerConfig(int port) | ||||||
|     { |     { | ||||||
|         snake::AppConfig appConfig; |         snake::AppConfig appConfig; | ||||||
|         appConfig.port = 8008; |         appConfig.port = port; | ||||||
|         appConfig.hostname = "127.0.0.1"; |         appConfig.hostname = "127.0.0.1"; | ||||||
|         appConfig.verbose = true; |         appConfig.verbose = true; | ||||||
|         appConfig.redisPort = 6379; |         appConfig.redisPort = 6379; | ||||||
|   | |||||||
| @@ -50,5 +50,5 @@ namespace ix | |||||||
|  |  | ||||||
|     bool startWebSocketEchoServer(ix::WebSocketServer& server); |     bool startWebSocketEchoServer(ix::WebSocketServer& server); | ||||||
|  |  | ||||||
|     snake::AppConfig makeSnakeServerConfig(); |     snake::AppConfig makeSnakeServerConfig(int port); | ||||||
| } // namespace ix | } // namespace ix | ||||||
|   | |||||||
| @@ -284,7 +284,6 @@ namespace snake | |||||||
|     void handleUnSubscribe( |     void handleUnSubscribe( | ||||||
|         std::shared_ptr<SnakeConnectionState> state, |         std::shared_ptr<SnakeConnectionState> state, | ||||||
|         std::shared_ptr<ix::WebSocket> ws, |         std::shared_ptr<ix::WebSocket> ws, | ||||||
|         const AppConfig& appConfig, |  | ||||||
|         const nlohmann::json& pdu) |         const nlohmann::json& pdu) | ||||||
|     { |     { | ||||||
|         // extract subscription_id |         // extract subscription_id | ||||||
| @@ -333,7 +332,7 @@ namespace snake | |||||||
|         } |         } | ||||||
|         else if (action == "rtm/unsubscribe") |         else if (action == "rtm/unsubscribe") | ||||||
|         { |         { | ||||||
|             handleUnSubscribe(state, ws, appConfig, pdu); |             handleUnSubscribe(state, ws, pdu); | ||||||
|         } |         } | ||||||
|         else |         else | ||||||
|         { |         { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user