/* * Author: Benjamin Sergeant * Copyright (c) 2018 Machine Zone. All rights reserved. */ #include "IXTest.h" #include "catch.hpp" #include #include #include #include #include #include using namespace ix; namespace { std::atomic incomingBytes(0); std::atomic outgoingBytes(0); void setupTrafficTrackerCallback() { ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) { if (incoming) { incomingBytes += size; } else { outgoingBytes += size; } }); } std::atomic gStop; std::atomic gSubscriberConnectedAndSubscribed; std::atomic gUniqueMessageIdsCount; std::atomic gMessageCount; std::set gIds; std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex. // // Background thread subscribe to the channel and validates what was sent // void startSubscriber(const ix::CobraConfig& config, const std::string& channel) { gSubscriberConnectedAndSubscribed = false; gUniqueMessageIdsCount = 0; gMessageCount = 0; ix::CobraConnection conn; conn.configure(config); conn.connect(); conn.setEventCallback([&conn, &channel](const CobraEventPtr& event) { if (event->type == ix::CobraEventType::Open) { TLogger() << "Subscriber connected:"; for (auto&& it : event->headers) { log("Headers " + it.first + " " + it.second); } } else if (event->type == ix::CobraEventType::Closed) { TLogger() << "Subscriber closed:" << event->errMsg; } else if (event->type == ix::CobraEventType::Error) { TLogger() << "Subscriber error:" << event->errMsg; } else if (event->type == ix::CobraEventType::Authenticated) { log("Subscriber authenticated"); std::string filter; std::string position("$"); int batchSize = 1; conn.subscribe(channel, filter, position, batchSize, [](const Json::Value& msg, const std::string& /*position*/) { log(msg.toStyledString()); std::string id = msg["id"].asString(); { std::lock_guard guard(gProtectIds); gIds.insert(id); } gMessageCount++; }); } else if (event->type == ix::CobraEventType::Subscribed) { TLogger() << "Subscriber: subscribed to channel " << event->subscriptionId; if (event->subscriptionId == channel) { gSubscriberConnectedAndSubscribed = true; } else { TLogger() << "Subscriber: unexpected channel " << event->subscriptionId; } } else if (event->type == ix::CobraEventType::UnSubscribed) { TLogger() << "Subscriber: unsubscribed from channel " << event->subscriptionId; if (event->subscriptionId != channel) { TLogger() << "Subscriber: unexpected channel " << event->subscriptionId; } } else if (event->type == ix::CobraEventType::Published) { TLogger() << "Subscriber: published message acked: " << event->msgId; } }); while (!gStop) { std::chrono::duration duration(10); std::this_thread::sleep_for(duration); } conn.unsubscribe(channel); conn.disconnect(); gUniqueMessageIdsCount = gIds.size(); } // publish 100 messages, during roughly 100ms // this is used to test thread safety of CobraMetricsPublisher::push void runAdditionalPublisher(ix::CobraMetricsPublisher* cobraMetricsPublisher) { Json::Value data; data["foo"] = "bar"; for (int i = 0; i < 100; ++i) { cobraMetricsPublisher->push("sms_metric_F_id", data); ix::msleep(1); } } } // namespace TEST_CASE("Cobra_Metrics_Publisher", "[cobra]") { int port = getFreePort(); bool preferTLS = true; snake::AppConfig appConfig = makeSnakeServerConfig(port, preferTLS); // Start a redis server ix::RedisServer redisServer(appConfig.redisPort); auto res = redisServer.listen(); REQUIRE(res.first); redisServer.start(); // Start a snake server snake::SnakeServer snakeServer(appConfig); snakeServer.run(); setupTrafficTrackerCallback(); std::string channel = ix::generateSessionId(); std::string endpoint = makeCobraEndpoint(port, preferTLS); std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); std::string role = "_sub"; std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; ix::CobraConfig config; config.endpoint = endpoint; config.appkey = appkey; config.rolename = role; config.rolesecret = secret; config.socketTLSOptions = makeClientTLSOptions(); gStop = false; std::thread subscriberThread(&startSubscriber, config, channel); int timeout = 10 * 1000; // 10s // Wait until the subscriber is ready (authenticated + subscription successful) while (!gSubscriberConnectedAndSubscribed) { std::chrono::duration duration(10); std::this_thread::sleep_for(duration); timeout -= 10; if (timeout <= 0) { snakeServer.stop(); redisServer.stop(); REQUIRE(false); // timeout } } ix::CobraMetricsPublisher cobraMetricsPublisher; cobraMetricsPublisher.configure(config, channel); cobraMetricsPublisher.setSession(uuid4()); cobraMetricsPublisher.enable(true); Json::Value data; data["foo"] = "bar"; // (1) Publish without restrictions cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1) cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2) // (2) Restrict what is sent using a blacklist // Add one entry to the blacklist // (will send msg #3) cobraMetricsPublisher.setBlacklist({ "sms_metric_B_id" // this id will not be sent }); // (msg #4) cobraMetricsPublisher.push("sms_metric_A_id", data); // ... cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent // Reset the blacklist // (msg #5) cobraMetricsPublisher.setBlacklist({}); // 4. // (3) Restrict what is sent using rate control // (msg #6) cobraMetricsPublisher.setRateControl({ {"sms_metric_C_id", 1}, // published once per minute (60 seconds) max }); // (msg #7) cobraMetricsPublisher.push("sms_metric_C_id", data); cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent ix::msleep(1400); // (msg #8) cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent ix::msleep(600); // wait a bit so that the last message is sent and can be received log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher."); // Test suspend + resume for (int i = 0; i < 3; ++i) { cobraMetricsPublisher.suspend(); ix::msleep(500); REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time cobraMetricsPublisher.resume(); ix::msleep(2000); // give cobra 2s to connect REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now cobraMetricsPublisher.push("sms_metric_E_id", data); } ix::msleep(500); // Test multi-threaded publish std::thread bgPublisher1(&runAdditionalPublisher, &cobraMetricsPublisher); std::thread bgPublisher2(&runAdditionalPublisher, &cobraMetricsPublisher); std::thread bgPublisher3(&runAdditionalPublisher, &cobraMetricsPublisher); std::thread bgPublisher4(&runAdditionalPublisher, &cobraMetricsPublisher); std::thread bgPublisher5(&runAdditionalPublisher, &cobraMetricsPublisher); bgPublisher1.join(); bgPublisher2.join(); bgPublisher3.join(); bgPublisher4.join(); bgPublisher5.join(); // Now stop the thread gStop = true; subscriberThread.join(); // // Validate that we received all message kinds, and the correct number of messages // CHECK(gIds.count("sms_metric_A_id") == 1); CHECK(gIds.count("sms_metric_B_id") == 1); CHECK(gIds.count("sms_metric_C_id") == 1); CHECK(gIds.count("sms_metric_D_id") == 1); CHECK(gIds.count("sms_metric_E_id") == 1); CHECK(gIds.count("sms_metric_F_id") == 1); CHECK(gIds.count("sms_set_rate_control_id") == 1); CHECK(gIds.count("sms_set_blacklist_id") == 1); spdlog::info("Incoming bytes {}", incomingBytes); spdlog::info("Outgoing bytes {}", outgoingBytes); spdlog::info("Stopping snake server..."); snakeServer.stop(); spdlog::info("Stopping redis server..."); redisServer.stop(); }