(cobra bots) do not use a queue to store messages pending processing, let the bot handle queuing
This commit is contained in:
		@@ -1,6 +1,10 @@
 | 
			
		||||
# Changelog
 | 
			
		||||
All changes to this project will be documented in this file.
 | 
			
		||||
 | 
			
		||||
## [9.5.4] - 2020-05-04
 | 
			
		||||
 | 
			
		||||
(cobra bots) do not use a queue to store messages pending processing, let the bot handle queuing
 | 
			
		||||
 | 
			
		||||
## [9.5.3] - 2020-04-29
 | 
			
		||||
 | 
			
		||||
(http client) better current request cancellation support when the HttpClient destructor is invoked (see #189)
 | 
			
		||||
 
 | 
			
		||||
@@ -8,7 +8,6 @@ set (IXBOTS_SOURCES
 | 
			
		||||
    ixbots/IXCobraToSentryBot.cpp
 | 
			
		||||
    ixbots/IXCobraToStatsdBot.cpp
 | 
			
		||||
    ixbots/IXCobraToStdoutBot.cpp
 | 
			
		||||
    ixbots/IXQueueManager.cpp
 | 
			
		||||
    ixbots/IXStatsdClient.cpp
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -17,7 +16,6 @@ set (IXBOTS_HEADERS
 | 
			
		||||
    ixbots/IXCobraToSentryBot.h
 | 
			
		||||
    ixbots/IXCobraToStatsdBot.h
 | 
			
		||||
    ixbots/IXCobraToStdoutBot.h
 | 
			
		||||
    ixbots/IXQueueManager.h
 | 
			
		||||
    ixbots/IXStatsdClient.h
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,6 @@
 | 
			
		||||
 | 
			
		||||
#include "IXCobraBot.h"
 | 
			
		||||
 | 
			
		||||
#include "IXQueueManager.h"
 | 
			
		||||
#include <ixcobra/IXCobraConnection.h>
 | 
			
		||||
#include <ixcore/utils/IXCoreLogger.h>
 | 
			
		||||
 | 
			
		||||
@@ -23,8 +22,6 @@ namespace ix
 | 
			
		||||
                          const std::string& filter,
 | 
			
		||||
                          const std::string& position,
 | 
			
		||||
                          bool verbose,
 | 
			
		||||
                          size_t maxQueueSize,
 | 
			
		||||
                          bool useQueue,
 | 
			
		||||
                          bool enableHeartbeat,
 | 
			
		||||
                          int runtime)
 | 
			
		||||
    {
 | 
			
		||||
@@ -43,8 +40,6 @@ namespace ix
 | 
			
		||||
        std::atomic<bool> throttled(false);
 | 
			
		||||
        std::atomic<bool> fatalCobraError(false);
 | 
			
		||||
 | 
			
		||||
        QueueManager queueManager(maxQueueSize);
 | 
			
		||||
 | 
			
		||||
        auto timer = [&sentCount,
 | 
			
		||||
                      &receivedCount,
 | 
			
		||||
                      &sentCountTotal,
 | 
			
		||||
@@ -114,40 +109,6 @@ namespace ix
 | 
			
		||||
 | 
			
		||||
        std::thread t2(heartbeat);
 | 
			
		||||
 | 
			
		||||
        auto sender =
 | 
			
		||||
            [this, &queueManager, verbose, &sentCount, &stop, &throttled, &fatalCobraError] {
 | 
			
		||||
                while (true)
 | 
			
		||||
                {
 | 
			
		||||
                    auto data = queueManager.pop();
 | 
			
		||||
                    Json::Value msg = data.first;
 | 
			
		||||
                    std::string position = data.second;
 | 
			
		||||
 | 
			
		||||
                    if (stop) break;
 | 
			
		||||
                    if (msg.isNull()) continue;
 | 
			
		||||
 | 
			
		||||
                    if (_onBotMessageCallback &&
 | 
			
		||||
                        _onBotMessageCallback(msg, position, verbose, throttled, fatalCobraError))
 | 
			
		||||
                    {
 | 
			
		||||
                        // That might be too noisy
 | 
			
		||||
                        if (verbose)
 | 
			
		||||
                        {
 | 
			
		||||
                            CoreLogger::info("cobra bot: sending succesfull");
 | 
			
		||||
                        }
 | 
			
		||||
                        ++sentCount;
 | 
			
		||||
                    }
 | 
			
		||||
                    else
 | 
			
		||||
                    {
 | 
			
		||||
                        CoreLogger::error("cobra bot: error sending");
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    if (stop) break;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                CoreLogger::info("sender thread done");
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
        std::thread t3(sender);
 | 
			
		||||
 | 
			
		||||
        std::string subscriptionPosition(position);
 | 
			
		||||
 | 
			
		||||
        conn.setEventCallback([this,
 | 
			
		||||
@@ -160,8 +121,6 @@ namespace ix
 | 
			
		||||
                               &throttled,
 | 
			
		||||
                               &receivedCount,
 | 
			
		||||
                               &fatalCobraError,
 | 
			
		||||
                               &useQueue,
 | 
			
		||||
                               &queueManager,
 | 
			
		||||
                               &sentCount](const CobraEventPtr& event) {
 | 
			
		||||
            if (event->type == ix::CobraEventType::Open)
 | 
			
		||||
            {
 | 
			
		||||
@@ -190,8 +149,6 @@ namespace ix
 | 
			
		||||
                                verbose,
 | 
			
		||||
                                &throttled,
 | 
			
		||||
                                &receivedCount,
 | 
			
		||||
                                &queueManager,
 | 
			
		||||
                                &useQueue,
 | 
			
		||||
                                &subscriptionPosition,
 | 
			
		||||
                                &fatalCobraError,
 | 
			
		||||
                                &sentCount](const Json::Value& msg, const std::string& position) {
 | 
			
		||||
@@ -211,28 +168,9 @@ namespace ix
 | 
			
		||||
 | 
			
		||||
                                   ++receivedCount;
 | 
			
		||||
 | 
			
		||||
                                   if (useQueue)
 | 
			
		||||
                                   {
 | 
			
		||||
                                       queueManager.add(msg, position);
 | 
			
		||||
                                   }
 | 
			
		||||
                                   else
 | 
			
		||||
                                   {
 | 
			
		||||
                                       if (_onBotMessageCallback &&
 | 
			
		||||
                                   _onBotMessageCallback(
 | 
			
		||||
                                               msg, position, verbose, throttled, fatalCobraError))
 | 
			
		||||
                                       {
 | 
			
		||||
                                           // That might be too noisy
 | 
			
		||||
                                           if (verbose)
 | 
			
		||||
                                           {
 | 
			
		||||
                                               CoreLogger::info("cobra bot: sending succesfull");
 | 
			
		||||
                                           }
 | 
			
		||||
                                           ++sentCount;
 | 
			
		||||
                                       }
 | 
			
		||||
                                       else
 | 
			
		||||
                                       {
 | 
			
		||||
                                           CoreLogger::error("cobra bot: error sending");
 | 
			
		||||
                                       }
 | 
			
		||||
                                   }
 | 
			
		||||
                                       msg, position, verbose,
 | 
			
		||||
                                       throttled, fatalCobraError, sentCount);
 | 
			
		||||
                               });
 | 
			
		||||
            }
 | 
			
		||||
            else if (event->type == ix::CobraEventType::Subscribed)
 | 
			
		||||
@@ -308,9 +246,6 @@ namespace ix
 | 
			
		||||
        // heartbeat thread
 | 
			
		||||
        if (t2.joinable()) t2.join();
 | 
			
		||||
 | 
			
		||||
        // sentry sender thread
 | 
			
		||||
        t3.join();
 | 
			
		||||
 | 
			
		||||
        return fatalCobraError ? -1 : (int64_t) sentCount;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -14,11 +14,12 @@
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    using OnBotMessageCallback = std::function<bool(const Json::Value&,
 | 
			
		||||
    using OnBotMessageCallback = std::function<void(const Json::Value&,
 | 
			
		||||
                                                    const std::string&,
 | 
			
		||||
                                                    const bool verbose,
 | 
			
		||||
                                                    std::atomic<bool>&,
 | 
			
		||||
                                                    std::atomic<bool>&)>;
 | 
			
		||||
                                                    std::atomic<bool>&,
 | 
			
		||||
                                                    std::atomic<uint64_t>&)>;
 | 
			
		||||
 | 
			
		||||
    class CobraBot
 | 
			
		||||
    {
 | 
			
		||||
@@ -30,8 +31,6 @@ namespace ix
 | 
			
		||||
                    const std::string& filter,
 | 
			
		||||
                    const std::string& position,
 | 
			
		||||
                    bool verbose,
 | 
			
		||||
                    size_t maxQueueSize,
 | 
			
		||||
                    bool useQueue,
 | 
			
		||||
                    bool enableHeartbeat,
 | 
			
		||||
                    int runtime);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,6 @@
 | 
			
		||||
#include "IXCobraToSentryBot.h"
 | 
			
		||||
 | 
			
		||||
#include "IXCobraBot.h"
 | 
			
		||||
#include "IXQueueManager.h"
 | 
			
		||||
#include <ixcobra/IXCobraConnection.h>
 | 
			
		||||
#include <ixcore/utils/IXCoreLogger.h>
 | 
			
		||||
 | 
			
		||||
@@ -23,7 +22,6 @@ namespace ix
 | 
			
		||||
                                const std::string& position,
 | 
			
		||||
                                SentryClient& sentryClient,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                size_t maxQueueSize,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int runtime)
 | 
			
		||||
    {
 | 
			
		||||
@@ -32,15 +30,14 @@ namespace ix
 | 
			
		||||
                                                    const std::string& /*position*/,
 | 
			
		||||
                                                    const bool verbose,
 | 
			
		||||
                                                    std::atomic<bool>& throttled,
 | 
			
		||||
                                                    std::atomic<bool> &
 | 
			
		||||
                                                    /*fatalCobraError*/) -> bool {
 | 
			
		||||
            auto ret = sentryClient.send(msg, verbose);
 | 
			
		||||
            HttpResponsePtr response = ret.first;
 | 
			
		||||
 | 
			
		||||
                                                    std::atomic<bool>& /*fatalCobraError*/,
 | 
			
		||||
                                                    std::atomic<uint64_t>& sentCount) -> void {
 | 
			
		||||
            sentryClient.send(msg, verbose,
 | 
			
		||||
                [&sentCount, &throttled, &verbose](const HttpResponsePtr& response) {
 | 
			
		||||
                if (!response)
 | 
			
		||||
                {
 | 
			
		||||
                    CoreLogger::warn("Null HTTP Response");
 | 
			
		||||
                return false;
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                if (verbose)
 | 
			
		||||
@@ -65,12 +62,13 @@ namespace ix
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
            bool success = response->statusCode == 200;
 | 
			
		||||
 | 
			
		||||
            if (!success)
 | 
			
		||||
                if (response->statusCode == 200)
 | 
			
		||||
                {
 | 
			
		||||
                    sentCount++;
 | 
			
		||||
                }
 | 
			
		||||
                else
 | 
			
		||||
                {
 | 
			
		||||
                    CoreLogger::error("Error sending data to sentry: " + std::to_string(response->statusCode));
 | 
			
		||||
                CoreLogger::error("Body: " + ret.second);
 | 
			
		||||
                    CoreLogger::error("Response: " + response->payload);
 | 
			
		||||
 | 
			
		||||
                    // Error 429 Too Many Requests
 | 
			
		||||
@@ -98,19 +96,14 @@ namespace ix
 | 
			
		||||
                        throttled = false;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
            return success;
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        bool useQueue = true;
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        return bot.run(config,
 | 
			
		||||
                       channel,
 | 
			
		||||
                       filter,
 | 
			
		||||
                       position,
 | 
			
		||||
                       verbose,
 | 
			
		||||
                       maxQueueSize,
 | 
			
		||||
                       useQueue,
 | 
			
		||||
                       enableHeartbeat,
 | 
			
		||||
                       runtime);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -18,7 +18,6 @@ namespace ix
 | 
			
		||||
                                const std::string& position,
 | 
			
		||||
                                SentryClient& sentryClient,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                size_t maxQueueSize,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int runtime);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,6 @@
 | 
			
		||||
#include "IXCobraToStatsdBot.h"
 | 
			
		||||
 | 
			
		||||
#include "IXCobraBot.h"
 | 
			
		||||
#include "IXQueueManager.h"
 | 
			
		||||
#include "IXStatsdClient.h"
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <ixcobra/IXCobraConnection.h>
 | 
			
		||||
@@ -63,7 +62,6 @@ namespace ix
 | 
			
		||||
                                const std::string& gauge,
 | 
			
		||||
                                const std::string& timer,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                size_t maxQueueSize,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int runtime)
 | 
			
		||||
    {
 | 
			
		||||
@@ -79,7 +77,8 @@ namespace ix
 | 
			
		||||
                                                     const std::string& /*position*/,
 | 
			
		||||
                                                     const bool verbose,
 | 
			
		||||
                                                     std::atomic<bool>& /*throttled*/,
 | 
			
		||||
                                                     std::atomic<bool>& fatalCobraError) -> bool {
 | 
			
		||||
                                                     std::atomic<bool>& fatalCobraError,
 | 
			
		||||
                                                     std::atomic<uint64_t>& sentCount) -> void {
 | 
			
		||||
                std::string id;
 | 
			
		||||
                for (auto&& attr : tokens)
 | 
			
		||||
                {
 | 
			
		||||
@@ -122,7 +121,7 @@ namespace ix
 | 
			
		||||
                    {
 | 
			
		||||
                        CoreLogger::error("Gauge " + gauge + " is not a numeric type");
 | 
			
		||||
                        fatalCobraError = true;
 | 
			
		||||
                        return false;
 | 
			
		||||
                        return;
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    if (verbose)
 | 
			
		||||
@@ -140,18 +139,14 @@ namespace ix
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                return true;
 | 
			
		||||
                sentCount++;
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        bool useQueue = true;
 | 
			
		||||
 | 
			
		||||
        return bot.run(config,
 | 
			
		||||
                       channel,
 | 
			
		||||
                       filter,
 | 
			
		||||
                       position,
 | 
			
		||||
                       verbose,
 | 
			
		||||
                       maxQueueSize,
 | 
			
		||||
                       useQueue,
 | 
			
		||||
                       enableHeartbeat,
 | 
			
		||||
                       runtime);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -22,7 +22,6 @@ namespace ix
 | 
			
		||||
                                const std::string& gauge,
 | 
			
		||||
                                const std::string& timer,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                size_t maxQueueSize,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int runtime);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,6 @@
 | 
			
		||||
#include "IXCobraToStdoutBot.h"
 | 
			
		||||
 | 
			
		||||
#include "IXCobraBot.h"
 | 
			
		||||
#include "IXQueueManager.h"
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <sstream>
 | 
			
		||||
@@ -71,7 +70,6 @@ namespace ix
 | 
			
		||||
                                bool fluentd,
 | 
			
		||||
                                bool quiet,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                size_t maxQueueSize,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int runtime)
 | 
			
		||||
    {
 | 
			
		||||
@@ -83,24 +81,20 @@ namespace ix
 | 
			
		||||
                                            const std::string& position,
 | 
			
		||||
                                            const bool /*verbose*/,
 | 
			
		||||
                                            std::atomic<bool>& /*throttled*/,
 | 
			
		||||
                                            std::atomic<bool> &
 | 
			
		||||
                                            /*fatalCobraError*/) -> bool {
 | 
			
		||||
                                            std::atomic<bool>& /*fatalCobraError*/,
 | 
			
		||||
                                            std::atomic<uint64_t>& sentCount) -> void {
 | 
			
		||||
                if (!quiet)
 | 
			
		||||
                {
 | 
			
		||||
                    writeToStdout(fluentd, jsonWriter, msg, position);
 | 
			
		||||
                }
 | 
			
		||||
                return true;
 | 
			
		||||
                sentCount++;
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        bool useQueue = false;
 | 
			
		||||
 | 
			
		||||
        return bot.run(config,
 | 
			
		||||
                       channel,
 | 
			
		||||
                       filter,
 | 
			
		||||
                       position,
 | 
			
		||||
                       verbose,
 | 
			
		||||
                       maxQueueSize,
 | 
			
		||||
                       useQueue,
 | 
			
		||||
                       enableHeartbeat,
 | 
			
		||||
                       runtime);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -19,7 +19,6 @@ namespace ix
 | 
			
		||||
                                bool fluentd,
 | 
			
		||||
                                bool quiet,
 | 
			
		||||
                                bool verbose,
 | 
			
		||||
                                size_t maxQueueSize,
 | 
			
		||||
                                bool enableHeartbeat,
 | 
			
		||||
                                int runtime);
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -1,67 +0,0 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXQueueManager.cpp
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "IXQueueManager.h"
 | 
			
		||||
 | 
			
		||||
#include <algorithm>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    std::pair<Json::Value, std::string> QueueManager::pop()
 | 
			
		||||
    {
 | 
			
		||||
        std::unique_lock<std::mutex> lock(_mutex);
 | 
			
		||||
 | 
			
		||||
        if (_queues.empty())
 | 
			
		||||
        {
 | 
			
		||||
            Json::Value val;
 | 
			
		||||
            return std::make_pair(val, std::string());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        std::vector<std::string> games;
 | 
			
		||||
        for (auto it : _queues)
 | 
			
		||||
        {
 | 
			
		||||
            games.push_back(it.first);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        std::random_shuffle(games.begin(), games.end());
 | 
			
		||||
        std::string game = games[0];
 | 
			
		||||
 | 
			
		||||
        auto duration = std::chrono::seconds(1);
 | 
			
		||||
        _condition.wait_for(lock, duration);
 | 
			
		||||
 | 
			
		||||
        if (_queues[game].empty())
 | 
			
		||||
        {
 | 
			
		||||
            Json::Value val;
 | 
			
		||||
            return std::make_pair(val, std::string());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        auto msg = _queues[game].front();
 | 
			
		||||
        _queues[game].pop();
 | 
			
		||||
        return msg;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void QueueManager::add(const Json::Value& msg, const std::string& position)
 | 
			
		||||
    {
 | 
			
		||||
        std::unique_lock<std::mutex> lock(_mutex);
 | 
			
		||||
 | 
			
		||||
        std::string game;
 | 
			
		||||
        if (msg.isMember("device") && msg["device"].isMember("game"))
 | 
			
		||||
        {
 | 
			
		||||
            game = msg["device"]["game"].asString();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (game.empty()) return;
 | 
			
		||||
 | 
			
		||||
        // if the sending is not fast enough there is no point
 | 
			
		||||
        // in queuing too many events.
 | 
			
		||||
        if (_queues[game].size() < _maxQueueSize)
 | 
			
		||||
        {
 | 
			
		||||
            _queues[game].push(std::make_pair(msg, position));
 | 
			
		||||
            _condition.notify_one();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
} // namespace ix
 | 
			
		||||
@@ -1,35 +0,0 @@
 | 
			
		||||
/*
 | 
			
		||||
 *  IXQueueManager.h
 | 
			
		||||
 *  Author: Benjamin Sergeant
 | 
			
		||||
 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include <condition_variable>
 | 
			
		||||
#include <json/json.h>
 | 
			
		||||
#include <map>
 | 
			
		||||
#include <mutex>
 | 
			
		||||
#include <queue>
 | 
			
		||||
#include <stddef.h>
 | 
			
		||||
 | 
			
		||||
namespace ix
 | 
			
		||||
{
 | 
			
		||||
    class QueueManager
 | 
			
		||||
    {
 | 
			
		||||
    public:
 | 
			
		||||
        QueueManager(size_t maxQueueSize)
 | 
			
		||||
            : _maxQueueSize(maxQueueSize)
 | 
			
		||||
        {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        std::pair<Json::Value, std::string> pop();
 | 
			
		||||
        void add(const Json::Value& msg, const std::string& position);
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        std::map<std::string, std::queue<std::pair<Json::Value, std::string>>> _queues;
 | 
			
		||||
        std::mutex _mutex;
 | 
			
		||||
        std::condition_variable _condition;
 | 
			
		||||
        size_t _maxQueueSize;
 | 
			
		||||
    };
 | 
			
		||||
} // namespace ix
 | 
			
		||||
@@ -226,20 +226,23 @@ namespace ix
 | 
			
		||||
        return _jsonWriter.write(payload);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg, bool verbose)
 | 
			
		||||
    void SentryClient::send(
 | 
			
		||||
        const Json::Value& msg,
 | 
			
		||||
        bool verbose,
 | 
			
		||||
        const OnResponseCallback& onResponseCallback)
 | 
			
		||||
    {
 | 
			
		||||
        auto args = _httpClient->createRequest();
 | 
			
		||||
        args->url = _url;
 | 
			
		||||
        args->verb = HttpClient::kPost;
 | 
			
		||||
        args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
 | 
			
		||||
        args->connectTimeout = 60;
 | 
			
		||||
        args->transferTimeout = 5 * 60;
 | 
			
		||||
        args->followRedirects = true;
 | 
			
		||||
        args->verbose = verbose;
 | 
			
		||||
        args->logger = [](const std::string& msg) { CoreLogger::log(msg.c_str()); };
 | 
			
		||||
        args->body = computePayload(msg);
 | 
			
		||||
 | 
			
		||||
        std::string body = computePayload(msg);
 | 
			
		||||
        HttpResponsePtr response = _httpClient->post(_url, body, args);
 | 
			
		||||
 | 
			
		||||
        return std::make_pair(response, body);
 | 
			
		||||
        _httpClient->performRequest(args, onResponseCallback);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // https://sentry.io/api/12345/minidump?sentry_key=abcdefgh");
 | 
			
		||||
 
 | 
			
		||||
@@ -21,12 +21,9 @@ namespace ix
 | 
			
		||||
        SentryClient(const std::string& dsn);
 | 
			
		||||
        ~SentryClient() = default;
 | 
			
		||||
 | 
			
		||||
        std::pair<HttpResponsePtr, std::string> send(const Json::Value& msg, bool verbose);
 | 
			
		||||
 | 
			
		||||
        Json::Value parseLuaStackTrace(const std::string& stack);
 | 
			
		||||
 | 
			
		||||
        // Mostly for testing
 | 
			
		||||
        void setTLSOptions(const SocketTLSOptions& tlsOptions);
 | 
			
		||||
        void send(const Json::Value& msg,
 | 
			
		||||
                  bool verbose,
 | 
			
		||||
                  const OnResponseCallback& onResponseCallback);
 | 
			
		||||
 | 
			
		||||
        void uploadMinidump(const std::string& sentryMetadata,
 | 
			
		||||
                            const std::string& minidumpBytes,
 | 
			
		||||
@@ -39,6 +36,12 @@ namespace ix
 | 
			
		||||
                           bool verbose,
 | 
			
		||||
                           const OnResponseCallback& onResponseCallback);
 | 
			
		||||
 | 
			
		||||
        Json::Value parseLuaStackTrace(const std::string& stack);
 | 
			
		||||
 | 
			
		||||
        // Mostly for testing
 | 
			
		||||
        void setTLSOptions(const SocketTLSOptions& tlsOptions);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private:
 | 
			
		||||
        int64_t getTimestamp();
 | 
			
		||||
        std::string computeAuthHeader();
 | 
			
		||||
 
 | 
			
		||||
@@ -6,4 +6,4 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.5.3"
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.5.4"
 | 
			
		||||
 
 | 
			
		||||
@@ -141,7 +141,6 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
 | 
			
		||||
        std::string filter;
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        bool verbose = true;
 | 
			
		||||
        size_t maxQueueSize = 10;
 | 
			
		||||
        bool enableHeartbeat = false;
 | 
			
		||||
 | 
			
		||||
        // FIXME: try to get this working with https instead of http
 | 
			
		||||
@@ -166,7 +165,6 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
 | 
			
		||||
                                                position,
 | 
			
		||||
                                                sentryClient,
 | 
			
		||||
                                                verbose,
 | 
			
		||||
                                                maxQueueSize,
 | 
			
		||||
                                                enableHeartbeat,
 | 
			
		||||
                                                runtime);
 | 
			
		||||
        //
 | 
			
		||||
 
 | 
			
		||||
@@ -90,7 +90,6 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
 | 
			
		||||
        std::string filter;
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        bool verbose = true;
 | 
			
		||||
        size_t maxQueueSize = 10;
 | 
			
		||||
        bool enableHeartbeat = false;
 | 
			
		||||
 | 
			
		||||
        // Only run the bot for 3 seconds
 | 
			
		||||
@@ -123,7 +122,6 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
 | 
			
		||||
                                                    gauge,
 | 
			
		||||
                                                    timer,
 | 
			
		||||
                                                    verbose,
 | 
			
		||||
                                                    maxQueueSize,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
        //
 | 
			
		||||
 
 | 
			
		||||
@@ -89,7 +89,6 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        bool verbose = true;
 | 
			
		||||
        bool quiet = false;
 | 
			
		||||
        size_t maxQueueSize = 10;
 | 
			
		||||
        bool enableHeartbeat = false;
 | 
			
		||||
 | 
			
		||||
        // Only run the bot for 3 seconds
 | 
			
		||||
@@ -105,7 +104,6 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
 | 
			
		||||
                                                    fluentd,
 | 
			
		||||
                                                    quiet,
 | 
			
		||||
                                                    verbose,
 | 
			
		||||
                                                    maxQueueSize,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
        //
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										10
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -148,7 +148,6 @@ int main(int argc, char** argv)
 | 
			
		||||
    int delayMs = -1;
 | 
			
		||||
    int count = 1;
 | 
			
		||||
    uint32_t maxWaitBetweenReconnectionRetries;
 | 
			
		||||
    size_t maxQueueSize = 100;
 | 
			
		||||
    int pingIntervalSecs = 30;
 | 
			
		||||
    int runtime = -1; // run indefinitely
 | 
			
		||||
 | 
			
		||||
@@ -328,9 +327,6 @@ int main(int argc, char** argv)
 | 
			
		||||
    cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
 | 
			
		||||
    cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
 | 
			
		||||
    cobra2statsd->add_option("--position", position, "Stream position");
 | 
			
		||||
    cobra2statsd->add_option("--queue_size",
 | 
			
		||||
                             maxQueueSize,
 | 
			
		||||
                             "Size of the queue to hold messages before they are sent to Sentry");
 | 
			
		||||
    cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds");
 | 
			
		||||
    addTLSOptions(cobra2statsd);
 | 
			
		||||
    addCobraConfig(cobra2statsd);
 | 
			
		||||
@@ -338,9 +334,6 @@ int main(int argc, char** argv)
 | 
			
		||||
    CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
 | 
			
		||||
    cobra2sentry->fallthrough();
 | 
			
		||||
    cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
 | 
			
		||||
    cobra2sentry->add_option("--queue_size",
 | 
			
		||||
                             maxQueueSize,
 | 
			
		||||
                             "Size of the queue to hold messages before they are sent to Sentry");
 | 
			
		||||
    cobra2sentry->add_option("channel", channel, "Channel")->required();
 | 
			
		||||
    cobra2sentry->add_flag("-v", verbose, "Verbose");
 | 
			
		||||
    cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
 | 
			
		||||
@@ -536,7 +529,6 @@ int main(int argc, char** argv)
 | 
			
		||||
                                                    fluentd,
 | 
			
		||||
                                                    quiet,
 | 
			
		||||
                                                    verbose,
 | 
			
		||||
                                                    maxQueueSize,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
        ret = (int) sentCount;
 | 
			
		||||
@@ -580,7 +572,6 @@ int main(int argc, char** argv)
 | 
			
		||||
                                                    gauge,
 | 
			
		||||
                                                    timer,
 | 
			
		||||
                                                    verbose,
 | 
			
		||||
                                                    maxQueueSize,
 | 
			
		||||
                                                    enableHeartbeat,
 | 
			
		||||
                                                    runtime);
 | 
			
		||||
            }
 | 
			
		||||
@@ -598,7 +589,6 @@ int main(int argc, char** argv)
 | 
			
		||||
                                            position,
 | 
			
		||||
                                            sentryClient,
 | 
			
		||||
                                            verbose,
 | 
			
		||||
                                            maxQueueSize,
 | 
			
		||||
                                            enableHeartbeat,
 | 
			
		||||
                                            runtime);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user