(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd

This commit is contained in:
Benjamin Sergeant 2019-12-28 17:28:05 -08:00
parent d89d152ad7
commit 8b5e42fe84
4 changed files with 126 additions and 33 deletions

View File

@ -1,6 +1,10 @@
# Changelog
All changes to this project will be documented in this file.
## [7.8.6] - 2019-12-28
(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
## [7.8.5] - 2019-12-28
(ws cobra to sentry) handle null events for empty queues

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "7.8.5"
#define IX_WEBSOCKET_VERSION "7.8.6"

View File

@ -22,7 +22,10 @@ namespace ix
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool> &stop) : _maxQueueSize(maxQueueSize), _stop(stop) {}
QueueManager(size_t maxQueueSize,
std::atomic<bool> &stop) :
_maxQueueSize(maxQueueSize),
_stop(stop) {}
Json::Value pop();
void add(Json::Value msg);
@ -146,11 +149,7 @@ namespace ix
{
Json::Value msg = queueManager.pop();
while (msg.isNull())
{
msg = queueManager.pop();
if (stop) return;
}
if (msg.isNull()) continue;
if (stop) return;
auto ret = sentryClient.send(msg, verbose);

View File

@ -6,6 +6,7 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -16,6 +17,58 @@
#include <statsd_client.h>
#endif
namespace
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize,
std::atomic<bool> &stop) :
_maxQueueSize(maxQueueSize),
_stop(stop) {}
Json::Value pop();
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queue.empty())
{
Json::Value val;
return val;
}
_condition.wait(lock, [this] { return !_stop; });
auto msg = _queue.front();
_queue.pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
}
}
namespace ix
{
// fields are command line argument that can be specified multiple times
@ -79,27 +132,72 @@ namespace ix
auto tokens = parseFields(fields);
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
Json::FastWriter jsonWriter;
uint64_t msgCount = 0;
std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
size_t maxQueueSize = 1000;
QueueManager queueManager(maxQueueSize, stop);
auto timer = [&sentCount, &receivedCount] {
while (true)
{
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
auto statsdSender = [&queueManager,
&host,
&port,
&sentCount,
&tokens,
&prefix,
&stop] {
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
while (true)
{
Json::Value msg = queueManager.pop();
if (msg.isNull()) continue;
if (stop) return;
std::string id;
for (auto&& attr : tokens)
{
id += ".";
id += extractAttr(attr, msg);
}
sentCount += 1;
#ifndef _WIN32
statsdClient.count(id, 1);
#endif
}
};
std::thread t2(statsdSender);
conn.setEventCallback([&conn,
&channel,
&filter,
&jsonWriter,
&statsdClient,
verbose,
&tokens,
&prefix,
&msgCount](ix::CobraConnectionEventType eventType,
&queueManager,
&receivedCount](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
@ -122,25 +220,17 @@ namespace ix
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount](
[&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
}
std::string id;
for (auto&& attr : tokens)
{
id += ".";
id += extractAttr(attr, msg);
}
receivedCount++;
spdlog::info("{} {}{}", msgCount++, prefix, id);
#ifndef _WIN32
statsdClient.count(id, 1);
#endif
++receivedCount;
queueManager.add(msg);
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)