cobra to statsd bot ported to windows + add unittest

This commit is contained in:
Benjamin Sergeant 2020-03-22 19:36:29 -07:00
parent 5ad54a8904
commit a0ffb2ba53
27 changed files with 653 additions and 689 deletions

View File

@ -36,6 +36,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketTLSOptions.cpp
ixwebsocket/IXUdpSocket.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXUserAgent.cpp
ixwebsocket/IXWebSocket.cpp
@ -69,6 +70,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketTLSOptions.h
ixwebsocket/IXUdpSocket.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXUtf8Validator.h
ixwebsocket/IXUserAgent.h

View File

@ -1,6 +1,10 @@
# Changelog
All changes to this project will be documented in this file.
## [8.3.3] - 2020-03-22
(cobra to statsd) port to windows and add a unittest
## [8.3.2] - 2020-03-20
(websocket+tls) fix hang in tls handshake which could lead to ANR, discovered through unittesting.

View File

@ -7,12 +7,14 @@ set (IXBOTS_SOURCES
ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp
ixbots/IXQueueManager.cpp
ixbots/IXStatsdClient.cpp
)
set (IXBOTS_HEADERS
ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h
ixbots/IXQueueManager.h
ixbots/IXStatsdClient.h
)
add_library(ixbots STATIC
@ -30,8 +32,6 @@ if (NOT SPDLOG_FOUND)
set(SPDLOG_INCLUDE_DIRS ../third_party/spdlog/include)
endif()
set(STATSD_CLIENT_INCLUDE_DIRS ../third_party/statsd-client-cpp/src)
set(IXBOTS_INCLUDE_DIRS
.
..
@ -39,7 +39,6 @@ set(IXBOTS_INCLUDE_DIRS
../ixcobra
../ixsentry
${JSONCPP_INCLUDE_DIRS}
${SPDLOG_INCLUDE_DIRS}
${STATSD_CLIENT_INCLUDE_DIRS})
${SPDLOG_INCLUDE_DIRS})
target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} )

View File

@ -1,7 +1,7 @@
/*
* IXCobraToSentryBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once

View File

@ -6,6 +6,7 @@
#include "IXCobraToStatsdBot.h"
#include "IXQueueManager.h"
#include "IXStatsdClient.h"
#include <atomic>
#include <chrono>
@ -16,10 +17,6 @@
#include <thread>
#include <vector>
#ifndef _WIN32
#include <statsd_client.h>
#endif
namespace ix
{
// fields are command line argument that can be specified multiple times
@ -63,11 +60,12 @@ namespace ix
const std::string& channel,
const std::string& filter,
const std::string& position,
const std::string& host,
int port,
const std::string& prefix,
StatsdClient& statsdClient,
const std::string& fields,
bool verbose)
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{
ix::CobraConnection conn;
conn.configure(config);
@ -80,11 +78,10 @@ namespace ix
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
size_t maxQueueSize = 1000;
QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, &receivedCount] {
while (true)
auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop)
{
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
@ -95,9 +92,11 @@ namespace ix
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount] {
auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
std::string state("na");
if (!enableHeartbeat) return;
while (true)
{
std::stringstream ss;
@ -120,21 +119,13 @@ namespace ix
std::thread t2(heartbeat);
auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] {
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop] {
while (true)
{
Json::Value msg = queueManager.pop();
if (msg.isNull()) continue;
if (stop) return;
if (msg.isNull()) continue;
std::string id;
for (auto&& attr : tokens)
@ -143,11 +134,8 @@ namespace ix
id += extractAttr(attr, msg);
}
sentCount += 1;
#ifndef _WIN32
statsdClient.count(id, 1);
#endif
sentCount += 1;
}
};
@ -214,12 +202,38 @@ namespace ix
}
});
while (true)
// Run forever
if (runtime == -1)
{
std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration);
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
}
return 0;
//
// Cleanup.
// join all the bg threads and stop them.
//
conn.disconnect();
stop = true;
t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
t3.join();
return (int) sentCount;
}
} // namespace ix

View File

@ -1,11 +1,12 @@
/*
* IXCobraToStatsdBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <ixbots/IXStatsdClient.h>
#include <string>
#include <stddef.h>
@ -15,9 +16,10 @@ namespace ix
const std::string& channel,
const std::string& filter,
const std::string& position,
const std::string& host,
int port,
const std::string& prefix,
StatsdClient& statsdClient,
const std::string& fields,
bool verbose);
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix

View File

@ -0,0 +1,154 @@
/*
* Copyright (c) 2014, Rex
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the {organization} nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* IXStatsdClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
// Adapted from statsd-client-cpp
// test with netcat as a server: `nc -ul 8125`
#include "IXStatsdClient.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
namespace ix
{
StatsdClient::StatsdClient(const string& host,
int port,
const string& prefix)
: _host(host)
, _port(port)
, _prefix(prefix)
, _stop(false)
{
_thread = std::thread([this] {
while (!_stop)
{
std::deque<std::string> staged_message_queue;
{
std::lock_guard<std::mutex> lock(_mutex);
batching_message_queue_.swap(staged_message_queue);
}
while (!staged_message_queue.empty())
{
auto message = staged_message_queue.front();
_socket.sendto(message);
staged_message_queue.pop_front();
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
}
StatsdClient::~StatsdClient()
{
_stop = true;
if (_thread.joinable()) _thread.join();
_socket.close();
}
bool StatsdClient::init(std::string& errMsg)
{
return _socket.init(_host, _port, errMsg);
}
/* will change the original string */
void StatsdClient::cleanup(string& key)
{
size_t pos = key.find_first_of(":|@");
while (pos != string::npos)
{
key[pos] = '_';
pos = key.find_first_of(":|@");
}
}
int StatsdClient::dec(const string& key)
{
return count(key, -1);
}
int StatsdClient::inc(const string& key)
{
return count(key, 1);
}
int StatsdClient::count(const string& key, size_t value)
{
return send(key, value, "c");
}
int StatsdClient::gauge(const string& key, size_t value)
{
return send(key, value, "g");
}
int StatsdClient::timing(const string& key, size_t ms)
{
return send(key, ms, "ms");
}
int StatsdClient::send(string key, size_t value, const string &type)
{
cleanup(key);
char buf[256];
snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
_prefix.c_str(), key.c_str(), value, type.c_str());
return send(buf);
}
int StatsdClient::send(const string &message)
{
std::lock_guard<std::mutex> lock(_mutex);
if (batching_message_queue_.empty() ||
batching_message_queue_.back().length() > max_batching_size)
{
batching_message_queue_.push_back(message);
}
else
{
(*batching_message_queue_.rbegin()).append("\n").append(message);
}
return 0;
}
} // end namespace ix

View File

@ -0,0 +1,62 @@
/*
* IXStatsdClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXUdpSocket.h>
#include <string>
#include <thread>
#include <deque>
#include <mutex>
#include <atomic>
namespace ix {
class StatsdClient {
public:
StatsdClient(const std::string& host="127.0.0.1",
int port=8125,
const std::string& prefix = "");
~StatsdClient();
bool init(std::string& errMsg);
int inc(const std::string& key);
int dec(const std::string& key);
int count(const std::string& key, size_t value);
int gauge(const std::string& key, size_t value);
int timing(const std::string& key, size_t ms);
private:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
int send(const std::string& message);
/* (Low Level Api) manually send a message
* type = "c", "g" or "ms"
*/
int send(std::string key, size_t value, const std::string& type);
void cleanup(std::string& key);
UdpSocket _socket;
std::string _host;
int _port;
std::string _prefix;
std::atomic<bool> _stop;
std::thread _thread;
std::mutex _mutex; // for the queue
std::deque<std::string> batching_message_queue_;
const uint64_t max_batching_size = 32768;
};
} // end namespace ix

View File

@ -19,6 +19,10 @@
#include "IXCobraConfig.h"
#ifdef max
#undef max
#endif
namespace ix
{
class WebSocket;

View File

@ -66,7 +66,7 @@ namespace ix
// Virtual methods
virtual bool accept(std::string& errMsg);
virtual bool connect(const std::string& url,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested);

125
ixwebsocket/IXUdpSocket.cpp Normal file
View File

@ -0,0 +1,125 @@
/*
* IXUdpSocket.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXUdpSocket.h"
#include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
#include "IXSocketConnect.h"
#include <algorithm>
#include <assert.h>
#include <fcntl.h>
#include <sstream>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#ifdef min
#undef min
#endif
namespace ix
{
UdpSocket::UdpSocket(int fd)
: _sockfd(fd)
, _selectInterrupt(createSelectInterrupt())
{
;
}
UdpSocket::~UdpSocket()
{
close();
}
void UdpSocket::close()
{
std::lock_guard<std::mutex> lock(_socketMutex);
if (_sockfd == -1) return;
closeSocket(_sockfd);
_sockfd = -1;
}
int UdpSocket::getErrno()
{
int err;
#ifdef _WIN32
err = WSAGetLastError();
#else
err = errno;
#endif
return err;
}
bool UdpSocket::isWaitNeeded()
{
int err = getErrno();
if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
{
return true;
}
return false;
}
void UdpSocket::closeSocket(int fd)
{
#ifdef _WIN32
closesocket(fd);
#else
::close(fd);
#endif
}
bool UdpSocket::init(const std::string& host, int port, std::string& errMsg)
{
_sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (_sockfd < 0)
{
errMsg = "Could not create socket";
return false;
}
memset(&_server, 0, sizeof(_server));
_server.sin_family = AF_INET;
_server.sin_port = htons(port);
// DNS resolution.
struct addrinfo hints, *result = nullptr;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
int ret = getaddrinfo(host.c_str(), nullptr, &hints, &result);
if (ret != 0)
{
errMsg = strerror(UdpSocket::getErrno());
freeaddrinfo(result);
close();
return false;
}
struct sockaddr_in* host_addr = (struct sockaddr_in*) result->ai_addr;
memcpy(&_server.sin_addr, &host_addr->sin_addr, sizeof(struct in_addr));
freeaddrinfo(result);
return true;
}
ssize_t UdpSocket::sendto(const std::string& buffer)
{
return (ssize_t)::sendto(
_sockfd, buffer.data(), buffer.size(), 0, (struct sockaddr*) &_server, sizeof(_server));
}
} // namespace ix

65
ixwebsocket/IXUdpSocket.h Normal file
View File

@ -0,0 +1,65 @@
/*
* IXUdpSocket.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#undef EWOULDBLOCK
#undef EAGAIN
#undef EINPROGRESS
#undef EBADF
#undef EINVAL
// map to WSA error codes
#define EWOULDBLOCK WSAEWOULDBLOCK
#define EAGAIN WSATRY_AGAIN
#define EINPROGRESS WSAEINPROGRESS
#define EBADF WSAEBADF
#define EINVAL WSAEINVAL
#endif
#include "IXCancellationRequest.h"
#include "IXNetSystem.h"
namespace ix
{
class SelectInterrupt;
class UdpSocket
{
public:
UdpSocket(int fd = -1);
virtual ~UdpSocket();
// Virtual methods
bool init(const std::string& host, int port, std::string& errMsg);
ssize_t sendto(const std::string& buffer);
virtual void close();
static int getErrno();
static bool isWaitNeeded();
static void closeSocket(int fd);
protected:
std::atomic<int> _sockfd;
std::mutex _socketMutex;
struct sockaddr_in _server;
private:
std::shared_ptr<SelectInterrupt> _selectInterrupt;
};
} // namespace ix

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "8.3.2"
#define IX_WEBSOCKET_VERSION "8.3.3"

View File

@ -40,31 +40,31 @@ set (SOURCES
IXGetFreePort.cpp
../third_party/msgpack11/msgpack11.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
IXWebSocketServerTest.cpp
IXWebSocketTestConnectionDisconnection.cpp
IXUrlParserTest.cpp
IXWebSocketServerTest.cpp
IXHttpClientTest.cpp
IXHttpServerTest.cpp
IXUnityBuildsTest.cpp
IXHttpTest.cpp
IXDNSLookupTest.cpp
IXWebSocketSubProtocolTest.cpp
IXSentryClientTest.cpp
IXWebSocketChatTest.cpp
#IXSocketTest.cpp
#IXSocketConnectTest.cpp
#IXWebSocketServerTest.cpp
#IXWebSocketTestConnectionDisconnection.cpp
#IXUrlParserTest.cpp
#IXWebSocketServerTest.cpp
#IXHttpClientTest.cpp
#IXHttpServerTest.cpp
#IXUnityBuildsTest.cpp
#IXHttpTest.cpp
#IXDNSLookupTest.cpp
#IXWebSocketSubProtocolTest.cpp
#IXSentryClientTest.cpp
#IXWebSocketChatTest.cpp
)
# Some unittest don't work on windows yet
# Windows without TLS does not have hmac yet
if (UNIX)
list(APPEND SOURCES
IXWebSocketCloseTest.cpp
# Windows without TLS does not have hmac yet
IXCobraChatTest.cpp
IXCobraMetricsPublisherTest.cpp
IXCobraToSentryBotTest.cpp
# IXWebSocketCloseTest.cpp
# IXCobraChatTest.cpp
# IXCobraMetricsPublisherTest.cpp
# IXCobraToSentryBotTest.cpp
IXCobraToStatsdBotTest.cpp
)
endif()

View File

@ -1,7 +1,7 @@
/*
* cmd_satori_chat.cpp
* IXCobraToSentryTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"

View File

@ -0,0 +1,141 @@
/*
* IXCobraToStatsdTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <chrono>
#include <iostream>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
using namespace ix;
namespace
{
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.configure(config, channel);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true);
Json::Value msg;
msg["fps"] = 60;
cobraMetricsPublisher.setGenericAttributes("game", "ody");
// Wait a bit
ix::msleep(500);
// publish some messages
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
ix::msleep(500);
}
} // namespace
TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// Start a fake statsd server (ultimately)
// Run the bot for a small amount of time
std::string channel = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::string endpoint = makeCobraEndpoint(port, true);
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
std::thread publisherThread(runPublisher, config, channel);
std::string filter;
std::string position("$");
bool verbose = true;
size_t maxQueueSize = 10;
bool enableHeartbeat = false;
// Only run the bot for 3 seconds
int runtime = 3;
std::string hostname("127.0.0.1");
// std::string hostname("www.google.com");
int statsdPort = 8125;
std::string prefix("ix.test");
StatsdClient statsdClient(hostname, statsdPort, prefix);
std::string errMsg;
bool initialized = statsdClient.init(errMsg);
if (!initialized)
{
spdlog::error(errMsg);
}
REQUIRE(initialized);
std::string fields("device.game\ndevice.os_name");
int sentCount = ix::cobra_to_statsd_bot(config,
channel,
filter,
position,
statsdClient,
fields,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
//
// We want at least 2 messages to be sent
//
REQUIRE(sentCount >= 2);
// Give us 1s for all messages to be received
ix::msleep(1000);
spdlog::info("Stopping snake server...");
snakeServer.stop();
spdlog::info("Stopping redis server...");
redisServer.stop();
publisherThread.join();
}
}

View File

@ -1,13 +0,0 @@
# Compiled Object files
*.slo
*.lo
*.o
# Compiled Dynamic libraries
*.so
*.dylib
# Compiled Static libraries
*.lai
*.la
*.a

View File

@ -1,18 +0,0 @@
cmake_minimum_required(VERSION 3.1)
project(helloCLion)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
include_directories(
src
)
add_library(statsdcppclient STATIC src/statsd_client.cpp)
add_definitions("-fPIC")
target_link_libraries(statsdcppclient pthread)
add_executable(system_monitor demo/system_monitor.cpp)
target_link_libraries(system_monitor statsdcppclient)
add_executable(test_client demo/test_client.cpp)
target_link_libraries(test_client statsdcppclient)

View File

@ -1,27 +0,0 @@
Copyright (c) 2014, Rex
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the {organization} nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,34 +0,0 @@
# a client sdk for StatsD, written in C++
## API
See [header file](src/statsd_client.h) for more api detail.
** Notice: this client is not thread-safe **
## Demo
### test\_client
This simple demo shows how the use this client.
### system\_monitor
This is a daemon for monitoring a Linux system.
It'll wake up every minute and monitor the following:
* load
* cpu
* free memory
* free swap (disabled)
* received bytes
* transmitted bytes
* procs
* uptime
The stats sent to statsd will be in "host.MACAddress" namespace.
Usage:
system_monitor statsd-host interface-to-monitor
e.g.
`system_monitor 172.16.42.1 eth0`

View File

@ -1,164 +0,0 @@
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
#include <string.h>
#include <netdb.h>
#include <sys/sysinfo.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <net/if.h>
#include <string>
#include <vector>
#include "statsd_client.h"
using namespace std;
static int running = 1;
void sigterm(int sig)
{
running = 0;
}
string localhost() {
struct addrinfo hints, *info, *p;
string hostname(1024, '\0');
gethostname((char*)hostname.data(), hostname.capacity());
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC; /*either IPV4 or IPV6*/
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_CANONNAME;
if ( getaddrinfo(hostname.c_str(), "http", &hints, &info) == 0) {
for(p = info; p != NULL; p = p->ai_next) {
hostname = p->ai_canonname;
}
freeaddrinfo(info);
}
string::size_type pos = hostname.find(".");
while ( pos != string::npos )
{
hostname[pos] = '_';
pos = hostname.find(".", pos);
}
return hostname;
}
vector<string>& StringSplitTrim(const string& sData,
const string& sDelim, vector<string>& vItems)
{
vItems.clear();
string::size_type bpos = 0;
string::size_type epos = 0;
string::size_type nlen = sDelim.size();
while(sData.substr(epos,nlen) == sDelim)
{
epos += nlen;
}
bpos = epos;
while ((epos=sData.find(sDelim, epos)) != string::npos)
{
vItems.push_back(sData.substr(bpos, epos-bpos));
epos += nlen;
while(sData.substr(epos,nlen) == sDelim)
{
epos += nlen;
}
bpos = epos;
}
if(bpos != sData.size())
{
vItems.push_back(sData.substr(bpos, sData.size()-bpos));
}
return vItems;
}
int main(int argc, char *argv[])
{
FILE *net, *stat;
struct sysinfo si;
char line[256];
unsigned int user, nice, sys, idle, total, busy, old_total=0, old_busy=0;
if (argc != 3) {
printf( "Usage: %s host port\n"
"Eg: %s 127.0.0.1 8125\n",
argv[0], argv[0]);
exit(1);
}
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, SIG_IGN); /* will save one syscall per sleep */
signal(SIGTERM, sigterm);
if ( (net = fopen("/proc/net/dev", "r")) == NULL) {
perror("fopen");
exit(-1);
}
if ( (stat = fopen("/proc/stat", "r")) == NULL) {
perror("fopen");
exit(-1);
}
string ns = string("host.") + localhost().c_str() + ".";
statsd::StatsdClient client(argv[1], atoi(argv[2]), ns);
daemon(0,0);
printf("running in background.\n");
while(running) {
rewind(net);
vector<string> items;
while(!feof(net)) {
fgets(line, sizeof(line), net);
StringSplitTrim(line, " ", items);
if ( items.size() < 17 ) continue;
if ( items[0].find(":") == string::npos ) continue;
if ( items[1] == "0" and items[9] == "0" ) continue;
string netface = "network."+items[0].erase( items[0].find(":") );
client.count( netface+".receive.bytes", atoll(items[1].c_str()) );
client.count( netface+".receive.packets", atoll(items[2].c_str()) );
client.count( netface+".transmit.bytes", atoll(items[9].c_str()) );
client.count( netface+".transmit.packets", atoll(items[10].c_str()) );
}
sysinfo(&si);
client.gauge("system.load", 100*si.loads[0]/0x10000);
client.gauge("system.freemem", si.freeram/1024);
client.gauge("system.procs", si.procs);
client.count("system.uptime", si.uptime);
/* rewind doesn't do the trick for /proc/stat */
freopen("/proc/stat", "r", stat);
fgets(line, sizeof(line), stat);
sscanf(line, "cpu %u %u %u %u", &user, &nice, &sys, &idle);
total = user + sys + idle;
busy = user + sys;
client.send("system.cpu", 100 * (busy - old_busy)/(total - old_total), "g", 1.0);
old_total = total;
old_busy = busy;
sleep(6);
}
fclose(net);
fclose(stat);
exit(0);
}

View File

@ -1,28 +0,0 @@
#include <iostream>
#include <unistd.h>
#include "statsd_client.h"
int main(void)
{
std::cout << "running..." << std::endl;
statsd::StatsdClient client;
statsd::StatsdClient client2("127.0.0.1", 8125, "myproject.abx.", true);
client.count("count1", 123, 1.0);
client.count("count2", 125, 1.0);
client.gauge("speed", 10);
int i;
for (i=0; i<1000; i++)
client2.timing("request", i);
sleep(1);
client.inc("count1", 1.0);
client2.dec("count2", 1.0);
// for(i=0; i<1000; i++) {
// client2.count("count3", i, 0.8);
// }
std::cout << "done" << std::endl;
return 0;
}

View File

@ -1,245 +0,0 @@
#include <math.h>
#include <netdb.h>
#include <time.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <netinet/in.h>
#include "statsd_client.h"
using namespace std;
namespace statsd {
inline bool fequal(float a, float b)
{
const float epsilon = 0.0001;
return ( fabs(a - b) < epsilon );
}
inline bool should_send(float sample_rate)
{
if ( fequal(sample_rate, 1.0) )
{
return true;
}
float p = ((float)random() / RAND_MAX);
return sample_rate > p;
}
struct _StatsdClientData {
int sock;
struct sockaddr_in server;
string ns;
string host;
short port;
bool init;
char errmsg[1024];
};
StatsdClient::StatsdClient(const string& host,
int port,
const string& ns,
const bool batching)
: batching_(batching), exit_(false)
{
d = new _StatsdClientData;
d->sock = -1;
config(host, port, ns);
srandom((unsigned) time(NULL));
if (batching_) {
pthread_mutex_init(&batching_mutex_lock_, nullptr);
batching_thread_ = std::thread([this] {
while (!exit_) {
std::deque<std::string> staged_message_queue;
pthread_mutex_lock(&batching_mutex_lock_);
batching_message_queue_.swap(staged_message_queue);
pthread_mutex_unlock(&batching_mutex_lock_);
while(!staged_message_queue.empty()) {
send_to_daemon(staged_message_queue.front());
staged_message_queue.pop_front();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
}
}
StatsdClient::~StatsdClient()
{
if (batching_) {
exit_ = true;
batching_thread_.join();
pthread_mutex_destroy(&batching_mutex_lock_);
}
// close socket
if (d->sock >= 0) {
close(d->sock);
d->sock = -1;
delete d;
d = NULL;
}
}
void StatsdClient::config(const string& host, int port, const string& ns)
{
d->ns = ns;
d->host = host;
d->port = port;
d->init = false;
if ( d->sock >= 0 ) {
close(d->sock);
}
d->sock = -1;
}
int StatsdClient::init()
{
if ( d->init ) return 0;
d->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if ( d->sock == -1 ) {
snprintf(d->errmsg, sizeof(d->errmsg), "could not create socket, err=%s", strerror(errno));
return -1;
}
memset(&d->server, 0, sizeof(d->server));
d->server.sin_family = AF_INET;
d->server.sin_port = htons(d->port);
int ret = inet_aton(d->host.c_str(), &d->server.sin_addr);
if ( ret == 0 )
{
// host must be a domain, get it from internet
struct addrinfo hints, *result = NULL;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
ret = getaddrinfo(d->host.c_str(), NULL, &hints, &result);
if ( ret ) {
close(d->sock);
d->sock = -1;
snprintf(d->errmsg, sizeof(d->errmsg),
"getaddrinfo fail, error=%d, msg=%s", ret, gai_strerror(ret) );
return -2;
}
struct sockaddr_in* host_addr = (struct sockaddr_in*)result->ai_addr;
memcpy(&d->server.sin_addr, &host_addr->sin_addr, sizeof(struct in_addr));
freeaddrinfo(result);
}
d->init = true;
return 0;
}
/* will change the original string */
void StatsdClient::cleanup(string& key)
{
size_t pos = key.find_first_of(":|@");
while ( pos != string::npos )
{
key[pos] = '_';
pos = key.find_first_of(":|@");
}
}
int StatsdClient::dec(const string& key, float sample_rate)
{
return count(key, -1, sample_rate);
}
int StatsdClient::inc(const string& key, float sample_rate)
{
return count(key, 1, sample_rate);
}
int StatsdClient::count(const string& key, size_t value, float sample_rate)
{
return send(key, value, "c", sample_rate);
}
int StatsdClient::gauge(const string& key, size_t value, float sample_rate)
{
return send(key, value, "g", sample_rate);
}
int StatsdClient::timing(const string& key, size_t ms, float sample_rate)
{
return send(key, ms, "ms", sample_rate);
}
int StatsdClient::send(string key, size_t value, const string &type, float sample_rate)
{
if (!should_send(sample_rate)) {
return 0;
}
cleanup(key);
char buf[256];
if ( fequal( sample_rate, 1.0 ) )
{
snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
d->ns.c_str(), key.c_str(), value, type.c_str());
}
else
{
snprintf(buf, sizeof(buf), "%s%s:%zd|%s|@%.2f",
d->ns.c_str(), key.c_str(), value, type.c_str(), sample_rate);
}
return send(buf);
}
int StatsdClient::send(const string &message)
{
if (batching_) {
pthread_mutex_lock(&batching_mutex_lock_);
if (batching_message_queue_.empty() ||
batching_message_queue_.back().length() > max_batching_size) {
batching_message_queue_.push_back(message);
} else {
(*batching_message_queue_.rbegin()).append("\n").append(message);
}
pthread_mutex_unlock(&batching_mutex_lock_);
return 0;
} else {
return send_to_daemon(message);
}
}
int StatsdClient::send_to_daemon(const string &message) {
int ret = init();
if ( ret )
{
return ret;
}
ret = (int) sendto(d->sock, message.data(), message.size(), 0, (struct sockaddr *) &d->server, sizeof(d->server));
if ( ret == -1) {
snprintf(d->errmsg, sizeof(d->errmsg),
"sendto server fail, host=%s:%d, err=%s", d->host.c_str(), d->port, strerror(errno));
return -1;
}
return 0;
}
const char* StatsdClient::errmsg()
{
return d->errmsg;
}
}

View File

@ -1,66 +0,0 @@
#ifndef STATSD_CLIENT_H
#define STATSD_CLIENT_H
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>
#include <string>
#include <thread>
#include <deque>
#include <iostream>
namespace statsd {
struct _StatsdClientData;
class StatsdClient {
public:
StatsdClient(const std::string& host="127.0.0.1", int port=8125, const std::string& ns = "", const bool batching = false);
~StatsdClient();
public:
// you can config at anytime; client will use new address (useful for Singleton)
void config(const std::string& host, int port, const std::string& ns = "");
const char* errmsg();
int send_to_daemon(const std::string &);
public:
int inc(const std::string& key, float sample_rate = 1.0);
int dec(const std::string& key, float sample_rate = 1.0);
int count(const std::string& key, size_t value, float sample_rate = 1.0);
int gauge(const std::string& key, size_t value, float sample_rate = 1.0);
int timing(const std::string& key, size_t ms, float sample_rate = 1.0);
public:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
int send(const std::string& message);
/* (Low Level Api) manually send a message
* type = "c", "g" or "ms"
*/
int send(std::string key, size_t value,
const std::string& type, float sample_rate);
protected:
int init();
void cleanup(std::string& key);
protected:
struct _StatsdClientData* d;
bool batching_;
bool exit_;
pthread_mutex_t batching_mutex_lock_;
std::thread batching_thread_;
std::deque<std::string> batching_message_queue_;
const uint64_t max_batching_size = 32768;
};
} // end namespace
#endif

View File

@ -225,25 +225,3 @@ endif()
if(NOT SKIP_INSTALL_FILES AND NOT SKIP_INSTALL_ALL )
install(FILES ${ZLIB_PC} DESTINATION "${INSTALL_PKGCONFIG_DIR}")
endif()
#============================================================================
# Example binaries
#============================================================================
add_executable(example test/example.c)
target_link_libraries(example zlib)
add_test(example example)
add_executable(minigzip test/minigzip.c)
target_link_libraries(minigzip zlib)
if(HAVE_OFF64_T)
add_executable(example64 test/example.c)
target_link_libraries(example64 zlib)
set_target_properties(example64 PROPERTIES COMPILE_FLAGS "-D_FILE_OFFSET_BITS=64")
add_test(example64 example64)
add_executable(minigzip64 test/minigzip.c)
target_link_libraries(minigzip64 zlib)
set_target_properties(minigzip64 PROPERTIES COMPILE_FLAGS "-D_FILE_OFFSET_BITS=64")
endif()

View File

@ -21,16 +21,11 @@ option(USE_TLS "Add TLS support" ON)
include_directories(ws .)
include_directories(ws ..)
include_directories(ws ../third_party)
include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise)
add_definitions(-DSPDLOG_COMPILED_LIB=1)
if (UNIX)
set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp)
endif()
find_package(JsonCpp)
if (NOT JSONCPP_FOUND)
include_directories(../third_party/jsoncpp)
@ -39,7 +34,6 @@ endif()
add_executable(ws
../third_party/msgpack11/msgpack11.cpp
${STATSD_CLIENT_SOURCES}
${JSONCPP_SOURCES}
ws_http_client.cpp

View File

@ -269,6 +269,9 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
cobra2statsd->add_option("--position", position, "Stream position");
cobra2statsd->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd);
@ -450,8 +453,20 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_to_statsd"))
{
ret = ix::cobra_to_statsd_bot(
cobraConfig, channel, filter, position, hostname, statsdPort, prefix, fields, verbose);
bool enableHeartbeat = true;
int runtime = -1;
ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
ret = ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
}
else if (app.got_subcommand("cobra_to_sentry"))
{