From 8821183aea43982852e8cdec06fade692f5cdcab Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 19 Sep 2019 12:51:34 -0700 Subject: [PATCH] missing file in ws tool --- ws/ws_cobra_metrics_publish.cpp | 86 +++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 ws/ws_cobra_metrics_publish.cpp diff --git a/ws/ws_cobra_metrics_publish.cpp b/ws/ws_cobra_metrics_publish.cpp new file mode 100644 index 00000000..d0b204c4 --- /dev/null +++ b/ws/ws_cobra_metrics_publish.cpp @@ -0,0 +1,86 @@ +/* + * ws_cobra_metrics_publish.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ix +{ + int ws_cobra_metrics_publish_main(const std::string& appkey, + const std::string& endpoint, + const std::string& rolename, + const std::string& rolesecret, + const std::string& channel, + const std::string& path, + bool stress) + { + std::atomic sentMessages(0); + std::atomic ackedMessages(0); + CobraConnection::setPublishTrackerCallback( + [&sentMessages, &ackedMessages](bool sent, bool acked) + { + if (sent) sentMessages++; + if (acked) ackedMessages++; + } + ); + + CobraMetricsPublisher cobraMetricsPublisher; + cobraMetricsPublisher.enable(true); + + bool enablePerMessageDeflate = true; + cobraMetricsPublisher.configure(appkey, endpoint, channel, + rolename, rolesecret, enablePerMessageDeflate); + + while (!cobraMetricsPublisher.isAuthenticated()) ; + + std::ifstream f(path); + std::string str((std::istreambuf_iterator(f)), + std::istreambuf_iterator()); + + Json::Value data; + Json::Reader reader; + if (!reader.parse(str, data)) return 1; + + if (!stress) + { + cobraMetricsPublisher.push(channel, data); + } + else + { + // Stress mode to try to trigger server and client bugs + while (true) + { + for (int i = 0 ; i < 1000; ++i) + { + cobraMetricsPublisher.push(channel, data); + } + + cobraMetricsPublisher.suspend(); + cobraMetricsPublisher.resume(); + + // FIXME: investigate why without this check we trigger a lock + while (!cobraMetricsPublisher.isAuthenticated()) ; + } + } + + // Wait a bit for the message to get a chance to be sent + // there isn't any ack on publish right now so it's the best we can do + // FIXME: this comment is a lie now + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages); + + return 0; + } +} +