This commit is contained in:
Ross Jacobs
2020-04-20 22:59:20 -07:00
committed by GitHub
parent 36257cbfe4
commit 5860c5c80b
50 changed files with 10078 additions and 8095 deletions

View File

@ -6,8 +6,8 @@
#pragma once
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
namespace ix
{

View File

@ -5,17 +5,17 @@
*/
#include "IXCobraConnection.h"
#include <ixcrypto/IXHMac.h>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <algorithm>
#include <stdexcept>
#include <cmath>
#include <cassert>
#include <cmath>
#include <cstring>
#include <iostream>
#include <ixcrypto/IXHMac.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <sstream>
#include <stdexcept>
namespace ix
@ -26,12 +26,12 @@ namespace ix
constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId;
constexpr int CobraConnection::kPingIntervalSecs;
CobraConnection::CobraConnection() :
_webSocket(new WebSocket()),
_publishMode(CobraConnection_PublishMode_Immediate),
_authenticated(false),
_eventCallback(nullptr),
_id(1)
CobraConnection::CobraConnection()
: _webSocket(new WebSocket())
, _publishMode(CobraConnection_PublishMode_Immediate)
, _authenticated(false)
, _eventCallback(nullptr)
, _id(1)
{
_pdu["action"] = "rtm/publish";
@ -97,11 +97,7 @@ namespace ix
if (_eventCallback)
{
_eventCallback(
std::make_unique<CobraEvent>(eventType,
errorMsg,
headers,
subscriptionId,
msgId));
std::make_unique<CobraEvent>(eventType, errorMsg, headers, subscriptionId, msgId));
}
}
@ -121,126 +117,119 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback()
{
_webSocket->setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
_webSocket->setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
invokeEventCallback(ix::CobraEventType::Open, std::string(), msg->openInfo.headers);
sendHandshakeMessage();
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
_authenticated = false;
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
ss << "Close code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraEventType::Closed, ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
Json::Value data;
Json::Reader reader;
if (!reader.parse(msg->str, data))
{
invokeEventCallback(ix::CobraEventType::Open,
std::string(),
msg->openInfo.headers);
sendHandshakeMessage();
invokeErrorCallback("Invalid json", msg->str);
return;
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
_authenticated = false;
std::stringstream ss;
ss << "Close code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraEventType::Closed,
ss.str());
if (!data.isMember("action"))
{
invokeErrorCallback("Missing action", msg->str);
return;
}
else if (msg->type == ix::WebSocketMessageType::Message)
auto action = data["action"].asString();
if (action == "auth/handshake/ok")
{
Json::Value data;
Json::Reader reader;
if (!reader.parse(msg->str, data))
if (!handleHandshakeResponse(data))
{
invokeErrorCallback("Invalid json", msg->str);
return;
}
if (!data.isMember("action"))
{
invokeErrorCallback("Missing action", msg->str);
return;
}
auto action = data["action"].asString();
if (action == "auth/handshake/ok")
{
if (!handleHandshakeResponse(data))
{
invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
}
}
else if (action == "auth/handshake/error")
{
invokeEventCallback(ix::CobraEventType::HandshakeError,
invokeErrorCallback("Error extracting nonce from handshake response",
msg->str);
}
else if (action == "auth/authenticate/ok")
{
_authenticated = true;
invokeEventCallback(ix::CobraEventType::Authenticated);
flushQueue();
}
else if (action == "auth/authenticate/error")
{
invokeEventCallback(ix::CobraEventType::AuthenticationError,
msg->str);
}
else if (action == "rtm/subscription/data")
{
handleSubscriptionData(data);
}
else if (action == "rtm/subscribe/ok")
{
if (!handleSubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", msg->str);
}
}
else if (action == "rtm/subscribe/error")
{
invokeEventCallback(ix::CobraEventType::SubscriptionError,
msg->str);
}
else if (action == "rtm/unsubscribe/ok")
{
if (!handleUnsubscriptionResponse(data))
{
invokeErrorCallback("Error processing unsubscribe response", msg->str);
}
}
else if (action == "rtm/unsubscribe/error")
{
invokeErrorCallback("Unsubscription error", msg->str);
}
else if (action == "rtm/publish/ok")
{
if (!handlePublishResponse(data))
{
invokeErrorCallback("Error processing publish response", msg->str);
}
}
else if (action == "rtm/publish/error")
{
invokeErrorCallback("Publish error", msg->str);
}
else
{
invokeErrorCallback("Un-handled message type", msg->str);
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (action == "auth/handshake/error")
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string());
invokeEventCallback(ix::CobraEventType::HandshakeError, msg->str);
}
else if (msg->type == ix::WebSocketMessageType::Pong)
else if (action == "auth/authenticate/ok")
{
invokeEventCallback(ix::CobraEventType::Pong, msg->str);
_authenticated = true;
invokeEventCallback(ix::CobraEventType::Authenticated);
flushQueue();
}
else if (action == "auth/authenticate/error")
{
invokeEventCallback(ix::CobraEventType::AuthenticationError, msg->str);
}
else if (action == "rtm/subscription/data")
{
handleSubscriptionData(data);
}
else if (action == "rtm/subscribe/ok")
{
if (!handleSubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", msg->str);
}
}
else if (action == "rtm/subscribe/error")
{
invokeEventCallback(ix::CobraEventType::SubscriptionError, msg->str);
}
else if (action == "rtm/unsubscribe/ok")
{
if (!handleUnsubscriptionResponse(data))
{
invokeErrorCallback("Error processing unsubscribe response", msg->str);
}
}
else if (action == "rtm/unsubscribe/error")
{
invokeErrorCallback("Unsubscription error", msg->str);
}
else if (action == "rtm/publish/ok")
{
if (!handlePublishResponse(data))
{
invokeErrorCallback("Error processing publish response", msg->str);
}
}
else if (action == "rtm/publish/error")
{
invokeErrorCallback("Publish error", msg->str);
}
else
{
invokeErrorCallback("Un-handled message type", msg->str);
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string());
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
invokeEventCallback(ix::CobraEventType::Pong, msg->str);
}
});
}
@ -254,12 +243,13 @@ namespace ix
return _publishMode;
}
void CobraConnection::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions)
void CobraConnection::configure(
const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions)
{
_roleName = rolename;
_roleSecret = rolesecret;
@ -402,7 +392,8 @@ namespace ix
if (!subscriptionId.isString()) return false;
invokeEventCallback(ix::CobraEventType::Subscribed,
std::string(), WebSocketHttpHeaders(),
std::string(),
WebSocketHttpHeaders(),
subscriptionId.asString());
return true;
}
@ -420,7 +411,8 @@ namespace ix
if (!subscriptionId.isString()) return false;
invokeEventCallback(ix::CobraEventType::UnSubscribed,
std::string(), WebSocketHttpHeaders(),
std::string(),
WebSocketHttpHeaders(),
subscriptionId.asString());
return true;
}
@ -468,8 +460,10 @@ namespace ix
uint64_t msgId = id.asUInt64();
invokeEventCallback(ix::CobraEventType::Published,
std::string(), WebSocketHttpHeaders(),
std::string(), msgId);
std::string(),
WebSocketHttpHeaders(),
std::string(),
msgId);
invokePublishTrackerCallback(false, true);
@ -499,9 +493,7 @@ namespace ix
}
std::pair<CobraConnection::MsgId, std::string> CobraConnection::prePublish(
const Json::Value& channels,
const Json::Value& msg,
bool addToQueue)
const Json::Value& channels, const Json::Value& msg, bool addToQueue)
{
std::lock_guard<std::mutex> lock(_prePublishMutex);
@ -667,8 +659,7 @@ namespace ix
bool CobraConnection::publishMessage(const std::string& serializedJson)
{
auto webSocketSendInfo = _webSocket->send(serializedJson);
CobraConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize,
false);
CobraConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
return webSocketSendInfo.success;
}

View File

@ -6,20 +6,19 @@
#pragma once
#include "IXCobraConfig.h"
#include "IXCobraEvent.h"
#include "IXCobraEventType.h"
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
#include "IXCobraEventType.h"
#include "IXCobraEvent.h"
#include <json/json.h>
#include <limits>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <limits>
#include "IXCobraConfig.h"
#ifdef max
#undef max
@ -121,10 +120,9 @@ namespace ix
/// Prepare a message for transmission
/// (update the pdu, compute a msgId, serialize json to a string)
std::pair<CobraConnection::MsgId, std::string> prePublish(
const Json::Value& channels,
const Json::Value& msg,
bool addToQueue);
std::pair<CobraConnection::MsgId, std::string> prePublish(const Json::Value& channels,
const Json::Value& msg,
bool addToQueue);
/// Attempt to send next message from the internal queue
bool publishNext();

View File

@ -7,10 +7,10 @@
#pragma once
#include "IXCobraEventType.h"
#include <cstdint>
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <memory>
#include <string>
#include <cstdint>
namespace ix
{
@ -38,4 +38,4 @@ namespace ix
};
using CobraEventPtr = std::unique_ptr<CobraEvent>;
}
} // namespace ix

View File

@ -5,9 +5,9 @@
*/
#include "IXCobraMetricsPublisher.h"
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <algorithm>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <stdexcept>
@ -17,8 +17,8 @@ namespace ix
const std::string CobraMetricsPublisher::kSetRateControlId = "sms_set_rate_control_id";
const std::string CobraMetricsPublisher::kSetBlacklistId = "sms_set_blacklist_id";
CobraMetricsPublisher::CobraMetricsPublisher() :
_enabled(true)
CobraMetricsPublisher::CobraMetricsPublisher()
: _enabled(true)
{
}
@ -27,8 +27,7 @@ namespace ix
;
}
void CobraMetricsPublisher::configure(const CobraConfig& config,
const std::string& channel)
void CobraMetricsPublisher::configure(const CobraConfig& config, const std::string& channel)
{
// Configure the satori connection and start its publish background thread
_cobra_metrics_theaded_publisher.configure(config, channel);
@ -42,7 +41,7 @@ namespace ix
}
void CobraMetricsPublisher::setGenericAttributes(const std::string& attrName,
const Json::Value& value)
const Json::Value& value)
{
std::lock_guard<std::mutex> lock(_device_mutex);
_device[attrName] = value;
@ -107,8 +106,7 @@ namespace ix
auto last_update = _last_update.find(id);
if (last_update == _last_update.end()) return false;
auto timeDeltaFromLastSend =
std::chrono::steady_clock::now() - last_update->second;
auto timeDeltaFromLastSend = std::chrono::steady_clock::now() - last_update->second;
return timeDeltaFromLastSend < std::chrono::seconds(rate_control_it->second);
}
@ -123,8 +121,7 @@ namespace ix
{
auto now = std::chrono::system_clock::now();
auto ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()).count();
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
return ms;
}
@ -165,10 +162,9 @@ namespace ix
return true;
}
CobraConnection::MsgId CobraMetricsPublisher::push(
const std::string& id,
const Json::Value& data,
bool shouldPushTest)
CobraConnection::MsgId CobraMetricsPublisher::push(const std::string& id,
const Json::Value& data,
bool shouldPushTest)
{
if (shouldPushTest && !shouldPush(id)) return CobraConnection::kInvalidMsgId;

View File

@ -40,8 +40,7 @@ namespace ix
/// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const CobraConfig& config,
const std::string& channel);
void configure(const CobraConfig& config, const std::string& channel);
/// Setter for the list of blacklisted metrics ids.
/// That list is sorted internally for fast lookups
@ -68,10 +67,14 @@ namespace ix
/// shouldPush method for places where we want to be as lightweight as possible when
/// collecting metrics. When set to false, it is used so that we don't do double work when
/// computing whether a metrics should be sent or not.
CobraConnection::MsgId push(const std::string& id, const Json::Value& data, bool shouldPushTest = true);
CobraConnection::MsgId push(const std::string& id,
const Json::Value& data,
bool shouldPushTest = true);
/// Interface used by lua. msg is a json encoded string.
CobraConnection::MsgId push(const std::string& id, const std::string& data, bool shouldPushTest = true);
CobraConnection::MsgId push(const std::string& id,
const std::string& data,
bool shouldPushTest = true);
/// Tells whether a metric can be pushed.
/// A metric can be pushed if it satisfies those conditions:
@ -89,10 +92,16 @@ namespace ix
void setGenericAttributes(const std::string& attrName, const Json::Value& value);
/// Set a unique id for the session. A uuid can be used.
void setSession(const std::string& session) { _session = session; }
void setSession(const std::string& session)
{
_session = session;
}
/// Get the unique id used to identify the current session
const std::string& getSession() const { return _session; }
const std::string& getSession() const
{
return _session;
}
/// Return the number of milliseconds since the epoch (~1970)
uint64_t getMillisecondsSinceEpoch() const;

View File

@ -5,78 +5,77 @@
*/
#include "IXCobraMetricsThreadedPublisher.h"
#include <ixwebsocket/IXSetThreadName.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <algorithm>
#include <stdexcept>
#include <cmath>
#include <cassert>
#include <cmath>
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXSetThreadName.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <sstream>
#include <stdexcept>
namespace ix
{
CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher() :
_stop(false)
CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher()
: _stop(false)
{
_cobra_connection.setEventCallback([](const CobraEventPtr& event)
_cobra_connection.setEventCallback([](const CobraEventPtr& event) {
std::stringstream ss;
if (event->type == ix::CobraEventType::Open)
{
std::stringstream ss;
ss << "Handshake headers" << std::endl;
if (event->type == ix::CobraEventType::Open)
for (auto&& it : event->headers)
{
ss << "Handshake headers" << std::endl;
ss << it.first << ": " << it.second << std::endl;
}
}
else if (event->type == ix::CobraEventType::Authenticated)
{
ss << "Authenticated";
}
else if (event->type == ix::CobraEventType::Error)
{
ss << "Error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::Closed)
{
ss << "Connection closed: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::Subscribed)
{
ss << "Subscribed through subscription id: " << event->subscriptionId;
}
else if (event->type == ix::CobraEventType::UnSubscribed)
{
ss << "Unsubscribed through subscription id: " << event->subscriptionId;
}
else if (event->type == ix::CobraEventType::Published)
{
ss << "Published message " << event->msgId << " acked";
}
else if (event->type == ix::CobraEventType::Pong)
{
ss << "Received websocket pong";
}
else if (event->type == ix::CobraEventType::HandshakeError)
{
ss << "Handshake error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
ss << "Authentication error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::SubscriptionError)
{
ss << "Subscription error: " << event->errMsg;
}
for (auto&& it : event->headers)
{
ss << it.first << ": " << it.second << std::endl;
}
}
else if (event->type == ix::CobraEventType::Authenticated)
{
ss << "Authenticated";
}
else if (event->type == ix::CobraEventType::Error)
{
ss << "Error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::Closed)
{
ss << "Connection closed: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::Subscribed)
{
ss << "Subscribed through subscription id: " << event->subscriptionId;
}
else if (event->type == ix::CobraEventType::UnSubscribed)
{
ss << "Unsubscribed through subscription id: " << event->subscriptionId;
}
else if (event->type == ix::CobraEventType::Published)
{
ss << "Published message " << event->msgId << " acked";
}
else if (event->type == ix::CobraEventType::Pong)
{
ss << "Received websocket pong";
}
else if (event->type == ix::CobraEventType::HandshakeError)
{
ss << "Handshake error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
ss << "Authentication error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::SubscriptionError)
{
ss << "Subscription error: " << event->errMsg;
}
ix::IXCoreLogger::Log(ss.str().c_str());
ix::IXCoreLogger::Log(ss.str().c_str());
});
}
@ -105,7 +104,6 @@ namespace ix
_channel = channel;
_cobra_connection.configure(config);
}
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind)
@ -163,13 +161,15 @@ namespace ix
{
_cobra_connection.suspend();
continue;
}; break;
};
break;
case MessageKind::Resume:
{
_cobra_connection.resume();
continue;
}; break;
};
break;
case MessageKind::Message:
{
@ -177,7 +177,8 @@ namespace ix
{
_cobra_connection.publishNext();
}
}; break;
};
break;
}
}
}

View File

@ -27,8 +27,7 @@ namespace ix
~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc...
void configure(const CobraConfig& config,
const std::string& channel);
void configure(const CobraConfig& config, const std::string& channel);
/// Start the worker thread, used for background publishing
void start();