diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 106c6a57..5fffdbb6 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [7.8.6] - 2019-12-28 + +(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd + ## [7.8.5] - 2019-12-28 (ws cobra to sentry) handle null events for empty queues diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index f235fe54..c15e324d 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "7.8.5" +#define IX_WEBSOCKET_VERSION "7.8.6" diff --git a/ws/ws_cobra_to_sentry.cpp b/ws/ws_cobra_to_sentry.cpp index b1ba4436..2d872761 100644 --- a/ws/ws_cobra_to_sentry.cpp +++ b/ws/ws_cobra_to_sentry.cpp @@ -22,7 +22,10 @@ namespace ix class QueueManager { public: - QueueManager(size_t maxQueueSize, std::atomic &stop) : _maxQueueSize(maxQueueSize), _stop(stop) {} + QueueManager(size_t maxQueueSize, + std::atomic &stop) : + _maxQueueSize(maxQueueSize), + _stop(stop) {} Json::Value pop(); void add(Json::Value msg); @@ -146,11 +149,7 @@ namespace ix { Json::Value msg = queueManager.pop(); - while (msg.isNull()) - { - msg = queueManager.pop(); - if (stop) return; - } + if (msg.isNull()) continue; if (stop) return; auto ret = sentryClient.send(msg, verbose); diff --git a/ws/ws_cobra_to_statsd.cpp b/ws/ws_cobra_to_statsd.cpp index c5797d66..c2b9b251 100644 --- a/ws/ws_cobra_to_statsd.cpp +++ b/ws/ws_cobra_to_statsd.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -16,6 +17,58 @@ #include #endif +namespace +{ + class QueueManager + { + public: + QueueManager(size_t maxQueueSize, + std::atomic &stop) : + _maxQueueSize(maxQueueSize), + _stop(stop) {} + + Json::Value pop(); + void add(Json::Value msg); + + private: + std::queue _queue; + std::mutex _mutex; + std::condition_variable _condition; + size_t _maxQueueSize; + std::atomic& _stop; + }; + + Json::Value QueueManager::pop() + { + std::unique_lock lock(_mutex); + + if (_queue.empty()) + { + Json::Value val; + return val; + } + + _condition.wait(lock, [this] { return !_stop; }); + + auto msg = _queue.front(); + _queue.pop(); + return msg; + } + + void QueueManager::add(Json::Value msg) + { + std::unique_lock lock(_mutex); + + // if the sending is not fast enough there is no point + // in queuing too many events. + if (_queue.size() < _maxQueueSize) + { + _queue.push(msg); + _condition.notify_one(); + } + } +} + namespace ix { // fields are command line argument that can be specified multiple times @@ -79,27 +132,72 @@ namespace ix auto tokens = parseFields(fields); - // statsd client - // test with netcat as a server: `nc -ul 8125` - bool statsdBatch = true; -#ifndef _WIN32 - statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch); -#else - int statsdClient; -#endif - Json::FastWriter jsonWriter; - uint64_t msgCount = 0; + std::atomic sentCount(0); + std::atomic receivedCount(0); + std::atomic stop(false); + + size_t maxQueueSize = 1000; + QueueManager queueManager(maxQueueSize, stop); + + auto timer = [&sentCount, &receivedCount] { + while (true) + { + spdlog::info("messages received {} sent {}", receivedCount, sentCount); + + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + std::thread t1(timer); + + auto statsdSender = [&queueManager, + &host, + &port, + &sentCount, + &tokens, + &prefix, + &stop] { + // statsd client + // test with netcat as a server: `nc -ul 8125` + bool statsdBatch = true; +#ifndef _WIN32 + statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch); +#else + int statsdClient; +#endif + while (true) + { + Json::Value msg = queueManager.pop(); + + if (msg.isNull()) continue; + if (stop) return; + + std::string id; + for (auto&& attr : tokens) + { + id += "."; + id += extractAttr(attr, msg); + } + + sentCount += 1; + +#ifndef _WIN32 + statsdClient.count(id, 1); +#endif + } + }; + + std::thread t2(statsdSender); conn.setEventCallback([&conn, &channel, &filter, &jsonWriter, - &statsdClient, verbose, - &tokens, - &prefix, - &msgCount](ix::CobraConnectionEventType eventType, + &queueManager, + &receivedCount](ix::CobraConnectionEventType eventType, const std::string& errMsg, const ix::WebSocketHttpHeaders& headers, const std::string& subscriptionId, @@ -122,25 +220,17 @@ namespace ix spdlog::info("Subscriber authenticated"); conn.subscribe(channel, filter, - [&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount]( + [&jsonWriter, &queueManager, verbose, &receivedCount]( const Json::Value& msg) { if (verbose) { spdlog::info(jsonWriter.write(msg)); } - std::string id; - for (auto&& attr : tokens) - { - id += "."; - id += extractAttr(attr, msg); - } + receivedCount++; - spdlog::info("{} {}{}", msgCount++, prefix, id); - -#ifndef _WIN32 - statsdClient.count(id, 1); -#endif + ++receivedCount; + queueManager.add(msg); }); } else if (eventType == ix::CobraConnection_EventType_Subscribed)