(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
This commit is contained in:
parent
f4af84dc06
commit
299dc0452e
@ -1,6 +1,10 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
All changes to this project will be documented in this file.
|
All changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [7.8.6] - 2019-12-28
|
||||||
|
|
||||||
|
(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
|
||||||
|
|
||||||
## [7.8.5] - 2019-12-28
|
## [7.8.5] - 2019-12-28
|
||||||
|
|
||||||
(ws cobra to sentry) handle null events for empty queues
|
(ws cobra to sentry) handle null events for empty queues
|
||||||
|
@ -6,4 +6,4 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#define IX_WEBSOCKET_VERSION "7.8.5"
|
#define IX_WEBSOCKET_VERSION "7.8.6"
|
||||||
|
@ -22,7 +22,10 @@ namespace ix
|
|||||||
class QueueManager
|
class QueueManager
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
QueueManager(size_t maxQueueSize, std::atomic<bool> &stop) : _maxQueueSize(maxQueueSize), _stop(stop) {}
|
QueueManager(size_t maxQueueSize,
|
||||||
|
std::atomic<bool> &stop) :
|
||||||
|
_maxQueueSize(maxQueueSize),
|
||||||
|
_stop(stop) {}
|
||||||
|
|
||||||
Json::Value pop();
|
Json::Value pop();
|
||||||
void add(Json::Value msg);
|
void add(Json::Value msg);
|
||||||
@ -146,11 +149,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
Json::Value msg = queueManager.pop();
|
Json::Value msg = queueManager.pop();
|
||||||
|
|
||||||
while (msg.isNull())
|
if (msg.isNull()) continue;
|
||||||
{
|
|
||||||
msg = queueManager.pop();
|
|
||||||
if (stop) return;
|
|
||||||
}
|
|
||||||
if (stop) return;
|
if (stop) return;
|
||||||
|
|
||||||
auto ret = sentryClient.send(msg, verbose);
|
auto ret = sentryClient.send(msg, verbose);
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <condition_variable>
|
||||||
#include <ixcobra/IXCobraConnection.h>
|
#include <ixcobra/IXCobraConnection.h>
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
@ -16,6 +17,58 @@
|
|||||||
#include <statsd_client.h>
|
#include <statsd_client.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
class QueueManager
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
QueueManager(size_t maxQueueSize,
|
||||||
|
std::atomic<bool> &stop) :
|
||||||
|
_maxQueueSize(maxQueueSize),
|
||||||
|
_stop(stop) {}
|
||||||
|
|
||||||
|
Json::Value pop();
|
||||||
|
void add(Json::Value msg);
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::queue<Json::Value> _queue;
|
||||||
|
std::mutex _mutex;
|
||||||
|
std::condition_variable _condition;
|
||||||
|
size_t _maxQueueSize;
|
||||||
|
std::atomic<bool>& _stop;
|
||||||
|
};
|
||||||
|
|
||||||
|
Json::Value QueueManager::pop()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
|
||||||
|
if (_queue.empty())
|
||||||
|
{
|
||||||
|
Json::Value val;
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
_condition.wait(lock, [this] { return !_stop; });
|
||||||
|
|
||||||
|
auto msg = _queue.front();
|
||||||
|
_queue.pop();
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void QueueManager::add(Json::Value msg)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
|
||||||
|
// if the sending is not fast enough there is no point
|
||||||
|
// in queuing too many events.
|
||||||
|
if (_queue.size() < _maxQueueSize)
|
||||||
|
{
|
||||||
|
_queue.push(msg);
|
||||||
|
_condition.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
{
|
{
|
||||||
// fields are command line argument that can be specified multiple times
|
// fields are command line argument that can be specified multiple times
|
||||||
@ -79,27 +132,72 @@ namespace ix
|
|||||||
|
|
||||||
auto tokens = parseFields(fields);
|
auto tokens = parseFields(fields);
|
||||||
|
|
||||||
// statsd client
|
|
||||||
// test with netcat as a server: `nc -ul 8125`
|
|
||||||
bool statsdBatch = true;
|
|
||||||
#ifndef _WIN32
|
|
||||||
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
|
|
||||||
#else
|
|
||||||
int statsdClient;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
Json::FastWriter jsonWriter;
|
Json::FastWriter jsonWriter;
|
||||||
uint64_t msgCount = 0;
|
std::atomic<uint64_t> sentCount(0);
|
||||||
|
std::atomic<uint64_t> receivedCount(0);
|
||||||
|
std::atomic<bool> stop(false);
|
||||||
|
|
||||||
|
size_t maxQueueSize = 1000;
|
||||||
|
QueueManager queueManager(maxQueueSize, stop);
|
||||||
|
|
||||||
|
auto timer = [&sentCount, &receivedCount] {
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
|
||||||
|
|
||||||
|
auto duration = std::chrono::seconds(1);
|
||||||
|
std::this_thread::sleep_for(duration);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::thread t1(timer);
|
||||||
|
|
||||||
|
auto statsdSender = [&queueManager,
|
||||||
|
&host,
|
||||||
|
&port,
|
||||||
|
&sentCount,
|
||||||
|
&tokens,
|
||||||
|
&prefix,
|
||||||
|
&stop] {
|
||||||
|
// statsd client
|
||||||
|
// test with netcat as a server: `nc -ul 8125`
|
||||||
|
bool statsdBatch = true;
|
||||||
|
#ifndef _WIN32
|
||||||
|
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
|
||||||
|
#else
|
||||||
|
int statsdClient;
|
||||||
|
#endif
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
Json::Value msg = queueManager.pop();
|
||||||
|
|
||||||
|
if (msg.isNull()) continue;
|
||||||
|
if (stop) return;
|
||||||
|
|
||||||
|
std::string id;
|
||||||
|
for (auto&& attr : tokens)
|
||||||
|
{
|
||||||
|
id += ".";
|
||||||
|
id += extractAttr(attr, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
sentCount += 1;
|
||||||
|
|
||||||
|
#ifndef _WIN32
|
||||||
|
statsdClient.count(id, 1);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::thread t2(statsdSender);
|
||||||
|
|
||||||
conn.setEventCallback([&conn,
|
conn.setEventCallback([&conn,
|
||||||
&channel,
|
&channel,
|
||||||
&filter,
|
&filter,
|
||||||
&jsonWriter,
|
&jsonWriter,
|
||||||
&statsdClient,
|
|
||||||
verbose,
|
verbose,
|
||||||
&tokens,
|
&queueManager,
|
||||||
&prefix,
|
&receivedCount](ix::CobraConnectionEventType eventType,
|
||||||
&msgCount](ix::CobraConnectionEventType eventType,
|
|
||||||
const std::string& errMsg,
|
const std::string& errMsg,
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
const ix::WebSocketHttpHeaders& headers,
|
||||||
const std::string& subscriptionId,
|
const std::string& subscriptionId,
|
||||||
@ -122,25 +220,17 @@ namespace ix
|
|||||||
spdlog::info("Subscriber authenticated");
|
spdlog::info("Subscriber authenticated");
|
||||||
conn.subscribe(channel,
|
conn.subscribe(channel,
|
||||||
filter,
|
filter,
|
||||||
[&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount](
|
[&jsonWriter, &queueManager, verbose, &receivedCount](
|
||||||
const Json::Value& msg) {
|
const Json::Value& msg) {
|
||||||
if (verbose)
|
if (verbose)
|
||||||
{
|
{
|
||||||
spdlog::info(jsonWriter.write(msg));
|
spdlog::info(jsonWriter.write(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string id;
|
receivedCount++;
|
||||||
for (auto&& attr : tokens)
|
|
||||||
{
|
|
||||||
id += ".";
|
|
||||||
id += extractAttr(attr, msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
spdlog::info("{} {}{}", msgCount++, prefix, id);
|
++receivedCount;
|
||||||
|
queueManager.add(msg);
|
||||||
#ifndef _WIN32
|
|
||||||
statsdClient.count(id, 1);
|
|
||||||
#endif
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (eventType == ix::CobraConnection_EventType_Subscribed)
|
else if (eventType == ix::CobraConnection_EventType_Subscribed)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user