/* * ws_cobra_to_statsd.cpp * Author: Benjamin Sergeant * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. */ #include #include #include #include #include #include #include #include #ifndef _WIN32 #include #endif namespace { class QueueManager { public: QueueManager(size_t maxQueueSize, std::atomic& stop) : _maxQueueSize(maxQueueSize) , _stop(stop) { } Json::Value pop(); void add(Json::Value msg); private: std::queue _queue; std::mutex _mutex; std::condition_variable _condition; size_t _maxQueueSize; std::atomic& _stop; }; Json::Value QueueManager::pop() { std::unique_lock lock(_mutex); if (_queue.empty()) { Json::Value val; return val; } _condition.wait(lock, [this] { return !_stop; }); auto msg = _queue.front(); _queue.pop(); return msg; } void QueueManager::add(Json::Value msg) { std::unique_lock lock(_mutex); // if the sending is not fast enough there is no point // in queuing too many events. if (_queue.size() < _maxQueueSize) { _queue.push(msg); _condition.notify_one(); } } } // namespace namespace ix { // fields are command line argument that can be specified multiple times std::vector parseFields(const std::string& fields) { std::vector tokens; // Split by \n std::string token; std::stringstream tokenStream(fields); while (std::getline(tokenStream, token)) { tokens.push_back(token); } return tokens; } // // Extract an attribute from a Json Value. // extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz // std::string extractAttr(const std::string& attr, const Json::Value& jsonValue) { // Split by . std::string token; std::stringstream tokenStream(attr); Json::Value val(jsonValue); while (std::getline(tokenStream, token, '.')) { val = val[token]; } return val.asString(); } int ws_cobra_to_statsd_main(const std::string& appkey, const std::string& endpoint, const std::string& rolename, const std::string& rolesecret, const std::string& channel, const std::string& filter, const std::string& host, int port, const std::string& prefix, const std::string& fields, bool verbose, const ix::SocketTLSOptions& tlsOptions) { ix::CobraConnection conn; conn.configure(appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true), tlsOptions); conn.connect(); auto tokens = parseFields(fields); Json::FastWriter jsonWriter; std::atomic sentCount(0); std::atomic receivedCount(0); std::atomic stop(false); size_t maxQueueSize = 1000; QueueManager queueManager(maxQueueSize, stop); auto timer = [&sentCount, &receivedCount] { while (true) { spdlog::info("messages received {} sent {}", receivedCount, sentCount); auto duration = std::chrono::seconds(1); std::this_thread::sleep_for(duration); } }; std::thread t1(timer); auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] { // statsd client // test with netcat as a server: `nc -ul 8125` bool statsdBatch = true; #ifndef _WIN32 statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch); #else int statsdClient; #endif while (true) { Json::Value msg = queueManager.pop(); if (msg.isNull()) continue; if (stop) return; std::string id; for (auto&& attr : tokens) { id += "."; id += extractAttr(attr, msg); } sentCount += 1; #ifndef _WIN32 statsdClient.count(id, 1); #endif } }; std::thread t2(statsdSender); conn.setEventCallback( [&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount]( ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, const std::string& subscriptionId, CobraConnection::MsgId msgId) { if (eventType == ix::CobraConnection_EventType_Open) { spdlog::info("Subscriber connected"); for (auto it : headers) { spdlog::info("{}: {}", it.first, it.second); } } if (eventType == ix::CobraConnection_EventType_Closed) { spdlog::info("Subscriber closed"); } else if (eventType == ix::CobraConnection_EventType_Authenticated) { spdlog::info("Subscriber authenticated"); conn.subscribe(channel, filter, [&jsonWriter, &queueManager, verbose, &receivedCount]( const Json::Value& msg) { if (verbose) { spdlog::info(jsonWriter.write(msg)); } receivedCount++; ++receivedCount; queueManager.add(msg); }); } else if (eventType == ix::CobraConnection_EventType_Subscribed) { spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); } else if (eventType == ix::CobraConnection_EventType_UnSubscribed) { spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); } else if (eventType == ix::CobraConnection_EventType_Error) { spdlog::error("Subscriber: error {}", errMsg); } else if (eventType == ix::CobraConnection_EventType_Published) { spdlog::error("Published message hacked: {}", msgId); } else if (eventType == ix::CobraConnection_EventType_Pong) { spdlog::info("Received websocket pong"); } }); while (true) { std::chrono::duration duration(1000); std::this_thread::sleep_for(duration); } return 0; } } // namespace ix