add cobra metrics publisher

This commit is contained in:
Benjamin Sergeant 2019-04-21 11:16:33 -07:00
parent d48bf9249b
commit f85c5002b7
10 changed files with 804 additions and 0 deletions

View File

@ -35,6 +35,8 @@ add_executable(ws
IXRedisClient.cpp
IXSentryClient.cpp
IXCobraConnection.cpp
IXCobraMetricsPublisher.cpp
IXCobraMetricsThreadedPublisher.cpp
ws_http_client.cpp
ws_ping_pong.cpp
@ -48,6 +50,7 @@ add_executable(ws
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws_cobra_subscribe.cpp
ws_cobra_publish.cpp
ws_cobra_to_statsd.cpp
ws_cobra_to_sentry.cpp
ws.cpp)

View File

@ -389,6 +389,11 @@ namespace ix
return _webSocket->getReadyState() == ix::WebSocket_ReadyState_Open;
}
bool CobraConnection::isAuthenticated() const
{
return isConnected() && _authenticated;
}
std::string CobraConnection::serializeJson(const Json::Value& value)
{
std::lock_guard<std::mutex> lock(_jsonWriterMutex);

View File

@ -91,6 +91,9 @@ namespace ix
/// Returns true only if we're connected
bool isConnected() const;
/// Returns true only if we're authenticated
bool isAuthenticated() const;
/// Flush the publish queue
bool flushQueue();

View File

@ -0,0 +1,228 @@
/*
* IXCobraMetricsPublisher.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include "IXCobraMetricsPublisher.h"
#include <algorithm>
#include <stdexcept>
namespace ix
{
const int CobraMetricsPublisher::kVersion = 1;
const std::string CobraMetricsPublisher::kSetRateControlId = "cms_set_rate_control_id";
const std::string CobraMetricsPublisher::kSetBlacklistId = "cms_set_blacklist_id";
CobraMetricsPublisher::CobraMetricsPublisher() :
_enabled(false)
{
}
CobraMetricsPublisher::~CobraMetricsPublisher()
{
;
}
void CobraMetricsPublisher::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate)
{
// Configure the satori connection and start its publish background thread
_cobra_metrics_theaded_publisher.start();
_cobra_metrics_theaded_publisher.configure(appkey, endpoint, channel,
rolename, rolesecret,
enablePerMessageDeflate);
}
Json::Value& CobraMetricsPublisher::getGenericAttributes()
{
std::lock_guard<std::mutex> lock(_device_mutex);
return _device;
}
void CobraMetricsPublisher::setGenericAttributes(const std::string& attrName,
const Json::Value& value)
{
std::lock_guard<std::mutex> lock(_device_mutex);
_device[attrName] = value;
}
void CobraMetricsPublisher::enable(bool enabled)
{
_enabled = enabled;
}
void CobraMetricsPublisher::setBlacklist(const std::vector<std::string>& blacklist)
{
_blacklist = blacklist;
std::sort(_blacklist.begin(), _blacklist.end());
// publish our blacklist
Json::Value data;
Json::Value metrics;
for (auto&& metric : blacklist)
{
metrics.append(metric);
}
data["blacklist"] = metrics;
push(kSetBlacklistId, data);
}
bool CobraMetricsPublisher::isMetricBlacklisted(const std::string& id) const
{
return std::binary_search(_blacklist.begin(), _blacklist.end(), id);
}
void CobraMetricsPublisher::setRateControl(
const std::unordered_map<std::string, int>& rate_control)
{
for (auto&& it : rate_control)
{
if (it.second >= 0)
{
_rate_control[it.first] = it.second;
}
}
// publish our rate_control
Json::Value data;
Json::Value metrics;
for (auto&& it : _rate_control)
{
metrics[it.first] = it.second;
}
data["rate_control"] = metrics;
push(kSetRateControlId, data);
}
bool CobraMetricsPublisher::isAboveMaxUpdateRate(const std::string& id) const
{
// Is this metrics rate controlled ?
auto rate_control_it = _rate_control.find(id);
if (rate_control_it == _rate_control.end()) return false;
// Was this metrics already sent ?
std::lock_guard<std::mutex> lock(_last_update_mutex);
auto last_update = _last_update.find(id);
if (last_update == _last_update.end()) return false;
auto timeDeltaFromLastSend =
std::chrono::steady_clock::now() - last_update->second;
return timeDeltaFromLastSend < std::chrono::seconds(rate_control_it->second);
}
void CobraMetricsPublisher::setLastUpdate(const std::string& id)
{
std::lock_guard<std::mutex> lock(_last_update_mutex);
_last_update[id] = std::chrono::steady_clock::now();
}
uint64_t CobraMetricsPublisher::getMillisecondsSinceEpoch() const
{
auto now = std::chrono::system_clock::now();
auto ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()).count();
return ms;
}
void CobraMetricsPublisher::push(const std::string& id,
const std::string& data,
bool shouldPushTest)
{
if (!_enabled) return;
Json::Value root;
Json::Reader reader;
if (!reader.parse(data, root)) return;
push(id, root, shouldPushTest);
}
void CobraMetricsPublisher::push(const std::string& id,
const CobraMetricsPublisher::Message& data)
{
if (!_enabled) return;
Json::Value root;
for (auto it : data)
{
root[it.first] = it.second;
}
push(id, root);
}
bool CobraMetricsPublisher::shouldPush(const std::string& id) const
{
if (!_enabled) return false;
if (isMetricBlacklisted(id)) return false;
if (isAboveMaxUpdateRate(id)) return false;
return true;
}
void CobraMetricsPublisher::push(const std::string& id,
const Json::Value& data,
bool shouldPushTest)
{
if (shouldPushTest && !shouldPush(id)) return;
setLastUpdate(id);
Json::Value msg;
msg["id"] = id;
msg["data"] = data;
msg["session"] = _session;
msg["version"] = kVersion;
msg["timestamp"] = getMillisecondsSinceEpoch();
{
std::lock_guard<std::mutex> lock(_device_mutex);
msg["device"] = _device;
}
// Now actually enqueue the task
_cobra_metrics_theaded_publisher.push(msg);
}
void CobraMetricsPublisher::setPublishMode(CobraConnectionPublishMode publishMode)
{
_cobra_metrics_theaded_publisher.setPublishMode(publishMode);
}
bool CobraMetricsPublisher::flushQueue()
{
return _cobra_metrics_theaded_publisher.flushQueue();
}
void CobraMetricsPublisher::suspend()
{
_cobra_metrics_theaded_publisher.suspend();
}
void CobraMetricsPublisher::resume()
{
_cobra_metrics_theaded_publisher.resume();
}
bool CobraMetricsPublisher::isConnected() const
{
return _cobra_metrics_theaded_publisher.isConnected();
}
bool CobraMetricsPublisher::isAuthenticated() const
{
return _cobra_metrics_theaded_publisher.isAuthenticated();
}
} // namespace ix

View File

@ -0,0 +1,165 @@
/*
* IXCobraMetricsPublisher.h
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#pragma once
#include "IXCobraMetricsThreadedPublisher.h"
#include <jsoncpp/json/json.h>
#include <string>
#include <unordered_map>
#include <chrono>
namespace ix
{
class CobraMetricsPublisher
{
public:
CobraMetricsPublisher();
~CobraMetricsPublisher();
/// Thread safety notes:
///
/// 1. _enabled, _blacklist and _rate_control read/writes are not protected by a mutex
/// to make shouldPush as fast as possible. _enabled default to false.
///
/// The code that set those is ran only once at init, and
/// the last value to be set is _enabled, which is also the first value checked in shouldPush,
/// so there shouldn't be any race condition.
///
/// 2. The queue of messages is thread safe, so multiple metrics can be safely pushed on multiple threads
///
/// 3. Access to _last_update is protected as it needs to be read/write.
///
/// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate);
/// Setter for the list of blacklisted metrics ids.
/// That list is sorted internally for fast lookups
void setBlacklist(const std::vector<std::string>& blacklist);
/// Set the maximum rate at which a metrics can be sent. Unit is seconds
/// if rate_control = { 'foo_id': 60 },
/// the foo_id metric cannot be pushed more than once every 60 seconds
void setRateControl(const std::unordered_map<std::string, int>& rate_control);
/// Configuration / enable/disable
void enable(bool enabled);
/// Simple interface, list of key value pairs where typeof(key) == typeof(value) == string
typedef std::unordered_map<std::string, std::string> Message;
void push(const std::string& id,
const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message());
/// Richer interface using json, which supports types (bool, int, float) and hierarchies of elements
///
/// The shouldPushTest argument should be set to false, and used in combination with the shouldPush method
/// for places where we want to be as lightweight as possible when collecting metrics. When set to false,
/// it is used so that we don't do double work when computing whether a metrics should be sent or not.
void push(const std::string& id,
const Json::Value& data,
bool shouldPushTest = true);
/// Interface used by lua. msg is a json encoded string.
void push(const std::string& id,
const std::string& data,
bool shouldPushTest = true);
/// Tells whether a metric can be pushed.
/// A metric can be pushed if it satisfies those conditions:
///
/// 1. the metrics system should be enabled
/// 2. the metrics shouldn't be black-listed
/// 3. the metrics shouldn't have reached its rate control limit at this "sampling"/"calling" time
bool shouldPush(const std::string& id) const;
/// Get generic information json object
Json::Value& getGenericAttributes();
/// Set generic information values
void setGenericAttributes(const std::string& attrName,
const Json::Value& value);
/// Set a unique id for the session. A uuid can be used.
void setSession(const std::string& session) { _session = session; }
/// Get the unique id used to identify the current session
const std::string& getSession() const { return _session; }
/// Return the number of milliseconds since the epoch (~1970)
uint64_t getMillisecondsSinceEpoch() const;
/// Set satori connection publish mode
void setPublishMode(CobraConnectionPublishMode publishMode);
/// Flush the publish queue
bool flushQueue();
/// Lifecycle management. Free resources when backgrounding
void suspend();
void resume();
/// Tells whether the socket connection is opened
bool isConnected() const;
/// Returns true only if we're authenticated
bool isAuthenticated() const;
private:
/// Lookup an id in our metrics to see whether it is blacklisted
/// Complexity is logarithmic
bool isMetricBlacklisted(const std::string& id) const;
/// Tells whether we should drop a metrics or not as part of an enqueuing
/// because it exceed the max update rate (it is sent too often)
bool isAboveMaxUpdateRate(const std::string& id) const;
/// Record when a metric was last sent. Used for rate control
void setLastUpdate(const std::string& id);
///
/// Member variables
///
CobraMetricsThreadedPublisher _cobra_metrics_theaded_publisher;
/// A boolean to enable or disable this system
/// push becomes a no-op when _enabled is true
bool _enabled;
/// A uuid used to uniquely identify a session
std::string _session;
/// The _device json blob is populated once when configuring this system
/// It record generic metadata about the client, run (version, device model, etc...)
Json::Value _device;
mutable std::mutex _device_mutex; // protect access to _device
/// Metrics control (black list + rate control)
std::vector<std::string> _blacklist;
std::unordered_map<std::string, int> _rate_control;
std::unordered_map<std::string, std::chrono::time_point<std::chrono::steady_clock>> _last_update;
mutable std::mutex _last_update_mutex; // protect access to _last_update
// const strings for internal ids
static const std::string kSetRateControlId;
static const std::string kSetBlacklistId;
/// Our protocol version. Can be used by subscribers who would want to be backward compatible
/// if we change the way we arrange data
static const int kVersion;
};
} // namespace ix

View File

@ -0,0 +1,219 @@
/*
* IXCobraMetricsThreadedPublisher.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include "IXCobraMetricsThreadedPublisher.h"
#include <ixwebsocket/IXSetThreadName.h>
#include <algorithm>
#include <stdexcept>
#include <cmath>
#include <cassert>
#include <iostream>
namespace ix
{
CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher() :
_stop(false)
{
_cobra_connection.setEventCallback(
[]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId)
{
std::stringstream ss;
if (eventType == ix::CobraConnection_EventType_Open)
{
ss << "Handshake headers" << std::endl;
for (auto it : headers)
{
ss << it.first << ": " << it.second << std::endl;
}
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
ss << "Authenticated";
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
ss << "Error: " << errMsg;
}
else if (eventType == ix::CobraConnection_EventType_Closed)
{
ss << "Connection closed: " << errMsg;
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
ss << "Subscribed through subscription id: " << subscriptionId;
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
ss << "Unsubscribed through subscription id: " << subscriptionId;
}
std::cerr << ss.str() << std::endl;
});
}
CobraMetricsThreadedPublisher::~CobraMetricsThreadedPublisher()
{
// The background thread won't be joinable if it was never
// started by calling CobraMetricsThreadedPublisher::start
if (!_thread.joinable()) return;
_stop = true;
_condition.notify_one();
_thread.join();
}
void CobraMetricsThreadedPublisher::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&CobraMetricsThreadedPublisher::run, this);
}
void CobraMetricsThreadedPublisher::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate)
{
_channel = channel;
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate);
_cobra_connection.configure(appkey, endpoint,
rolename, rolesecret,
webSocketPerMessageDeflateOptions);
}
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind,
const Json::Value& msg)
{
// Now actually enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queue_mutex);
// add the task
_queue.push(std::make_pair(messageKind, msg));
} // release lock
// wake up one thread
_condition.notify_one();
}
void CobraMetricsThreadedPublisher::setPublishMode(CobraConnectionPublishMode publishMode)
{
_cobra_connection.setPublishMode(publishMode);
}
bool CobraMetricsThreadedPublisher::flushQueue()
{
return _cobra_connection.flushQueue();
}
void CobraMetricsThreadedPublisher::run()
{
setThreadName("CobraMetricsPublisher");
Json::Value channels;
channels.append(std::string());
channels.append(std::string());
const std::string messageIdKey("id");
_cobra_connection.connect();
while (true)
{
Json::Value msg;
MessageKind messageKind;
{
std::unique_lock<std::mutex> lock(_queue_mutex);
while (!_stop && _queue.empty())
{
_condition.wait(lock);
}
if (_stop)
{
_cobra_connection.disconnect();
return;
}
auto item = _queue.front();
_queue.pop();
messageKind = item.first;
msg = item.second;
}
switch (messageKind)
{
case MessageKind::Suspend:
{
_cobra_connection.suspend();
continue;
}; break;
case MessageKind::Resume:
{
_cobra_connection.resume();
continue;
}; break;
case MessageKind::Message:
{
;
}; break;
}
//
// Publish to multiple channels. This let the consumer side
// easily subscribe to all message of a certain type, without having
// to do manipulations on the messages on the server side.
//
channels[0] = _channel;
if (msg.isMember(messageIdKey))
{
channels[1] = msg[messageIdKey];
}
_cobra_connection.publish(channels, msg);
}
}
void CobraMetricsThreadedPublisher::push(const Json::Value& msg)
{
pushMessage(MessageKind::Message, msg);
}
void CobraMetricsThreadedPublisher::suspend()
{
pushMessage(MessageKind::Suspend, Json::Value());
}
void CobraMetricsThreadedPublisher::resume()
{
pushMessage(MessageKind::Resume, Json::Value());
}
bool CobraMetricsThreadedPublisher::isConnected() const
{
return _cobra_connection.isConnected();
}
bool CobraMetricsThreadedPublisher::isAuthenticated() const
{
return _cobra_connection.isAuthenticated();
}
} // namespace ix

View File

@ -0,0 +1,108 @@
/*
* IXCobraMetricsThreadedPublisher.h
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#pragma once
#include "IXCobraConnection.h"
#include <jsoncpp/json/json.h>
#include <string>
#include <queue>
#include <mutex>
#include <thread>
#include <map>
#include <atomic>
#include <condition_variable>
namespace ix
{
class CobraMetricsThreadedPublisher
{
public:
CobraMetricsThreadedPublisher();
~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate);
/// Start the worker thread, used for background publishing
void start();
/// Push a msg to our queue of messages to be published to cobra on the background
// thread. Main user right now is the Cobra Metrics System
void push(const Json::Value& msg);
/// Set cobra connection publish mode
void setPublishMode(CobraConnectionPublishMode publishMode);
/// Flush the publish queue
bool flushQueue();
/// Lifecycle management. Free resources when backgrounding
void suspend();
void resume();
/// Tells whether the socket connection is opened
bool isConnected() const;
/// Returns true only if we're authenticated
bool isAuthenticated() const;
private:
enum class MessageKind
{
Message = 0,
Suspend = 1,
Resume = 2
};
/// Push a message to be processed by the background thread
void pushMessage(MessageKind messageKind,
const Json::Value& msg);
/// Get a wait time which is increasing exponentially based on the number of retries
uint64_t getWaitTimeExp(int retry_count);
/// Debugging routine to print the connection parameters to the console
void printInfo();
/// Publish a message to satory
/// Will retry multiple times (3) if a problem occurs.
///
/// Right now, only called on the publish worker thread.
void safePublish(const Json::Value& msg);
/// The worker thread "daemon" method. That method never returns unless _stop is set to true
void run();
/// Our connection to cobra.
CobraConnection _cobra_connection;
/// The channel we are publishing to
std::string _channel;
/// Internal data structures used to publish to cobra
/// Pending messages are stored into a queue, which is protected by a mutex
/// We used a condition variable to prevent the worker thread from busy polling
/// So we notify the condition variable when an incoming message arrives to signal
/// that it should wake up and take care of publishing it to cobra
/// To shutdown the worker thread one has to set the _stop boolean to true.
/// This is done in the destructor
std::queue<std::pair<MessageKind, Json::Value>> _queue;
mutable std::mutex _queue_mutex;
std::condition_variable _condition;
std::atomic<bool> _stop;
std::thread _thread;
};
} // namespace ix

View File

@ -135,6 +135,16 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_option("channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->add_option("--appkey", appkey, "Appkey");
cobraPublish->add_option("--endpoint", endpoint, "Endpoint");
cobraPublish->add_option("--rolename", rolename, "Role name");
cobraPublish->add_option("--rolesecret", rolesecret, "Role secret");
cobraPublish->add_option("channel", channel, "Channel")->required();
cobraPublish->add_option("--pidfile", pidfile, "Pid file");
cobraPublish->add_option("path", path, "Path to the file to send")
->required()->check(CLI::ExistingPath);
CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd");
cobra2statsd->add_option("--appkey", appkey, "Appkey");
cobra2statsd->add_option("--endpoint", endpoint, "Endpoint");
@ -228,6 +238,12 @@ int main(int argc, char** argv)
rolename, rolesecret,
channel);
}
else if (app.got_subcommand("cobra_publish"))
{
return ix::ws_cobra_publish_main(appkey, endpoint,
rolename, rolesecret,
channel, path);
}
else if (app.got_subcommand("cobra_to_statsd"))
{
return ix::ws_cobra_to_statsd_main(appkey, endpoint,

View File

@ -59,6 +59,13 @@ namespace ix
const std::string& rolesecret,
const std::string& channel);
int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& path);
int ws_cobra_to_statsd_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,

50
ws/ws_cobra_publish.cpp Normal file
View File

@ -0,0 +1,50 @@
/*
* ws_cobra_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <fstream>
#include <sstream>
#include <chrono>
#include <thread>
#include <atomic>
#include <jsoncpp/json/json.h>
#include "IXCobraMetricsPublisher.h"
namespace ix
{
int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& path)
{
CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true);
bool enablePerMessageDeflate = true;
cobraMetricsPublisher.configure(appkey, endpoint, channel,
rolename, rolesecret, enablePerMessageDeflate);
while (!cobraMetricsPublisher.isAuthenticated()) ;
std::ifstream f(path);
std::string str((std::istreambuf_iterator<char>(f)),
std::istreambuf_iterator<char>());
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data)) return 1;
cobraMetricsPublisher.push(std::string("foo_id"), data);
// Wait a bit for the message to get a chance to be sent
// there isn't any ack on publish right now so it's the best we can do
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 0;
}
}