diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2826d33f..70b07f8c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.1.0] - 2020-03-26 + +(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends + ## [9.0.3] - 2020-03-24 (ws connect) display statistics about how much time it takes to stop the connection diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp index 7cfd5a42..5eed276d 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.cpp +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -37,6 +37,7 @@ namespace ix std::atomic errorSending(false); std::atomic stop(false); std::atomic throttled(false); + std::atomic fatalCobraError(false); QueueManager queueManager(maxQueueSize); @@ -179,6 +180,7 @@ namespace ix verbose, &throttled, &receivedCount, + &fatalCobraError, &queueManager](ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, @@ -240,6 +242,21 @@ namespace ix { spdlog::info("Received websocket pong"); } + else if (eventType == ix::CobraConnection_EventType_Handshake_Error) + { + spdlog::error("Subscriber: Handshake error: {}", errMsg); + fatalCobraError = true; + } + else if (eventType == ix::CobraConnection_EventType_Authentication_Error) + { + spdlog::error("Subscriber: Authentication error: {}", errMsg); + fatalCobraError = true; + } + else if (eventType == ix::CobraConnection_EventType_Subscription_Error) + { + spdlog::error("Subscriber: Subscription error: {}", errMsg); + fatalCobraError = true; + } }); // Run forever @@ -251,6 +268,7 @@ namespace ix std::this_thread::sleep_for(duration); if (strict && errorSending) break; + if (fatalCobraError) break; } } // Run for a duration, used by unittesting now @@ -262,6 +280,7 @@ namespace ix std::this_thread::sleep_for(duration); if (strict && errorSending) break; + if (fatalCobraError) break; } } @@ -272,12 +291,15 @@ namespace ix conn.disconnect(); stop = true; + // progress thread t1.join(); - if (t2.joinable()) t2.join(); - spdlog::info("heartbeat thread done"); + // heartbeat thread + if (t2.joinable()) t2.join(); + + // sentry sender thread t3.join(); - return (strict && errorSending) ? -1 : (int) sentCount; + return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount; } } // namespace ix diff --git a/ixbots/ixbots/IXCobraToStatsdBot.cpp b/ixbots/ixbots/IXCobraToStatsdBot.cpp index 8d1ecf21..fd1b2673 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.cpp +++ b/ixbots/ixbots/IXCobraToStatsdBot.cpp @@ -77,6 +77,7 @@ namespace ix std::atomic sentCount(0); std::atomic receivedCount(0); std::atomic stop(false); + std::atomic fatalCobraError(false); QueueManager queueManager(maxQueueSize); @@ -88,16 +89,18 @@ namespace ix auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); } + + spdlog::info("timer thread done"); }; std::thread t1(timer); - auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] { + auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] { std::string state("na"); if (!enableHeartbeat) return; - while (true) + while (!stop) { std::stringstream ss; ss << "messages received " << receivedCount; @@ -115,6 +118,8 @@ namespace ix auto duration = std::chrono::minutes(1); std::this_thread::sleep_for(duration); } + + spdlog::info("heartbeat thread done"); }; std::thread t2(heartbeat); @@ -142,7 +147,7 @@ namespace ix std::thread t3(statsdSender); conn.setEventCallback( - [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount]( + [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError]( ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, @@ -200,6 +205,21 @@ namespace ix { spdlog::info("Received websocket pong"); } + else if (eventType == ix::CobraConnection_EventType_Handshake_Error) + { + spdlog::error("Subscriber: Handshake error: {}", errMsg); + fatalCobraError = true; + } + else if (eventType == ix::CobraConnection_EventType_Authentication_Error) + { + spdlog::error("Subscriber: Authentication error: {}", errMsg); + fatalCobraError = true; + } + else if (eventType == ix::CobraConnection_EventType_Subscription_Error) + { + spdlog::error("Subscriber: Subscription error: {}", errMsg); + fatalCobraError = true; + } }); // Run forever @@ -209,6 +229,8 @@ namespace ix { auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); + + if (fatalCobraError) break; } } // Run for a duration, used by unittesting now @@ -218,6 +240,8 @@ namespace ix { auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); + + if (fatalCobraError) break; } } @@ -228,12 +252,15 @@ namespace ix conn.disconnect(); stop = true; + // progress thread t1.join(); - if (t2.joinable()) t2.join(); - spdlog::info("heartbeat thread done"); + // heartbeat thread + if (t2.joinable()) t2.join(); + + // statsd sender thread t3.join(); - return (int) sentCount; + return fatalCobraError ? -1 : (int) sentCount; } } // namespace ix diff --git a/ixcobra/ixcobra/IXCobraConnection.cpp b/ixcobra/ixcobra/IXCobraConnection.cpp index 9639d005..049a4d62 100644 --- a/ixcobra/ixcobra/IXCobraConnection.cpp +++ b/ixcobra/ixcobra/IXCobraConnection.cpp @@ -166,7 +166,8 @@ namespace ix } else if (action == "auth/handshake/error") { - invokeErrorCallback("Handshake error", msg->str); + invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error, + msg->str); } else if (action == "auth/authenticate/ok") { @@ -176,7 +177,8 @@ namespace ix } else if (action == "auth/authenticate/error") { - invokeErrorCallback("Authentication error", msg->str); + invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error, + msg->str); } else if (action == "rtm/subscription/data") { @@ -191,7 +193,8 @@ namespace ix } else if (action == "rtm/subscribe/error") { - invokeErrorCallback("Subscription error", msg->str); + invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error, + msg->str); } else if (action == "rtm/unsubscribe/ok") { diff --git a/ixcobra/ixcobra/IXCobraConnection.h b/ixcobra/ixcobra/IXCobraConnection.h index 2c7c2e39..7be0e112 100644 --- a/ixcobra/ixcobra/IXCobraConnection.h +++ b/ixcobra/ixcobra/IXCobraConnection.h @@ -37,7 +37,10 @@ namespace ix CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_UnSubscribed = 5, CobraConnection_EventType_Published = 6, - CobraConnection_EventType_Pong = 7 + CobraConnection_EventType_Pong = 7, + CobraConnection_EventType_Handshake_Error = 8, + CobraConnection_EventType_Authentication_Error = 9, + CobraConnection_EventType_Subscription_Error = 10 }; enum CobraConnectionPublishMode diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index b784c1ec..d4573a2b 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.0.3" +#define IX_WEBSOCKET_VERSION "9.1.0" diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index 3ac13341..fd8f0ef7 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -55,8 +55,10 @@ namespace ix std::atomic msgPerSeconds(0); std::atomic msgCount(0); - auto timer = [&msgPerSeconds, &msgCount] { - while (true) + std::atomic fatalCobraError(false); + + auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] { + while (!fatalCobraError) { spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); @@ -78,11 +80,12 @@ namespace ix &msgCount, &msgPerSeconds, &quiet, - &fluentd](ix::CobraConnectionEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { + &fluentd, + &fatalCobraError](ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { if (eventType == ix::CobraConnection_EventType_Open) { spdlog::info("Subscriber connected"); @@ -137,14 +140,32 @@ namespace ix { spdlog::info("Received websocket pong"); } + else if (eventType == ix::CobraConnection_EventType_Handshake_Error) + { + spdlog::error("Subscriber: Handshake error: {}", errMsg); + fatalCobraError = true; + } + else if (eventType == ix::CobraConnection_EventType_Authentication_Error) + { + spdlog::error("Subscriber: Authentication error: {}", errMsg); + fatalCobraError = true; + } + else if (eventType == ix::CobraConnection_EventType_Subscription_Error) + { + spdlog::error("Subscriber: Subscription error: {}", errMsg); + fatalCobraError = true; + } }); - while (true) + while (!fatalCobraError) { auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); } - return 0; + conn.disconnect(); + t.join(); + + return fatalCobraError ? 1 : 0; } } // namespace ix