+add utf-8 validation code, not hooked up properly yet

+ws autobahn / Add code to test websocket client compliance with the autobahn test-suite
+Ping received with a payload too large (> 125 bytes) trigger a connection closure
+cobra / add tracking about published messages
+cobra / publish returns a message id, that can be used when
+cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system).
This commit is contained in:
Benjamin Sergeant 2019-08-31 16:46:44 -07:00
parent 6b2cdb6b54
commit 10dbe2d44d
16 changed files with 375 additions and 21 deletions

View File

@ -1 +1 @@
5.0.7 5.1.0

View File

@ -1,8 +1,18 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [5.1.0] - 2019-08-31
add utf-8 validation code, not hooked up properly yet
ws autobahn / Add code to test websocket client compliance with the autobahn test-suite
Ping received with a payload too large (> 125 bytes) trigger a connection closure
cobra / add tracking about published messages
cobra / publish returns a message id, that can be used when
cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system).
## [5.0.9] - 2019-08-30 ## [5.0.9] - 2019-08-30
User-Agent header is set when not specified.
New option to cap the max wait between reconnection attempts. Still default to 10s. (setMaxWaitBetweenReconnectionRetries). New option to cap the max wait between reconnection attempts. Still default to 10s. (setMaxWaitBetweenReconnectionRetries).
``` ```

View File

@ -12,6 +12,63 @@
#include <cmath> #include <cmath>
#include <cassert> #include <cassert>
namespace
{
//
// Stolen from here http://www.zedwood.com/article/cpp-is-valid-utf8-string-function
// There doesn't seem to be anything in the C++ library so far to do that.
// The closest thing is code for converting from utf-8 to utf-16 or utf-32 but
// that isn't working well for some broken input strings.
//
bool isValidUtf8(const std::string& str)
{
size_t i = 0;
size_t ix = str.length();
int c, n, j;
for (; i < ix; i++)
{
c = (unsigned char) str[i];
//if (c==0x09 || c==0x0a || c==0x0d || (0x20 <= c && c <= 0x7e) ) n = 0; // is_printable_ascii
if (0x00 <= c && c <= 0x7f)
{
n = 0; // 0bbbbbbb
}
else if ((c & 0xE0) == 0xC0)
{
n = 1; // 110bbbbb
}
else if ( c==0xed && i<(ix-1) && ((unsigned char)str[i+1] & 0xa0)==0xa0)
{
return false; //U+d800 to U+dfff
}
else if ((c & 0xF0) == 0xE0)
{
n = 2; // 1110bbbb
}
else if ((c & 0xF8) == 0xF0)
{
n = 3; // 11110bbb
}
//else if (($c & 0xFC) == 0xF8) n=4; // 111110bb //byte 5, unnecessary in 4 byte UTF-8
//else if (($c & 0xFE) == 0xFC) n=5; // 1111110b //byte 6, unnecessary in 4 byte UTF-8
else
{
return false;
}
for (j=0; j<n && i<ix; j++)
{ // n bytes matching 10bbbbbb follow ?
if ((++i == ix) || (( (unsigned char)str[i] & 0xC0) != 0x80))
{
return false;
}
}
}
return true;
}
}
namespace ix namespace ix
{ {
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr; OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
@ -404,6 +461,11 @@ namespace ix
WebSocketSendInfo WebSocket::sendText(const std::string& text, WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {
if (!isValidUtf8(text))
{
stop();
return false;
}
return sendMessage(text, SendMessageKind::Text, onProgressCallback); return sendMessage(text, SendMessageKind::Text, onProgressCallback);
} }

View File

@ -590,6 +590,17 @@ namespace ix
std::string pingData(_rxbuf.begin()+ws.header_size, std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N); _rxbuf.begin()+ws.header_size + (size_t) ws.N);
// too large
if (pingData.size() > 125)
{
std::string reason("reason control frame with payload length > 125 octets");
// Unexpected frame type
close(1002,
reason,
reason.size());
return;
}
if (_enablePong) if (_enablePong)
{ {
// Reply back right away // Reply back right away

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "5.0.9" #define IX_WEBSOCKET_VERSION "5.1.0"

View File

@ -70,6 +70,7 @@ add_executable(ws
ws_cobra_to_sentry.cpp ws_cobra_to_sentry.cpp
ws_snake.cpp ws_snake.cpp
ws_httpd.cpp ws_httpd.cpp
ws_autobahn.cpp
ws.cpp) ws.cpp)
target_link_libraries(ws ixwebsocket) target_link_libraries(ws ixwebsocket)

View File

@ -18,6 +18,7 @@
namespace ix namespace ix
{ {
TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr; TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr;
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize; constexpr size_t CobraConnection::kQueueMaxSize;
CobraConnection::CobraConnection() : CobraConnection::CobraConnection() :
@ -56,6 +57,24 @@ namespace ix
} }
} }
void CobraConnection::setPublishTrackerCallback(const PublishTrackerCallback& callback)
{
_publishTrackerCallback = callback;
}
void CobraConnection::resetPublishTrackerCallback()
{
setPublishTrackerCallback(nullptr);
}
void CobraConnection::invokePublishTrackerCallback(bool sent, bool acked)
{
if (_publishTrackerCallback)
{
_publishTrackerCallback(sent, acked);
}
}
void CobraConnection::setEventCallback(const EventCallback& eventCallback) void CobraConnection::setEventCallback(const EventCallback& eventCallback)
{ {
std::lock_guard<std::mutex> lock(_eventCallbackMutex); std::lock_guard<std::mutex> lock(_eventCallbackMutex);
@ -65,12 +84,13 @@ namespace ix
void CobraConnection::invokeEventCallback(ix::CobraConnectionEventType eventType, void CobraConnection::invokeEventCallback(ix::CobraConnectionEventType eventType,
const std::string& errorMsg, const std::string& errorMsg,
const WebSocketHttpHeaders& headers, const WebSocketHttpHeaders& headers,
const std::string& subscriptionId) const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{ {
std::lock_guard<std::mutex> lock(_eventCallbackMutex); std::lock_guard<std::mutex> lock(_eventCallbackMutex);
if (_eventCallback) if (_eventCallback)
{ {
_eventCallback(eventType, errorMsg, headers, subscriptionId); _eventCallback(eventType, errorMsg, headers, subscriptionId, msgId);
} }
} }
@ -178,6 +198,17 @@ namespace ix
{ {
invokeErrorCallback("Unsubscription error", msg->str); invokeErrorCallback("Unsubscription error", msg->str);
} }
else if (action == "rtm/publish/ok")
{
if (!handlePublishResponse(data))
{
invokeErrorCallback("Error processing publish response", msg->str);
}
}
else if (action == "rtm/publish/error")
{
invokeErrorCallback("Publish error", msg->str);
}
else else
{ {
invokeErrorCallback("Un-handled message type", msg->str); invokeErrorCallback("Un-handled message type", msg->str);
@ -374,6 +405,24 @@ namespace ix
return true; return true;
} }
bool CobraConnection::handlePublishResponse(const Json::Value& pdu)
{
if (!pdu.isMember("id")) return false;
Json::Value id = pdu["id"];
if (!id.isUInt64()) return false;
uint64_t msgId = id.asUInt64();
invokeEventCallback(ix::CobraConnection_EventType_Published,
std::string(), WebSocketHttpHeaders(),
std::string(), msgId);
invokePublishTrackerCallback(false, true);
return true;
}
bool CobraConnection::connect() bool CobraConnection::connect()
{ {
_webSocket->start(); _webSocket->start();
@ -399,9 +448,11 @@ namespace ix
// //
// publish is not thread safe as we are trying to reuse some Json objects. // publish is not thread safe as we are trying to reuse some Json objects.
// //
bool CobraConnection::publish(const Json::Value& channels, CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg) const Json::Value& msg)
{ {
invokePublishTrackerCallback(true, false);
_body["channels"] = channels; _body["channels"] = channels;
_body["message"] = msg; _body["message"] = msg;
_pdu["body"] = _body; _pdu["body"] = _body;
@ -412,7 +463,7 @@ namespace ix
if (_publishMode == CobraConnection_PublishMode_Batch) if (_publishMode == CobraConnection_PublishMode_Batch)
{ {
enqueue(serializedJson); enqueue(serializedJson);
return true; return _id - 1;
} }
// //
@ -421,14 +472,14 @@ namespace ix
// //
if (_authenticated && publishMessage(serializedJson)) if (_authenticated && publishMessage(serializedJson))
{ {
return true; return _id - 1;
} }
else // Or else we enqueue else // Or else we enqueue
// Slow code path is when we haven't connected yet (startup), // Slow code path is when we haven't connected yet (startup),
// or when the connection drops for some reason. // or when the connection drops for some reason.
{ {
enqueue(serializedJson); enqueue(serializedJson);
return false; return 0;
} }
} }

View File

@ -15,6 +15,7 @@
#include <string> #include <string>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <limits>
namespace ix namespace ix
{ {
@ -27,7 +28,8 @@ namespace ix
CobraConnection_EventType_Open = 2, CobraConnection_EventType_Open = 2,
CobraConnection_EventType_Closed = 3, CobraConnection_EventType_Closed = 3,
CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5 CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6
}; };
enum CobraConnectionPublishMode enum CobraConnectionPublishMode
@ -40,12 +42,17 @@ namespace ix
using EventCallback = std::function<void(CobraConnectionEventType, using EventCallback = std::function<void(CobraConnectionEventType,
const std::string&, const std::string&,
const WebSocketHttpHeaders&, const WebSocketHttpHeaders&,
const std::string&)>; const std::string&,
uint64_t msgId)>;
using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>; using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
using PublishTrackerCallback = std::function<void(bool sent, bool acked)>;
class CobraConnection class CobraConnection
{ {
public: public:
using MsgId = uint64_t;
CobraConnection(); CobraConnection();
~CobraConnection(); ~CobraConnection();
@ -57,11 +64,18 @@ namespace ix
const std::string& rolesecret, const std::string& rolesecret,
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions); const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions);
/// Set the traffic tracker callback
static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);
/// Reset the traffic tracker callback to an no-op one. /// Reset the traffic tracker callback to an no-op one.
static void resetTrafficTrackerCallback(); static void resetTrafficTrackerCallback();
/// Set the publish tracker callback
static void setPublishTrackerCallback(const PublishTrackerCallback& callback);
/// Reset the publish tracker callback to an no-op one.
static void resetPublishTrackerCallback();
/// Set the closed callback /// Set the closed callback
void setEventCallback(const EventCallback& eventCallback); void setEventCallback(const EventCallback& eventCallback);
@ -71,7 +85,7 @@ namespace ix
/// Publish a message to a channel /// Publish a message to a channel
/// ///
/// No-op if the connection is not established /// No-op if the connection is not established
bool publish(const Json::Value& channels, const Json::Value& msg); MsgId publish(const Json::Value& channels, const Json::Value& msg);
// Subscribe to a channel, and execute a callback when an incoming // Subscribe to a channel, and execute a callback when an incoming
// message arrives. // message arrives.
@ -111,6 +125,7 @@ namespace ix
bool handleSubscriptionData(const Json::Value& pdu); bool handleSubscriptionData(const Json::Value& pdu);
bool handleSubscriptionResponse(const Json::Value& pdu); bool handleSubscriptionResponse(const Json::Value& pdu);
bool handleUnsubscriptionResponse(const Json::Value& pdu); bool handleUnsubscriptionResponse(const Json::Value& pdu);
bool handlePublishResponse(const Json::Value& pdu);
void initWebSocketOnMessageCallback(); void initWebSocketOnMessageCallback();
@ -121,11 +136,15 @@ namespace ix
/// Invoke the traffic tracker callback /// Invoke the traffic tracker callback
static void invokeTrafficTrackerCallback(size_t size, bool incoming); static void invokeTrafficTrackerCallback(size_t size, bool incoming);
/// Invoke the publish tracker callback
static void invokePublishTrackerCallback(bool sent, bool acked);
/// Invoke event callbacks /// Invoke event callbacks
void invokeEventCallback(CobraConnectionEventType eventType, void invokeEventCallback(CobraConnectionEventType eventType,
const std::string& errorMsg = std::string(), const std::string& errorMsg = std::string(),
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders(), const WebSocketHttpHeaders& headers = WebSocketHttpHeaders(),
const std::string& subscriptionId = std::string()); const std::string& subscriptionId = std::string(),
uint64_t msgId = std::numeric_limits<uint64_t>::max());
void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu); void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu);
/// ///
@ -150,6 +169,9 @@ namespace ix
/// Traffic tracker callback /// Traffic tracker callback
static TrafficTrackerCallback _trafficTrackerCallback; static TrafficTrackerCallback _trafficTrackerCallback;
/// Publish tracker callback
static PublishTrackerCallback _publishTrackerCallback;
/// Cobra events callbacks /// Cobra events callbacks
EventCallback _eventCallback; EventCallback _eventCallback;
mutable std::mutex _eventCallbackMutex; mutable std::mutex _eventCallbackMutex;

View File

@ -25,7 +25,8 @@ namespace ix
(ix::CobraConnectionEventType eventType, (ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId) const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{ {
std::stringstream ss; std::stringstream ss;
@ -58,6 +59,10 @@ namespace ix
{ {
ss << "Unsubscribed through subscription id: " << subscriptionId; ss << "Unsubscribed through subscription id: " << subscriptionId;
} }
else if (eventType == ix::CobraConnection_EventType_Published)
{
ss << "Published message " << msgId << " acked";
}
ix::IXCoreLogger::Log(ss.str().c_str()); ix::IXCoreLogger::Log(ss.str().c_str());
}); });

View File

@ -226,6 +226,9 @@ int main(int argc, char** argv)
httpServerApp->add_option("--port", port, "Port"); httpServerApp->add_option("--port", port, "Port");
httpServerApp->add_option("--host", hostname, "Hostname"); httpServerApp->add_option("--host", hostname, "Hostname");
CLI::App* autobahnApp = app.add_subcommand("autobahn", "Test client Autobahn compliance");
autobahnApp->add_option("--url", url, "url");
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
// pid file handling // pid file handling
@ -328,6 +331,10 @@ int main(int argc, char** argv)
{ {
ret = ix::ws_httpd_main(port, hostname); ret = ix::ws_httpd_main(port, hostname);
} }
else if (app.got_subcommand("autobahn"))
{
ret = ix::ws_autobahn_main(url);
}
ix::uninitNetSystem(); ix::uninitNetSystem();
return ret; return ret;

View File

@ -102,4 +102,6 @@ namespace ix
const std::string& appsConfigPath); const std::string& appsConfigPath);
int ws_httpd_main(int port, const std::string& hostname); int ws_httpd_main(int port, const std::string& hostname);
int ws_autobahn_main(const std::string& url);
} // namespace ix } // namespace ix

154
ws/ws_autobahn.cpp Normal file
View File

@ -0,0 +1,154 @@
/*
* ws_autobahn.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <atomic>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
namespace ix
{
class AutobahnTestCase
{
public:
AutobahnTestCase(const std::string& _url);
void run();
private:
void log(const std::string& msg);
std::string _url;
ix::WebSocket _webSocket;
std::atomic<bool> _done;
};
AutobahnTestCase::AutobahnTestCase(const std::string& url) :
_url(url),
_done(false)
{
_webSocket.disableAutomaticReconnection();
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
void AutobahnTestCase::log(const std::string& msg)
{
std::cerr << msg << std::endl;
}
void AutobahnTestCase::run()
{
_webSocket.setUrl(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("autobahn: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "autobahn: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
_done = true;
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
// ss << "autobahn: received message: "
// << msg->str;
// log(ss.str());
_webSocket.send(msg->str, msg->binary);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
// And error can happen, in which case the test-case is marked done
_done = true;
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
std::cerr << "Received ping" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
std::cerr << "Received pong" << std::endl;
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
log("Waiting for being closed ...");
while (!_done)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
_webSocket.stop();
}
//
// make && bench ws autobahn --url 'ws://localhost:9001/runCase?case=9&agent=ixwebsocket' && ws connect -d 'ws://localhost:9001/updateReports?agent=ixwebsocket'
//
int ws_autobahn_main(const std::string& url)
{
int N = 1; // 519;
N++;
for (int i = 1 ; i < N; ++i)
{
int caseNumber = i;
std::stringstream ss;
ss << "ws://localhost:9001/runCase?case="
<< caseNumber
<< "&agent=ixwebsocket";
std::string url(ss.str());
AutobahnTestCase testCase(url);
testCase.run();
}
return 0;
}
}

View File

@ -12,6 +12,7 @@
#include <atomic> #include <atomic>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <spdlog/spdlog.h>
namespace ix namespace ix
{ {
@ -23,6 +24,16 @@ namespace ix
const std::string& path, const std::string& path,
bool stress) bool stress)
{ {
std::atomic<int> sentMessages(0);
std::atomic<int> ackedMessages(0);
CobraConnection::setPublishTrackerCallback(
[&sentMessages, &ackedMessages](bool sent, bool acked)
{
if (sent) sentMessages++;
if (acked) ackedMessages++;
}
);
CobraMetricsPublisher cobraMetricsPublisher; CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true); cobraMetricsPublisher.enable(true);
@ -64,8 +75,11 @@ namespace ix
// Wait a bit for the message to get a chance to be sent // Wait a bit for the message to get a chance to be sent
// there isn't any ack on publish right now so it's the best we can do // there isn't any ack on publish right now so it's the best we can do
// FIXME: this comment is a lie now
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages);
return 0; return 0;
} }
} }

View File

@ -57,7 +57,8 @@ namespace ix
(ix::CobraConnectionEventType eventType, (ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId) const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{ {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
@ -96,6 +97,10 @@ namespace ix
{ {
spdlog::error("Subscriber: error {}", errMsg); spdlog::error("Subscriber: error {}", errMsg);
} }
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
} }
); );

View File

@ -102,7 +102,8 @@ namespace ix
(ix::CobraConnectionEventType eventType, (ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId) const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{ {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
@ -169,6 +170,10 @@ namespace ix
{ {
spdlog::error("Subscriber: error {}", errMsg); spdlog::error("Subscriber: error {}", errMsg);
} }
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
} }
); );

View File

@ -95,7 +95,8 @@ namespace ix
(ix::CobraConnectionEventType eventType, (ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId) const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{ {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
@ -149,6 +150,10 @@ namespace ix
{ {
spdlog::error("Subscriber: error {}", errMsg); spdlog::error("Subscriber: error {}", errMsg);
} }
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
} }
); );