From c317100b47847f7ba512c7fbaa3222860bb91fed Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 4 Jun 2020 09:35:55 -0700 Subject: [PATCH] (statsd cobra bots) statsd improvement: prefix does not need a dot as a suffix, message size can be larger than 256 bytes, error handling was invalid, use core logger for logging instead of std::cerr --- docs/CHANGELOG.md | 4 ++++ ixbots/ixbots/IXCobraToStatsdBot.cpp | 8 +++++++- ixbots/ixbots/IXStatsdClient.cpp | 21 +++++++++++++-------- ixwebsocket/IXWebSocketVersion.h | 2 +- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index eee2f3d3..86a0b6f0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.6.6] - 2020-06-04 + +(statsd cobra bots) statsd improvement: prefix does not need a dot as a suffix, message size can be larger than 256 bytes, error handling was invalid, use core logger for logging instead of std::cerr + ## [9.6.5] - 2020-05-29 (http server) support gzip compression diff --git a/ixbots/ixbots/IXCobraToStatsdBot.cpp b/ixbots/ixbots/IXCobraToStatsdBot.cpp index 138f2566..1cc6793b 100644 --- a/ixbots/ixbots/IXCobraToStatsdBot.cpp +++ b/ixbots/ixbots/IXCobraToStatsdBot.cpp @@ -70,11 +70,17 @@ namespace ix std::atomic& fatalCobraError, std::atomic& sentCount) -> void { std::string id; + int idx = 0; for (auto&& attr : tokens) { - id += "."; auto val = extractAttr(attr, msg); id += val.asString(); + + // We add a dot separator unless we are processing the last token + if (idx++ != tokens.size() - 1) + { + id += "."; + } } if (gauge.empty() && timer.empty()) diff --git a/ixbots/ixbots/IXStatsdClient.cpp b/ixbots/ixbots/IXStatsdClient.cpp index 762624fb..bc98951e 100644 --- a/ixbots/ixbots/IXStatsdClient.cpp +++ b/ixbots/ixbots/IXStatsdClient.cpp @@ -39,9 +39,10 @@ #include "IXStatsdClient.h" -#include #include -#include +#include +#include +#include #include #include @@ -54,6 +55,8 @@ namespace ix , _stop(false) { _thread = std::thread([this] { + setThreadName("Statsd"); + while (!_stop) { flushQueue(); @@ -115,11 +118,10 @@ namespace ix { cleanup(key); - char buf[256]; - snprintf( - buf, sizeof(buf), "%s%s:%zd|%s\n", _prefix.c_str(), key.c_str(), value, type.c_str()); + std::stringstream ss; + ss << _prefix << "." << key << ":" << value << "|" << type << "\n"; - enqueue(buf); + enqueue(ss.str()); return 0; } @@ -137,10 +139,13 @@ namespace ix { auto message = _queue.front(); auto ret = _socket.sendto(message); - if (ret != 0) + if (ret == -1) { - std::cerr << "error: " << strerror(UdpSocket::getErrno()) << std::endl; + CoreLogger::error(std::string("statsd error: ") + strerror(UdpSocket::getErrno())); } + + // we always dequeue regardless of the ability to send the message + // so that we keep our queue size under control _queue.pop_front(); } } diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index aca86d7e..2f99b622 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.6.5" +#define IX_WEBSOCKET_VERSION "9.6.6"