cobra to statsd bot ported to windows + add unittest
This commit is contained in:
		@@ -36,6 +36,7 @@ set( IXWEBSOCKET_SOURCES
 | 
			
		||||
    ixwebsocket/IXSocketFactory.cpp
 | 
			
		||||
    ixwebsocket/IXSocketServer.cpp
 | 
			
		||||
    ixwebsocket/IXSocketTLSOptions.cpp
 | 
			
		||||
    ixwebsocket/IXUdpSocket.cpp
 | 
			
		||||
    ixwebsocket/IXUrlParser.cpp
 | 
			
		||||
    ixwebsocket/IXUserAgent.cpp
 | 
			
		||||
    ixwebsocket/IXWebSocket.cpp
 | 
			
		||||
@@ -69,6 +70,7 @@ set( IXWEBSOCKET_HEADERS
 | 
			
		||||
    ixwebsocket/IXSocketFactory.h
 | 
			
		||||
    ixwebsocket/IXSocketServer.h
 | 
			
		||||
    ixwebsocket/IXSocketTLSOptions.h
 | 
			
		||||
    ixwebsocket/IXUdpSocket.h
 | 
			
		||||
    ixwebsocket/IXUrlParser.h
 | 
			
		||||
    ixwebsocket/IXUtf8Validator.h
 | 
			
		||||
    ixwebsocket/IXUserAgent.h
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,10 @@
 | 
			
		||||
# Changelog
 | 
			
		||||
All changes to this project will be documented in this file.
 | 
			
		||||
 | 
			
		||||
## [8.3.3] - 2020-03-22
 | 
			
		||||
 | 
			
		||||
(cobra to statsd) port to windows and add a unittest
 | 
			
		||||
 | 
			
		||||
## [8.3.2] - 2020-03-20
 | 
			
		||||
 | 
			
		||||
(websocket+tls) fix hang in tls handshake which could lead to ANR, discovered through unittesting.
 | 
			
		||||
 
 | 
			
		||||
@@ -7,12 +7,14 @@ set (IXBOTS_SOURCES
 | 
			
		||||
    ixbots/IXCobraToSentryBot.cpp
 | 
			
		||||
    ixbots/IXCobraToStatsdBot.cpp
 | 
			
		||||
    ixbots/IXQueueManager.cpp
 | 
			
		||||
    ixbots/IXStatsdClient.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
set (IXBOTS_HEADERS
 | 
			
		||||
    ixbots/IXCobraToSentryBot.h
 | 
			
		||||
    ixbots/IXCobraToStatsdBot.h
 | 
			
		||||
    ixbots/IXQueueManager.h
 | 
			
		||||
    ixbots/IXStatsdClient.h
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
add_library(ixbots STATIC
 | 
			
		||||
@@ -30,8 +32,6 @@ if (NOT SPDLOG_FOUND)
 | 
			
		||||
  set(SPDLOG_INCLUDE_DIRS ../third_party/spdlog/include)
 | 
			
		||||
endif()
 | 
			
		||||
 | 
			
		||||
set(STATSD_CLIENT_INCLUDE_DIRS ../third_party/statsd-client-cpp/src)
 | 
			
		||||
 | 
			
		||||
set(IXBOTS_INCLUDE_DIRS
 | 
			
		||||
    .
 | 
			
		||||
    ..
 | 
			
		||||
@@ -39,7 +39,6 @@ set(IXBOTS_INCLUDE_DIRS
 | 
			
		||||
    ../ixcobra
 | 
			
		||||
    ../ixsentry
 | 
			
		||||
    ${JSONCPP_INCLUDE_DIRS}
 | 
			
		||||
    ${SPDLOG_INCLUDE_DIRS}
 | 
			
		||||
    ${STATSD_CLIENT_INCLUDE_DIRS})
 | 
			
		||||
    ${SPDLOG_INCLUDE_DIRS})
 | 
			
		||||
 | 
			
		||||
target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} )
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXCobraToSentryBot.h
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 *  Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -6,6 +6,7 @@
 | 
			
		||||
 | 
			
		||||
#include "IXCobraToStatsdBot.h"
 | 
			
		||||
#include "IXQueueManager.h"
 | 
			
		||||
#include "IXStatsdClient.h"
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
@@ -16,10 +17,6 @@
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
#ifndef _WIN32
 | 
			
		||||
#include <statsd_client.h>
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    // fields are command line argument that can be specified multiple times
 | 
			
		||||
@@ -63,11 +60,12 @@ namespace ix
 | 
			
		||||
                            const std::string& channel,
 | 
			
		||||
                            const std::string& filter,
 | 
			
		||||
                            const std::string& position,
 | 
			
		||||
                            const std::string& host,
 | 
			
		||||
                            int port,
 | 
			
		||||
                            const std::string& prefix,
 | 
			
		||||
                            StatsdClient& statsdClient,
 | 
			
		||||
                            const std::string& fields,
 | 
			
		||||
                            bool verbose)
 | 
			
		||||
                            bool verbose,
 | 
			
		||||
                            size_t maxQueueSize,
 | 
			
		||||
                            bool enableHeartbeat,
 | 
			
		||||
                            int runtime)
 | 
			
		||||
    {
 | 
			
		||||
        ix::CobraConnection conn;
 | 
			
		||||
        conn.configure(config);
 | 
			
		||||
@@ -80,11 +78,10 @@ namespace ix
 | 
			
		||||
        std::atomic<uint64_t> receivedCount(0);
 | 
			
		||||
        std::atomic<bool> stop(false);
 | 
			
		||||
 | 
			
		||||
        size_t maxQueueSize = 1000;
 | 
			
		||||
        QueueManager queueManager(maxQueueSize);
 | 
			
		||||
 | 
			
		||||
        auto timer = [&sentCount, &receivedCount] {
 | 
			
		||||
            while (true)
 | 
			
		||||
        auto timer = [&sentCount, &receivedCount, &stop] {
 | 
			
		||||
            while (!stop)
 | 
			
		||||
            {
 | 
			
		||||
                spdlog::info("messages received {} sent {}", receivedCount, sentCount);
 | 
			
		||||
 | 
			
		||||
@@ -95,9 +92,11 @@ namespace ix
 | 
			
		||||
 | 
			
		||||
        std::thread t1(timer);
 | 
			
		||||
 | 
			
		||||
        auto heartbeat = [&sentCount, &receivedCount] {
 | 
			
		||||
        auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
 | 
			
		||||
            std::string state("na");
 | 
			
		||||
 | 
			
		||||
            if (!enableHeartbeat) return;
 | 
			
		||||
 | 
			
		||||
            while (true)
 | 
			
		||||
            {
 | 
			
		||||
                std::stringstream ss;
 | 
			
		||||
@@ -120,21 +119,13 @@ namespace ix
 | 
			
		||||
 | 
			
		||||
        std::thread t2(heartbeat);
 | 
			
		||||
 | 
			
		||||
        auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] {
 | 
			
		||||
            // statsd client
 | 
			
		||||
            // test with netcat as a server: `nc -ul 8125`
 | 
			
		||||
            bool statsdBatch = true;
 | 
			
		||||
#ifndef _WIN32
 | 
			
		||||
            statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
 | 
			
		||||
#else
 | 
			
		||||
            int statsdClient;
 | 
			
		||||
#endif
 | 
			
		||||
        auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop] {
 | 
			
		||||
            while (true)
 | 
			
		||||
            {
 | 
			
		||||
                Json::Value msg = queueManager.pop();
 | 
			
		||||
 | 
			
		||||
                if (msg.isNull()) continue;
 | 
			
		||||
                if (stop) return;
 | 
			
		||||
                if (msg.isNull()) continue;
 | 
			
		||||
 | 
			
		||||
                std::string id;
 | 
			
		||||
                for (auto&& attr : tokens)
 | 
			
		||||
@@ -143,11 +134,8 @@ namespace ix
 | 
			
		||||
                    id += extractAttr(attr, msg);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                sentCount += 1;
 | 
			
		||||
 | 
			
		||||
#ifndef _WIN32
 | 
			
		||||
                statsdClient.count(id, 1);
 | 
			
		||||
#endif
 | 
			
		||||
                sentCount += 1;
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
@@ -214,12 +202,38 @@ namespace ix
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        // Run forever
 | 
			
		||||
        if (runtime == -1)
 | 
			
		||||
        {
 | 
			
		||||
            while (true)
 | 
			
		||||
            {
 | 
			
		||||
            std::chrono::duration<double, std::milli> duration(1000);
 | 
			
		||||
                auto duration = std::chrono::seconds(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        // Run for a duration, used by unittesting now
 | 
			
		||||
        else
 | 
			
		||||
        {
 | 
			
		||||
            for (int i = 0 ; i < runtime; ++i)
 | 
			
		||||
            {
 | 
			
		||||
                auto duration = std::chrono::seconds(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
        //
 | 
			
		||||
        // Cleanup.
 | 
			
		||||
        // join all the bg threads and stop them.
 | 
			
		||||
        //
 | 
			
		||||
        conn.disconnect();
 | 
			
		||||
        stop = true;
 | 
			
		||||
 | 
			
		||||
        t1.join();
 | 
			
		||||
        if (t2.joinable()) t2.join();
 | 
			
		||||
        spdlog::info("heartbeat thread done");
 | 
			
		||||
 | 
			
		||||
        t3.join();
 | 
			
		||||
 | 
			
		||||
        return (int) sentCount;
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -1,11 +1,12 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXCobraToStatsdBot.h
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 *  Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include <ixcobra/IXCobraConfig.h>
 | 
			
		||||
#include <ixbots/IXStatsdClient.h>
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <stddef.h>
 | 
			
		||||
 | 
			
		||||
@@ -15,9 +16,10 @@ namespace ix
 | 
			
		||||
                            const std::string& channel,
 | 
			
		||||
                            const std::string& filter,
 | 
			
		||||
                            const std::string& position,
 | 
			
		||||
                            const std::string& host,
 | 
			
		||||
                            int port,
 | 
			
		||||
                            const std::string& prefix,
 | 
			
		||||
                            StatsdClient& statsdClient,
 | 
			
		||||
                            const std::string& fields,
 | 
			
		||||
                            bool verbose);
 | 
			
		||||
                            bool verbose,
 | 
			
		||||
                            size_t maxQueueSize,
 | 
			
		||||
                            bool enableHeartbeat,
 | 
			
		||||
                            int runtime);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										154
									
								
								ixbots/ixbots/IXStatsdClient.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										154
									
								
								ixbots/ixbots/IXStatsdClient.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,154 @@
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright (c) 2014, Rex
 | 
			
		||||
 * All rights reserved.
 | 
			
		||||
 *
 | 
			
		||||
 * Redistribution and use in source and binary forms, with or without
 | 
			
		||||
 * modification, are permitted provided that the following conditions are met:
 | 
			
		||||
 *
 | 
			
		||||
 * * Redistributions of source code must retain the above copyright notice, this
 | 
			
		||||
 *   list of conditions and the following disclaimer.
 | 
			
		||||
 *
 | 
			
		||||
 * * Redistributions in binary form must reproduce the above copyright notice,
 | 
			
		||||
 *   this list of conditions and the following disclaimer in the documentation
 | 
			
		||||
 *   and/or other materials provided with the distribution.
 | 
			
		||||
 *
 | 
			
		||||
 * * Neither the name of the {organization} nor the names of its
 | 
			
		||||
 *   contributors may be used to endorse or promote products derived from
 | 
			
		||||
 *   this software without specific prior written permission.
 | 
			
		||||
 *
 | 
			
		||||
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 | 
			
		||||
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 | 
			
		||||
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 | 
			
		||||
 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
 | 
			
		||||
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 | 
			
		||||
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 | 
			
		||||
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 | 
			
		||||
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 | 
			
		||||
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | 
			
		||||
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 *  IXStatsdClient.cpp
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
// Adapted from statsd-client-cpp
 | 
			
		||||
// test with netcat as a server: `nc -ul 8125`
 | 
			
		||||
 | 
			
		||||
#include "IXStatsdClient.h"
 | 
			
		||||
 | 
			
		||||
#include <stdlib.h>
 | 
			
		||||
#include <string.h>
 | 
			
		||||
#include <stdio.h>
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    StatsdClient::StatsdClient(const string& host,
 | 
			
		||||
                               int port,
 | 
			
		||||
                               const string& prefix)
 | 
			
		||||
    : _host(host)
 | 
			
		||||
      , _port(port)
 | 
			
		||||
      , _prefix(prefix)
 | 
			
		||||
      , _stop(false)
 | 
			
		||||
    {
 | 
			
		||||
        _thread = std::thread([this] {
 | 
			
		||||
            while (!_stop)
 | 
			
		||||
            {
 | 
			
		||||
                std::deque<std::string> staged_message_queue;
 | 
			
		||||
 | 
			
		||||
                {
 | 
			
		||||
                    std::lock_guard<std::mutex> lock(_mutex);
 | 
			
		||||
                    batching_message_queue_.swap(staged_message_queue);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                while (!staged_message_queue.empty())
 | 
			
		||||
                {
 | 
			
		||||
                    auto message = staged_message_queue.front();
 | 
			
		||||
                    _socket.sendto(message);
 | 
			
		||||
                    staged_message_queue.pop_front();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                std::this_thread::sleep_for(std::chrono::seconds(1));
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    StatsdClient::~StatsdClient()
 | 
			
		||||
    {
 | 
			
		||||
        _stop = true;
 | 
			
		||||
        if (_thread.joinable()) _thread.join();
 | 
			
		||||
 | 
			
		||||
        _socket.close();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    bool StatsdClient::init(std::string& errMsg)
 | 
			
		||||
    {
 | 
			
		||||
        return _socket.init(_host, _port, errMsg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* will change the original string */
 | 
			
		||||
    void StatsdClient::cleanup(string& key)
 | 
			
		||||
    {
 | 
			
		||||
        size_t pos = key.find_first_of(":|@");
 | 
			
		||||
        while (pos != string::npos)
 | 
			
		||||
        {
 | 
			
		||||
            key[pos] = '_';
 | 
			
		||||
            pos = key.find_first_of(":|@");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int StatsdClient::dec(const string& key)
 | 
			
		||||
    {
 | 
			
		||||
        return count(key, -1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int StatsdClient::inc(const string& key)
 | 
			
		||||
    {
 | 
			
		||||
        return count(key, 1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int StatsdClient::count(const string& key, size_t value)
 | 
			
		||||
    {
 | 
			
		||||
        return send(key, value, "c");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int StatsdClient::gauge(const string& key, size_t value)
 | 
			
		||||
    {
 | 
			
		||||
        return send(key, value, "g");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int StatsdClient::timing(const string& key, size_t ms)
 | 
			
		||||
    {
 | 
			
		||||
        return send(key, ms, "ms");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int StatsdClient::send(string key, size_t value, const string &type)
 | 
			
		||||
    {
 | 
			
		||||
        cleanup(key);
 | 
			
		||||
 | 
			
		||||
        char buf[256];
 | 
			
		||||
        snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
 | 
			
		||||
                 _prefix.c_str(), key.c_str(), value, type.c_str());
 | 
			
		||||
 | 
			
		||||
        return send(buf);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int StatsdClient::send(const string &message)
 | 
			
		||||
    {
 | 
			
		||||
        std::lock_guard<std::mutex> lock(_mutex);
 | 
			
		||||
 | 
			
		||||
        if (batching_message_queue_.empty() ||
 | 
			
		||||
            batching_message_queue_.back().length() > max_batching_size)
 | 
			
		||||
        {
 | 
			
		||||
            batching_message_queue_.push_back(message);
 | 
			
		||||
        }
 | 
			
		||||
        else
 | 
			
		||||
        {
 | 
			
		||||
            (*batching_message_queue_.rbegin()).append("\n").append(message);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
} // end namespace ix
 | 
			
		||||
							
								
								
									
										62
									
								
								ixbots/ixbots/IXStatsdClient.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										62
									
								
								ixbots/ixbots/IXStatsdClient.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,62 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXStatsdClient.h
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include <ixwebsocket/IXNetSystem.h>
 | 
			
		||||
#include <ixwebsocket/IXUdpSocket.h>
 | 
			
		||||
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <deque>
 | 
			
		||||
#include <mutex>
 | 
			
		||||
#include <atomic>
 | 
			
		||||
 | 
			
		||||
namespace ix {
 | 
			
		||||
 | 
			
		||||
    class StatsdClient {
 | 
			
		||||
    public:
 | 
			
		||||
        StatsdClient(const std::string& host="127.0.0.1",
 | 
			
		||||
                     int port=8125,
 | 
			
		||||
                     const std::string& prefix = "");
 | 
			
		||||
        ~StatsdClient();
 | 
			
		||||
 | 
			
		||||
        bool init(std::string& errMsg);
 | 
			
		||||
        int inc(const std::string& key);
 | 
			
		||||
        int dec(const std::string& key);
 | 
			
		||||
        int count(const std::string& key, size_t value);
 | 
			
		||||
        int gauge(const std::string& key, size_t value);
 | 
			
		||||
        int timing(const std::string& key, size_t ms);
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        /**
 | 
			
		||||
         * (Low Level Api) manually send a message
 | 
			
		||||
         * which might be composed of several lines.
 | 
			
		||||
         */
 | 
			
		||||
        int send(const std::string& message);
 | 
			
		||||
 | 
			
		||||
        /* (Low Level Api) manually send a message
 | 
			
		||||
         * type = "c", "g" or "ms"
 | 
			
		||||
         */
 | 
			
		||||
        int send(std::string key, size_t value, const std::string& type);
 | 
			
		||||
 | 
			
		||||
        void cleanup(std::string& key);
 | 
			
		||||
 | 
			
		||||
        UdpSocket _socket;
 | 
			
		||||
 | 
			
		||||
        std::string _host;
 | 
			
		||||
        int _port;
 | 
			
		||||
        std::string _prefix;
 | 
			
		||||
 | 
			
		||||
        std::atomic<bool> _stop;
 | 
			
		||||
        std::thread _thread;
 | 
			
		||||
        std::mutex _mutex; // for the queue
 | 
			
		||||
 | 
			
		||||
        std::deque<std::string> batching_message_queue_;
 | 
			
		||||
        const uint64_t max_batching_size = 32768;
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
} // end namespace ix
 | 
			
		||||
@@ -19,6 +19,10 @@
 | 
			
		||||
 | 
			
		||||
#include "IXCobraConfig.h"
 | 
			
		||||
 | 
			
		||||
#ifdef max
 | 
			
		||||
#undef max
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    class WebSocket;
 | 
			
		||||
 
 | 
			
		||||
@@ -66,7 +66,7 @@ namespace ix
 | 
			
		||||
        // Virtual methods
 | 
			
		||||
        virtual bool accept(std::string& errMsg);
 | 
			
		||||
 | 
			
		||||
        virtual bool connect(const std::string& url,
 | 
			
		||||
        virtual bool connect(const std::string& host,
 | 
			
		||||
                             int port,
 | 
			
		||||
                             std::string& errMsg,
 | 
			
		||||
                             const CancellationRequest& isCancellationRequested);
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										125
									
								
								ixwebsocket/IXUdpSocket.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										125
									
								
								ixwebsocket/IXUdpSocket.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,125 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXUdpSocket.cpp
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "IXUdpSocket.h"
 | 
			
		||||
 | 
			
		||||
#include "IXNetSystem.h"
 | 
			
		||||
#include "IXSelectInterrupt.h"
 | 
			
		||||
#include "IXSelectInterruptFactory.h"
 | 
			
		||||
#include "IXSocketConnect.h"
 | 
			
		||||
#include <algorithm>
 | 
			
		||||
#include <assert.h>
 | 
			
		||||
#include <fcntl.h>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
#include <stdint.h>
 | 
			
		||||
#include <stdio.h>
 | 
			
		||||
#include <stdlib.h>
 | 
			
		||||
#include <string.h>
 | 
			
		||||
#include <sys/types.h>
 | 
			
		||||
 | 
			
		||||
#ifdef min
 | 
			
		||||
#undef min
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    UdpSocket::UdpSocket(int fd)
 | 
			
		||||
        : _sockfd(fd)
 | 
			
		||||
        , _selectInterrupt(createSelectInterrupt())
 | 
			
		||||
    {
 | 
			
		||||
        ;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    UdpSocket::~UdpSocket()
 | 
			
		||||
    {
 | 
			
		||||
        close();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void UdpSocket::close()
 | 
			
		||||
    {
 | 
			
		||||
        std::lock_guard<std::mutex> lock(_socketMutex);
 | 
			
		||||
 | 
			
		||||
        if (_sockfd == -1) return;
 | 
			
		||||
 | 
			
		||||
        closeSocket(_sockfd);
 | 
			
		||||
        _sockfd = -1;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int UdpSocket::getErrno()
 | 
			
		||||
    {
 | 
			
		||||
        int err;
 | 
			
		||||
 | 
			
		||||
#ifdef _WIN32
 | 
			
		||||
        err = WSAGetLastError();
 | 
			
		||||
#else
 | 
			
		||||
        err = errno;
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
        return err;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    bool UdpSocket::isWaitNeeded()
 | 
			
		||||
    {
 | 
			
		||||
        int err = getErrno();
 | 
			
		||||
 | 
			
		||||
        if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
 | 
			
		||||
        {
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void UdpSocket::closeSocket(int fd)
 | 
			
		||||
    {
 | 
			
		||||
#ifdef _WIN32
 | 
			
		||||
        closesocket(fd);
 | 
			
		||||
#else
 | 
			
		||||
        ::close(fd);
 | 
			
		||||
#endif
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    bool UdpSocket::init(const std::string& host, int port, std::string& errMsg)
 | 
			
		||||
    {
 | 
			
		||||
        _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
 | 
			
		||||
        if (_sockfd < 0)
 | 
			
		||||
        {
 | 
			
		||||
            errMsg = "Could not create socket";
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        memset(&_server, 0, sizeof(_server));
 | 
			
		||||
        _server.sin_family = AF_INET;
 | 
			
		||||
        _server.sin_port = htons(port);
 | 
			
		||||
 | 
			
		||||
        // DNS resolution.
 | 
			
		||||
        struct addrinfo hints, *result = nullptr;
 | 
			
		||||
        memset(&hints, 0, sizeof(hints));
 | 
			
		||||
        hints.ai_family = AF_INET;
 | 
			
		||||
        hints.ai_socktype = SOCK_DGRAM;
 | 
			
		||||
 | 
			
		||||
        int ret = getaddrinfo(host.c_str(), nullptr, &hints, &result);
 | 
			
		||||
        if (ret != 0)
 | 
			
		||||
        {
 | 
			
		||||
            errMsg = strerror(UdpSocket::getErrno());
 | 
			
		||||
            freeaddrinfo(result);
 | 
			
		||||
            close();
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        struct sockaddr_in* host_addr = (struct sockaddr_in*) result->ai_addr;
 | 
			
		||||
        memcpy(&_server.sin_addr, &host_addr->sin_addr, sizeof(struct in_addr));
 | 
			
		||||
        freeaddrinfo(result);
 | 
			
		||||
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ssize_t UdpSocket::sendto(const std::string& buffer)
 | 
			
		||||
    {
 | 
			
		||||
        return (ssize_t)::sendto(
 | 
			
		||||
            _sockfd, buffer.data(), buffer.size(), 0, (struct sockaddr*) &_server, sizeof(_server));
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
							
								
								
									
										65
									
								
								ixwebsocket/IXUdpSocket.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								ixwebsocket/IXUdpSocket.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,65 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXUdpSocket.h
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <mutex>
 | 
			
		||||
#include <string>
 | 
			
		||||
 | 
			
		||||
#ifdef _WIN32
 | 
			
		||||
#include <BaseTsd.h>
 | 
			
		||||
typedef SSIZE_T ssize_t;
 | 
			
		||||
 | 
			
		||||
#undef EWOULDBLOCK
 | 
			
		||||
#undef EAGAIN
 | 
			
		||||
#undef EINPROGRESS
 | 
			
		||||
#undef EBADF
 | 
			
		||||
#undef EINVAL
 | 
			
		||||
 | 
			
		||||
// map to WSA error codes
 | 
			
		||||
#define EWOULDBLOCK WSAEWOULDBLOCK
 | 
			
		||||
#define EAGAIN WSATRY_AGAIN
 | 
			
		||||
#define EINPROGRESS WSAEINPROGRESS
 | 
			
		||||
#define EBADF WSAEBADF
 | 
			
		||||
#define EINVAL WSAEINVAL
 | 
			
		||||
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
#include "IXCancellationRequest.h"
 | 
			
		||||
#include "IXNetSystem.h"
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    class SelectInterrupt;
 | 
			
		||||
 | 
			
		||||
    class UdpSocket
 | 
			
		||||
    {
 | 
			
		||||
    public:
 | 
			
		||||
        UdpSocket(int fd = -1);
 | 
			
		||||
        virtual ~UdpSocket();
 | 
			
		||||
 | 
			
		||||
        // Virtual methods
 | 
			
		||||
        bool init(const std::string& host, int port, std::string& errMsg);
 | 
			
		||||
        ssize_t sendto(const std::string& buffer);
 | 
			
		||||
        virtual void close();
 | 
			
		||||
 | 
			
		||||
        static int getErrno();
 | 
			
		||||
        static bool isWaitNeeded();
 | 
			
		||||
        static void closeSocket(int fd);
 | 
			
		||||
 | 
			
		||||
    protected:
 | 
			
		||||
        std::atomic<int> _sockfd;
 | 
			
		||||
        std::mutex _socketMutex;
 | 
			
		||||
 | 
			
		||||
        struct sockaddr_in _server;
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        std::shared_ptr<SelectInterrupt> _selectInterrupt;
 | 
			
		||||
    };
 | 
			
		||||
} // namespace ix
 | 
			
		||||
@@ -6,4 +6,4 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "8.3.2"
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "8.3.3"
 | 
			
		||||
 
 | 
			
		||||
@@ -40,31 +40,31 @@ set (SOURCES
 | 
			
		||||
  IXGetFreePort.cpp
 | 
			
		||||
  ../third_party/msgpack11/msgpack11.cpp
 | 
			
		||||
 | 
			
		||||
  IXSocketTest.cpp
 | 
			
		||||
  IXSocketConnectTest.cpp
 | 
			
		||||
  IXWebSocketServerTest.cpp
 | 
			
		||||
  IXWebSocketTestConnectionDisconnection.cpp
 | 
			
		||||
  IXUrlParserTest.cpp
 | 
			
		||||
  IXWebSocketServerTest.cpp
 | 
			
		||||
  IXHttpClientTest.cpp
 | 
			
		||||
  IXHttpServerTest.cpp
 | 
			
		||||
  IXUnityBuildsTest.cpp
 | 
			
		||||
  IXHttpTest.cpp
 | 
			
		||||
  IXDNSLookupTest.cpp
 | 
			
		||||
  IXWebSocketSubProtocolTest.cpp
 | 
			
		||||
  IXSentryClientTest.cpp
 | 
			
		||||
  IXWebSocketChatTest.cpp
 | 
			
		||||
  #IXSocketTest.cpp
 | 
			
		||||
  #IXSocketConnectTest.cpp
 | 
			
		||||
  #IXWebSocketServerTest.cpp
 | 
			
		||||
  #IXWebSocketTestConnectionDisconnection.cpp
 | 
			
		||||
  #IXUrlParserTest.cpp
 | 
			
		||||
  #IXWebSocketServerTest.cpp
 | 
			
		||||
  #IXHttpClientTest.cpp
 | 
			
		||||
  #IXHttpServerTest.cpp
 | 
			
		||||
  #IXUnityBuildsTest.cpp
 | 
			
		||||
  #IXHttpTest.cpp
 | 
			
		||||
  #IXDNSLookupTest.cpp
 | 
			
		||||
  #IXWebSocketSubProtocolTest.cpp
 | 
			
		||||
  #IXSentryClientTest.cpp
 | 
			
		||||
  #IXWebSocketChatTest.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# Some unittest don't work on windows yet
 | 
			
		||||
# Windows without TLS does not have hmac yet
 | 
			
		||||
if (UNIX)
 | 
			
		||||
  list(APPEND SOURCES
 | 
			
		||||
    IXWebSocketCloseTest.cpp
 | 
			
		||||
 | 
			
		||||
    # Windows without TLS does not have hmac yet
 | 
			
		||||
    IXCobraChatTest.cpp
 | 
			
		||||
    IXCobraMetricsPublisherTest.cpp
 | 
			
		||||
    IXCobraToSentryBotTest.cpp
 | 
			
		||||
    # IXWebSocketCloseTest.cpp
 | 
			
		||||
    # IXCobraChatTest.cpp
 | 
			
		||||
    # IXCobraMetricsPublisherTest.cpp
 | 
			
		||||
    # IXCobraToSentryBotTest.cpp
 | 
			
		||||
    IXCobraToStatsdBotTest.cpp
 | 
			
		||||
  )
 | 
			
		||||
endif()
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  cmd_satori_chat.cpp
 | 
			
		||||
 *  IXCobraToSentryTest.cpp
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2017 Machine Zone. All rights reserved.
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "IXTest.h"
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										141
									
								
								test/IXCobraToStatsdBotTest.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										141
									
								
								test/IXCobraToStatsdBotTest.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,141 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXCobraToStatsdTest.cpp
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2020 Machine Zone. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "IXTest.h"
 | 
			
		||||
#include "catch.hpp"
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <ixbots/IXCobraToStatsdBot.h>
 | 
			
		||||
#include <ixcobra/IXCobraConnection.h>
 | 
			
		||||
#include <ixcobra/IXCobraMetricsPublisher.h>
 | 
			
		||||
#include <ixcrypto/IXUuid.h>
 | 
			
		||||
#include <ixsentry/IXSentryClient.h>
 | 
			
		||||
#include <ixsnake/IXRedisServer.h>
 | 
			
		||||
#include <ixsnake/IXSnakeServer.h>
 | 
			
		||||
#include <ixwebsocket/IXHttpServer.h>
 | 
			
		||||
#include <ixwebsocket/IXUserAgent.h>
 | 
			
		||||
 | 
			
		||||
using namespace ix;
 | 
			
		||||
 | 
			
		||||
namespace
 | 
			
		||||
{
 | 
			
		||||
    void runPublisher(const ix::CobraConfig& config, const std::string& channel)
 | 
			
		||||
    {
 | 
			
		||||
        ix::CobraMetricsPublisher cobraMetricsPublisher;
 | 
			
		||||
        cobraMetricsPublisher.configure(config, channel);
 | 
			
		||||
        cobraMetricsPublisher.setSession(uuid4());
 | 
			
		||||
        cobraMetricsPublisher.enable(true);
 | 
			
		||||
 | 
			
		||||
        Json::Value msg;
 | 
			
		||||
        msg["fps"] = 60;
 | 
			
		||||
 | 
			
		||||
        cobraMetricsPublisher.setGenericAttributes("game", "ody");
 | 
			
		||||
 | 
			
		||||
        // Wait a bit
 | 
			
		||||
        ix::msleep(500);
 | 
			
		||||
 | 
			
		||||
        // publish some messages
 | 
			
		||||
        cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
 | 
			
		||||
        cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
 | 
			
		||||
        ix::msleep(500);
 | 
			
		||||
 | 
			
		||||
        cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
 | 
			
		||||
        cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
 | 
			
		||||
        ix::msleep(500);
 | 
			
		||||
 | 
			
		||||
        cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
 | 
			
		||||
        cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
 | 
			
		||||
        ix::msleep(500);
 | 
			
		||||
    }
 | 
			
		||||
} // namespace
 | 
			
		||||
 | 
			
		||||
TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
 | 
			
		||||
{
 | 
			
		||||
    SECTION("Exchange and count sent/received messages.")
 | 
			
		||||
    {
 | 
			
		||||
        int port = getFreePort();
 | 
			
		||||
        snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
 | 
			
		||||
 | 
			
		||||
        // Start a redis server
 | 
			
		||||
        ix::RedisServer redisServer(appConfig.redisPort);
 | 
			
		||||
        auto res = redisServer.listen();
 | 
			
		||||
        REQUIRE(res.first);
 | 
			
		||||
        redisServer.start();
 | 
			
		||||
 | 
			
		||||
        // Start a snake server
 | 
			
		||||
        snake::SnakeServer snakeServer(appConfig);
 | 
			
		||||
        snakeServer.run();
 | 
			
		||||
 | 
			
		||||
        // Start a fake statsd server (ultimately)
 | 
			
		||||
 | 
			
		||||
        // Run the bot for a small amount of time
 | 
			
		||||
        std::string channel = ix::generateSessionId();
 | 
			
		||||
        std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
 | 
			
		||||
        std::string role = "_sub";
 | 
			
		||||
        std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
 | 
			
		||||
        std::string endpoint = makeCobraEndpoint(port, true);
 | 
			
		||||
 | 
			
		||||
        ix::CobraConfig config;
 | 
			
		||||
        config.endpoint = endpoint;
 | 
			
		||||
        config.appkey = appkey;
 | 
			
		||||
        config.rolename = role;
 | 
			
		||||
        config.rolesecret = secret;
 | 
			
		||||
        config.socketTLSOptions = makeClientTLSOptions();
 | 
			
		||||
 | 
			
		||||
        std::thread publisherThread(runPublisher, config, channel);
 | 
			
		||||
 | 
			
		||||
        std::string filter;
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        bool verbose = true;
 | 
			
		||||
        size_t maxQueueSize = 10;
 | 
			
		||||
        bool enableHeartbeat = false;
 | 
			
		||||
 | 
			
		||||
        // Only run the bot for 3 seconds
 | 
			
		||||
        int runtime = 3;
 | 
			
		||||
 | 
			
		||||
        std::string hostname("127.0.0.1");
 | 
			
		||||
        // std::string hostname("www.google.com");
 | 
			
		||||
        int statsdPort = 8125;
 | 
			
		||||
        std::string prefix("ix.test");
 | 
			
		||||
        StatsdClient statsdClient(hostname, statsdPort, prefix);
 | 
			
		||||
 | 
			
		||||
        std::string errMsg;
 | 
			
		||||
        bool initialized = statsdClient.init(errMsg);
 | 
			
		||||
        if (!initialized)
 | 
			
		||||
        {
 | 
			
		||||
            spdlog::error(errMsg);
 | 
			
		||||
        }
 | 
			
		||||
        REQUIRE(initialized);
 | 
			
		||||
 | 
			
		||||
        std::string fields("device.game\ndevice.os_name");
 | 
			
		||||
 | 
			
		||||
        int sentCount = ix::cobra_to_statsd_bot(config,
 | 
			
		||||
                                                channel,
 | 
			
		||||
                                                filter,
 | 
			
		||||
                                                position,
 | 
			
		||||
                                                statsdClient,
 | 
			
		||||
                                                fields,
 | 
			
		||||
                                                verbose,
 | 
			
		||||
                                                maxQueueSize,
 | 
			
		||||
                                                enableHeartbeat,
 | 
			
		||||
                                                runtime);
 | 
			
		||||
        //
 | 
			
		||||
        // We want at least 2 messages to be sent
 | 
			
		||||
        //
 | 
			
		||||
        REQUIRE(sentCount >= 2);
 | 
			
		||||
 | 
			
		||||
        // Give us 1s for all messages to be received
 | 
			
		||||
        ix::msleep(1000);
 | 
			
		||||
 | 
			
		||||
        spdlog::info("Stopping snake server...");
 | 
			
		||||
        snakeServer.stop();
 | 
			
		||||
 | 
			
		||||
        spdlog::info("Stopping redis server...");
 | 
			
		||||
        redisServer.stop();
 | 
			
		||||
 | 
			
		||||
        publisherThread.join();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										13
									
								
								third_party/statsd-client-cpp/.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										13
									
								
								third_party/statsd-client-cpp/.gitignore
									
									
									
									
										vendored
									
									
								
							@@ -1,13 +0,0 @@
 | 
			
		||||
# Compiled Object files
 | 
			
		||||
*.slo
 | 
			
		||||
*.lo
 | 
			
		||||
*.o
 | 
			
		||||
 | 
			
		||||
# Compiled Dynamic libraries
 | 
			
		||||
*.so
 | 
			
		||||
*.dylib
 | 
			
		||||
 | 
			
		||||
# Compiled Static libraries
 | 
			
		||||
*.lai
 | 
			
		||||
*.la
 | 
			
		||||
*.a
 | 
			
		||||
							
								
								
									
										18
									
								
								third_party/statsd-client-cpp/CMakeLists.txt
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										18
									
								
								third_party/statsd-client-cpp/CMakeLists.txt
									
									
									
									
										vendored
									
									
								
							@@ -1,18 +0,0 @@
 | 
			
		||||
cmake_minimum_required(VERSION 3.1)
 | 
			
		||||
project(helloCLion)
 | 
			
		||||
 | 
			
		||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
 | 
			
		||||
 | 
			
		||||
include_directories(
 | 
			
		||||
    src
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
add_library(statsdcppclient STATIC src/statsd_client.cpp)
 | 
			
		||||
add_definitions("-fPIC")
 | 
			
		||||
target_link_libraries(statsdcppclient pthread)
 | 
			
		||||
 | 
			
		||||
add_executable(system_monitor demo/system_monitor.cpp)
 | 
			
		||||
target_link_libraries(system_monitor statsdcppclient)
 | 
			
		||||
 | 
			
		||||
add_executable(test_client demo/test_client.cpp)
 | 
			
		||||
target_link_libraries(test_client statsdcppclient)
 | 
			
		||||
							
								
								
									
										27
									
								
								third_party/statsd-client-cpp/LICENSE
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										27
									
								
								third_party/statsd-client-cpp/LICENSE
									
									
									
									
										vendored
									
									
								
							@@ -1,27 +0,0 @@
 | 
			
		||||
Copyright (c) 2014, Rex
 | 
			
		||||
All rights reserved.
 | 
			
		||||
 | 
			
		||||
Redistribution and use in source and binary forms, with or without
 | 
			
		||||
modification, are permitted provided that the following conditions are met:
 | 
			
		||||
 | 
			
		||||
* Redistributions of source code must retain the above copyright notice, this
 | 
			
		||||
  list of conditions and the following disclaimer.
 | 
			
		||||
 | 
			
		||||
* Redistributions in binary form must reproduce the above copyright notice,
 | 
			
		||||
  this list of conditions and the following disclaimer in the documentation
 | 
			
		||||
  and/or other materials provided with the distribution.
 | 
			
		||||
 | 
			
		||||
* Neither the name of the {organization} nor the names of its
 | 
			
		||||
  contributors may be used to endorse or promote products derived from
 | 
			
		||||
  this software without specific prior written permission.
 | 
			
		||||
 | 
			
		||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 | 
			
		||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 | 
			
		||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 | 
			
		||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
 | 
			
		||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 | 
			
		||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 | 
			
		||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 | 
			
		||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 | 
			
		||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | 
			
		||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
			
		||||
							
								
								
									
										34
									
								
								third_party/statsd-client-cpp/README.md
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										34
									
								
								third_party/statsd-client-cpp/README.md
									
									
									
									
										vendored
									
									
								
							@@ -1,34 +0,0 @@
 | 
			
		||||
# a client sdk for StatsD, written in C++
 | 
			
		||||
 | 
			
		||||
## API
 | 
			
		||||
See [header file](src/statsd_client.h) for more api detail.
 | 
			
		||||
 | 
			
		||||
** Notice: this client is not thread-safe **
 | 
			
		||||
 | 
			
		||||
## Demo
 | 
			
		||||
### test\_client
 | 
			
		||||
This simple demo shows how the use this client.
 | 
			
		||||
 | 
			
		||||
### system\_monitor
 | 
			
		||||
This is a daemon for monitoring a Linux system.
 | 
			
		||||
It'll wake up every minute and monitor the following:
 | 
			
		||||
 | 
			
		||||
* load
 | 
			
		||||
* cpu
 | 
			
		||||
* free memory
 | 
			
		||||
* free swap (disabled)
 | 
			
		||||
* received bytes
 | 
			
		||||
* transmitted bytes
 | 
			
		||||
* procs
 | 
			
		||||
* uptime
 | 
			
		||||
 | 
			
		||||
The stats sent to statsd will be in "host.MACAddress" namespace.
 | 
			
		||||
 | 
			
		||||
Usage:
 | 
			
		||||
 | 
			
		||||
    system_monitor statsd-host interface-to-monitor
 | 
			
		||||
 | 
			
		||||
e.g.
 | 
			
		||||
 | 
			
		||||
    `system_monitor 172.16.42.1 eth0`
 | 
			
		||||
 | 
			
		||||
@@ -1,164 +0,0 @@
 | 
			
		||||
 | 
			
		||||
#include <sys/types.h>
 | 
			
		||||
#include <stdio.h>
 | 
			
		||||
#include <stdlib.h>
 | 
			
		||||
#include <signal.h>
 | 
			
		||||
#include <unistd.h>
 | 
			
		||||
#include <string.h>
 | 
			
		||||
#include <netdb.h>
 | 
			
		||||
#include <sys/sysinfo.h>
 | 
			
		||||
 | 
			
		||||
#include <sys/ioctl.h>
 | 
			
		||||
#include <netinet/in.h>
 | 
			
		||||
#include <net/if.h>
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
#include "statsd_client.h"
 | 
			
		||||
 | 
			
		||||
using namespace std;
 | 
			
		||||
 | 
			
		||||
static int running = 1;
 | 
			
		||||
 | 
			
		||||
void sigterm(int sig)
 | 
			
		||||
{
 | 
			
		||||
    running = 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
string localhost() {
 | 
			
		||||
    struct addrinfo hints, *info, *p;
 | 
			
		||||
    string hostname(1024, '\0');
 | 
			
		||||
    gethostname((char*)hostname.data(), hostname.capacity());
 | 
			
		||||
 | 
			
		||||
    memset(&hints, 0, sizeof hints);
 | 
			
		||||
    hints.ai_family = AF_UNSPEC; /*either IPV4 or IPV6*/
 | 
			
		||||
    hints.ai_socktype = SOCK_STREAM;
 | 
			
		||||
    hints.ai_flags = AI_CANONNAME;
 | 
			
		||||
 | 
			
		||||
    if ( getaddrinfo(hostname.c_str(), "http", &hints, &info) == 0) {
 | 
			
		||||
        for(p = info; p != NULL; p = p->ai_next) {
 | 
			
		||||
            hostname = p->ai_canonname;
 | 
			
		||||
        }
 | 
			
		||||
        freeaddrinfo(info);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    string::size_type pos = hostname.find(".");
 | 
			
		||||
    while ( pos != string::npos )
 | 
			
		||||
    {
 | 
			
		||||
        hostname[pos] = '_';
 | 
			
		||||
        pos = hostname.find(".", pos);
 | 
			
		||||
    }
 | 
			
		||||
    return hostname;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
vector<string>& StringSplitTrim(const string& sData,
 | 
			
		||||
        const string& sDelim, vector<string>& vItems)
 | 
			
		||||
{
 | 
			
		||||
    vItems.clear();
 | 
			
		||||
 | 
			
		||||
    string::size_type bpos = 0;
 | 
			
		||||
    string::size_type epos = 0;
 | 
			
		||||
    string::size_type nlen = sDelim.size();
 | 
			
		||||
 | 
			
		||||
    while(sData.substr(epos,nlen) == sDelim)
 | 
			
		||||
    {
 | 
			
		||||
        epos += nlen;
 | 
			
		||||
    }
 | 
			
		||||
    bpos = epos;
 | 
			
		||||
 | 
			
		||||
    while ((epos=sData.find(sDelim, epos)) != string::npos)
 | 
			
		||||
    {
 | 
			
		||||
        vItems.push_back(sData.substr(bpos, epos-bpos));
 | 
			
		||||
        epos += nlen;
 | 
			
		||||
        while(sData.substr(epos,nlen) == sDelim)
 | 
			
		||||
        {
 | 
			
		||||
            epos += nlen;
 | 
			
		||||
        }
 | 
			
		||||
        bpos = epos;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if(bpos != sData.size())
 | 
			
		||||
    {
 | 
			
		||||
        vItems.push_back(sData.substr(bpos, sData.size()-bpos));
 | 
			
		||||
    }
 | 
			
		||||
    return vItems;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int main(int argc, char *argv[])
 | 
			
		||||
{
 | 
			
		||||
    FILE *net, *stat;
 | 
			
		||||
    struct sysinfo si;
 | 
			
		||||
    char line[256];
 | 
			
		||||
    unsigned int user, nice, sys, idle, total, busy, old_total=0, old_busy=0;
 | 
			
		||||
 | 
			
		||||
    if (argc != 3) {
 | 
			
		||||
        printf( "Usage: %s host port\n"
 | 
			
		||||
                "Eg:    %s 127.0.0.1 8125\n",
 | 
			
		||||
                argv[0], argv[0]);
 | 
			
		||||
        exit(1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    signal(SIGHUP, SIG_IGN);
 | 
			
		||||
    signal(SIGPIPE, SIG_IGN);
 | 
			
		||||
    signal(SIGCHLD, SIG_IGN); /* will save one syscall per sleep */
 | 
			
		||||
    signal(SIGTERM, sigterm);
 | 
			
		||||
 | 
			
		||||
    if ( (net = fopen("/proc/net/dev", "r")) == NULL) {
 | 
			
		||||
        perror("fopen");
 | 
			
		||||
        exit(-1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if ( (stat = fopen("/proc/stat", "r")) == NULL) {
 | 
			
		||||
        perror("fopen");
 | 
			
		||||
        exit(-1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    string ns = string("host.") + localhost().c_str() + ".";
 | 
			
		||||
    statsd::StatsdClient client(argv[1], atoi(argv[2]), ns);
 | 
			
		||||
 | 
			
		||||
    daemon(0,0);
 | 
			
		||||
    printf("running in background.\n");
 | 
			
		||||
 | 
			
		||||
    while(running) {
 | 
			
		||||
        rewind(net);
 | 
			
		||||
        vector<string> items;
 | 
			
		||||
        while(!feof(net)) {
 | 
			
		||||
            fgets(line, sizeof(line), net);
 | 
			
		||||
            StringSplitTrim(line, " ", items);
 | 
			
		||||
 | 
			
		||||
            if ( items.size() < 17 ) continue;
 | 
			
		||||
            if ( items[0].find(":") == string::npos ) continue;
 | 
			
		||||
            if ( items[1] == "0" and items[9] == "0" ) continue;
 | 
			
		||||
 | 
			
		||||
            string netface = "network."+items[0].erase( items[0].find(":") );
 | 
			
		||||
            client.count( netface+".receive.bytes", atoll(items[1].c_str()) );
 | 
			
		||||
            client.count( netface+".receive.packets", atoll(items[2].c_str()) );
 | 
			
		||||
            client.count( netface+".transmit.bytes", atoll(items[9].c_str()) );
 | 
			
		||||
            client.count( netface+".transmit.packets", atoll(items[10].c_str()) );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        sysinfo(&si);
 | 
			
		||||
        client.gauge("system.load", 100*si.loads[0]/0x10000);
 | 
			
		||||
        client.gauge("system.freemem", si.freeram/1024);
 | 
			
		||||
        client.gauge("system.procs", si.procs);
 | 
			
		||||
        client.count("system.uptime", si.uptime);
 | 
			
		||||
 | 
			
		||||
        /* rewind doesn't do the trick for /proc/stat */
 | 
			
		||||
        freopen("/proc/stat", "r", stat);
 | 
			
		||||
        fgets(line, sizeof(line), stat);
 | 
			
		||||
        sscanf(line, "cpu  %u %u %u %u", &user, &nice, &sys, &idle);
 | 
			
		||||
        total = user + sys + idle;
 | 
			
		||||
        busy = user + sys;
 | 
			
		||||
 | 
			
		||||
        client.send("system.cpu", 100 * (busy - old_busy)/(total - old_total), "g", 1.0);
 | 
			
		||||
 | 
			
		||||
        old_total = total;
 | 
			
		||||
        old_busy = busy;
 | 
			
		||||
        sleep(6);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fclose(net);
 | 
			
		||||
    fclose(stat);
 | 
			
		||||
 | 
			
		||||
    exit(0);
 | 
			
		||||
}
 | 
			
		||||
@@ -1,28 +0,0 @@
 | 
			
		||||
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <unistd.h>
 | 
			
		||||
#include "statsd_client.h"
 | 
			
		||||
 | 
			
		||||
int main(void)
 | 
			
		||||
{
 | 
			
		||||
    std::cout << "running..." << std::endl;
 | 
			
		||||
 | 
			
		||||
    statsd::StatsdClient client;
 | 
			
		||||
    statsd::StatsdClient client2("127.0.0.1", 8125, "myproject.abx.", true);
 | 
			
		||||
 | 
			
		||||
    client.count("count1", 123, 1.0);
 | 
			
		||||
    client.count("count2", 125, 1.0);
 | 
			
		||||
    client.gauge("speed", 10);
 | 
			
		||||
    int i;
 | 
			
		||||
    for (i=0; i<1000; i++)
 | 
			
		||||
        client2.timing("request", i);
 | 
			
		||||
    sleep(1);
 | 
			
		||||
    client.inc("count1", 1.0);
 | 
			
		||||
    client2.dec("count2", 1.0);
 | 
			
		||||
//    for(i=0; i<1000; i++) {
 | 
			
		||||
//        client2.count("count3", i, 0.8);
 | 
			
		||||
//    }
 | 
			
		||||
 | 
			
		||||
    std::cout << "done" << std::endl;
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										245
									
								
								third_party/statsd-client-cpp/src/statsd_client.cpp
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										245
									
								
								third_party/statsd-client-cpp/src/statsd_client.cpp
									
									
									
									
										vendored
									
									
								
							@@ -1,245 +0,0 @@
 | 
			
		||||
#include <math.h>
 | 
			
		||||
#include <netdb.h>
 | 
			
		||||
#include <time.h>
 | 
			
		||||
#include <unistd.h>
 | 
			
		||||
#include <stdlib.h>
 | 
			
		||||
#include <string.h>
 | 
			
		||||
#include <stdio.h>
 | 
			
		||||
#include <netinet/in.h>
 | 
			
		||||
#include "statsd_client.h"
 | 
			
		||||
 | 
			
		||||
using namespace std;
 | 
			
		||||
namespace statsd {
 | 
			
		||||
 | 
			
		||||
inline bool fequal(float a, float b)
 | 
			
		||||
{
 | 
			
		||||
    const float epsilon = 0.0001;
 | 
			
		||||
    return ( fabs(a - b) < epsilon );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
inline bool should_send(float sample_rate)
 | 
			
		||||
{
 | 
			
		||||
    if ( fequal(sample_rate, 1.0) )
 | 
			
		||||
    {
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    float p = ((float)random() / RAND_MAX);
 | 
			
		||||
    return sample_rate > p;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct _StatsdClientData {
 | 
			
		||||
    int     sock;
 | 
			
		||||
    struct  sockaddr_in server;
 | 
			
		||||
 | 
			
		||||
    string  ns;
 | 
			
		||||
    string  host;
 | 
			
		||||
    short   port;
 | 
			
		||||
    bool    init;
 | 
			
		||||
 | 
			
		||||
    char    errmsg[1024];
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
StatsdClient::StatsdClient(const string& host,
 | 
			
		||||
                           int port,
 | 
			
		||||
                           const string& ns,
 | 
			
		||||
                           const bool batching)
 | 
			
		||||
: batching_(batching), exit_(false)
 | 
			
		||||
{
 | 
			
		||||
    d = new _StatsdClientData;
 | 
			
		||||
    d->sock = -1;
 | 
			
		||||
    config(host, port, ns);
 | 
			
		||||
    srandom((unsigned) time(NULL));
 | 
			
		||||
 | 
			
		||||
    if (batching_) {
 | 
			
		||||
        pthread_mutex_init(&batching_mutex_lock_, nullptr);
 | 
			
		||||
        batching_thread_ = std::thread([this] {
 | 
			
		||||
          while (!exit_) {
 | 
			
		||||
              std::deque<std::string> staged_message_queue;
 | 
			
		||||
 | 
			
		||||
              pthread_mutex_lock(&batching_mutex_lock_);
 | 
			
		||||
              batching_message_queue_.swap(staged_message_queue);
 | 
			
		||||
              pthread_mutex_unlock(&batching_mutex_lock_);
 | 
			
		||||
 | 
			
		||||
              while(!staged_message_queue.empty()) {
 | 
			
		||||
                  send_to_daemon(staged_message_queue.front());
 | 
			
		||||
                  staged_message_queue.pop_front();
 | 
			
		||||
              }
 | 
			
		||||
 | 
			
		||||
              std::this_thread::sleep_for(std::chrono::milliseconds(1000));
 | 
			
		||||
          }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
StatsdClient::~StatsdClient()
 | 
			
		||||
{
 | 
			
		||||
    if (batching_) {
 | 
			
		||||
        exit_ = true;
 | 
			
		||||
        batching_thread_.join();
 | 
			
		||||
        pthread_mutex_destroy(&batching_mutex_lock_);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    // close socket
 | 
			
		||||
    if (d->sock >= 0) {
 | 
			
		||||
        close(d->sock);
 | 
			
		||||
        d->sock = -1;
 | 
			
		||||
        delete d;
 | 
			
		||||
        d = NULL;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void StatsdClient::config(const string& host, int port, const string& ns)
 | 
			
		||||
{
 | 
			
		||||
    d->ns = ns;
 | 
			
		||||
    d->host = host;
 | 
			
		||||
    d->port = port;
 | 
			
		||||
    d->init = false;
 | 
			
		||||
    if ( d->sock >= 0 ) {
 | 
			
		||||
        close(d->sock);
 | 
			
		||||
    }
 | 
			
		||||
    d->sock = -1;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::init()
 | 
			
		||||
{
 | 
			
		||||
    if ( d->init ) return 0;
 | 
			
		||||
 | 
			
		||||
    d->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
 | 
			
		||||
    if ( d->sock == -1 ) {
 | 
			
		||||
        snprintf(d->errmsg, sizeof(d->errmsg), "could not create socket, err=%s", strerror(errno));
 | 
			
		||||
        return -1;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    memset(&d->server, 0, sizeof(d->server));
 | 
			
		||||
    d->server.sin_family = AF_INET;
 | 
			
		||||
    d->server.sin_port = htons(d->port);
 | 
			
		||||
 | 
			
		||||
    int ret = inet_aton(d->host.c_str(), &d->server.sin_addr);
 | 
			
		||||
    if ( ret == 0 )
 | 
			
		||||
    {
 | 
			
		||||
        // host must be a domain, get it from internet
 | 
			
		||||
        struct addrinfo hints, *result = NULL;
 | 
			
		||||
        memset(&hints, 0, sizeof(hints));
 | 
			
		||||
        hints.ai_family = AF_INET;
 | 
			
		||||
        hints.ai_socktype = SOCK_DGRAM;
 | 
			
		||||
 | 
			
		||||
        ret = getaddrinfo(d->host.c_str(), NULL, &hints, &result);
 | 
			
		||||
        if ( ret ) {
 | 
			
		||||
            close(d->sock);
 | 
			
		||||
            d->sock = -1;
 | 
			
		||||
            snprintf(d->errmsg, sizeof(d->errmsg),
 | 
			
		||||
                     "getaddrinfo fail, error=%d, msg=%s", ret, gai_strerror(ret) );
 | 
			
		||||
            return -2;
 | 
			
		||||
        }
 | 
			
		||||
        struct sockaddr_in* host_addr = (struct sockaddr_in*)result->ai_addr;
 | 
			
		||||
        memcpy(&d->server.sin_addr, &host_addr->sin_addr, sizeof(struct in_addr));
 | 
			
		||||
        freeaddrinfo(result);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    d->init = true;
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* will change the original string */
 | 
			
		||||
void StatsdClient::cleanup(string& key)
 | 
			
		||||
{
 | 
			
		||||
    size_t pos = key.find_first_of(":|@");
 | 
			
		||||
    while ( pos != string::npos )
 | 
			
		||||
    {
 | 
			
		||||
        key[pos] = '_';
 | 
			
		||||
        pos = key.find_first_of(":|@");
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::dec(const string& key, float sample_rate)
 | 
			
		||||
{
 | 
			
		||||
    return count(key, -1, sample_rate);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::inc(const string& key, float sample_rate)
 | 
			
		||||
{
 | 
			
		||||
    return count(key, 1, sample_rate);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::count(const string& key, size_t value, float sample_rate)
 | 
			
		||||
{
 | 
			
		||||
    return send(key, value, "c", sample_rate);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::gauge(const string& key, size_t value, float sample_rate)
 | 
			
		||||
{
 | 
			
		||||
    return send(key, value, "g", sample_rate);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::timing(const string& key, size_t ms, float sample_rate)
 | 
			
		||||
{
 | 
			
		||||
    return send(key, ms, "ms", sample_rate);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::send(string key, size_t value, const string &type, float sample_rate)
 | 
			
		||||
{
 | 
			
		||||
    if (!should_send(sample_rate)) {
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    cleanup(key);
 | 
			
		||||
 | 
			
		||||
    char buf[256];
 | 
			
		||||
    if ( fequal( sample_rate, 1.0 ) )
 | 
			
		||||
    {
 | 
			
		||||
        snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
 | 
			
		||||
                 d->ns.c_str(), key.c_str(), value, type.c_str());
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
        snprintf(buf, sizeof(buf), "%s%s:%zd|%s|@%.2f",
 | 
			
		||||
                 d->ns.c_str(), key.c_str(), value, type.c_str(), sample_rate);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return send(buf);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int StatsdClient::send(const string &message)
 | 
			
		||||
{
 | 
			
		||||
    if (batching_) {
 | 
			
		||||
        pthread_mutex_lock(&batching_mutex_lock_);
 | 
			
		||||
        if (batching_message_queue_.empty() ||
 | 
			
		||||
            batching_message_queue_.back().length() > max_batching_size) {
 | 
			
		||||
            batching_message_queue_.push_back(message);
 | 
			
		||||
        } else {
 | 
			
		||||
            (*batching_message_queue_.rbegin()).append("\n").append(message);
 | 
			
		||||
        }
 | 
			
		||||
        pthread_mutex_unlock(&batching_mutex_lock_);
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
    } else {
 | 
			
		||||
        return send_to_daemon(message);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
int StatsdClient::send_to_daemon(const string &message) {
 | 
			
		||||
    int ret = init();
 | 
			
		||||
    if ( ret )
 | 
			
		||||
    {
 | 
			
		||||
        return ret;
 | 
			
		||||
    }
 | 
			
		||||
    ret = (int) sendto(d->sock, message.data(), message.size(), 0, (struct sockaddr *) &d->server, sizeof(d->server));
 | 
			
		||||
    if ( ret == -1) {
 | 
			
		||||
        snprintf(d->errmsg, sizeof(d->errmsg),
 | 
			
		||||
                 "sendto server fail, host=%s:%d, err=%s", d->host.c_str(), d->port, strerror(errno));
 | 
			
		||||
        return -1;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const char* StatsdClient::errmsg()
 | 
			
		||||
{
 | 
			
		||||
    return d->errmsg;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -1,66 +0,0 @@
 | 
			
		||||
 | 
			
		||||
#ifndef STATSD_CLIENT_H
 | 
			
		||||
#define STATSD_CLIENT_H
 | 
			
		||||
 | 
			
		||||
#include <sys/types.h>
 | 
			
		||||
#include <arpa/inet.h>
 | 
			
		||||
#include <sys/socket.h>
 | 
			
		||||
#include <pthread.h>
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <deque>
 | 
			
		||||
#include <iostream>
 | 
			
		||||
 | 
			
		||||
namespace statsd {
 | 
			
		||||
 | 
			
		||||
struct _StatsdClientData;
 | 
			
		||||
 | 
			
		||||
class StatsdClient {
 | 
			
		||||
public:
 | 
			
		||||
    StatsdClient(const std::string& host="127.0.0.1", int port=8125, const std::string& ns = "", const bool batching = false);
 | 
			
		||||
    ~StatsdClient();
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    // you can config at anytime; client will use new address (useful for Singleton)
 | 
			
		||||
    void config(const std::string& host, int port, const std::string& ns = "");
 | 
			
		||||
    const char* errmsg();
 | 
			
		||||
    int send_to_daemon(const std::string &);
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    int inc(const std::string& key, float sample_rate = 1.0);
 | 
			
		||||
    int dec(const std::string& key, float sample_rate = 1.0);
 | 
			
		||||
    int count(const std::string& key, size_t value, float sample_rate = 1.0);
 | 
			
		||||
    int gauge(const std::string& key, size_t value, float sample_rate = 1.0);
 | 
			
		||||
    int timing(const std::string& key, size_t ms, float sample_rate = 1.0);
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    /**
 | 
			
		||||
     * (Low Level Api) manually send a message
 | 
			
		||||
     * which might be composed of several lines.
 | 
			
		||||
     */
 | 
			
		||||
    int send(const std::string& message);
 | 
			
		||||
 | 
			
		||||
    /* (Low Level Api) manually send a message
 | 
			
		||||
     * type = "c", "g" or "ms"
 | 
			
		||||
     */
 | 
			
		||||
    int send(std::string key, size_t value,
 | 
			
		||||
             const std::string& type, float sample_rate);
 | 
			
		||||
 | 
			
		||||
protected:
 | 
			
		||||
    int init();
 | 
			
		||||
    void cleanup(std::string& key);
 | 
			
		||||
 | 
			
		||||
protected:
 | 
			
		||||
    struct _StatsdClientData* d;
 | 
			
		||||
 | 
			
		||||
    bool batching_;
 | 
			
		||||
    bool exit_;
 | 
			
		||||
    pthread_mutex_t batching_mutex_lock_;
 | 
			
		||||
    std::thread batching_thread_;
 | 
			
		||||
    std::deque<std::string> batching_message_queue_;
 | 
			
		||||
    const uint64_t max_batching_size = 32768;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
} // end namespace
 | 
			
		||||
 | 
			
		||||
#endif
 | 
			
		||||
							
								
								
									
										22
									
								
								third_party/zlib/CMakeLists.txt
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										22
									
								
								third_party/zlib/CMakeLists.txt
									
									
									
									
										vendored
									
									
								
							@@ -225,25 +225,3 @@ endif()
 | 
			
		||||
if(NOT SKIP_INSTALL_FILES AND NOT SKIP_INSTALL_ALL )
 | 
			
		||||
    install(FILES ${ZLIB_PC} DESTINATION "${INSTALL_PKGCONFIG_DIR}")
 | 
			
		||||
endif()
 | 
			
		||||
 | 
			
		||||
#============================================================================
 | 
			
		||||
# Example binaries
 | 
			
		||||
#============================================================================
 | 
			
		||||
 | 
			
		||||
add_executable(example test/example.c)
 | 
			
		||||
target_link_libraries(example zlib)
 | 
			
		||||
add_test(example example)
 | 
			
		||||
 | 
			
		||||
add_executable(minigzip test/minigzip.c)
 | 
			
		||||
target_link_libraries(minigzip zlib)
 | 
			
		||||
 | 
			
		||||
if(HAVE_OFF64_T)
 | 
			
		||||
    add_executable(example64 test/example.c)
 | 
			
		||||
    target_link_libraries(example64 zlib)
 | 
			
		||||
    set_target_properties(example64 PROPERTIES COMPILE_FLAGS "-D_FILE_OFFSET_BITS=64")
 | 
			
		||||
    add_test(example64 example64)
 | 
			
		||||
 | 
			
		||||
    add_executable(minigzip64 test/minigzip.c)
 | 
			
		||||
    target_link_libraries(minigzip64 zlib)
 | 
			
		||||
    set_target_properties(minigzip64 PROPERTIES COMPILE_FLAGS "-D_FILE_OFFSET_BITS=64")
 | 
			
		||||
endif()
 | 
			
		||||
 
 | 
			
		||||
@@ -21,16 +21,11 @@ option(USE_TLS "Add TLS support" ON)
 | 
			
		||||
include_directories(ws .)
 | 
			
		||||
include_directories(ws ..)
 | 
			
		||||
include_directories(ws ../third_party)
 | 
			
		||||
include_directories(ws ../third_party/statsd-client-cpp/src)
 | 
			
		||||
include_directories(ws ../third_party/spdlog/include)
 | 
			
		||||
include_directories(ws ../third_party/cpp-linenoise)
 | 
			
		||||
 | 
			
		||||
add_definitions(-DSPDLOG_COMPILED_LIB=1)
 | 
			
		||||
 | 
			
		||||
if (UNIX)
 | 
			
		||||
  set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp)
 | 
			
		||||
endif()
 | 
			
		||||
 | 
			
		||||
find_package(JsonCpp)
 | 
			
		||||
if (NOT JSONCPP_FOUND)
 | 
			
		||||
  include_directories(../third_party/jsoncpp)
 | 
			
		||||
@@ -39,7 +34,6 @@ endif()
 | 
			
		||||
 | 
			
		||||
add_executable(ws
 | 
			
		||||
  ../third_party/msgpack11/msgpack11.cpp
 | 
			
		||||
  ${STATSD_CLIENT_SOURCES}
 | 
			
		||||
  ${JSONCPP_SOURCES}
 | 
			
		||||
 | 
			
		||||
  ws_http_client.cpp
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										19
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										19
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -269,6 +269,9 @@ int main(int argc, char** argv)
 | 
			
		||||
    cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
 | 
			
		||||
    cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
 | 
			
		||||
    cobra2statsd->add_option("--position", position, "Stream position");
 | 
			
		||||
    cobra2statsd->add_option("--queue_size",
 | 
			
		||||
                             maxQueueSize,
 | 
			
		||||
                             "Size of the queue to hold messages before they are sent to Sentry");
 | 
			
		||||
    addTLSOptions(cobra2statsd);
 | 
			
		||||
    addCobraConfig(cobra2statsd);
 | 
			
		||||
 | 
			
		||||
@@ -450,8 +453,20 @@ int main(int argc, char** argv)
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("cobra_to_statsd"))
 | 
			
		||||
    {
 | 
			
		||||
        ret = ix::cobra_to_statsd_bot(
 | 
			
		||||
            cobraConfig, channel, filter, position, hostname, statsdPort, prefix, fields, verbose);
 | 
			
		||||
        bool enableHeartbeat = true;
 | 
			
		||||
        int runtime = -1;
 | 
			
		||||
        ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
 | 
			
		||||
 | 
			
		||||
        ret = ix::cobra_to_statsd_bot(cobraConfig,
 | 
			
		||||
                                      channel,
 | 
			
		||||
                                      filter,
 | 
			
		||||
                                      position,
 | 
			
		||||
                                      statsdClient,
 | 
			
		||||
                                      fields,
 | 
			
		||||
                                      verbose,
 | 
			
		||||
                                      maxQueueSize,
 | 
			
		||||
                                      enableHeartbeat,
 | 
			
		||||
                                      runtime);
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("cobra_to_sentry"))
 | 
			
		||||
    {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user