(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends
This commit is contained in:
		@@ -1,6 +1,10 @@
 | 
			
		||||
# Changelog
 | 
			
		||||
All changes to this project will be documented in this file.
 | 
			
		||||
 | 
			
		||||
## [9.1.0] - 2020-03-26
 | 
			
		||||
 | 
			
		||||
(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends
 | 
			
		||||
 | 
			
		||||
## [9.0.3] - 2020-03-24
 | 
			
		||||
 | 
			
		||||
(ws connect) display statistics about how much time it takes to stop the connection
 | 
			
		||||
 
 | 
			
		||||
@@ -37,6 +37,7 @@ namespace ix
 | 
			
		||||
        std::atomic<bool> errorSending(false);
 | 
			
		||||
        std::atomic<bool> stop(false);
 | 
			
		||||
        std::atomic<bool> throttled(false);
 | 
			
		||||
        std::atomic<bool> fatalCobraError(false);
 | 
			
		||||
 | 
			
		||||
        QueueManager queueManager(maxQueueSize);
 | 
			
		||||
 | 
			
		||||
@@ -179,6 +180,7 @@ namespace ix
 | 
			
		||||
                               verbose,
 | 
			
		||||
                               &throttled,
 | 
			
		||||
                               &receivedCount,
 | 
			
		||||
                               &fatalCobraError,
 | 
			
		||||
                               &queueManager](ix::CobraConnectionEventType eventType,
 | 
			
		||||
                                              const std::string& errMsg,
 | 
			
		||||
                                              const ix::WebSocketHttpHeaders& headers,
 | 
			
		||||
@@ -240,6 +242,21 @@ namespace ix
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("Received websocket pong");
 | 
			
		||||
            }
 | 
			
		||||
            else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Subscriber: Handshake error: {}", errMsg);
 | 
			
		||||
                fatalCobraError = true;
 | 
			
		||||
            }
 | 
			
		||||
            else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Subscriber: Authentication error: {}", errMsg);
 | 
			
		||||
                fatalCobraError = true;
 | 
			
		||||
            }
 | 
			
		||||
            else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Subscriber: Subscription error: {}", errMsg);
 | 
			
		||||
                fatalCobraError = true;
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        // Run forever
 | 
			
		||||
@@ -251,6 +268,7 @@ namespace ix
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
 | 
			
		||||
                if (strict && errorSending) break;
 | 
			
		||||
                if (fatalCobraError) break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        // Run for a duration, used by unittesting now
 | 
			
		||||
@@ -262,6 +280,7 @@ namespace ix
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
 | 
			
		||||
                if (strict && errorSending) break;
 | 
			
		||||
                if (fatalCobraError) break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -272,12 +291,15 @@ namespace ix
 | 
			
		||||
        conn.disconnect();
 | 
			
		||||
        stop = true;
 | 
			
		||||
 | 
			
		||||
        // progress thread
 | 
			
		||||
        t1.join();
 | 
			
		||||
        if (t2.joinable()) t2.join();
 | 
			
		||||
        spdlog::info("heartbeat thread done");
 | 
			
		||||
 | 
			
		||||
        // heartbeat thread
 | 
			
		||||
        if (t2.joinable()) t2.join();
 | 
			
		||||
 | 
			
		||||
        // sentry sender thread
 | 
			
		||||
        t3.join();
 | 
			
		||||
 | 
			
		||||
        return (strict && errorSending) ? -1 : (int) sentCount;
 | 
			
		||||
        return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount;
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -77,6 +77,7 @@ namespace ix
 | 
			
		||||
        std::atomic<uint64_t> sentCount(0);
 | 
			
		||||
        std::atomic<uint64_t> receivedCount(0);
 | 
			
		||||
        std::atomic<bool> stop(false);
 | 
			
		||||
        std::atomic<bool> fatalCobraError(false);
 | 
			
		||||
 | 
			
		||||
        QueueManager queueManager(maxQueueSize);
 | 
			
		||||
 | 
			
		||||
@@ -88,16 +89,18 @@ namespace ix
 | 
			
		||||
                auto duration = std::chrono::seconds(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            spdlog::info("timer thread done");
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        std::thread t1(timer);
 | 
			
		||||
 | 
			
		||||
        auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
 | 
			
		||||
        auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
 | 
			
		||||
            std::string state("na");
 | 
			
		||||
 | 
			
		||||
            if (!enableHeartbeat) return;
 | 
			
		||||
 | 
			
		||||
            while (true)
 | 
			
		||||
            while (!stop)
 | 
			
		||||
            {
 | 
			
		||||
                std::stringstream ss;
 | 
			
		||||
                ss << "messages received " << receivedCount;
 | 
			
		||||
@@ -115,6 +118,8 @@ namespace ix
 | 
			
		||||
                auto duration = std::chrono::minutes(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            spdlog::info("heartbeat thread done");
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        std::thread t2(heartbeat);
 | 
			
		||||
@@ -142,7 +147,7 @@ namespace ix
 | 
			
		||||
        std::thread t3(statsdSender);
 | 
			
		||||
 | 
			
		||||
        conn.setEventCallback(
 | 
			
		||||
            [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount](
 | 
			
		||||
            [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](
 | 
			
		||||
                ix::CobraConnectionEventType eventType,
 | 
			
		||||
                const std::string& errMsg,
 | 
			
		||||
                const ix::WebSocketHttpHeaders& headers,
 | 
			
		||||
@@ -200,6 +205,21 @@ namespace ix
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::info("Received websocket pong");
 | 
			
		||||
                }
 | 
			
		||||
                else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::error("Subscriber: Handshake error: {}", errMsg);
 | 
			
		||||
                    fatalCobraError = true;
 | 
			
		||||
                }
 | 
			
		||||
                else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::error("Subscriber: Authentication error: {}", errMsg);
 | 
			
		||||
                    fatalCobraError = true;
 | 
			
		||||
                }
 | 
			
		||||
                else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::error("Subscriber: Subscription error: {}", errMsg);
 | 
			
		||||
                    fatalCobraError = true;
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        // Run forever
 | 
			
		||||
@@ -209,6 +229,8 @@ namespace ix
 | 
			
		||||
            {
 | 
			
		||||
                auto duration = std::chrono::seconds(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
 | 
			
		||||
                if (fatalCobraError) break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        // Run for a duration, used by unittesting now
 | 
			
		||||
@@ -218,6 +240,8 @@ namespace ix
 | 
			
		||||
            {
 | 
			
		||||
                auto duration = std::chrono::seconds(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
 | 
			
		||||
                if (fatalCobraError) break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -228,12 +252,15 @@ namespace ix
 | 
			
		||||
        conn.disconnect();
 | 
			
		||||
        stop = true;
 | 
			
		||||
 | 
			
		||||
        // progress thread
 | 
			
		||||
        t1.join();
 | 
			
		||||
        if (t2.joinable()) t2.join();
 | 
			
		||||
        spdlog::info("heartbeat thread done");
 | 
			
		||||
 | 
			
		||||
        // heartbeat thread
 | 
			
		||||
        if (t2.joinable()) t2.join();
 | 
			
		||||
 | 
			
		||||
        // statsd sender thread
 | 
			
		||||
        t3.join();
 | 
			
		||||
 | 
			
		||||
        return (int) sentCount;
 | 
			
		||||
        return fatalCobraError ? -1 : (int) sentCount;
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -166,7 +166,8 @@ namespace ix
 | 
			
		||||
                    }
 | 
			
		||||
                    else if (action == "auth/handshake/error")
 | 
			
		||||
                    {
 | 
			
		||||
                        invokeErrorCallback("Handshake error", msg->str);
 | 
			
		||||
                        invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error,
 | 
			
		||||
                                            msg->str);
 | 
			
		||||
                    }
 | 
			
		||||
                    else if (action == "auth/authenticate/ok")
 | 
			
		||||
                    {
 | 
			
		||||
@@ -176,7 +177,8 @@ namespace ix
 | 
			
		||||
                    }
 | 
			
		||||
                    else if (action == "auth/authenticate/error")
 | 
			
		||||
                    {
 | 
			
		||||
                        invokeErrorCallback("Authentication error", msg->str);
 | 
			
		||||
                        invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error,
 | 
			
		||||
                                            msg->str);
 | 
			
		||||
                    }
 | 
			
		||||
                    else if (action == "rtm/subscription/data")
 | 
			
		||||
                    {
 | 
			
		||||
@@ -191,7 +193,8 @@ namespace ix
 | 
			
		||||
                    }
 | 
			
		||||
                    else if (action == "rtm/subscribe/error")
 | 
			
		||||
                    {
 | 
			
		||||
                        invokeErrorCallback("Subscription error", msg->str);
 | 
			
		||||
                        invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error,
 | 
			
		||||
                                            msg->str);
 | 
			
		||||
                    }
 | 
			
		||||
                    else if (action == "rtm/unsubscribe/ok")
 | 
			
		||||
                    {
 | 
			
		||||
 
 | 
			
		||||
@@ -37,7 +37,10 @@ namespace ix
 | 
			
		||||
        CobraConnection_EventType_Subscribed = 4,
 | 
			
		||||
        CobraConnection_EventType_UnSubscribed = 5,
 | 
			
		||||
        CobraConnection_EventType_Published = 6,
 | 
			
		||||
        CobraConnection_EventType_Pong = 7
 | 
			
		||||
        CobraConnection_EventType_Pong = 7,
 | 
			
		||||
        CobraConnection_EventType_Handshake_Error = 8,
 | 
			
		||||
        CobraConnection_EventType_Authentication_Error = 9,
 | 
			
		||||
        CobraConnection_EventType_Subscription_Error = 10
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    enum CobraConnectionPublishMode
 | 
			
		||||
 
 | 
			
		||||
@@ -6,4 +6,4 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.0.3"
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.1.0"
 | 
			
		||||
 
 | 
			
		||||
@@ -55,8 +55,10 @@ namespace ix
 | 
			
		||||
        std::atomic<int> msgPerSeconds(0);
 | 
			
		||||
        std::atomic<int> msgCount(0);
 | 
			
		||||
 | 
			
		||||
        auto timer = [&msgPerSeconds, &msgCount] {
 | 
			
		||||
            while (true)
 | 
			
		||||
        std::atomic<bool> fatalCobraError(false);
 | 
			
		||||
 | 
			
		||||
        auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] {
 | 
			
		||||
            while (!fatalCobraError)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
 | 
			
		||||
 | 
			
		||||
@@ -78,7 +80,8 @@ namespace ix
 | 
			
		||||
                               &msgCount,
 | 
			
		||||
                               &msgPerSeconds,
 | 
			
		||||
                               &quiet,
 | 
			
		||||
                               &fluentd](ix::CobraConnectionEventType eventType,
 | 
			
		||||
                               &fluentd,
 | 
			
		||||
                               &fatalCobraError](ix::CobraConnectionEventType eventType,
 | 
			
		||||
                                                 const std::string& errMsg,
 | 
			
		||||
                                                 const ix::WebSocketHttpHeaders& headers,
 | 
			
		||||
                                                 const std::string& subscriptionId,
 | 
			
		||||
@@ -137,14 +140,32 @@ namespace ix
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("Received websocket pong");
 | 
			
		||||
            }
 | 
			
		||||
            else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Subscriber: Handshake error: {}", errMsg);
 | 
			
		||||
                fatalCobraError = true;
 | 
			
		||||
            }
 | 
			
		||||
            else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Subscriber: Authentication error: {}", errMsg);
 | 
			
		||||
                fatalCobraError = true;
 | 
			
		||||
            }
 | 
			
		||||
            else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::error("Subscriber: Subscription error: {}", errMsg);
 | 
			
		||||
                fatalCobraError = true;
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        while (true)
 | 
			
		||||
        while (!fatalCobraError)
 | 
			
		||||
        {
 | 
			
		||||
            auto duration = std::chrono::seconds(1);
 | 
			
		||||
            std::this_thread::sleep_for(duration);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
        conn.disconnect();
 | 
			
		||||
        t.join();
 | 
			
		||||
 | 
			
		||||
        return fatalCobraError ? 1 : 0;
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user