From 64754df66cef1e0d53846277bf161c4151ad6fff Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Wed, 15 Apr 2020 17:38:21 -0700 Subject: [PATCH] (ixcobra) change cobra event callback to use a struct instead of several objects, which is more flexible/extensible --- docs/CHANGELOG.md | 4 ++ ixbots/ixbots/IXCobraToSentryBot.cpp | 47 +++++++++--------- ixbots/ixbots/IXCobraToStatsdBot.cpp | 48 +++++++++---------- ixcobra/ixcobra/IXCobraConnection.cpp | 7 ++- ixcobra/ixcobra/IXCobraConnection.h | 7 +-- ixcobra/ixcobra/IXCobraEvent.h | 41 ++++++++++++++++ .../IXCobraMetricsThreadedPublisher.cpp | 48 +++++++++++-------- ixwebsocket/IXWebSocketVersion.h | 2 +- test/IXCobraChatTest.cpp | 37 ++++++-------- test/IXCobraMetricsPublisherTest.cpp | 41 ++++++++-------- ws/ws_cobra_metrics_to_redis.cpp | 29 +++++------ ws/ws_cobra_publish.cpp | 44 ++++++++++------- ws/ws_cobra_subscribe.cpp | 47 +++++++++--------- 13 files changed, 224 insertions(+), 178 deletions(-) create mode 100644 ixcobra/ixcobra/IXCobraEvent.h diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 29c5fb8e..7156196d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.2.9] - 2020-04-15 + +(ixcobra) change cobra event callback to use a struct instead of several objects, which is more flexible/extensible + ## [9.2.8] - 2020-04-15 (ixcobra) make CobraConnection_EventType an enum class (CobraEventType) diff --git a/ixbots/ixbots/IXCobraToSentryBot.cpp b/ixbots/ixbots/IXCobraToSentryBot.cpp index a2021e2e..2e6ce0ed 100644 --- a/ixbots/ixbots/IXCobraToSentryBot.cpp +++ b/ixbots/ixbots/IXCobraToSentryBot.cpp @@ -181,25 +181,22 @@ namespace ix &throttled, &receivedCount, &fatalCobraError, - &queueManager](ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraEventType::Open) + &queueManager](const CobraEventPtr& event) + { + if (event->type == ix::CobraEventType::Open) { spdlog::info("Subscriber connected"); - for (auto it : headers) + for (auto&& it : event->headers) { spdlog::info("{}: {}", it.first, it.second); } } - if (eventType == ix::CobraEventType::Closed) + else if (event->type == ix::CobraEventType::Closed) { - spdlog::info("Subscriber closed"); + spdlog::info("Subscriber closed: {}", event->errMsg); } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Authenticated) { spdlog::info("Subscriber authenticated"); conn.subscribe(channel, @@ -222,39 +219,39 @@ namespace ix queueManager.add(msg); }); } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); + spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); + spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Error) { - spdlog::error("Subscriber: error {}", errMsg); + spdlog::error("Subscriber: error {}", event->errMsg); } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - spdlog::error("Published message hacked: {}", msgId); + spdlog::error("Published message hacked: {}", event->msgId); } - else if (eventType == ix::CobraEventType::Pong) + else if (event->type == ix::CobraEventType::Pong) { spdlog::info("Received websocket pong"); } - else if (eventType == ix::CobraEventType::HandshakeError) + else if (event->type == ix::CobraEventType::HandshakeError) { - spdlog::error("Subscriber: Handshake error: {}", errMsg); + spdlog::error("Subscriber: Handshake error: {}", event->errMsg); fatalCobraError = true; } - else if (eventType == ix::CobraEventType::AuthenticationError) + else if (event->type == ix::CobraEventType::AuthenticationError) { - spdlog::error("Subscriber: Authentication error: {}", errMsg); + spdlog::error("Subscriber: Authentication error: {}", event->errMsg); fatalCobraError = true; } - else if (eventType == ix::CobraEventType::SubscriptionError) + else if (event->type == ix::CobraEventType::SubscriptionError) { - spdlog::error("Subscriber: Subscription error: {}", errMsg); + spdlog::error("Subscriber: Subscription error: {}", event->errMsg); fatalCobraError = true; } }); diff --git a/ixbots/ixbots/IXCobraToStatsdBot.cpp b/ixbots/ixbots/IXCobraToStatsdBot.cpp index f58c473c..b838bc01 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.cpp +++ b/ixbots/ixbots/IXCobraToStatsdBot.cpp @@ -201,26 +201,22 @@ namespace ix std::thread t3(statsdSender); conn.setEventCallback( - [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError]( - ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraEventType::Open) + [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](const CobraEventPtr& event) + { + if (event->type == ix::CobraEventType::Open) { spdlog::info("Subscriber connected"); - for (auto it : headers) + for (auto&& it : event->headers) { spdlog::info("{}: {}", it.first, it.second); } } - if (eventType == ix::CobraEventType::Closed) + else if (event->type == ix::CobraEventType::Closed) { - spdlog::info("Subscriber closed"); + spdlog::info("Subscriber closed: {}", event->errMsg); } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Authenticated) { spdlog::info("Subscriber authenticated"); conn.subscribe(channel, @@ -239,39 +235,39 @@ namespace ix queueManager.add(msg); }); } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); + spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); + spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Error) { - spdlog::error("Subscriber: error {}", errMsg); + spdlog::error("Subscriber: error {}", event->errMsg); } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - spdlog::error("Published message hacked: {}", msgId); + spdlog::error("Published message hacked: {}", event->msgId); } - else if (eventType == ix::CobraEventType::Pong) + else if (event->type == ix::CobraEventType::Pong) { spdlog::info("Received websocket pong"); } - else if (eventType == ix::CobraEventType::HandshakeError) + else if (event->type == ix::CobraEventType::HandshakeError) { - spdlog::error("Subscriber: Handshake error: {}", errMsg); + spdlog::error("Subscriber: Handshake error: {}", event->errMsg); fatalCobraError = true; } - else if (eventType == ix::CobraEventType::AuthenticationError) + else if (event->type == ix::CobraEventType::AuthenticationError) { - spdlog::error("Subscriber: Authentication error: {}", errMsg); + spdlog::error("Subscriber: Authentication error: {}", event->errMsg); fatalCobraError = true; } - else if (eventType == ix::CobraEventType::SubscriptionError) + else if (event->type == ix::CobraEventType::SubscriptionError) { - spdlog::error("Subscriber: Subscription error: {}", errMsg); + spdlog::error("Subscriber: Subscription error: {}", event->errMsg); fatalCobraError = true; } }); diff --git a/ixcobra/ixcobra/IXCobraConnection.cpp b/ixcobra/ixcobra/IXCobraConnection.cpp index ceb62995..7446e71b 100644 --- a/ixcobra/ixcobra/IXCobraConnection.cpp +++ b/ixcobra/ixcobra/IXCobraConnection.cpp @@ -96,7 +96,12 @@ namespace ix std::lock_guard lock(_eventCallbackMutex); if (_eventCallback) { - _eventCallback(eventType, errorMsg, headers, subscriptionId, msgId); + _eventCallback( + std::make_unique(eventType, + errorMsg, + headers, + subscriptionId, + msgId)); } } diff --git a/ixcobra/ixcobra/IXCobraConnection.h b/ixcobra/ixcobra/IXCobraConnection.h index 0e9eda1a..eacfd720 100644 --- a/ixcobra/ixcobra/IXCobraConnection.h +++ b/ixcobra/ixcobra/IXCobraConnection.h @@ -9,6 +9,7 @@ #include #include #include "IXCobraEventType.h" +#include "IXCobraEvent.h" #include #include #include @@ -36,11 +37,7 @@ namespace ix }; using SubscriptionCallback = std::function; - using EventCallback = std::function; + using EventCallback = std::function; using TrafficTrackerCallback = std::function; using PublishTrackerCallback = std::function; diff --git a/ixcobra/ixcobra/IXCobraEvent.h b/ixcobra/ixcobra/IXCobraEvent.h new file mode 100644 index 00000000..fd2c9fa6 --- /dev/null +++ b/ixcobra/ixcobra/IXCobraEvent.h @@ -0,0 +1,41 @@ +/* + * IXCobraEvent.h + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include "IXCobraEventType.h" +#include +#include +#include +#include + +namespace ix +{ + struct CobraEvent + { + ix::CobraEventType type; + const std::string& errMsg; + const ix::WebSocketHttpHeaders& headers; + const std::string& subscriptionId; + uint64_t msgId; // CobraConnection::MsgId + + CobraEvent(ix::CobraEventType t, + const std::string& e, + const ix::WebSocketHttpHeaders& h, + const std::string& s, + uint64_t m) + : type(t) + , errMsg(e) + , headers(h) + , subscriptionId(s) + , msgId(m) + { + ; + } + }; + + using CobraEventPtr = std::unique_ptr; +} diff --git a/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp b/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp index 25d8370c..99867813 100644 --- a/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp +++ b/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp @@ -22,53 +22,59 @@ namespace ix CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher() : _stop(false) { - _cobra_connection.setEventCallback( - [] - (ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) + _cobra_connection.setEventCallback([](const CobraEventPtr& event) { std::stringstream ss; - if (eventType == ix::CobraEventType::Open) + if (event->type == ix::CobraEventType::Open) { ss << "Handshake headers" << std::endl; - for (auto it : headers) + for (auto&& it : event->headers) { ss << it.first << ": " << it.second << std::endl; } } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Authenticated) { ss << "Authenticated"; } - else if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Error) { - ss << "Error: " << errMsg; + ss << "Error: " << event->errMsg; } - else if (eventType == ix::CobraEventType::Closed) + else if (event->type == ix::CobraEventType::Closed) { - ss << "Connection closed: " << errMsg; + ss << "Connection closed: " << event->errMsg; } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - ss << "Subscribed through subscription id: " << subscriptionId; + ss << "Subscribed through subscription id: " << event->subscriptionId; } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - ss << "Unsubscribed through subscription id: " << subscriptionId; + ss << "Unsubscribed through subscription id: " << event->subscriptionId; } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - ss << "Published message " << msgId << " acked"; + ss << "Published message " << event->msgId << " acked"; } - else if (eventType == ix::CobraEventType::Pong) + else if (event->type == ix::CobraEventType::Pong) { ss << "Received websocket pong"; } + else if (event->type == ix::CobraEventType::HandshakeError) + { + ss << "Handshake error: " << event->errMsg; + } + else if (event->type == ix::CobraEventType::AuthenticationError) + { + ss << "Authentication error: " << event->errMsg; + } + else if (event->type == ix::CobraEventType::SubscriptionError) + { + ss << "Subscription error: " << event->errMsg; + } ix::IXCoreLogger::Log(ss.str().c_str()); }); diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 77733d8b..7c546325 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.2.8" +#define IX_WEBSOCKET_VERSION "9.2.9" diff --git a/test/IXCobraChatTest.cpp b/test/IXCobraChatTest.cpp index d6847295..e673a350 100644 --- a/test/IXCobraChatTest.cpp +++ b/test/IXCobraChatTest.cpp @@ -180,44 +180,41 @@ namespace _conn.configure(_cobraConfig); _conn.connect(); - _conn.setEventCallback([this, channel](ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraEventType::Open) + _conn.setEventCallback([this, channel](const CobraEventPtr& event) + { + if (event->type == ix::CobraEventType::Open) { log("Subscriber connected: " + _user); - for (auto&& it : headers) + for (auto&& it : event->headers) { log("Headers " + it.first + " " + it.second); } } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Authenticated) { log("Subscriber authenticated: " + _user); subscribe(channel); } - else if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Error) { - log(errMsg + _user); + log(event->errMsg + _user); } - else if (eventType == ix::CobraEventType::Closed) + else if (event->type == ix::CobraEventType::Closed) { log("Connection closed: " + _user); } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - log("Subscription ok: " + _user + " subscription_id " + subscriptionId); + log("Subscription ok: " + _user + " subscription_id " + event->subscriptionId); _connectedAndSubscribed = true; } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId); + log("Unsubscription ok: " + _user + " subscription_id " + event->subscriptionId); } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - TLogger() << "Subscriber: published message acked: " << msgId; + TLogger() << "Subscriber: published message acked: " << event->msgId; } }); @@ -248,11 +245,7 @@ namespace ix::msleep(50); _conn.disconnect(); - _conn.setEventCallback([](ix::CobraEventType /*eventType*/, - const std::string& /*errMsg*/, - const ix::WebSocketHttpHeaders& /*headers*/, - const std::string& /*subscriptionId*/, - CobraConnection::MsgId /*msgId*/) { ; }); + _conn.setEventCallback([](const CobraEventPtr& /*event*/) {}); } } // namespace diff --git a/test/IXCobraMetricsPublisherTest.cpp b/test/IXCobraMetricsPublisherTest.cpp index 6db52eec..4512b3ae 100644 --- a/test/IXCobraMetricsPublisherTest.cpp +++ b/test/IXCobraMetricsPublisherTest.cpp @@ -54,24 +54,25 @@ namespace conn.configure(config); conn.connect(); - conn.setEventCallback([&conn, &channel](ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraEventType::Open) + conn.setEventCallback([&conn, &channel](const CobraEventPtr& event) + { + if (event->type == ix::CobraEventType::Open) { TLogger() << "Subscriber connected:"; - for (auto&& it : headers) + for (auto&& it : event->headers) { log("Headers " + it.first + " " + it.second); } } - if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Closed) { - TLogger() << "Subscriber error:" << errMsg; + TLogger() << "Subscriber closed:" << event->errMsg; } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Error) + { + TLogger() << "Subscriber error:" << event->errMsg; + } + else if (event->type == ix::CobraEventType::Authenticated) { log("Subscriber authenticated"); std::string filter; @@ -92,29 +93,29 @@ namespace gMessageCount++; }); } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - TLogger() << "Subscriber: subscribed to channel " << subscriptionId; - if (subscriptionId == channel) + TLogger() << "Subscriber: subscribed to channel " << event->subscriptionId; + if (event->subscriptionId == channel) { gSubscriberConnectedAndSubscribed = true; } else { - TLogger() << "Subscriber: unexpected channel " << subscriptionId; + TLogger() << "Subscriber: unexpected channel " << event->subscriptionId; } } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - TLogger() << "Subscriber: ununexpected from channel " << subscriptionId; - if (subscriptionId != channel) + TLogger() << "Subscriber: ununexpected from channel " << event->subscriptionId; + if (event->subscriptionId != channel) { - TLogger() << "Subscriber: unexpected channel " << subscriptionId; + TLogger() << "Subscriber: unexpected channel " << event->subscriptionId; } } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - TLogger() << "Subscriber: published message acked: " << msgId; + TLogger() << "Subscriber: published message acked: " << event->msgId; } }); diff --git a/ws/ws_cobra_metrics_to_redis.cpp b/ws/ws_cobra_metrics_to_redis.cpp index c61d9d13..7cb9d386 100644 --- a/ws/ws_cobra_metrics_to_redis.cpp +++ b/ws/ws_cobra_metrics_to_redis.cpp @@ -106,21 +106,18 @@ namespace ix &msgPerSeconds, &conditionVariableMutex, &condition, - &queue](ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraEventType::Open) + &queue](const CobraEventPtr& event) + { + if (event->type == ix::CobraEventType::Open) { spdlog::info("Subscriber connected"); - for (auto it : headers) + for (auto&& it : event->headers) { spdlog::info("{}: {}", it.first, it.second); } } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Authenticated) { spdlog::info("Subscriber authenticated"); @@ -141,21 +138,21 @@ namespace ix msgCount++; }); } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); + spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); + spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Error) { - spdlog::error("Subscriber: error {}", errMsg); + spdlog::error("Subscriber: error {}", event->errMsg); } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - spdlog::error("Published message hacked: {}", msgId); + spdlog::error("Published message hacked: {}", event->msgId); } }); diff --git a/ws/ws_cobra_publish.cpp b/ws/ws_cobra_publish.cpp index dd97ca4b..f7ac44b2 100644 --- a/ws/ws_cobra_publish.cpp +++ b/ws/ws_cobra_publish.cpp @@ -38,22 +38,22 @@ namespace ix std::atomic authenticated(false); std::atomic messageAcked(false); - conn.setEventCallback([&conn, &channel, &data, &authenticated, &messageAcked]( - ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraEventType::Open) + conn.setEventCallback([&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event) + { + if (event->type == ix::CobraEventType::Open) { spdlog::info("Publisher connected"); - for (auto it : headers) + for (auto&& it : event->headers) { spdlog::info("{}: {}", it.first, it.second); } } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Closed) + { + spdlog::info("Subscriber closed: {}", event->errMsg); + } + else if (event->type == ix::CobraEventType::Authenticated) { spdlog::info("Publisher authenticated"); authenticated = true; @@ -64,27 +64,35 @@ namespace ix spdlog::info("Published msg {}", msgId); } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - spdlog::info("Publisher: subscribed to channel {}", subscriptionId); + spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - spdlog::info("Publisher: unsubscribed from channel {}", subscriptionId); + spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Error) { - spdlog::error("Publisher: error {}", errMsg); + spdlog::error("Publisher: error {}", event->errMsg); } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - spdlog::info("Published message id {} acked", msgId); + spdlog::info("Published message id {} acked", event->msgId); messageAcked = true; } - else if (eventType == ix::CobraEventType::Pong) + else if (event->type == ix::CobraEventType::Pong) { spdlog::info("Received websocket pong"); } + else if (event->type == ix::CobraEventType::HandshakeError) + { + spdlog::error("Subscriber: Handshake error: {}", event->errMsg); + } + else if (event->type == ix::CobraEventType::AuthenticationError) + { + spdlog::error("Subscriber: Authentication error: {}", event->errMsg); + } }); conn.connect(); diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index f60e7da4..c4d0f0dc 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -106,21 +106,22 @@ namespace ix &msgPerSeconds, &quiet, &fluentd, - &fatalCobraError](ix::CobraEventType eventType, - const std::string& errMsg, - const ix::WebSocketHttpHeaders& headers, - const std::string& subscriptionId, - CobraConnection::MsgId msgId) { - if (eventType == ix::CobraEventType::Open) + &fatalCobraError](const CobraEventPtr& event) + { + if (event->type == ix::CobraEventType::Open) { spdlog::info("Subscriber connected"); - for (auto it : headers) + for (auto&& it : event->headers) { spdlog::info("{}: {}", it.first, it.second); } } - else if (eventType == ix::CobraEventType::Authenticated) + else if (event->type == ix::CobraEventType::Closed) + { + spdlog::info("Subscriber closed: {}", event->errMsg); + } + else if (event->type == ix::CobraEventType::Authenticated) { spdlog::info("Subscriber authenticated"); spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition); @@ -145,39 +146,39 @@ namespace ix subscriptionPosition = position; }); } - else if (eventType == ix::CobraEventType::Subscribed) + else if (event->type == ix::CobraEventType::Subscribed) { - spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); + spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::UnSubscribed) + else if (event->type == ix::CobraEventType::UnSubscribed) { - spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); + spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId); } - else if (eventType == ix::CobraEventType::Error) + else if (event->type == ix::CobraEventType::Error) { - spdlog::error("Subscriber: error {}", errMsg); + spdlog::error("Subscriber: error {}", event->errMsg); } - else if (eventType == ix::CobraEventType::Published) + else if (event->type == ix::CobraEventType::Published) { - spdlog::error("Published message hacked: {}", msgId); + spdlog::error("Published message hacked: {}", event->msgId); } - else if (eventType == ix::CobraEventType::Pong) + else if (event->type == ix::CobraEventType::Pong) { spdlog::info("Received websocket pong"); } - else if (eventType == ix::CobraEventType::HandshakeError) + else if (event->type == ix::CobraEventType::HandshakeError) { - spdlog::error("Subscriber: Handshake error: {}", errMsg); + spdlog::error("Subscriber: Handshake error: {}", event->errMsg); fatalCobraError = true; } - else if (eventType == ix::CobraEventType::AuthenticationError) + else if (event->type == ix::CobraEventType::AuthenticationError) { - spdlog::error("Subscriber: Authentication error: {}", errMsg); + spdlog::error("Subscriber: Authentication error: {}", event->errMsg); fatalCobraError = true; } - else if (eventType == ix::CobraEventType::SubscriptionError) + else if (event->type == ix::CobraEventType::SubscriptionError) { - spdlog::error("Subscriber: Subscription error: {}", errMsg); + spdlog::error("Subscriber: Subscription error: {}", event->errMsg); fatalCobraError = true; } });