diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index d9fe3661..e231a322 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -35,6 +35,8 @@ add_executable(ws IXRedisClient.cpp IXSentryClient.cpp IXCobraConnection.cpp + IXCobraMetricsPublisher.cpp + IXCobraMetricsThreadedPublisher.cpp ws_http_client.cpp ws_ping_pong.cpp @@ -48,6 +50,7 @@ add_executable(ws ws_redis_publish.cpp ws_redis_subscribe.cpp ws_cobra_subscribe.cpp + ws_cobra_publish.cpp ws_cobra_to_statsd.cpp ws_cobra_to_sentry.cpp ws.cpp) diff --git a/ws/IXCobraConnection.cpp b/ws/IXCobraConnection.cpp index d1165de1..072a7d55 100644 --- a/ws/IXCobraConnection.cpp +++ b/ws/IXCobraConnection.cpp @@ -389,6 +389,11 @@ namespace ix return _webSocket->getReadyState() == ix::WebSocket_ReadyState_Open; } + bool CobraConnection::isAuthenticated() const + { + return isConnected() && _authenticated; + } + std::string CobraConnection::serializeJson(const Json::Value& value) { std::lock_guard lock(_jsonWriterMutex); diff --git a/ws/IXCobraConnection.h b/ws/IXCobraConnection.h index 1cc23e97..5f511267 100644 --- a/ws/IXCobraConnection.h +++ b/ws/IXCobraConnection.h @@ -91,6 +91,9 @@ namespace ix /// Returns true only if we're connected bool isConnected() const; + /// Returns true only if we're authenticated + bool isAuthenticated() const; + /// Flush the publish queue bool flushQueue(); diff --git a/ws/IXCobraMetricsPublisher.cpp b/ws/IXCobraMetricsPublisher.cpp new file mode 100644 index 00000000..3b804542 --- /dev/null +++ b/ws/IXCobraMetricsPublisher.cpp @@ -0,0 +1,228 @@ +/* + * IXCobraMetricsPublisher.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2017 Machine Zone. All rights reserved. + */ + +#include "IXCobraMetricsPublisher.h" + +#include +#include + + +namespace ix +{ + const int CobraMetricsPublisher::kVersion = 1; + const std::string CobraMetricsPublisher::kSetRateControlId = "cms_set_rate_control_id"; + const std::string CobraMetricsPublisher::kSetBlacklistId = "cms_set_blacklist_id"; + + CobraMetricsPublisher::CobraMetricsPublisher() : + _enabled(false) + { + } + + CobraMetricsPublisher::~CobraMetricsPublisher() + { + ; + } + + void CobraMetricsPublisher::configure(const std::string& appkey, + const std::string& endpoint, + const std::string& channel, + const std::string& rolename, + const std::string& rolesecret, + bool enablePerMessageDeflate) + { + // Configure the satori connection and start its publish background thread + _cobra_metrics_theaded_publisher.start(); + + _cobra_metrics_theaded_publisher.configure(appkey, endpoint, channel, + rolename, rolesecret, + enablePerMessageDeflate); + } + + Json::Value& CobraMetricsPublisher::getGenericAttributes() + { + std::lock_guard lock(_device_mutex); + return _device; + } + + void CobraMetricsPublisher::setGenericAttributes(const std::string& attrName, + const Json::Value& value) + { + std::lock_guard lock(_device_mutex); + _device[attrName] = value; + } + + void CobraMetricsPublisher::enable(bool enabled) + { + _enabled = enabled; + } + + void CobraMetricsPublisher::setBlacklist(const std::vector& blacklist) + { + _blacklist = blacklist; + std::sort(_blacklist.begin(), _blacklist.end()); + + // publish our blacklist + Json::Value data; + Json::Value metrics; + for (auto&& metric : blacklist) + { + metrics.append(metric); + } + data["blacklist"] = metrics; + push(kSetBlacklistId, data); + } + + bool CobraMetricsPublisher::isMetricBlacklisted(const std::string& id) const + { + return std::binary_search(_blacklist.begin(), _blacklist.end(), id); + } + + void CobraMetricsPublisher::setRateControl( + const std::unordered_map& rate_control) + { + for (auto&& it : rate_control) + { + if (it.second >= 0) + { + _rate_control[it.first] = it.second; + } + } + + // publish our rate_control + Json::Value data; + Json::Value metrics; + for (auto&& it : _rate_control) + { + metrics[it.first] = it.second; + } + data["rate_control"] = metrics; + push(kSetRateControlId, data); + } + + bool CobraMetricsPublisher::isAboveMaxUpdateRate(const std::string& id) const + { + // Is this metrics rate controlled ? + auto rate_control_it = _rate_control.find(id); + if (rate_control_it == _rate_control.end()) return false; + + // Was this metrics already sent ? + std::lock_guard lock(_last_update_mutex); + auto last_update = _last_update.find(id); + if (last_update == _last_update.end()) return false; + + auto timeDeltaFromLastSend = + std::chrono::steady_clock::now() - last_update->second; + + return timeDeltaFromLastSend < std::chrono::seconds(rate_control_it->second); + } + + void CobraMetricsPublisher::setLastUpdate(const std::string& id) + { + std::lock_guard lock(_last_update_mutex); + _last_update[id] = std::chrono::steady_clock::now(); + } + + uint64_t CobraMetricsPublisher::getMillisecondsSinceEpoch() const + { + auto now = std::chrono::system_clock::now(); + auto ms = + std::chrono::duration_cast( + now.time_since_epoch()).count(); + + return ms; + } + + void CobraMetricsPublisher::push(const std::string& id, + const std::string& data, + bool shouldPushTest) + { + if (!_enabled) return; + + Json::Value root; + Json::Reader reader; + if (!reader.parse(data, root)) return; + + push(id, root, shouldPushTest); + } + + void CobraMetricsPublisher::push(const std::string& id, + const CobraMetricsPublisher::Message& data) + { + if (!_enabled) return; + + Json::Value root; + for (auto it : data) + { + root[it.first] = it.second; + } + + push(id, root); + } + + bool CobraMetricsPublisher::shouldPush(const std::string& id) const + { + if (!_enabled) return false; + if (isMetricBlacklisted(id)) return false; + if (isAboveMaxUpdateRate(id)) return false; + + return true; + } + + void CobraMetricsPublisher::push(const std::string& id, + const Json::Value& data, + bool shouldPushTest) + { + if (shouldPushTest && !shouldPush(id)) return; + + setLastUpdate(id); + + Json::Value msg; + msg["id"] = id; + msg["data"] = data; + msg["session"] = _session; + msg["version"] = kVersion; + msg["timestamp"] = getMillisecondsSinceEpoch(); + + { + std::lock_guard lock(_device_mutex); + msg["device"] = _device; + } + + // Now actually enqueue the task + _cobra_metrics_theaded_publisher.push(msg); + } + + void CobraMetricsPublisher::setPublishMode(CobraConnectionPublishMode publishMode) + { + _cobra_metrics_theaded_publisher.setPublishMode(publishMode); + } + + bool CobraMetricsPublisher::flushQueue() + { + return _cobra_metrics_theaded_publisher.flushQueue(); + } + + void CobraMetricsPublisher::suspend() + { + _cobra_metrics_theaded_publisher.suspend(); + } + + void CobraMetricsPublisher::resume() + { + _cobra_metrics_theaded_publisher.resume(); + } + + bool CobraMetricsPublisher::isConnected() const + { + return _cobra_metrics_theaded_publisher.isConnected(); + } + + bool CobraMetricsPublisher::isAuthenticated() const + { + return _cobra_metrics_theaded_publisher.isAuthenticated(); + } + +} // namespace ix diff --git a/ws/IXCobraMetricsPublisher.h b/ws/IXCobraMetricsPublisher.h new file mode 100644 index 00000000..1d2fa369 --- /dev/null +++ b/ws/IXCobraMetricsPublisher.h @@ -0,0 +1,165 @@ +/* + * IXCobraMetricsPublisher.h + * Author: Benjamin Sergeant + * Copyright (c) 2017 Machine Zone. All rights reserved. + */ + +#pragma once + +#include "IXCobraMetricsThreadedPublisher.h" + +#include + +#include +#include +#include + +namespace ix +{ + class CobraMetricsPublisher + { + public: + CobraMetricsPublisher(); + ~CobraMetricsPublisher(); + + /// Thread safety notes: + /// + /// 1. _enabled, _blacklist and _rate_control read/writes are not protected by a mutex + /// to make shouldPush as fast as possible. _enabled default to false. + /// + /// The code that set those is ran only once at init, and + /// the last value to be set is _enabled, which is also the first value checked in shouldPush, + /// so there shouldn't be any race condition. + /// + /// 2. The queue of messages is thread safe, so multiple metrics can be safely pushed on multiple threads + /// + /// 3. Access to _last_update is protected as it needs to be read/write. + /// + + /// Configuration / set keys, etc... + /// All input data but the channel name is encrypted with rc4 + void configure(const std::string& appkey, + const std::string& endpoint, + const std::string& channel, + const std::string& rolename, + const std::string& rolesecret, + bool enablePerMessageDeflate); + + /// Setter for the list of blacklisted metrics ids. + /// That list is sorted internally for fast lookups + void setBlacklist(const std::vector& blacklist); + + /// Set the maximum rate at which a metrics can be sent. Unit is seconds + /// if rate_control = { 'foo_id': 60 }, + /// the foo_id metric cannot be pushed more than once every 60 seconds + void setRateControl(const std::unordered_map& rate_control); + + /// Configuration / enable/disable + void enable(bool enabled); + + /// Simple interface, list of key value pairs where typeof(key) == typeof(value) == string + typedef std::unordered_map Message; + void push(const std::string& id, + const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message()); + + /// Richer interface using json, which supports types (bool, int, float) and hierarchies of elements + /// + /// The shouldPushTest argument should be set to false, and used in combination with the shouldPush method + /// for places where we want to be as lightweight as possible when collecting metrics. When set to false, + /// it is used so that we don't do double work when computing whether a metrics should be sent or not. + void push(const std::string& id, + const Json::Value& data, + bool shouldPushTest = true); + + /// Interface used by lua. msg is a json encoded string. + void push(const std::string& id, + const std::string& data, + bool shouldPushTest = true); + + /// Tells whether a metric can be pushed. + /// A metric can be pushed if it satisfies those conditions: + /// + /// 1. the metrics system should be enabled + /// 2. the metrics shouldn't be black-listed + /// 3. the metrics shouldn't have reached its rate control limit at this "sampling"/"calling" time + bool shouldPush(const std::string& id) const; + + /// Get generic information json object + Json::Value& getGenericAttributes(); + + /// Set generic information values + void setGenericAttributes(const std::string& attrName, + const Json::Value& value); + + /// Set a unique id for the session. A uuid can be used. + void setSession(const std::string& session) { _session = session; } + + /// Get the unique id used to identify the current session + const std::string& getSession() const { return _session; } + + /// Return the number of milliseconds since the epoch (~1970) + uint64_t getMillisecondsSinceEpoch() const; + + /// Set satori connection publish mode + void setPublishMode(CobraConnectionPublishMode publishMode); + + /// Flush the publish queue + bool flushQueue(); + + /// Lifecycle management. Free resources when backgrounding + void suspend(); + void resume(); + + /// Tells whether the socket connection is opened + bool isConnected() const; + + /// Returns true only if we're authenticated + bool isAuthenticated() const; + + private: + + /// Lookup an id in our metrics to see whether it is blacklisted + /// Complexity is logarithmic + bool isMetricBlacklisted(const std::string& id) const; + + /// Tells whether we should drop a metrics or not as part of an enqueuing + /// because it exceed the max update rate (it is sent too often) + bool isAboveMaxUpdateRate(const std::string& id) const; + + /// Record when a metric was last sent. Used for rate control + void setLastUpdate(const std::string& id); + + /// + /// Member variables + /// + + CobraMetricsThreadedPublisher _cobra_metrics_theaded_publisher; + + /// A boolean to enable or disable this system + /// push becomes a no-op when _enabled is true + bool _enabled; + + /// A uuid used to uniquely identify a session + std::string _session; + + /// The _device json blob is populated once when configuring this system + /// It record generic metadata about the client, run (version, device model, etc...) + Json::Value _device; + mutable std::mutex _device_mutex; // protect access to _device + + /// Metrics control (black list + rate control) + std::vector _blacklist; + std::unordered_map _rate_control; + std::unordered_map> _last_update; + mutable std::mutex _last_update_mutex; // protect access to _last_update + + // const strings for internal ids + static const std::string kSetRateControlId; + static const std::string kSetBlacklistId; + + /// Our protocol version. Can be used by subscribers who would want to be backward compatible + /// if we change the way we arrange data + static const int kVersion; + }; + +} // namespace ix diff --git a/ws/IXCobraMetricsThreadedPublisher.cpp b/ws/IXCobraMetricsThreadedPublisher.cpp new file mode 100644 index 00000000..4a400e48 --- /dev/null +++ b/ws/IXCobraMetricsThreadedPublisher.cpp @@ -0,0 +1,219 @@ +/* + * IXCobraMetricsThreadedPublisher.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2017 Machine Zone. All rights reserved. + */ + +#include "IXCobraMetricsThreadedPublisher.h" +#include + +#include +#include +#include +#include +#include + + +namespace ix +{ + CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher() : + _stop(false) + { + _cobra_connection.setEventCallback( + [] + (ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId) + { + std::stringstream ss; + + if (eventType == ix::CobraConnection_EventType_Open) + { + ss << "Handshake headers" << std::endl; + + for (auto it : headers) + { + ss << it.first << ": " << it.second << std::endl; + } + } + else if (eventType == ix::CobraConnection_EventType_Authenticated) + { + ss << "Authenticated"; + } + else if (eventType == ix::CobraConnection_EventType_Error) + { + ss << "Error: " << errMsg; + } + else if (eventType == ix::CobraConnection_EventType_Closed) + { + ss << "Connection closed: " << errMsg; + } + else if (eventType == ix::CobraConnection_EventType_Subscribed) + { + ss << "Subscribed through subscription id: " << subscriptionId; + } + else if (eventType == ix::CobraConnection_EventType_UnSubscribed) + { + ss << "Unsubscribed through subscription id: " << subscriptionId; + } + + std::cerr << ss.str() << std::endl; + }); + } + + CobraMetricsThreadedPublisher::~CobraMetricsThreadedPublisher() + { + // The background thread won't be joinable if it was never + // started by calling CobraMetricsThreadedPublisher::start + if (!_thread.joinable()) return; + + _stop = true; + _condition.notify_one(); + _thread.join(); + } + + void CobraMetricsThreadedPublisher::start() + { + if (_thread.joinable()) return; // we've already been started + + _thread = std::thread(&CobraMetricsThreadedPublisher::run, this); + } + + void CobraMetricsThreadedPublisher::configure(const std::string& appkey, + const std::string& endpoint, + const std::string& channel, + const std::string& rolename, + const std::string& rolesecret, + bool enablePerMessageDeflate) + { + _channel = channel; + + ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate); + _cobra_connection.configure(appkey, endpoint, + rolename, rolesecret, + webSocketPerMessageDeflateOptions); + } + + void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, + const Json::Value& msg) + { + // Now actually enqueue the task + { + // acquire lock + std::unique_lock lock(_queue_mutex); + + // add the task + _queue.push(std::make_pair(messageKind, msg)); + } // release lock + + // wake up one thread + _condition.notify_one(); + } + + void CobraMetricsThreadedPublisher::setPublishMode(CobraConnectionPublishMode publishMode) + { + _cobra_connection.setPublishMode(publishMode); + } + + bool CobraMetricsThreadedPublisher::flushQueue() + { + return _cobra_connection.flushQueue(); + } + + void CobraMetricsThreadedPublisher::run() + { + setThreadName("CobraMetricsPublisher"); + + Json::Value channels; + channels.append(std::string()); + channels.append(std::string()); + const std::string messageIdKey("id"); + + _cobra_connection.connect(); + + while (true) + { + Json::Value msg; + MessageKind messageKind; + + { + std::unique_lock lock(_queue_mutex); + + while (!_stop && _queue.empty()) + { + _condition.wait(lock); + } + if (_stop) + { + _cobra_connection.disconnect(); + return; + } + + auto item = _queue.front(); + _queue.pop(); + + messageKind = item.first; + msg = item.second; + } + + switch (messageKind) + { + case MessageKind::Suspend: + { + _cobra_connection.suspend(); + continue; + }; break; + + case MessageKind::Resume: + { + _cobra_connection.resume(); + continue; + }; break; + + case MessageKind::Message: + { + ; + }; break; + } + + // + // Publish to multiple channels. This let the consumer side + // easily subscribe to all message of a certain type, without having + // to do manipulations on the messages on the server side. + // + channels[0] = _channel; + if (msg.isMember(messageIdKey)) + { + channels[1] = msg[messageIdKey]; + } + _cobra_connection.publish(channels, msg); + } + } + + void CobraMetricsThreadedPublisher::push(const Json::Value& msg) + { + pushMessage(MessageKind::Message, msg); + } + + void CobraMetricsThreadedPublisher::suspend() + { + pushMessage(MessageKind::Suspend, Json::Value()); + } + + void CobraMetricsThreadedPublisher::resume() + { + pushMessage(MessageKind::Resume, Json::Value()); + } + + bool CobraMetricsThreadedPublisher::isConnected() const + { + return _cobra_connection.isConnected(); + } + + bool CobraMetricsThreadedPublisher::isAuthenticated() const + { + return _cobra_connection.isAuthenticated(); + } + +} // namespace ix diff --git a/ws/IXCobraMetricsThreadedPublisher.h b/ws/IXCobraMetricsThreadedPublisher.h new file mode 100644 index 00000000..2e119fca --- /dev/null +++ b/ws/IXCobraMetricsThreadedPublisher.h @@ -0,0 +1,108 @@ +/* + * IXCobraMetricsThreadedPublisher.h + * Author: Benjamin Sergeant + * Copyright (c) 2017 Machine Zone. All rights reserved. + */ + +#pragma once + +#include "IXCobraConnection.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace ix +{ + class CobraMetricsThreadedPublisher + { + public: + CobraMetricsThreadedPublisher(); + ~CobraMetricsThreadedPublisher(); + + /// Configuration / set keys, etc... + /// All input data but the channel name is encrypted with rc4 + void configure(const std::string& appkey, + const std::string& endpoint, + const std::string& channel, + const std::string& rolename, + const std::string& rolesecret, + bool enablePerMessageDeflate); + + /// Start the worker thread, used for background publishing + void start(); + + /// Push a msg to our queue of messages to be published to cobra on the background + // thread. Main user right now is the Cobra Metrics System + void push(const Json::Value& msg); + + /// Set cobra connection publish mode + void setPublishMode(CobraConnectionPublishMode publishMode); + + /// Flush the publish queue + bool flushQueue(); + + /// Lifecycle management. Free resources when backgrounding + void suspend(); + void resume(); + + /// Tells whether the socket connection is opened + bool isConnected() const; + + /// Returns true only if we're authenticated + bool isAuthenticated() const; + + private: + enum class MessageKind + { + Message = 0, + Suspend = 1, + Resume = 2 + }; + + /// Push a message to be processed by the background thread + void pushMessage(MessageKind messageKind, + const Json::Value& msg); + + /// Get a wait time which is increasing exponentially based on the number of retries + uint64_t getWaitTimeExp(int retry_count); + + /// Debugging routine to print the connection parameters to the console + void printInfo(); + + /// Publish a message to satory + /// Will retry multiple times (3) if a problem occurs. + /// + /// Right now, only called on the publish worker thread. + void safePublish(const Json::Value& msg); + + /// The worker thread "daemon" method. That method never returns unless _stop is set to true + void run(); + + /// Our connection to cobra. + CobraConnection _cobra_connection; + + /// The channel we are publishing to + std::string _channel; + + /// Internal data structures used to publish to cobra + /// Pending messages are stored into a queue, which is protected by a mutex + /// We used a condition variable to prevent the worker thread from busy polling + /// So we notify the condition variable when an incoming message arrives to signal + /// that it should wake up and take care of publishing it to cobra + /// To shutdown the worker thread one has to set the _stop boolean to true. + /// This is done in the destructor + std::queue> _queue; + mutable std::mutex _queue_mutex; + std::condition_variable _condition; + std::atomic _stop; + std::thread _thread; + }; + +} // namespace ix diff --git a/ws/ws.cpp b/ws/ws.cpp index a2ca0337..8a8ba8c6 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -135,6 +135,16 @@ int main(int argc, char** argv) cobraSubscribeApp->add_option("channel", channel, "Channel")->required(); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); + CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); + cobraPublish->add_option("--appkey", appkey, "Appkey"); + cobraPublish->add_option("--endpoint", endpoint, "Endpoint"); + cobraPublish->add_option("--rolename", rolename, "Role name"); + cobraPublish->add_option("--rolesecret", rolesecret, "Role secret"); + cobraPublish->add_option("channel", channel, "Channel")->required(); + cobraPublish->add_option("--pidfile", pidfile, "Pid file"); + cobraPublish->add_option("path", path, "Path to the file to send") + ->required()->check(CLI::ExistingPath); + CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); cobra2statsd->add_option("--appkey", appkey, "Appkey"); cobra2statsd->add_option("--endpoint", endpoint, "Endpoint"); @@ -228,6 +238,12 @@ int main(int argc, char** argv) rolename, rolesecret, channel); } + else if (app.got_subcommand("cobra_publish")) + { + return ix::ws_cobra_publish_main(appkey, endpoint, + rolename, rolesecret, + channel, path); + } else if (app.got_subcommand("cobra_to_statsd")) { return ix::ws_cobra_to_statsd_main(appkey, endpoint, diff --git a/ws/ws.h b/ws/ws.h index e574ef12..050c1d9b 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -59,6 +59,13 @@ namespace ix const std::string& rolesecret, const std::string& channel); + int ws_cobra_publish_main(const std::string& appkey, + const std::string& endpoint, + const std::string& rolename, + const std::string& rolesecret, + const std::string& channel, + const std::string& path); + int ws_cobra_to_statsd_main(const std::string& appkey, const std::string& endpoint, const std::string& rolename, diff --git a/ws/ws_cobra_publish.cpp b/ws/ws_cobra_publish.cpp new file mode 100644 index 00000000..3696870a --- /dev/null +++ b/ws/ws_cobra_publish.cpp @@ -0,0 +1,50 @@ +/* + * ws_cobra_publish.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "IXCobraMetricsPublisher.h" + +namespace ix +{ + int ws_cobra_publish_main(const std::string& appkey, + const std::string& endpoint, + const std::string& rolename, + const std::string& rolesecret, + const std::string& channel, + const std::string& path) + { + CobraMetricsPublisher cobraMetricsPublisher; + cobraMetricsPublisher.enable(true); + + bool enablePerMessageDeflate = true; + cobraMetricsPublisher.configure(appkey, endpoint, channel, + rolename, rolesecret, enablePerMessageDeflate); + + while (!cobraMetricsPublisher.isAuthenticated()) ; + + std::ifstream f(path); + std::string str((std::istreambuf_iterator(f)), + std::istreambuf_iterator()); + + Json::Value data; + Json::Reader reader; + if (!reader.parse(str, data)) return 1; + + cobraMetricsPublisher.push(std::string("foo_id"), data); + + // Wait a bit for the message to get a chance to be sent + // there isn't any ack on publish right now so it's the best we can do + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + return 0; + } +}