(ixcobra) change cobra event callback to use a struct instead of several objects, which is more flexible/extensible
This commit is contained in:
parent
71a421eefc
commit
64754df66c
@ -1,6 +1,10 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
All changes to this project will be documented in this file.
|
All changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [9.2.9] - 2020-04-15
|
||||||
|
|
||||||
|
(ixcobra) change cobra event callback to use a struct instead of several objects, which is more flexible/extensible
|
||||||
|
|
||||||
## [9.2.8] - 2020-04-15
|
## [9.2.8] - 2020-04-15
|
||||||
|
|
||||||
(ixcobra) make CobraConnection_EventType an enum class (CobraEventType)
|
(ixcobra) make CobraConnection_EventType an enum class (CobraEventType)
|
||||||
|
@ -181,25 +181,22 @@ namespace ix
|
|||||||
&throttled,
|
&throttled,
|
||||||
&receivedCount,
|
&receivedCount,
|
||||||
&fatalCobraError,
|
&fatalCobraError,
|
||||||
&queueManager](ix::CobraEventType eventType,
|
&queueManager](const CobraEventPtr& event)
|
||||||
const std::string& errMsg,
|
{
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
if (event->type == ix::CobraEventType::Open)
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId) {
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber connected");
|
spdlog::info("Subscriber connected");
|
||||||
|
|
||||||
for (auto it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
spdlog::info("{}: {}", it.first, it.second);
|
spdlog::info("{}: {}", it.first, it.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (eventType == ix::CobraEventType::Closed)
|
else if (event->type == ix::CobraEventType::Closed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber closed");
|
spdlog::info("Subscriber closed: {}", event->errMsg);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber authenticated");
|
spdlog::info("Subscriber authenticated");
|
||||||
conn.subscribe(channel,
|
conn.subscribe(channel,
|
||||||
@ -222,39 +219,39 @@ namespace ix
|
|||||||
queueManager.add(msg);
|
queueManager.add(msg);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
|
spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
|
spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: error {}", errMsg);
|
spdlog::error("Subscriber: error {}", event->errMsg);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
spdlog::error("Published message hacked: {}", msgId);
|
spdlog::error("Published message hacked: {}", event->msgId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Pong)
|
else if (event->type == ix::CobraEventType::Pong)
|
||||||
{
|
{
|
||||||
spdlog::info("Received websocket pong");
|
spdlog::info("Received websocket pong");
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::HandshakeError)
|
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Handshake error: {}", errMsg);
|
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::AuthenticationError)
|
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Authentication error: {}", errMsg);
|
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::SubscriptionError)
|
else if (event->type == ix::CobraEventType::SubscriptionError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Subscription error: {}", errMsg);
|
spdlog::error("Subscriber: Subscription error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -201,26 +201,22 @@ namespace ix
|
|||||||
std::thread t3(statsdSender);
|
std::thread t3(statsdSender);
|
||||||
|
|
||||||
conn.setEventCallback(
|
conn.setEventCallback(
|
||||||
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](
|
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](const CobraEventPtr& event)
|
||||||
ix::CobraEventType eventType,
|
{
|
||||||
const std::string& errMsg,
|
if (event->type == ix::CobraEventType::Open)
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId) {
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber connected");
|
spdlog::info("Subscriber connected");
|
||||||
|
|
||||||
for (auto it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
spdlog::info("{}: {}", it.first, it.second);
|
spdlog::info("{}: {}", it.first, it.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (eventType == ix::CobraEventType::Closed)
|
else if (event->type == ix::CobraEventType::Closed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber closed");
|
spdlog::info("Subscriber closed: {}", event->errMsg);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber authenticated");
|
spdlog::info("Subscriber authenticated");
|
||||||
conn.subscribe(channel,
|
conn.subscribe(channel,
|
||||||
@ -239,39 +235,39 @@ namespace ix
|
|||||||
queueManager.add(msg);
|
queueManager.add(msg);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
|
spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
|
spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: error {}", errMsg);
|
spdlog::error("Subscriber: error {}", event->errMsg);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
spdlog::error("Published message hacked: {}", msgId);
|
spdlog::error("Published message hacked: {}", event->msgId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Pong)
|
else if (event->type == ix::CobraEventType::Pong)
|
||||||
{
|
{
|
||||||
spdlog::info("Received websocket pong");
|
spdlog::info("Received websocket pong");
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::HandshakeError)
|
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Handshake error: {}", errMsg);
|
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::AuthenticationError)
|
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Authentication error: {}", errMsg);
|
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::SubscriptionError)
|
else if (event->type == ix::CobraEventType::SubscriptionError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Subscription error: {}", errMsg);
|
spdlog::error("Subscriber: Subscription error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -96,7 +96,12 @@ namespace ix
|
|||||||
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
|
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
|
||||||
if (_eventCallback)
|
if (_eventCallback)
|
||||||
{
|
{
|
||||||
_eventCallback(eventType, errorMsg, headers, subscriptionId, msgId);
|
_eventCallback(
|
||||||
|
std::make_unique<CobraEvent>(eventType,
|
||||||
|
errorMsg,
|
||||||
|
headers,
|
||||||
|
subscriptionId,
|
||||||
|
msgId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
|
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
|
||||||
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
|
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
|
||||||
#include "IXCobraEventType.h"
|
#include "IXCobraEventType.h"
|
||||||
|
#include "IXCobraEvent.h"
|
||||||
#include <json/json.h>
|
#include <json/json.h>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
@ -36,11 +37,7 @@ namespace ix
|
|||||||
};
|
};
|
||||||
|
|
||||||
using SubscriptionCallback = std::function<void(const Json::Value&, const std::string&)>;
|
using SubscriptionCallback = std::function<void(const Json::Value&, const std::string&)>;
|
||||||
using EventCallback = std::function<void(CobraEventType,
|
using EventCallback = std::function<void(const CobraEventPtr&)>;
|
||||||
const std::string&,
|
|
||||||
const WebSocketHttpHeaders&,
|
|
||||||
const std::string&,
|
|
||||||
uint64_t msgId)>;
|
|
||||||
|
|
||||||
using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
|
using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
|
||||||
using PublishTrackerCallback = std::function<void(bool sent, bool acked)>;
|
using PublishTrackerCallback = std::function<void(bool sent, bool acked)>;
|
||||||
|
41
ixcobra/ixcobra/IXCobraEvent.h
Normal file
41
ixcobra/ixcobra/IXCobraEvent.h
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
/*
|
||||||
|
* IXCobraEvent.h
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "IXCobraEventType.h"
|
||||||
|
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
|
||||||
|
#include <memory>
|
||||||
|
#include <string>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
struct CobraEvent
|
||||||
|
{
|
||||||
|
ix::CobraEventType type;
|
||||||
|
const std::string& errMsg;
|
||||||
|
const ix::WebSocketHttpHeaders& headers;
|
||||||
|
const std::string& subscriptionId;
|
||||||
|
uint64_t msgId; // CobraConnection::MsgId
|
||||||
|
|
||||||
|
CobraEvent(ix::CobraEventType t,
|
||||||
|
const std::string& e,
|
||||||
|
const ix::WebSocketHttpHeaders& h,
|
||||||
|
const std::string& s,
|
||||||
|
uint64_t m)
|
||||||
|
: type(t)
|
||||||
|
, errMsg(e)
|
||||||
|
, headers(h)
|
||||||
|
, subscriptionId(s)
|
||||||
|
, msgId(m)
|
||||||
|
{
|
||||||
|
;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
using CobraEventPtr = std::unique_ptr<CobraEvent>;
|
||||||
|
}
|
@ -22,53 +22,59 @@ namespace ix
|
|||||||
CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher() :
|
CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher() :
|
||||||
_stop(false)
|
_stop(false)
|
||||||
{
|
{
|
||||||
_cobra_connection.setEventCallback(
|
_cobra_connection.setEventCallback([](const CobraEventPtr& event)
|
||||||
[]
|
|
||||||
(ix::CobraEventType eventType,
|
|
||||||
const std::string& errMsg,
|
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId)
|
|
||||||
{
|
{
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
if (event->type == ix::CobraEventType::Open)
|
||||||
{
|
{
|
||||||
ss << "Handshake headers" << std::endl;
|
ss << "Handshake headers" << std::endl;
|
||||||
|
|
||||||
for (auto it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
ss << it.first << ": " << it.second << std::endl;
|
ss << it.first << ": " << it.second << std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
ss << "Authenticated";
|
ss << "Authenticated";
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
{
|
{
|
||||||
ss << "Error: " << errMsg;
|
ss << "Error: " << event->errMsg;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Closed)
|
else if (event->type == ix::CobraEventType::Closed)
|
||||||
{
|
{
|
||||||
ss << "Connection closed: " << errMsg;
|
ss << "Connection closed: " << event->errMsg;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
ss << "Subscribed through subscription id: " << subscriptionId;
|
ss << "Subscribed through subscription id: " << event->subscriptionId;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
ss << "Unsubscribed through subscription id: " << subscriptionId;
|
ss << "Unsubscribed through subscription id: " << event->subscriptionId;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
ss << "Published message " << msgId << " acked";
|
ss << "Published message " << event->msgId << " acked";
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Pong)
|
else if (event->type == ix::CobraEventType::Pong)
|
||||||
{
|
{
|
||||||
ss << "Received websocket pong";
|
ss << "Received websocket pong";
|
||||||
}
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||||
|
{
|
||||||
|
ss << "Handshake error: " << event->errMsg;
|
||||||
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||||
|
{
|
||||||
|
ss << "Authentication error: " << event->errMsg;
|
||||||
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::SubscriptionError)
|
||||||
|
{
|
||||||
|
ss << "Subscription error: " << event->errMsg;
|
||||||
|
}
|
||||||
|
|
||||||
ix::IXCoreLogger::Log(ss.str().c_str());
|
ix::IXCoreLogger::Log(ss.str().c_str());
|
||||||
});
|
});
|
||||||
|
@ -6,4 +6,4 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#define IX_WEBSOCKET_VERSION "9.2.8"
|
#define IX_WEBSOCKET_VERSION "9.2.9"
|
||||||
|
@ -180,44 +180,41 @@ namespace
|
|||||||
_conn.configure(_cobraConfig);
|
_conn.configure(_cobraConfig);
|
||||||
_conn.connect();
|
_conn.connect();
|
||||||
|
|
||||||
_conn.setEventCallback([this, channel](ix::CobraEventType eventType,
|
_conn.setEventCallback([this, channel](const CobraEventPtr& event)
|
||||||
const std::string& errMsg,
|
{
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
if (event->type == ix::CobraEventType::Open)
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId) {
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
|
||||||
{
|
{
|
||||||
log("Subscriber connected: " + _user);
|
log("Subscriber connected: " + _user);
|
||||||
for (auto&& it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
log("Headers " + it.first + " " + it.second);
|
log("Headers " + it.first + " " + it.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
log("Subscriber authenticated: " + _user);
|
log("Subscriber authenticated: " + _user);
|
||||||
subscribe(channel);
|
subscribe(channel);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
{
|
{
|
||||||
log(errMsg + _user);
|
log(event->errMsg + _user);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Closed)
|
else if (event->type == ix::CobraEventType::Closed)
|
||||||
{
|
{
|
||||||
log("Connection closed: " + _user);
|
log("Connection closed: " + _user);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
log("Subscription ok: " + _user + " subscription_id " + subscriptionId);
|
log("Subscription ok: " + _user + " subscription_id " + event->subscriptionId);
|
||||||
_connectedAndSubscribed = true;
|
_connectedAndSubscribed = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
|
log("Unsubscription ok: " + _user + " subscription_id " + event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber: published message acked: " << msgId;
|
TLogger() << "Subscriber: published message acked: " << event->msgId;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -248,11 +245,7 @@ namespace
|
|||||||
ix::msleep(50);
|
ix::msleep(50);
|
||||||
_conn.disconnect();
|
_conn.disconnect();
|
||||||
|
|
||||||
_conn.setEventCallback([](ix::CobraEventType /*eventType*/,
|
_conn.setEventCallback([](const CobraEventPtr& /*event*/) {});
|
||||||
const std::string& /*errMsg*/,
|
|
||||||
const ix::WebSocketHttpHeaders& /*headers*/,
|
|
||||||
const std::string& /*subscriptionId*/,
|
|
||||||
CobraConnection::MsgId /*msgId*/) { ; });
|
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
@ -54,24 +54,25 @@ namespace
|
|||||||
conn.configure(config);
|
conn.configure(config);
|
||||||
conn.connect();
|
conn.connect();
|
||||||
|
|
||||||
conn.setEventCallback([&conn, &channel](ix::CobraEventType eventType,
|
conn.setEventCallback([&conn, &channel](const CobraEventPtr& event)
|
||||||
const std::string& errMsg,
|
{
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
if (event->type == ix::CobraEventType::Open)
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId) {
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber connected:";
|
TLogger() << "Subscriber connected:";
|
||||||
for (auto&& it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
log("Headers " + it.first + " " + it.second);
|
log("Headers " + it.first + " " + it.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Closed)
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber error:" << errMsg;
|
TLogger() << "Subscriber closed:" << event->errMsg;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
|
{
|
||||||
|
TLogger() << "Subscriber error:" << event->errMsg;
|
||||||
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
log("Subscriber authenticated");
|
log("Subscriber authenticated");
|
||||||
std::string filter;
|
std::string filter;
|
||||||
@ -92,29 +93,29 @@ namespace
|
|||||||
gMessageCount++;
|
gMessageCount++;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber: subscribed to channel " << subscriptionId;
|
TLogger() << "Subscriber: subscribed to channel " << event->subscriptionId;
|
||||||
if (subscriptionId == channel)
|
if (event->subscriptionId == channel)
|
||||||
{
|
{
|
||||||
gSubscriberConnectedAndSubscribed = true;
|
gSubscriberConnectedAndSubscribed = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
|
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber: ununexpected from channel " << subscriptionId;
|
TLogger() << "Subscriber: ununexpected from channel " << event->subscriptionId;
|
||||||
if (subscriptionId != channel)
|
if (event->subscriptionId != channel)
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
|
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
TLogger() << "Subscriber: published message acked: " << msgId;
|
TLogger() << "Subscriber: published message acked: " << event->msgId;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -106,21 +106,18 @@ namespace ix
|
|||||||
&msgPerSeconds,
|
&msgPerSeconds,
|
||||||
&conditionVariableMutex,
|
&conditionVariableMutex,
|
||||||
&condition,
|
&condition,
|
||||||
&queue](ix::CobraEventType eventType,
|
&queue](const CobraEventPtr& event)
|
||||||
const std::string& errMsg,
|
{
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
if (event->type == ix::CobraEventType::Open)
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId) {
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber connected");
|
spdlog::info("Subscriber connected");
|
||||||
|
|
||||||
for (auto it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
spdlog::info("{}: {}", it.first, it.second);
|
spdlog::info("{}: {}", it.first, it.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber authenticated");
|
spdlog::info("Subscriber authenticated");
|
||||||
|
|
||||||
@ -141,21 +138,21 @@ namespace ix
|
|||||||
msgCount++;
|
msgCount++;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
|
spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
|
spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: error {}", errMsg);
|
spdlog::error("Subscriber: error {}", event->errMsg);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
spdlog::error("Published message hacked: {}", msgId);
|
spdlog::error("Published message hacked: {}", event->msgId);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -38,22 +38,22 @@ namespace ix
|
|||||||
std::atomic<bool> authenticated(false);
|
std::atomic<bool> authenticated(false);
|
||||||
std::atomic<bool> messageAcked(false);
|
std::atomic<bool> messageAcked(false);
|
||||||
|
|
||||||
conn.setEventCallback([&conn, &channel, &data, &authenticated, &messageAcked](
|
conn.setEventCallback([&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event)
|
||||||
ix::CobraEventType eventType,
|
{
|
||||||
const std::string& errMsg,
|
if (event->type == ix::CobraEventType::Open)
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId) {
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
|
||||||
{
|
{
|
||||||
spdlog::info("Publisher connected");
|
spdlog::info("Publisher connected");
|
||||||
|
|
||||||
for (auto it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
spdlog::info("{}: {}", it.first, it.second);
|
spdlog::info("{}: {}", it.first, it.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Closed)
|
||||||
|
{
|
||||||
|
spdlog::info("Subscriber closed: {}", event->errMsg);
|
||||||
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
spdlog::info("Publisher authenticated");
|
spdlog::info("Publisher authenticated");
|
||||||
authenticated = true;
|
authenticated = true;
|
||||||
@ -64,27 +64,35 @@ namespace ix
|
|||||||
|
|
||||||
spdlog::info("Published msg {}", msgId);
|
spdlog::info("Published msg {}", msgId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Publisher: subscribed to channel {}", subscriptionId);
|
spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Publisher: unsubscribed from channel {}", subscriptionId);
|
spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
{
|
{
|
||||||
spdlog::error("Publisher: error {}", errMsg);
|
spdlog::error("Publisher: error {}", event->errMsg);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
spdlog::info("Published message id {} acked", msgId);
|
spdlog::info("Published message id {} acked", event->msgId);
|
||||||
messageAcked = true;
|
messageAcked = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Pong)
|
else if (event->type == ix::CobraEventType::Pong)
|
||||||
{
|
{
|
||||||
spdlog::info("Received websocket pong");
|
spdlog::info("Received websocket pong");
|
||||||
}
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
|
||||||
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
conn.connect();
|
conn.connect();
|
||||||
|
@ -106,21 +106,22 @@ namespace ix
|
|||||||
&msgPerSeconds,
|
&msgPerSeconds,
|
||||||
&quiet,
|
&quiet,
|
||||||
&fluentd,
|
&fluentd,
|
||||||
&fatalCobraError](ix::CobraEventType eventType,
|
&fatalCobraError](const CobraEventPtr& event)
|
||||||
const std::string& errMsg,
|
{
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
if (event->type == ix::CobraEventType::Open)
|
||||||
const std::string& subscriptionId,
|
|
||||||
CobraConnection::MsgId msgId) {
|
|
||||||
if (eventType == ix::CobraEventType::Open)
|
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber connected");
|
spdlog::info("Subscriber connected");
|
||||||
|
|
||||||
for (auto it : headers)
|
for (auto&& it : event->headers)
|
||||||
{
|
{
|
||||||
spdlog::info("{}: {}", it.first, it.second);
|
spdlog::info("{}: {}", it.first, it.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Authenticated)
|
else if (event->type == ix::CobraEventType::Closed)
|
||||||
|
{
|
||||||
|
spdlog::info("Subscriber closed: {}", event->errMsg);
|
||||||
|
}
|
||||||
|
else if (event->type == ix::CobraEventType::Authenticated)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber authenticated");
|
spdlog::info("Subscriber authenticated");
|
||||||
spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition);
|
spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition);
|
||||||
@ -145,39 +146,39 @@ namespace ix
|
|||||||
subscriptionPosition = position;
|
subscriptionPosition = position;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Subscribed)
|
else if (event->type == ix::CobraEventType::Subscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
|
spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::UnSubscribed)
|
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
|
spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Error)
|
else if (event->type == ix::CobraEventType::Error)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: error {}", errMsg);
|
spdlog::error("Subscriber: error {}", event->errMsg);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Published)
|
else if (event->type == ix::CobraEventType::Published)
|
||||||
{
|
{
|
||||||
spdlog::error("Published message hacked: {}", msgId);
|
spdlog::error("Published message hacked: {}", event->msgId);
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::Pong)
|
else if (event->type == ix::CobraEventType::Pong)
|
||||||
{
|
{
|
||||||
spdlog::info("Received websocket pong");
|
spdlog::info("Received websocket pong");
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::HandshakeError)
|
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Handshake error: {}", errMsg);
|
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::AuthenticationError)
|
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Authentication error: {}", errMsg);
|
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraEventType::SubscriptionError)
|
else if (event->type == ix::CobraEventType::SubscriptionError)
|
||||||
{
|
{
|
||||||
spdlog::error("Subscriber: Subscription error: {}", errMsg);
|
spdlog::error("Subscriber: Subscription error: {}", event->errMsg);
|
||||||
fatalCobraError = true;
|
fatalCobraError = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
Loading…
x
Reference in New Issue
Block a user