From 47b3368f78d34ff50acc6701320687328c21b6f4 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 19 Sep 2019 12:51:11 -0700 Subject: [PATCH] In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch) --- DOCKER_VERSION | 2 +- docs/CHANGELOG.md | 4 ++ ixwebsocket/IXDNSLookup.cpp | 2 +- ixwebsocket/IXWebSocketVersion.h | 2 +- ws/CMakeLists.txt | 1 + ws/ws.cpp | 20 +++++- ws/ws.h | 11 +++- ws/ws_cobra_publish.cpp | 105 ++++++++++++++++++------------- ws/ws_cobra_subscribe.cpp | 1 - 9 files changed, 98 insertions(+), 50 deletions(-) diff --git a/DOCKER_VERSION b/DOCKER_VERSION index 024b066c..ca063943 100644 --- a/DOCKER_VERSION +++ b/DOCKER_VERSION @@ -1 +1 @@ -6.2.1 +6.2.2 diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ac81990e..22aa945a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [6.2.2] - 2019-09-19 + +- In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch) + ## [6.2.1] - 2019-09-17 - On error while doing a client handshake, additionally display port number next to the host name diff --git a/ixwebsocket/IXDNSLookup.cpp b/ixwebsocket/IXDNSLookup.cpp index d776f6d1..1b566ddc 100644 --- a/ixwebsocket/IXDNSLookup.cpp +++ b/ixwebsocket/IXDNSLookup.cpp @@ -134,7 +134,7 @@ namespace ix std::string errMsg; struct addrinfo* res = getAddrInfo(hostname, port, errMsg); - if (self.lock()) + if (auto lock = self.lock()) { // Copy result into the member variables setRes(res); diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index adb7bc23..8d53fd2a 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "6.2.1" +#define IX_WEBSOCKET_VERSION "6.2.2" diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 6bd7b36e..d3c69d92 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -54,6 +54,7 @@ add_executable(ws ws_redis_publish.cpp ws_redis_subscribe.cpp ws_cobra_subscribe.cpp + ws_cobra_metrics_publish.cpp ws_cobra_publish.cpp ws_cobra_to_statsd.cpp ws_cobra_to_sentry.cpp diff --git a/ws/ws.cpp b/ws/ws.cpp index bc6bd9e0..a0917aa9 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -182,7 +182,17 @@ int main(int argc, char** argv) cobraPublish->add_option("--pidfile", pidfile, "Pid file"); cobraPublish->add_option("path", path, "Path to the file to send") ->required()->check(CLI::ExistingPath); - cobraPublish->add_flag("--stress", stress, "Stress mode"); + + CLI::App* cobraMetricsPublish = app.add_subcommand("cobra_metrics_publish", "Cobra metrics publisher"); + cobraMetricsPublish->add_option("--appkey", appkey, "Appkey"); + cobraMetricsPublish->add_option("--endpoint", endpoint, "Endpoint"); + cobraMetricsPublish->add_option("--rolename", rolename, "Role name"); + cobraMetricsPublish->add_option("--rolesecret", rolesecret, "Role secret"); + cobraMetricsPublish->add_option("channel", channel, "Channel")->required(); + cobraMetricsPublish->add_option("--pidfile", pidfile, "Pid file"); + cobraMetricsPublish->add_option("path", path, "Path to the file to send") + ->required()->check(CLI::ExistingPath); + cobraMetricsPublish->add_flag("--stress", stress, "Stress mode"); CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); cobra2statsd->add_option("--appkey", appkey, "Appkey"); @@ -305,7 +315,13 @@ int main(int argc, char** argv) { ret = ix::ws_cobra_publish_main(appkey, endpoint, rolename, rolesecret, - channel, path, stress); + channel, path); + } + else if (app.got_subcommand("cobra_metrics_publish")) + { + ret = ix::ws_cobra_metrics_publish_main(appkey, endpoint, + rolename, rolesecret, + channel, path, stress); } else if (app.got_subcommand("cobra_to_statsd")) { diff --git a/ws/ws.h b/ws/ws.h index 7208fc7a..da4a5647 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -67,8 +67,15 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, - const std::string& path, - bool stress); + const std::string& path); + + int ws_cobra_metrics_publish_main(const std::string& appkey, + const std::string& endpoint, + const std::string& rolename, + const std::string& rolesecret, + const std::string& channel, + const std::string& path, + bool stress); int ws_cobra_to_statsd_main(const std::string& appkey, const std::string& endpoint, diff --git a/ws/ws_cobra_publish.cpp b/ws/ws_cobra_publish.cpp index 77541d45..cb473a84 100644 --- a/ws/ws_cobra_publish.cpp +++ b/ws/ws_cobra_publish.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include #include #include #include @@ -21,64 +23,83 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, - const std::string& path, - bool stress) + const std::string& path) { - std::atomic sentMessages(0); - std::atomic ackedMessages(0); - CobraConnection::setPublishTrackerCallback( - [&sentMessages, &ackedMessages](bool sent, bool acked) - { - if (sent) sentMessages++; - if (acked) ackedMessages++; - } - ); - - CobraMetricsPublisher cobraMetricsPublisher; - cobraMetricsPublisher.enable(true); - - bool enablePerMessageDeflate = true; - cobraMetricsPublisher.configure(appkey, endpoint, channel, - rolename, rolesecret, enablePerMessageDeflate); - - while (!cobraMetricsPublisher.isAuthenticated()) ; - std::ifstream f(path); std::string str((std::istreambuf_iterator(f)), std::istreambuf_iterator()); Json::Value data; Json::Reader reader; - if (!reader.parse(str, data)) return 1; - - if (!stress) + if (!reader.parse(str, data)) { - cobraMetricsPublisher.push(channel, data); + spdlog::info("Input file is not a JSON file"); + return 1; } - else - { - // Stress mode to try to trigger server and client bugs - while (true) + + ix::CobraConnection conn; + conn.configure(appkey, endpoint, + rolename, rolesecret, + ix::WebSocketPerMessageDeflateOptions(true)); + conn.connect(); + + // Display incoming messages + std::atomic authenticated(false); + std::atomic messageAcked(false); + std::condition_variable condition; + + conn.setEventCallback( + [&conn, &channel, &data, &authenticated, &messageAcked, &condition] + (ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { - for (int i = 0 ; i < 1000; ++i) + if (eventType == ix::CobraConnection_EventType_Open) { - cobraMetricsPublisher.push(channel, data); + spdlog::info("Publisher connected"); + + for (auto it : headers) + { + spdlog::info("{}: {}", it.first, it.second); + } } + else if (eventType == ix::CobraConnection_EventType_Authenticated) + { + spdlog::info("Publisher authenticated"); + authenticated = true; - cobraMetricsPublisher.suspend(); - cobraMetricsPublisher.resume(); + spdlog::info("Publishing data"); - // FIXME: investigate why without this check we trigger a lock - while (!cobraMetricsPublisher.isAuthenticated()) ; + Json::Value channels; + channels[0] = channel; + conn.publish(channels, data); + } + else if (eventType == ix::CobraConnection_EventType_Subscribed) + { + spdlog::info("Publisher: subscribed to channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_UnSubscribed) + { + spdlog::info("Publisher: unsubscribed from channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_Error) + { + spdlog::error("Publisher: error {}", errMsg); + condition.notify_one(); + } + else if (eventType == ix::CobraConnection_EventType_Published) + { + spdlog::info("Published message acked: {}", msgId); + messageAcked = true; + condition.notify_one(); + } } - } + ); - // Wait a bit for the message to get a chance to be sent - // there isn't any ack on publish right now so it's the best we can do - // FIXME: this comment is a lie now - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages); + while (!authenticated) ; + while (!messageAcked) ; return 0; } diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index d4e595fa..f5a78254 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -23,7 +23,6 @@ namespace ix const std::string& filter, bool quiet) { - ix::CobraConnection conn; conn.configure(appkey, endpoint, rolename, rolesecret,