(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends

This commit is contained in:
Benjamin Sergeant 2020-03-26 18:54:28 -07:00
parent 09e4584fc8
commit d2db7310ff
7 changed files with 103 additions and 23 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.1.0] - 2020-03-26
(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends
## [9.0.3] - 2020-03-24 ## [9.0.3] - 2020-03-24
(ws connect) display statistics about how much time it takes to stop the connection (ws connect) display statistics about how much time it takes to stop the connection

View File

@ -37,6 +37,7 @@ namespace ix
std::atomic<bool> errorSending(false); std::atomic<bool> errorSending(false);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize); QueueManager queueManager(maxQueueSize);
@ -179,6 +180,7 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&fatalCobraError,
&queueManager](ix::CobraConnectionEventType eventType, &queueManager](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -240,6 +242,21 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
// Run forever // Run forever
@ -251,6 +268,7 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (strict && errorSending) break; if (strict && errorSending) break;
if (fatalCobraError) break;
} }
} }
// Run for a duration, used by unittesting now // Run for a duration, used by unittesting now
@ -262,6 +280,7 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (strict && errorSending) break; if (strict && errorSending) break;
if (fatalCobraError) break;
} }
} }
@ -272,12 +291,15 @@ namespace ix
conn.disconnect(); conn.disconnect();
stop = true; stop = true;
// progress thread
t1.join(); t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// heartbeat thread
if (t2.joinable()) t2.join();
// sentry sender thread
t3.join(); t3.join();
return (strict && errorSending) ? -1 : (int) sentCount; return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount;
} }
} // namespace ix } // namespace ix

View File

@ -77,6 +77,7 @@ namespace ix
std::atomic<uint64_t> sentCount(0); std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0); std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize); QueueManager queueManager(maxQueueSize);
@ -88,16 +89,18 @@ namespace ix
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("timer thread done");
}; };
std::thread t1(timer); std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] { auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na"); std::string state("na");
if (!enableHeartbeat) return; if (!enableHeartbeat) return;
while (true) while (!stop)
{ {
std::stringstream ss; std::stringstream ss;
ss << "messages received " << receivedCount; ss << "messages received " << receivedCount;
@ -115,6 +118,8 @@ namespace ix
auto duration = std::chrono::minutes(1); auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("heartbeat thread done");
}; };
std::thread t2(heartbeat); std::thread t2(heartbeat);
@ -142,7 +147,7 @@ namespace ix
std::thread t3(statsdSender); std::thread t3(statsdSender);
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount]( [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](
ix::CobraConnectionEventType eventType, ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -200,6 +205,21 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
// Run forever // Run forever
@ -209,6 +229,8 @@ namespace ix
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
} }
} }
// Run for a duration, used by unittesting now // Run for a duration, used by unittesting now
@ -218,6 +240,8 @@ namespace ix
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
} }
} }
@ -228,12 +252,15 @@ namespace ix
conn.disconnect(); conn.disconnect();
stop = true; stop = true;
// progress thread
t1.join(); t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// heartbeat thread
if (t2.joinable()) t2.join();
// statsd sender thread
t3.join(); t3.join();
return (int) sentCount; return fatalCobraError ? -1 : (int) sentCount;
} }
} // namespace ix } // namespace ix

View File

@ -166,7 +166,8 @@ namespace ix
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
invokeErrorCallback("Handshake error", msg->str); invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error,
msg->str);
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -176,7 +177,8 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
invokeErrorCallback("Authentication error", msg->str); invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error,
msg->str);
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -191,7 +193,8 @@ namespace ix
} }
else if (action == "rtm/subscribe/error") else if (action == "rtm/subscribe/error")
{ {
invokeErrorCallback("Subscription error", msg->str); invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error,
msg->str);
} }
else if (action == "rtm/unsubscribe/ok") else if (action == "rtm/unsubscribe/ok")
{ {

View File

@ -37,7 +37,10 @@ namespace ix
CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5, CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6, CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7 CobraConnection_EventType_Pong = 7,
CobraConnection_EventType_Handshake_Error = 8,
CobraConnection_EventType_Authentication_Error = 9,
CobraConnection_EventType_Subscription_Error = 10
}; };
enum CobraConnectionPublishMode enum CobraConnectionPublishMode

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.0.3" #define IX_WEBSOCKET_VERSION "9.1.0"

View File

@ -55,8 +55,10 @@ namespace ix
std::atomic<int> msgPerSeconds(0); std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0); std::atomic<int> msgCount(0);
auto timer = [&msgPerSeconds, &msgCount] { std::atomic<bool> fatalCobraError(false);
while (true)
auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] {
while (!fatalCobraError)
{ {
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
@ -78,7 +80,8 @@ namespace ix
&msgCount, &msgCount,
&msgPerSeconds, &msgPerSeconds,
&quiet, &quiet,
&fluentd](ix::CobraConnectionEventType eventType, &fluentd,
&fatalCobraError](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId, const std::string& subscriptionId,
@ -137,14 +140,32 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
while (true) while (!fatalCobraError)
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
return 0; conn.disconnect();
t.join();
return fatalCobraError ? 1 : 0;
} }
} // namespace ix } // namespace ix