Compare commits

..

1 Commits

Author SHA1 Message Date
d8597b054c bump version for universal windows platform fixes 2020-05-01 11:32:58 -07:00
32 changed files with 349 additions and 249 deletions

View File

@ -12,10 +12,6 @@ set (CMAKE_CXX_STANDARD 14)
set (CXX_STANDARD_REQUIRED ON) set (CXX_STANDARD_REQUIRED ON)
set (CMAKE_CXX_EXTENSIONS OFF) set (CMAKE_CXX_EXTENSIONS OFF)
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
endif()
if (UNIX) if (UNIX)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif() endif()
@ -123,11 +119,6 @@ if (USE_TLS)
if (NOT USE_MBED_TLS AND NOT USE_OPEN_SSL) # unless we want something else if (NOT USE_MBED_TLS AND NOT USE_OPEN_SSL) # unless we want something else
set(USE_SECURE_TRANSPORT ON) set(USE_SECURE_TRANSPORT ON)
endif() endif()
# default to mbedtls on uwp (universal windows platform) if nothing is configured
elseif (${CMAKE_SYSTEM_NAME} MATCHES "WindowsStore")
if (NOT USE_OPEN_SSL) # unless we want something else
set(USE_MBED_TLS ON)
endif()
else() # default to OpenSSL on all other platforms else() # default to OpenSSL on all other platforms
if (NOT USE_MBED_TLS) # Unless mbedtls is requested if (NOT USE_MBED_TLS) # Unless mbedtls is requested
set(USE_OPEN_SSL ON) set(USE_OPEN_SSL ON)
@ -153,8 +144,6 @@ add_library( ixwebsocket STATIC
${IXWEBSOCKET_HEADERS} ${IXWEBSOCKET_HEADERS}
) )
add_library ( ixwebsocket::ixwebsocket ALIAS ixwebsocket )
if (USE_TLS) if (USE_TLS)
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_TLS) target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_TLS)
if (USE_MBED_TLS) if (USE_MBED_TLS)
@ -176,28 +165,26 @@ if (USE_TLS)
if (APPLE) if (APPLE)
set(CMAKE_LIBRARY_PATH ${CMAKE_LIBRARY_PATH} /usr/local/opt/openssl/lib) set(CMAKE_LIBRARY_PATH ${CMAKE_LIBRARY_PATH} /usr/local/opt/openssl/lib)
set(CMAKE_INCLUDE_PATH ${CMAKE_INCLUDE_PATH} /usr/local/opt/openssl/include) set(CMAKE_INCLUDE_PATH ${CMAKE_INCLUDE_PATH} /usr/local/opt/openssl/include)
# This is for MacPort OpenSSL 1.0
# set(CMAKE_LIBRARY_PATH ${CMAKE_LIBRARY_PATH} /opt/local/lib/openssl-1.0)
# set(CMAKE_INCLUDE_PATH ${CMAKE_INCLUDE_PATH} /opt/local/include/openssl-1.0)
endif() endif()
# This OPENSSL_FOUND check is to help find a cmake manually configured OpenSSL # This OPENSSL_FOUND check is to help find a cmake manually configured OpenSSL
if (NOT OPENSSL_FOUND) if (NOT OPENSSL_FOUND)
include(FindOpenSSL) find_package(OpenSSL REQUIRED)
endif() endif()
message(STATUS "OpenSSL: " ${OPENSSL_VERSION}) message(STATUS "OpenSSL: " ${OPENSSL_VERSION})
target_link_libraries(ixwebsocket PUBLIC OpenSSL::SSL OpenSSL::Crypto) add_definitions(${OPENSSL_DEFINITIONS})
target_include_directories(ixwebsocket PUBLIC ${OPENSSL_INCLUDE_DIR})
target_link_libraries(ixwebsocket ${OPENSSL_LIBRARIES})
elseif (USE_MBED_TLS) elseif (USE_MBED_TLS)
message(STATUS "TLS configured to use mbedtls") message(STATUS "TLS configured to use mbedtls")
find_package(MbedTLS REQUIRED) find_package(MbedTLS REQUIRED)
target_include_directories(ixwebsocket PUBLIC ${MBEDTLS_INCLUDE_DIRS}) target_include_directories(ixwebsocket PUBLIC ${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixwebsocket PUBLIC ${MBEDTLS_LIBRARIES}) target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES})
elseif (USE_SECURE_TRANSPORT) elseif (USE_SECURE_TRANSPORT)
message(STATUS "TLS configured to use secure transport") message(STATUS "TLS configured to use secure transport")
target_link_libraries(ixwebsocket PUBLIC "-framework foundation" "-framework security") target_link_libraries(ixwebsocket "-framework foundation" "-framework security")
endif() endif()
endif() endif()
@ -207,25 +194,25 @@ if (NOT ZLIB_FOUND)
endif() endif()
if (ZLIB_FOUND) if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS}) include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket PUBLIC ${ZLIB_LIBRARIES}) target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
else() else()
include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib) include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib)
add_subdirectory(third_party/zlib EXCLUDE_FROM_ALL) add_subdirectory(third_party/zlib)
target_link_libraries(ixwebsocket PRIVATE $<LINK_ONLY:zlibstatic>) target_link_libraries(ixwebsocket zlibstatic)
endif() endif()
if (WIN32) if (WIN32)
target_link_libraries(ixwebsocket PUBLIC wsock32 ws2_32 shlwapi) target_link_libraries(ixwebsocket wsock32 ws2_32 shlwapi)
add_definitions(-D_CRT_SECURE_NO_WARNINGS) add_definitions(-D_CRT_SECURE_NO_WARNINGS)
if (USE_TLS) if (USE_TLS)
target_link_libraries(ixwebsocket PUBLIC Crypt32) target_link_libraries(ixwebsocket Crypt32)
endif() endif()
endif() endif()
if (UNIX) if (UNIX)
find_package(Threads) find_package(Threads)
target_link_libraries(ixwebsocket PUBLIC ${CMAKE_THREAD_LIBS_INIT}) target_link_libraries(ixwebsocket ${CMAKE_THREAD_LIBS_INIT})
endif() endif()
@ -238,18 +225,15 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
target_compile_options(ixwebsocket PRIVATE /MP) target_compile_options(ixwebsocket PRIVATE /MP)
endif() endif()
target_include_directories(ixwebsocket PUBLIC $<BUILD_INTERFACE:${IXWEBSOCKET_INCLUDE_DIRS}> $<INSTALL_INTERFACE:include/ixwebsocket>) target_include_directories(ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS})
set_target_properties(ixwebsocket PROPERTIES PUBLIC_HEADER "${IXWEBSOCKET_HEADERS}") set_target_properties(ixwebsocket PROPERTIES PUBLIC_HEADER "${IXWEBSOCKET_HEADERS}")
install(TARGETS ixwebsocket EXPORT ixwebsocket install(TARGETS ixwebsocket
ARCHIVE DESTINATION ${CMAKE_INSTALL_PREFIX}/lib ARCHIVE DESTINATION ${CMAKE_INSTALL_PREFIX}/lib
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_PREFIX}/include/ixwebsocket/ PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_PREFIX}/include/ixwebsocket/
) )
install(EXPORT ixwebsocket NAMESPACE ixwebsocket:: DESTINATION lib/cmake/ixwebsocket)
export(EXPORT ixwebsocket NAMESPACE ixwebsocket:: FILE ixwebsocketConfig.cmake)
if (USE_WS OR USE_TEST) if (USE_WS OR USE_TEST)
add_subdirectory(ixcore) add_subdirectory(ixcore)
add_subdirectory(ixcrypto) add_subdirectory(ixcrypto)

View File

@ -45,6 +45,3 @@ IXWebSocket client code is autobahn compliant beginning with the 6.0.0 version.
If your company or project is using this library, feel free to open an issue or PR to amend this list. If your company or project is using this library, feel free to open an issue or PR to amend this list.
- [Machine Zone](https://www.mz.com) - [Machine Zone](https://www.mz.com)
- [dis-light](https://gitlab.com/HCInk/dis-light), a discord library with a node frontend.
- [libDiscordBot](https://github.com/tostc/libDiscordBot/tree/master), a work in progress discord library
- [gwebsocket](https://github.com/norrbotten/gwebsocket), a websocket (lua) module for Garry's Mod

View File

@ -1,21 +1,9 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.5.7] - 2020-05-08 ## [9.5.4] - 2020-05-01
(cmake) default TLS back to mbedtls on Windows Universal Platform (windows) fix build for universal windows platform
## [9.5.6] - 2020-05-06
(cobra bots) add a --heartbeat_timeout option to specify when the bot should terminate because no events are received
## [9.5.5] - 2020-05-06
(openssl tls) when OpenSSL is older than 1.1, register the crypto locking callback to be thread safe. Should fix lots of CI failures
## [9.5.4] - 2020-05-04
(cobra bots) do not use a queue to store messages pending processing, let the bot handle queuing
## [9.5.3] - 2020-04-29 ## [9.5.3] - 2020-04-29

View File

@ -8,6 +8,7 @@ set (IXBOTS_SOURCES
ixbots/IXCobraToSentryBot.cpp ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp ixbots/IXCobraToStatsdBot.cpp
ixbots/IXCobraToStdoutBot.cpp ixbots/IXCobraToStdoutBot.cpp
ixbots/IXQueueManager.cpp
ixbots/IXStatsdClient.cpp ixbots/IXStatsdClient.cpp
) )
@ -16,6 +17,7 @@ set (IXBOTS_HEADERS
ixbots/IXCobraToSentryBot.h ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h ixbots/IXCobraToStatsdBot.h
ixbots/IXCobraToStdoutBot.h ixbots/IXCobraToStdoutBot.h
ixbots/IXQueueManager.h
ixbots/IXStatsdClient.h ixbots/IXStatsdClient.h
) )

View File

@ -6,6 +6,7 @@
#include "IXCobraBot.h" #include "IXCobraBot.h"
#include "IXQueueManager.h"
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcore/utils/IXCoreLogger.h> #include <ixcore/utils/IXCoreLogger.h>
@ -21,14 +22,17 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position, const std::string& position,
bool verbose,
size_t maxQueueSize,
bool useQueue,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure(config); conn.configure(config);
conn.connect(); conn.connect();
Json::FastWriter jsonWriter;
std::atomic<uint64_t> sentCount(0); std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0); std::atomic<uint64_t> receivedCount(0);
uint64_t sentCountTotal(0); uint64_t sentCountTotal(0);
@ -39,6 +43,8 @@ namespace ix
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false); std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, auto timer = [&sentCount,
&receivedCount, &receivedCount,
&sentCountTotal, &sentCountTotal,
@ -79,7 +85,7 @@ namespace ix
std::thread t1(timer); std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat, &heartBeatTimeout, &fatalCobraError] { auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na"); std::string state("na");
if (!enableHeartbeat) return; if (!enableHeartbeat) return;
@ -95,12 +101,11 @@ namespace ix
if (currentState == state) if (currentState == state)
{ {
CoreLogger::error("no messages received or sent for 1 minute, exiting"); CoreLogger::error("no messages received or sent for 1 minute, exiting");
fatalCobraError = true; exit(1);
break;
} }
state = currentState; state = currentState;
auto duration = std::chrono::seconds(heartBeatTimeout); auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
@ -109,6 +114,40 @@ namespace ix
std::thread t2(heartbeat); std::thread t2(heartbeat);
auto sender =
[this, &queueManager, verbose, &sentCount, &stop, &throttled, &fatalCobraError] {
while (true)
{
auto data = queueManager.pop();
Json::Value msg = data.first;
std::string position = data.second;
if (stop) break;
if (msg.isNull()) continue;
if (_onBotMessageCallback &&
_onBotMessageCallback(msg, position, verbose, throttled, fatalCobraError))
{
// That might be too noisy
if (verbose)
{
CoreLogger::info("cobra bot: sending succesfull");
}
++sentCount;
}
else
{
CoreLogger::error("cobra bot: error sending");
}
if (stop) break;
}
CoreLogger::info("sender thread done");
};
std::thread t3(sender);
std::string subscriptionPosition(position); std::string subscriptionPosition(position);
conn.setEventCallback([this, conn.setEventCallback([this,
@ -116,9 +155,13 @@ namespace ix
&channel, &channel,
&filter, &filter,
&subscriptionPosition, &subscriptionPosition,
&jsonWriter,
verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&fatalCobraError, &fatalCobraError,
&useQueue,
&queueManager,
&sentCount](const CobraEventPtr& event) { &sentCount](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open) if (event->type == ix::CobraEventType::Open)
{ {
@ -143,11 +186,21 @@ namespace ix
filter, filter,
subscriptionPosition, subscriptionPosition,
[this, [this,
&jsonWriter,
verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&queueManager,
&useQueue,
&subscriptionPosition, &subscriptionPosition,
&fatalCobraError, &fatalCobraError,
&sentCount](const Json::Value& msg, const std::string& position) { &sentCount](const Json::Value& msg, const std::string& position) {
if (verbose)
{
CoreLogger::info("Subscriber received message "
+ position + " -> " + jsonWriter.write(msg));
}
subscriptionPosition = position; subscriptionPosition = position;
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
@ -158,9 +211,28 @@ namespace ix
++receivedCount; ++receivedCount;
if (useQueue)
{
queueManager.add(msg, position);
}
else
{
if (_onBotMessageCallback &&
_onBotMessageCallback( _onBotMessageCallback(
msg, position, throttled, msg, position, verbose, throttled, fatalCobraError))
fatalCobraError, sentCount); {
// That might be too noisy
if (verbose)
{
CoreLogger::info("cobra bot: sending succesfull");
}
++sentCount;
}
else
{
CoreLogger::error("cobra bot: error sending");
}
}
}); });
} }
else if (event->type == ix::CobraEventType::Subscribed) else if (event->type == ix::CobraEventType::Subscribed)
@ -236,6 +308,9 @@ namespace ix
// heartbeat thread // heartbeat thread
if (t2.joinable()) t2.join(); if (t2.joinable()) t2.join();
// sentry sender thread
t3.join();
return fatalCobraError ? -1 : (int64_t) sentCount; return fatalCobraError ? -1 : (int64_t) sentCount;
} }

View File

@ -14,11 +14,11 @@
namespace ix namespace ix
{ {
using OnBotMessageCallback = std::function<void(const Json::Value&, using OnBotMessageCallback = std::function<bool(const Json::Value&,
const std::string&, const std::string&,
const bool verbose,
std::atomic<bool>&, std::atomic<bool>&,
std::atomic<bool>&, std::atomic<bool>&)>;
std::atomic<uint64_t>&)>;
class CobraBot class CobraBot
{ {
@ -29,8 +29,10 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position, const std::string& position,
bool verbose,
size_t maxQueueSize,
bool useQueue,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
void setOnBotMessageCallback(const OnBotMessageCallback& callback); void setOnBotMessageCallback(const OnBotMessageCallback& callback);

View File

@ -7,6 +7,7 @@
#include "IXCobraToSentryBot.h" #include "IXCobraToSentryBot.h"
#include "IXCobraBot.h" #include "IXCobraBot.h"
#include "IXQueueManager.h"
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcore/utils/IXCoreLogger.h> #include <ixcore/utils/IXCoreLogger.h>
@ -22,31 +23,54 @@ namespace ix
const std::string& position, const std::string& position,
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose,
size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
CobraBot bot; CobraBot bot;
bot.setOnBotMessageCallback([&sentryClient, &verbose](const Json::Value& msg, bot.setOnBotMessageCallback([&sentryClient](const Json::Value& msg,
const std::string& /*position*/, const std::string& /*position*/,
const bool verbose,
std::atomic<bool>& throttled, std::atomic<bool>& throttled,
std::atomic<bool>& /*fatalCobraError*/, std::atomic<bool> &
std::atomic<uint64_t>& sentCount) -> void { /*fatalCobraError*/) -> bool {
sentryClient.send(msg, verbose, auto ret = sentryClient.send(msg, verbose);
[&sentCount, &throttled](const HttpResponsePtr& response) { HttpResponsePtr response = ret.first;
if (!response) if (!response)
{ {
CoreLogger::warn("Null HTTP Response"); CoreLogger::warn("Null HTTP Response");
return; return false;
} }
if (response->statusCode == 200) if (verbose)
{ {
sentCount++; for (auto it : response->headers)
{
CoreLogger::info(it.first + ": " + it.second);
} }
else
CoreLogger::info("Upload size: " + std::to_string(response->uploadSize));
CoreLogger::info("Download size: " + std::to_string(response->downloadSize));
CoreLogger::info("Status: " + std::to_string(response->statusCode));
if (response->errorCode != HttpErrorCode::Ok)
{
CoreLogger::info("error message: " + response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
CoreLogger::info("payload: " + response->payload);
}
}
bool success = response->statusCode == 200;
if (!success)
{ {
CoreLogger::error("Error sending data to sentry: " + std::to_string(response->statusCode)); CoreLogger::error("Error sending data to sentry: " + std::to_string(response->statusCode));
CoreLogger::error("Body: " + ret.second);
CoreLogger::error("Response: " + response->payload); CoreLogger::error("Response: " + response->payload);
// Error 429 Too Many Requests // Error 429 Too Many Requests
@ -74,15 +98,20 @@ namespace ix
throttled = false; throttled = false;
} }
} }
return success;
}); });
});
bool useQueue = true;
return bot.run(config, return bot.run(config,
channel, channel,
filter, filter,
position, position,
verbose,
maxQueueSize,
useQueue,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} // namespace ix } // namespace ix

View File

@ -18,7 +18,7 @@ namespace ix
const std::string& position, const std::string& position,
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose,
size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
} // namespace ix } // namespace ix

View File

@ -7,6 +7,7 @@
#include "IXCobraToStatsdBot.h" #include "IXCobraToStatsdBot.h"
#include "IXCobraBot.h" #include "IXCobraBot.h"
#include "IXQueueManager.h"
#include "IXStatsdClient.h" #include "IXStatsdClient.h"
#include <chrono> #include <chrono>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
@ -62,8 +63,8 @@ namespace ix
const std::string& gauge, const std::string& gauge,
const std::string& timer, const std::string& timer,
bool verbose, bool verbose,
size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
@ -74,11 +75,11 @@ namespace ix
CobraBot bot; CobraBot bot;
bot.setOnBotMessageCallback( bot.setOnBotMessageCallback(
[&statsdClient, &tokens, &gauge, &timer, &verbose](const Json::Value& msg, [&statsdClient, &tokens, &gauge, &timer](const Json::Value& msg,
const std::string& /*position*/, const std::string& /*position*/,
const bool verbose,
std::atomic<bool>& /*throttled*/, std::atomic<bool>& /*throttled*/,
std::atomic<bool>& fatalCobraError, std::atomic<bool>& fatalCobraError) -> bool {
std::atomic<uint64_t>& sentCount) -> void {
std::string id; std::string id;
for (auto&& attr : tokens) for (auto&& attr : tokens)
{ {
@ -121,7 +122,7 @@ namespace ix
{ {
CoreLogger::error("Gauge " + gauge + " is not a numeric type"); CoreLogger::error("Gauge " + gauge + " is not a numeric type");
fatalCobraError = true; fatalCobraError = true;
return; return false;
} }
if (verbose) if (verbose)
@ -139,15 +140,19 @@ namespace ix
} }
} }
sentCount++; return true;
}); });
bool useQueue = true;
return bot.run(config, return bot.run(config,
channel, channel,
filter, filter,
position, position,
verbose,
maxQueueSize,
useQueue,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} // namespace ix } // namespace ix

View File

@ -22,7 +22,7 @@ namespace ix
const std::string& gauge, const std::string& gauge,
const std::string& timer, const std::string& timer,
bool verbose, bool verbose,
size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
} // namespace ix } // namespace ix

View File

@ -7,6 +7,7 @@
#include "IXCobraToStdoutBot.h" #include "IXCobraToStdoutBot.h"
#include "IXCobraBot.h" #include "IXCobraBot.h"
#include "IXQueueManager.h"
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
@ -69,8 +70,9 @@ namespace ix
const std::string& position, const std::string& position,
bool fluentd, bool fluentd,
bool quiet, bool quiet,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime) int runtime)
{ {
CobraBot bot; CobraBot bot;
@ -79,22 +81,27 @@ namespace ix
bot.setOnBotMessageCallback( bot.setOnBotMessageCallback(
[&fluentd, &quiet, &jsonWriter](const Json::Value& msg, [&fluentd, &quiet, &jsonWriter](const Json::Value& msg,
const std::string& position, const std::string& position,
const bool /*verbose*/,
std::atomic<bool>& /*throttled*/, std::atomic<bool>& /*throttled*/,
std::atomic<bool>& /*fatalCobraError*/, std::atomic<bool> &
std::atomic<uint64_t>& sentCount) -> void { /*fatalCobraError*/) -> bool {
if (!quiet) if (!quiet)
{ {
writeToStdout(fluentd, jsonWriter, msg, position); writeToStdout(fluentd, jsonWriter, msg, position);
} }
sentCount++; return true;
}); });
bool useQueue = false;
return bot.run(config, return bot.run(config,
channel, channel,
filter, filter,
position, position,
verbose,
maxQueueSize,
useQueue,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} // namespace ix } // namespace ix

View File

@ -18,7 +18,8 @@ namespace ix
const std::string& position, const std::string& position,
bool fluentd, bool fluentd,
bool quiet, bool quiet,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,
int heartBeatTimeout,
int runtime); int runtime);
} // namespace ix } // namespace ix

View File

@ -0,0 +1,67 @@
/*
* IXQueueManager.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXQueueManager.h"
#include <algorithm>
#include <vector>
namespace ix
{
std::pair<Json::Value, std::string> QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return std::make_pair(val, std::string());
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
auto duration = std::chrono::seconds(1);
_condition.wait_for(lock, duration);
if (_queues[game].empty())
{
Json::Value val;
return std::make_pair(val, std::string());
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(const Json::Value& msg, const std::string& position)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(std::make_pair(msg, position));
_condition.notify_one();
}
}
} // namespace ix

View File

@ -0,0 +1,35 @@
/*
* IXQueueManager.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <condition_variable>
#include <json/json.h>
#include <map>
#include <mutex>
#include <queue>
#include <stddef.h>
namespace ix
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize)
: _maxQueueSize(maxQueueSize)
{
}
std::pair<Json::Value, std::string> pop();
void add(const Json::Value& msg, const std::string& position);
private:
std::map<std::string, std::queue<std::pair<Json::Value, std::string>>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
};
} // namespace ix

View File

@ -226,23 +226,20 @@ namespace ix
return _jsonWriter.write(payload); return _jsonWriter.write(payload);
} }
void SentryClient::send( std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg, bool verbose)
const Json::Value& msg,
bool verbose,
const OnResponseCallback& onResponseCallback)
{ {
auto args = _httpClient->createRequest(); auto args = _httpClient->createRequest();
args->url = _url;
args->verb = HttpClient::kPost;
args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader(); args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args->connectTimeout = 60; args->connectTimeout = 60;
args->transferTimeout = 5 * 60; args->transferTimeout = 5 * 60;
args->followRedirects = true; args->followRedirects = true;
args->verbose = verbose; args->verbose = verbose;
args->logger = [](const std::string& msg) { CoreLogger::log(msg.c_str()); }; args->logger = [](const std::string& msg) { CoreLogger::log(msg.c_str()); };
args->body = computePayload(msg);
_httpClient->performRequest(args, onResponseCallback); std::string body = computePayload(msg);
HttpResponsePtr response = _httpClient->post(_url, body, args);
return std::make_pair(response, body);
} }
// https://sentry.io/api/12345/minidump?sentry_key=abcdefgh"); // https://sentry.io/api/12345/minidump?sentry_key=abcdefgh");

View File

@ -21,9 +21,12 @@ namespace ix
SentryClient(const std::string& dsn); SentryClient(const std::string& dsn);
~SentryClient() = default; ~SentryClient() = default;
void send(const Json::Value& msg, std::pair<HttpResponsePtr, std::string> send(const Json::Value& msg, bool verbose);
bool verbose,
const OnResponseCallback& onResponseCallback); Json::Value parseLuaStackTrace(const std::string& stack);
// Mostly for testing
void setTLSOptions(const SocketTLSOptions& tlsOptions);
void uploadMinidump(const std::string& sentryMetadata, void uploadMinidump(const std::string& sentryMetadata,
const std::string& minidumpBytes, const std::string& minidumpBytes,
@ -36,12 +39,6 @@ namespace ix
bool verbose, bool verbose,
const OnResponseCallback& onResponseCallback); const OnResponseCallback& onResponseCallback);
Json::Value parseLuaStackTrace(const std::string& stack);
// Mostly for testing
void setTLSOptions(const SocketTLSOptions& tlsOptions);
private: private:
int64_t getTimestamp(); int64_t getTimestamp();
std::string computeAuthHeader(); std::string computeAuthHeader();

View File

@ -6,7 +6,6 @@
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include <cassert>
#include <chrono> #include <chrono>
namespace ix namespace ix
@ -14,8 +13,6 @@ namespace ix
CancellationRequest makeCancellationRequestWithTimeout( CancellationRequest makeCancellationRequestWithTimeout(
int secs, std::atomic<bool>& requestInitCancellation) int secs, std::atomic<bool>& requestInitCancellation)
{ {
assert(secs > 0);
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
auto timeout = std::chrono::seconds(secs); auto timeout = std::chrono::seconds(secs);

View File

@ -25,12 +25,10 @@ namespace ix
const std::string HttpClient::kHead = "HEAD"; const std::string HttpClient::kHead = "HEAD";
const std::string HttpClient::kDel = "DEL"; const std::string HttpClient::kDel = "DEL";
const std::string HttpClient::kPut = "PUT"; const std::string HttpClient::kPut = "PUT";
const std::string HttpClient::kPatch = "PATCH";
HttpClient::HttpClient(bool async) HttpClient::HttpClient(bool async)
: _async(async) : _async(async)
, _stop(false) , _stop(false)
, _forceBody(false)
{ {
if (!_async) return; if (!_async) return;
@ -51,11 +49,6 @@ namespace ix
_tlsOptions = tlsOptions; _tlsOptions = tlsOptions;
} }
void HttpClient::setForceBody(bool value)
{
_forceBody = value;
}
HttpRequestArgsPtr HttpClient::createRequest(const std::string& url, const std::string& verb) HttpRequestArgsPtr HttpClient::createRequest(const std::string& url, const std::string& verb)
{ {
auto request = std::make_shared<HttpRequestArgs>(); auto request = std::make_shared<HttpRequestArgs>();
@ -199,7 +192,7 @@ namespace ix
ss << "User-Agent: " << userAgent() << "\r\n"; ss << "User-Agent: " << userAgent() << "\r\n";
} }
if (verb == kPost || verb == kPut || verb == kPatch || _forceBody) if (verb == kPost || verb == kPut)
{ {
ss << "Content-Length: " << body.size() << "\r\n"; ss << "Content-Length: " << body.size() << "\r\n";
@ -248,7 +241,8 @@ namespace ix
} }
// Make a new cancellation object dealing with transfer timeout // Make a new cancellation object dealing with transfer timeout
isCancellationRequested = makeCancellationRequestWithTimeout(args->transferTimeout, _stop); isCancellationRequested =
makeCancellationRequestWithTimeout(args->transferTimeout, _stop);
if (args->verbose) if (args->verbose)
{ {
@ -567,20 +561,6 @@ namespace ix
return request(url, kPut, body, args); return request(url, kPut, body, args);
} }
HttpResponsePtr HttpClient::patch(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args)
{
return request(url, kPatch, serializeHttpParameters(httpParameters), args);
}
HttpResponsePtr HttpClient::patch(const std::string& url,
const std::string& body,
const HttpRequestArgsPtr args)
{
return request(url, kPatch, body, args);
}
std::string HttpClient::urlEncode(const std::string& value) std::string HttpClient::urlEncode(const std::string& value)
{ {
std::ostringstream escaped; std::ostringstream escaped;

View File

@ -46,19 +46,12 @@ namespace ix
const std::string& body, const std::string& body,
HttpRequestArgsPtr args); HttpRequestArgsPtr args);
HttpResponsePtr patch(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args);
HttpResponsePtr patch(const std::string& url,
const std::string& body,
HttpRequestArgsPtr args);
HttpResponsePtr request(const std::string& url, HttpResponsePtr request(const std::string& url,
const std::string& verb, const std::string& verb,
const std::string& body, const std::string& body,
HttpRequestArgsPtr args, HttpRequestArgsPtr args,
int redirects = 0); int redirects = 0);
void setForceBody(bool value);
// Async API // Async API
HttpRequestArgsPtr createRequest(const std::string& url = std::string(), HttpRequestArgsPtr createRequest(const std::string& url = std::string(),
const std::string& verb = HttpClient::kGet); const std::string& verb = HttpClient::kGet);
@ -85,7 +78,6 @@ namespace ix
const static std::string kHead; const static std::string kHead;
const static std::string kDel; const static std::string kDel;
const static std::string kPut; const static std::string kPut;
const static std::string kPatch;
private: private:
void log(const std::string& msg, HttpRequestArgsPtr args); void log(const std::string& msg, HttpRequestArgsPtr args);
@ -94,6 +86,7 @@ namespace ix
// Async API background thread runner // Async API background thread runner
void run(); void run();
// Async API // Async API
bool _async; bool _async;
std::queue<std::pair<HttpRequestArgsPtr, OnResponseCallback>> _queue; std::queue<std::pair<HttpRequestArgsPtr, OnResponseCallback>> _queue;
@ -106,7 +99,5 @@ namespace ix
std::mutex _mutex; // to protect accessing the _socket (only one socket per client) std::mutex _mutex; // to protect accessing the _socket (only one socket per client)
SocketTLSOptions _tlsOptions; SocketTLSOptions _tlsOptions;
bool _forceBody;
}; };
} // namespace ix } // namespace ix

View File

@ -19,7 +19,6 @@ typedef unsigned long int nfds_t;
#else #else
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h>
#include <netdb.h> #include <netdb.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/ip.h> #include <netinet/ip.h>

View File

@ -85,8 +85,6 @@ namespace ix
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false); std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag; std::once_flag SocketOpenSSL::_openSSLInitFlag;
std::unique_ptr<std::mutex[]> SocketOpenSSL::_openSSLMutexes =
std::make_unique<std::mutex[]>(CRYPTO_num_locks());
SocketOpenSSL::SocketOpenSSL(const SocketTLSOptions& tlsOptions, int fd) SocketOpenSSL::SocketOpenSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd) : Socket(fd)
@ -108,11 +106,6 @@ namespace ix
if (!OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, nullptr)) return; if (!OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, nullptr)) return;
#else #else
(void) OPENSSL_config(nullptr); (void) OPENSSL_config(nullptr);
if (CRYPTO_get_locking_callback() != nullptr)
{
CRYPTO_set_locking_callback(SocketOpenSSL::openSSLLockingCallback);
}
#endif #endif
(void) OpenSSL_add_ssl_algorithms(); (void) OpenSSL_add_ssl_algorithms();
@ -121,21 +114,6 @@ namespace ix
_openSSLInitializationSuccessful = true; _openSSLInitializationSuccessful = true;
} }
void SocketOpenSSL::openSSLLockingCallback(int mode,
int type,
const char* /*file*/,
int /*line*/)
{
if (mode & CRYPTO_LOCK)
{
_openSSLMutexes[type].lock();
}
else
{
_openSSLMutexes[type].unlock();
}
}
std::string SocketOpenSSL::getSSLError(int ret) std::string SocketOpenSSL::getSSLError(int ret)
{ {
unsigned long e; unsigned long e;

View File

@ -49,9 +49,6 @@ namespace ix
bool handleTLSOptions(std::string& errMsg); bool handleTLSOptions(std::string& errMsg);
bool openSSLServerHandshake(std::string& errMsg); bool openSSLServerHandshake(std::string& errMsg);
// Required for OpenSSL < 1.1
static void openSSLLockingCallback(int mode, int type, const char* /*file*/, int /*line*/);
SSL* _ssl_connection; SSL* _ssl_connection;
SSL_CTX* _ssl_context; SSL_CTX* _ssl_context;
const SSL_METHOD* _ssl_method; const SSL_METHOD* _ssl_method;
@ -61,7 +58,6 @@ namespace ix
static std::once_flag _openSSLInitFlag; static std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful; static std::atomic<bool> _openSSLInitializationSuccessful;
static std::unique_ptr<std::mutex[]> _openSSLMutexes;
}; };
} // namespace ix } // namespace ix

View File

@ -44,18 +44,6 @@ namespace ix
return err; return err;
} }
bool UdpSocket::isWaitNeeded()
{
int err = getErrno();
if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
{
return true;
}
return false;
}
void UdpSocket::closeSocket(int fd) void UdpSocket::closeSocket(int fd)
{ {
#ifdef _WIN32 #ifdef _WIN32
@ -74,13 +62,6 @@ namespace ix
return false; return false;
} }
#ifdef _WIN32
unsigned long nonblocking = 1;
ioctlsocket(_sockfd, FIONBIO, &nonblocking);
#else
fcntl(_sockfd, F_SETFL, O_NONBLOCK); // make socket non blocking
#endif
memset(&_server, 0, sizeof(_server)); memset(&_server, 0, sizeof(_server));
_server.sin_family = AF_INET; _server.sin_family = AF_INET;
_server.sin_port = htons(port); _server.sin_port = htons(port);
@ -112,15 +93,4 @@ namespace ix
return (ssize_t)::sendto( return (ssize_t)::sendto(
_sockfd, buffer.data(), buffer.size(), 0, (struct sockaddr*) &_server, sizeof(_server)); _sockfd, buffer.data(), buffer.size(), 0, (struct sockaddr*) &_server, sizeof(_server));
} }
ssize_t UdpSocket::recvfrom(char* buffer, size_t length)
{
#ifdef _WIN32
int addressLen = (int) sizeof(_server);
#else
socklen_t addressLen = (socklen_t) sizeof(_server);
#endif
return (ssize_t)::recvfrom(
_sockfd, buffer, length, 0, (struct sockaddr*) &_server, &addressLen);
}
} // namespace ix } // namespace ix

View File

@ -28,12 +28,9 @@ namespace ix
// Virtual methods // Virtual methods
bool init(const std::string& host, int port, std::string& errMsg); bool init(const std::string& host, int port, std::string& errMsg);
ssize_t sendto(const std::string& buffer); ssize_t sendto(const std::string& buffer);
ssize_t recvfrom(char* buffer, size_t length);
void close(); void close();
static int getErrno(); static int getErrno();
static bool isWaitNeeded();
static void closeSocket(int fd); static void closeSocket(int fd);
private: private:

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.5.7" #define IX_WEBSOCKET_VERSION "9.5.4"

View File

@ -148,7 +148,7 @@ test_tsan_mbedtls:
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)
build_test_openssl: build_test_openssl:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; ninja install) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
test_openssl: build_test_openssl test_openssl: build_test_openssl
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)

View File

@ -141,8 +141,8 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
std::string filter; std::string filter;
std::string position("$"); std::string position("$");
bool verbose = true; bool verbose = true;
size_t maxQueueSize = 10;
bool enableHeartbeat = false; bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// FIXME: try to get this working with https instead of http // FIXME: try to get this working with https instead of http
// to regress the TLS 1.3 OpenSSL bug // to regress the TLS 1.3 OpenSSL bug
@ -166,8 +166,8 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
position, position,
sentryClient, sentryClient,
verbose, verbose,
maxQueueSize,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent

View File

@ -90,8 +90,8 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
std::string filter; std::string filter;
std::string position("$"); std::string position("$");
bool verbose = true; bool verbose = true;
size_t maxQueueSize = 10;
bool enableHeartbeat = false; bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// Only run the bot for 3 seconds // Only run the bot for 3 seconds
int runtime = 3; int runtime = 3;
@ -123,8 +123,8 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
gauge, gauge,
timer, timer,
verbose, verbose,
maxQueueSize,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent

View File

@ -87,9 +87,10 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
std::string filter; std::string filter;
std::string position("$"); std::string position("$");
bool verbose = true;
bool quiet = false; bool quiet = false;
size_t maxQueueSize = 10;
bool enableHeartbeat = false; bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// Only run the bot for 3 seconds // Only run the bot for 3 seconds
int runtime = 3; int runtime = 3;
@ -103,8 +104,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
position, position,
fluentd, fluentd,
quiet, quiet,
verbose,
maxQueueSize,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent

View File

@ -96,7 +96,7 @@ TEST_CASE("subprotocol", "[websocket_subprotocol]")
int attempts = 0; int attempts = 0;
while (!connected) while (!connected)
{ {
REQUIRE(attempts++ < 100); REQUIRE(attempts++ < 10);
ix::msleep(10); ix::msleep(10);
} }

View File

@ -19,8 +19,8 @@
#include <ixwebsocket/IXNetSystem.h> #include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <sstream> #include <sstream>
#include <string> #include <string>
@ -148,9 +148,9 @@ int main(int argc, char** argv)
int delayMs = -1; int delayMs = -1;
int count = 1; int count = 1;
uint32_t maxWaitBetweenReconnectionRetries; uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
int pingIntervalSecs = 30; int pingIntervalSecs = 30;
int runtime = -1; // run indefinitely int runtime = -1; // run indefinitely
int heartBeatTimeout = 60;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option( app->add_option(
@ -288,7 +288,6 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds"); cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds");
cobraSubscribeApp->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobraSubscribeApp); addTLSOptions(cobraSubscribeApp);
addCobraConfig(cobraSubscribeApp); addCobraConfig(cobraSubscribeApp);
@ -329,21 +328,25 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
cobra2statsd->add_option("--position", position, "Stream position"); cobra2statsd->add_option("--position", position, "Stream position");
cobra2statsd->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds"); cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds");
cobra2statsd->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobra2statsd); addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd); addCobraConfig(cobra2statsd);
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry"); CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
cobra2sentry->fallthrough(); cobra2sentry->fallthrough();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position"); cobra2sentry->add_option("--position", position, "Stream position");
cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds"); cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds");
cobra2sentry->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobra2sentry); addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry); addCobraConfig(cobra2sentry);
@ -532,8 +535,9 @@ int main(int argc, char** argv)
position, position,
fluentd, fluentd,
quiet, quiet,
verbose,
maxQueueSize,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
ret = (int) sentCount; ret = (int) sentCount;
} }
@ -576,8 +580,8 @@ int main(int argc, char** argv)
gauge, gauge,
timer, timer,
verbose, verbose,
maxQueueSize,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
} }
@ -594,8 +598,8 @@ int main(int argc, char** argv)
position, position,
sentryClient, sentryClient,
verbose, verbose,
maxQueueSize,
enableHeartbeat, enableHeartbeat,
heartBeatTimeout,
runtime); runtime);
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))