In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch)
This commit is contained in:
		| @@ -1 +1 @@ | |||||||
| 6.2.1 | 6.2.2 | ||||||
|   | |||||||
| @@ -1,6 +1,10 @@ | |||||||
| # Changelog | # Changelog | ||||||
| All notable changes to this project will be documented in this file. | All notable changes to this project will be documented in this file. | ||||||
|  |  | ||||||
|  | ## [6.2.2] - 2019-09-19 | ||||||
|  |  | ||||||
|  | - In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch) | ||||||
|  |  | ||||||
| ## [6.2.1] - 2019-09-17 | ## [6.2.1] - 2019-09-17 | ||||||
|  |  | ||||||
| - On error while doing a client handshake, additionally display port number next to the host name | - On error while doing a client handshake, additionally display port number next to the host name | ||||||
|   | |||||||
| @@ -134,7 +134,7 @@ namespace ix | |||||||
|         std::string errMsg; |         std::string errMsg; | ||||||
|         struct addrinfo* res = getAddrInfo(hostname, port, errMsg); |         struct addrinfo* res = getAddrInfo(hostname, port, errMsg); | ||||||
|  |  | ||||||
|         if (self.lock()) |         if (auto lock = self.lock()) | ||||||
|         { |         { | ||||||
|             // Copy result into the member variables |             // Copy result into the member variables | ||||||
|             setRes(res); |             setRes(res); | ||||||
|   | |||||||
| @@ -6,4 +6,4 @@ | |||||||
|  |  | ||||||
| #pragma once | #pragma once | ||||||
|  |  | ||||||
| #define IX_WEBSOCKET_VERSION "6.2.1" | #define IX_WEBSOCKET_VERSION "6.2.2" | ||||||
|   | |||||||
| @@ -54,6 +54,7 @@ add_executable(ws | |||||||
|   ws_redis_publish.cpp |   ws_redis_publish.cpp | ||||||
|   ws_redis_subscribe.cpp |   ws_redis_subscribe.cpp | ||||||
|   ws_cobra_subscribe.cpp |   ws_cobra_subscribe.cpp | ||||||
|  |   ws_cobra_metrics_publish.cpp | ||||||
|   ws_cobra_publish.cpp |   ws_cobra_publish.cpp | ||||||
|   ws_cobra_to_statsd.cpp |   ws_cobra_to_statsd.cpp | ||||||
|   ws_cobra_to_sentry.cpp |   ws_cobra_to_sentry.cpp | ||||||
|   | |||||||
							
								
								
									
										20
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										20
									
								
								ws/ws.cpp
									
									
									
									
									
								
							| @@ -182,7 +182,17 @@ int main(int argc, char** argv) | |||||||
|     cobraPublish->add_option("--pidfile", pidfile, "Pid file"); |     cobraPublish->add_option("--pidfile", pidfile, "Pid file"); | ||||||
|     cobraPublish->add_option("path", path, "Path to the file to send") |     cobraPublish->add_option("path", path, "Path to the file to send") | ||||||
|         ->required()->check(CLI::ExistingPath); |         ->required()->check(CLI::ExistingPath); | ||||||
|     cobraPublish->add_flag("--stress", stress, "Stress mode"); |  | ||||||
|  |     CLI::App* cobraMetricsPublish = app.add_subcommand("cobra_metrics_publish", "Cobra metrics publisher"); | ||||||
|  |     cobraMetricsPublish->add_option("--appkey", appkey, "Appkey"); | ||||||
|  |     cobraMetricsPublish->add_option("--endpoint", endpoint, "Endpoint"); | ||||||
|  |     cobraMetricsPublish->add_option("--rolename", rolename, "Role name"); | ||||||
|  |     cobraMetricsPublish->add_option("--rolesecret", rolesecret, "Role secret"); | ||||||
|  |     cobraMetricsPublish->add_option("channel", channel, "Channel")->required(); | ||||||
|  |     cobraMetricsPublish->add_option("--pidfile", pidfile, "Pid file"); | ||||||
|  |     cobraMetricsPublish->add_option("path", path, "Path to the file to send") | ||||||
|  |         ->required()->check(CLI::ExistingPath); | ||||||
|  |     cobraMetricsPublish->add_flag("--stress", stress, "Stress mode"); | ||||||
|  |  | ||||||
|     CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); |     CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); | ||||||
|     cobra2statsd->add_option("--appkey", appkey, "Appkey"); |     cobra2statsd->add_option("--appkey", appkey, "Appkey"); | ||||||
| @@ -305,7 +315,13 @@ int main(int argc, char** argv) | |||||||
|     { |     { | ||||||
|         ret = ix::ws_cobra_publish_main(appkey, endpoint, |         ret = ix::ws_cobra_publish_main(appkey, endpoint, | ||||||
|                                         rolename, rolesecret, |                                         rolename, rolesecret, | ||||||
|                                         channel, path, stress); |                                         channel, path); | ||||||
|  |     } | ||||||
|  |     else if (app.got_subcommand("cobra_metrics_publish")) | ||||||
|  |     { | ||||||
|  |         ret = ix::ws_cobra_metrics_publish_main(appkey, endpoint, | ||||||
|  |                                                 rolename, rolesecret, | ||||||
|  |                                                 channel, path, stress); | ||||||
|     } |     } | ||||||
|     else if (app.got_subcommand("cobra_to_statsd")) |     else if (app.got_subcommand("cobra_to_statsd")) | ||||||
|     { |     { | ||||||
|   | |||||||
							
								
								
									
										11
									
								
								ws/ws.h
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								ws/ws.h
									
									
									
									
									
								
							| @@ -67,8 +67,15 @@ namespace ix | |||||||
|                               const std::string& rolename, |                               const std::string& rolename, | ||||||
|                               const std::string& rolesecret, |                               const std::string& rolesecret, | ||||||
|                               const std::string& channel, |                               const std::string& channel, | ||||||
|                               const std::string& path, |                               const std::string& path); | ||||||
|                               bool stress); |  | ||||||
|  |     int ws_cobra_metrics_publish_main(const std::string& appkey, | ||||||
|  |                                       const std::string& endpoint, | ||||||
|  |                                       const std::string& rolename, | ||||||
|  |                                       const std::string& rolesecret, | ||||||
|  |                                       const std::string& channel, | ||||||
|  |                                       const std::string& path, | ||||||
|  |                                       bool stress); | ||||||
|  |  | ||||||
|     int ws_cobra_to_statsd_main(const std::string& appkey, |     int ws_cobra_to_statsd_main(const std::string& appkey, | ||||||
|                                 const std::string& endpoint, |                                 const std::string& endpoint, | ||||||
|   | |||||||
| @@ -10,6 +10,8 @@ | |||||||
| #include <chrono> | #include <chrono> | ||||||
| #include <thread> | #include <thread> | ||||||
| #include <atomic> | #include <atomic> | ||||||
|  | #include <mutex> | ||||||
|  | #include <condition_variable> | ||||||
| #include <jsoncpp/json/json.h> | #include <jsoncpp/json/json.h> | ||||||
| #include <ixcobra/IXCobraMetricsPublisher.h> | #include <ixcobra/IXCobraMetricsPublisher.h> | ||||||
| #include <spdlog/spdlog.h> | #include <spdlog/spdlog.h> | ||||||
| @@ -21,64 +23,83 @@ namespace ix | |||||||
|                               const std::string& rolename, |                               const std::string& rolename, | ||||||
|                               const std::string& rolesecret, |                               const std::string& rolesecret, | ||||||
|                               const std::string& channel, |                               const std::string& channel, | ||||||
|                               const std::string& path, |                               const std::string& path) | ||||||
|                               bool stress) |  | ||||||
|     { |     { | ||||||
|         std::atomic<int> sentMessages(0); |  | ||||||
|         std::atomic<int> ackedMessages(0); |  | ||||||
|         CobraConnection::setPublishTrackerCallback( |  | ||||||
|             [&sentMessages, &ackedMessages](bool sent, bool acked) |  | ||||||
|             { |  | ||||||
|                 if (sent) sentMessages++; |  | ||||||
|                 if (acked) ackedMessages++; |  | ||||||
|             } |  | ||||||
|         ); |  | ||||||
|  |  | ||||||
|         CobraMetricsPublisher cobraMetricsPublisher; |  | ||||||
|         cobraMetricsPublisher.enable(true); |  | ||||||
|  |  | ||||||
|         bool enablePerMessageDeflate = true; |  | ||||||
|         cobraMetricsPublisher.configure(appkey, endpoint, channel, |  | ||||||
|                                         rolename, rolesecret, enablePerMessageDeflate); |  | ||||||
|  |  | ||||||
|         while (!cobraMetricsPublisher.isAuthenticated()) ; |  | ||||||
|  |  | ||||||
|         std::ifstream f(path); |         std::ifstream f(path); | ||||||
|         std::string str((std::istreambuf_iterator<char>(f)), |         std::string str((std::istreambuf_iterator<char>(f)), | ||||||
|                          std::istreambuf_iterator<char>()); |                          std::istreambuf_iterator<char>()); | ||||||
|  |  | ||||||
|         Json::Value data; |         Json::Value data; | ||||||
|         Json::Reader reader; |         Json::Reader reader; | ||||||
|         if (!reader.parse(str, data)) return 1; |         if (!reader.parse(str, data)) | ||||||
|  |  | ||||||
|         if (!stress) |  | ||||||
|         { |         { | ||||||
|             cobraMetricsPublisher.push(channel, data); |             spdlog::info("Input file is not a JSON file"); | ||||||
|  |             return 1; | ||||||
|         } |         } | ||||||
|         else |  | ||||||
|         { |         ix::CobraConnection conn; | ||||||
|             // Stress mode to try to trigger server and client bugs |         conn.configure(appkey, endpoint, | ||||||
|             while (true) |                        rolename, rolesecret, | ||||||
|  |                        ix::WebSocketPerMessageDeflateOptions(true)); | ||||||
|  |         conn.connect(); | ||||||
|  |  | ||||||
|  |         // Display incoming messages | ||||||
|  |         std::atomic<bool> authenticated(false); | ||||||
|  |         std::atomic<bool> messageAcked(false); | ||||||
|  |         std::condition_variable condition; | ||||||
|  |  | ||||||
|  |         conn.setEventCallback( | ||||||
|  |             [&conn, &channel, &data, &authenticated, &messageAcked, &condition] | ||||||
|  |             (ix::CobraConnectionEventType eventType, | ||||||
|  |              const std::string& errMsg, | ||||||
|  |              const ix::WebSocketHttpHeaders& headers, | ||||||
|  |              const std::string& subscriptionId, | ||||||
|  |              CobraConnection::MsgId msgId) | ||||||
|             { |             { | ||||||
|                 for (int i = 0 ; i < 1000; ++i) |                 if (eventType == ix::CobraConnection_EventType_Open) | ||||||
|                 { |                 { | ||||||
|                     cobraMetricsPublisher.push(channel, data); |                     spdlog::info("Publisher connected"); | ||||||
|  |  | ||||||
|  |                     for (auto it : headers) | ||||||
|  |                     { | ||||||
|  |                         spdlog::info("{}: {}", it.first, it.second); | ||||||
|  |                     } | ||||||
|                 } |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Authenticated) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Publisher authenticated"); | ||||||
|  |                     authenticated = true; | ||||||
|  |  | ||||||
|                 cobraMetricsPublisher.suspend(); |                     spdlog::info("Publishing data"); | ||||||
|                 cobraMetricsPublisher.resume(); |  | ||||||
|  |  | ||||||
|                 // FIXME: investigate why without this check we trigger a lock |                     Json::Value channels; | ||||||
|                 while (!cobraMetricsPublisher.isAuthenticated()) ; |                     channels[0] = channel; | ||||||
|  |                     conn.publish(channels, data); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Subscribed) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Publisher: subscribed to channel {}", subscriptionId); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_UnSubscribed) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Publisher: unsubscribed from channel {}", subscriptionId); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Error) | ||||||
|  |                 { | ||||||
|  |                     spdlog::error("Publisher: error {}", errMsg); | ||||||
|  |                     condition.notify_one(); | ||||||
|  |                 } | ||||||
|  |                 else if (eventType == ix::CobraConnection_EventType_Published) | ||||||
|  |                 { | ||||||
|  |                     spdlog::info("Published message acked: {}", msgId); | ||||||
|  |                     messageAcked = true; | ||||||
|  |                     condition.notify_one(); | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|         } |         ); | ||||||
|  |  | ||||||
|         // Wait a bit for the message to get a chance to be sent |         while (!authenticated) ; | ||||||
|         // there isn't any ack on publish right now so it's the best we can do |         while (!messageAcked) ; | ||||||
|         // FIXME: this comment is a lie now |  | ||||||
|         std::this_thread::sleep_for(std::chrono::milliseconds(100)); |  | ||||||
|  |  | ||||||
|         spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages); |  | ||||||
|  |  | ||||||
|         return 0; |         return 0; | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -23,7 +23,6 @@ namespace ix | |||||||
|                                 const std::string& filter, |                                 const std::string& filter, | ||||||
|                                 bool quiet) |                                 bool quiet) | ||||||
|     { |     { | ||||||
|  |  | ||||||
|         ix::CobraConnection conn; |         ix::CobraConnection conn; | ||||||
|         conn.configure(appkey, endpoint, |         conn.configure(appkey, endpoint, | ||||||
|                        rolename, rolesecret, |                        rolename, rolesecret, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user