Compare commits

..

7 Commits

24 changed files with 232 additions and 228 deletions

View File

@ -123,8 +123,8 @@ if (USE_TLS)
if (NOT USE_MBED_TLS AND NOT USE_OPEN_SSL) # unless we want something else if (NOT USE_MBED_TLS AND NOT USE_OPEN_SSL) # unless we want something else
set(USE_SECURE_TRANSPORT ON) set(USE_SECURE_TRANSPORT ON)
endif() endif()
# default to mbedtls on uwp (universal windows platform) if nothing is configured # default to mbedtls on windows if nothing is configured
elseif (${CMAKE_SYSTEM_NAME} MATCHES "WindowsStore") elseif (WIN32)
if (NOT USE_OPEN_SSL) # unless we want something else if (NOT USE_OPEN_SSL) # unless we want something else
set(USE_MBED_TLS ON) set(USE_MBED_TLS ON)
endif() endif()

View File

@ -48,3 +48,4 @@ If your company or project is using this library, feel free to open an issue or
- [dis-light](https://gitlab.com/HCInk/dis-light), a discord library with a node frontend. - [dis-light](https://gitlab.com/HCInk/dis-light), a discord library with a node frontend.
- [libDiscordBot](https://github.com/tostc/libDiscordBot/tree/master), a work in progress discord library - [libDiscordBot](https://github.com/tostc/libDiscordBot/tree/master), a work in progress discord library
- [gwebsocket](https://github.com/norrbotten/gwebsocket), a websocket (lua) module for Garry's Mod - [gwebsocket](https://github.com/norrbotten/gwebsocket), a websocket (lua) module for Garry's Mod
- [DisCPP](https://github.com/DisCPP/DisCPP), a simple but feature rich Discord API wrapper

View File

@ -2,7 +2,7 @@ FROM alpine:3.11 as build
RUN apk add --no-cache \ RUN apk add --no-cache \
gcc g++ musl-dev linux-headers \ gcc g++ musl-dev linux-headers \
cmake mbedtls-dev make zlib-dev cmake mbedtls-dev make zlib-dev ninja
RUN addgroup -S app && \ RUN addgroup -S app && \
adduser -S -G app app && \ adduser -S -G app app && \

View File

@ -1,6 +1,22 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.6.1] - 2020-05-17
(windows + tls) mbedtls is the default windows tls backend + add ability to load system certificates with mbdetls on windows
## [9.6.0] - 2020-05-12
(ixbots) add options to limit how many messages per minute should be processed
## [9.5.9] - 2020-05-12
(ixbots) add new class to configure a bot to simplify passing options around
## [9.5.8] - 2020-05-08
(openssl tls) (openssl < 1.1) logic inversion - crypto locking callback are not registered properly
## [9.5.7] - 2020-05-08 ## [9.5.7] - 2020-05-08
(cmake) default TLS back to mbedtls on Windows Universal Platform (cmake) default TLS back to mbedtls on Windows Universal Platform

View File

@ -13,6 +13,7 @@ set (IXBOTS_SOURCES
set (IXBOTS_HEADERS set (IXBOTS_HEADERS
ixbots/IXCobraBot.h ixbots/IXCobraBot.h
ixbots/IXCobraBotConfig.h
ixbots/IXCobraToSentryBot.h ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h ixbots/IXCobraToStatsdBot.h
ixbots/IXCobraToStdoutBot.h ixbots/IXCobraToStdoutBot.h

View File

@ -17,14 +17,18 @@
namespace ix namespace ix
{ {
int64_t CobraBot::run(const CobraConfig& config, int64_t CobraBot::run(const CobraBotConfig& botConfig)
const std::string& channel,
const std::string& filter,
const std::string& position,
bool enableHeartbeat,
int heartBeatTimeout,
int runtime)
{ {
auto config = botConfig.cobraConfig;
auto channel = botConfig.channel;
auto filter = botConfig.filter;
auto position = botConfig.position;
auto enableHeartbeat = botConfig.enableHeartbeat;
auto heartBeatTimeout = botConfig.heartBeatTimeout;
auto runtime = botConfig.runtime;
auto maxEventsPerMinute = botConfig.maxEventsPerMinute;
auto limitReceivedEvents = botConfig.limitReceivedEvents;
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure(config); conn.configure(config);
conn.connect(); conn.connect();
@ -35,9 +39,11 @@ namespace ix
uint64_t receivedCountTotal(0); uint64_t receivedCountTotal(0);
uint64_t sentCountPerSecs(0); uint64_t sentCountPerSecs(0);
uint64_t receivedCountPerSecs(0); uint64_t receivedCountPerSecs(0);
std::atomic<int> receivedCountPerMinutes(0);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false); std::atomic<bool> fatalCobraError(false);
int minuteCounter = 0;
auto timer = [&sentCount, auto timer = [&sentCount,
&receivedCount, &receivedCount,
@ -45,6 +51,8 @@ namespace ix
&receivedCountTotal, &receivedCountTotal,
&sentCountPerSecs, &sentCountPerSecs,
&receivedCountPerSecs, &receivedCountPerSecs,
&receivedCountPerMinutes,
&minuteCounter,
&stop] { &stop] {
while (!stop) while (!stop)
{ {
@ -65,13 +73,19 @@ namespace ix
CoreLogger::info(ss.str()); CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal; receivedCountPerSecs = receivedCount - receivedCountTotal;
sentCountPerSecs = sentCount - receivedCountTotal; sentCountPerSecs = sentCount - sentCountTotal;
receivedCountTotal += receivedCountPerSecs; receivedCountTotal += receivedCountPerSecs;
sentCountTotal += sentCountPerSecs; sentCountTotal += sentCountPerSecs;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (minuteCounter++ == 60)
{
receivedCountPerMinutes = 0;
minuteCounter = 0;
}
} }
CoreLogger::info("timer thread done"); CoreLogger::info("timer thread done");
@ -118,6 +132,9 @@ namespace ix
&subscriptionPosition, &subscriptionPosition,
&throttled, &throttled,
&receivedCount, &receivedCount,
&receivedCountPerMinutes,
maxEventsPerMinute,
limitReceivedEvents,
&fatalCobraError, &fatalCobraError,
&sentCount](const CobraEventPtr& event) { &sentCount](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open) if (event->type == ix::CobraEventType::Open)
@ -139,29 +156,34 @@ namespace ix
CoreLogger::info("Subscribing to " + channel); CoreLogger::info("Subscribing to " + channel);
CoreLogger::info("Subscribing at position " + subscriptionPosition); CoreLogger::info("Subscribing at position " + subscriptionPosition);
CoreLogger::info("Subscribing with filter " + filter); CoreLogger::info("Subscribing with filter " + filter);
conn.subscribe(channel, conn.subscribe(channel, filter, subscriptionPosition,
filter, [&sentCount, &receivedCountPerMinutes,
subscriptionPosition, maxEventsPerMinute, limitReceivedEvents,
[this, &throttled, &receivedCount,
&throttled, &subscriptionPosition, &fatalCobraError,
&receivedCount, this](const Json::Value& msg, const std::string& position) {
&subscriptionPosition, subscriptionPosition = position;
&fatalCobraError, ++receivedCount;
&sentCount](const Json::Value& msg, const std::string& position) {
subscriptionPosition = position;
// If we cannot send to sentry fast enough, drop the message ++receivedCountPerMinutes;
if (throttled) if (limitReceivedEvents)
{ {
return; if (receivedCountPerMinutes > maxEventsPerMinute)
} {
return;
}
}
++receivedCount; // If we cannot send to sentry fast enough, drop the message
if (throttled)
{
return;
}
_onBotMessageCallback( _onBotMessageCallback(
msg, position, throttled, msg, position, throttled,
fatalCobraError, sentCount); fatalCobraError, sentCount);
}); });
} }
else if (event->type == ix::CobraEventType::Subscribed) else if (event->type == ix::CobraEventType::Subscribed)
{ {

View File

@ -8,7 +8,7 @@
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <ixcobra/IXCobraConfig.h> #include "IXCobraBotConfig.h"
#include <json/json.h> #include <json/json.h>
#include <stddef.h> #include <stddef.h>
@ -25,14 +25,7 @@ namespace ix
public: public:
CobraBot() = default; CobraBot() = default;
int64_t run(const CobraConfig& config, int64_t run(const CobraBotConfig& botConfig);
const std::string& channel,
const std::string& filter,
const std::string& position,
bool enableHeartbeat,
int heartBeatTimeout,
int runtime);
void setOnBotMessageCallback(const OnBotMessageCallback& callback); void setOnBotMessageCallback(const OnBotMessageCallback& callback);
private: private:

View File

@ -0,0 +1,31 @@
/*
* IXCobraBotConfig.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <limits>
#include <ixcobra/IXCobraConfig.h>
#ifdef max
#undef max
#endif
namespace ix
{
struct CobraBotConfig
{
CobraConfig cobraConfig;
std::string channel;
std::string filter;
std::string position = std::string("$");
bool enableHeartbeat = true;
int heartBeatTimeout = 60;
int runtime = -1;
int maxEventsPerMinute = std::numeric_limits<int>::max();
bool limitReceivedEvents = false;
};
} // namespace ix

View File

@ -16,15 +16,9 @@
namespace ix namespace ix
{ {
int64_t cobra_to_sentry_bot(const CobraConfig& config, int64_t cobra_to_sentry_bot(const CobraBotConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose)
bool enableHeartbeat,
int heartBeatTimeout,
int runtime)
{ {
CobraBot bot; CobraBot bot;
bot.setOnBotMessageCallback([&sentryClient, &verbose](const Json::Value& msg, bot.setOnBotMessageCallback([&sentryClient, &verbose](const Json::Value& msg,
@ -77,12 +71,6 @@ namespace ix
}); });
}); });
return bot.run(config, return bot.run(config);
channel,
filter,
position,
enableHeartbeat,
heartBeatTimeout,
runtime);
} }
} // namespace ix } // namespace ix

View File

@ -6,19 +6,13 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
#include <ixcobra/IXCobraConfig.h> #include "IXCobraBotConfig.h"
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <string> #include <string>
namespace ix namespace ix
{ {
int64_t cobra_to_sentry_bot(const CobraConfig& config, int64_t cobra_to_sentry_bot(const CobraBotConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient, SentryClient& sentryClient,
bool verbose, bool verbose);
bool enableHeartbeat,
int heartBeatTimeout,
int runtime);
} // namespace ix } // namespace ix

View File

@ -53,23 +53,13 @@ namespace ix
return val; return val;
} }
int64_t cobra_to_statsd_bot(const ix::CobraConfig& config, int64_t cobra_to_statsd_bot(const ix::CobraBotConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
StatsdClient& statsdClient, StatsdClient& statsdClient,
const std::string& fields, const std::string& fields,
const std::string& gauge, const std::string& gauge,
const std::string& timer, const std::string& timer,
bool verbose, bool verbose)
bool enableHeartbeat,
int heartBeatTimeout,
int runtime)
{ {
ix::CobraConnection conn;
conn.configure(config);
conn.connect();
auto tokens = parseFields(fields); auto tokens = parseFields(fields);
CobraBot bot; CobraBot bot;
@ -142,12 +132,6 @@ namespace ix
sentCount++; sentCount++;
}); });
return bot.run(config, return bot.run(config);
channel,
filter,
position,
enableHeartbeat,
heartBeatTimeout,
runtime);
} }
} // namespace ix } // namespace ix

View File

@ -7,22 +7,16 @@
#include <cstdint> #include <cstdint>
#include <ixbots/IXStatsdClient.h> #include <ixbots/IXStatsdClient.h>
#include <ixcobra/IXCobraConfig.h> #include "IXCobraBotConfig.h"
#include <stddef.h> #include <stddef.h>
#include <string> #include <string>
namespace ix namespace ix
{ {
int64_t cobra_to_statsd_bot(const ix::CobraConfig& config, int64_t cobra_to_statsd_bot(const ix::CobraBotConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
StatsdClient& statsdClient, StatsdClient& statsdClient,
const std::string& fields, const std::string& fields,
const std::string& gauge, const std::string& gauge,
const std::string& timer, const std::string& timer,
bool verbose, bool verbose);
bool enableHeartbeat,
int heartBeatTimeout,
int runtime);
} // namespace ix } // namespace ix

View File

@ -63,15 +63,9 @@ namespace ix
} }
} }
int64_t cobra_to_stdout_bot(const CobraConfig& config, int64_t cobra_to_stdout_bot(const ix::CobraBotConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool fluentd, bool fluentd,
bool quiet, bool quiet)
bool enableHeartbeat,
int heartBeatTimeout,
int runtime)
{ {
CobraBot bot; CobraBot bot;
auto jsonWriter = makeStreamWriter(); auto jsonWriter = makeStreamWriter();
@ -89,12 +83,6 @@ namespace ix
sentCount++; sentCount++;
}); });
return bot.run(config, return bot.run(config);
channel,
filter,
position,
enableHeartbeat,
heartBeatTimeout,
runtime);
} }
} // namespace ix } // namespace ix

View File

@ -6,19 +6,13 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
#include <ixcobra/IXCobraConfig.h> #include "IXCobraBotConfig.h"
#include <stddef.h> #include <stddef.h>
#include <string> #include <string>
namespace ix namespace ix
{ {
int64_t cobra_to_stdout_bot(const ix::CobraConfig& config, int64_t cobra_to_stdout_bot(const ix::CobraBotConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool fluentd, bool fluentd,
bool quiet, bool quiet);
bool enableHeartbeat,
int heartBeatTimeout,
int runtime);
} // namespace ix } // namespace ix

View File

@ -43,6 +43,55 @@ namespace ix
mbedtls_pk_init(&_pkey); mbedtls_pk_init(&_pkey);
} }
bool SocketMbedTLS::loadSystemCertificates(std::string& errorMsg)
{
#ifdef _WIN32
DWORD flags = CERT_STORE_READONLY_FLAG | CERT_STORE_OPEN_EXISTING_FLAG |
CERT_SYSTEM_STORE_CURRENT_USER;
HCERTSTORE systemStore = CertOpenStore(CERT_STORE_PROV_SYSTEM, 0, 0, flags, L"Root");
if (!systemStore)
{
errorMsg = "CertOpenStore failed with ";
errorMsg += std::to_string(GetLastError());
return false;
}
PCCERT_CONTEXT certificateIterator = NULL;
int certificateCount = 0;
while (certificateIterator = CertEnumCertificatesInStore(systemStore, certificateIterator))
{
if (certificateIterator->dwCertEncodingType & X509_ASN_ENCODING)
{
int ret = mbedtls_x509_crt_parse(&_cacert,
certificateIterator->pbCertEncoded,
certificateIterator->cbCertEncoded);
if (ret == 0)
{
++certificateCount;
}
}
}
CertFreeCertificateContext(certificateIterator);
CertCloseStore(systemStore, 0);
if (certificateCount == 0)
{
errorMsg = "No certificates found";
return false;
}
return true;
#else
// On macOS we can query the system cert location from the keychain
// On Linux we could try to fetch some local files based on the distribution
// On Android we could use JNI to get to the system certs
return false;
#endif
}
bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg)
{ {
initMBedTLS(); initMBedTLS();
@ -96,13 +145,15 @@ namespace ix
} }
else else
{ {
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
// FIXME: should we call mbedtls_ssl_conf_verify ? // FIXME: should we call mbedtls_ssl_conf_verify ?
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
if (_tlsOptions.isUsingSystemDefaults()) if (_tlsOptions.isUsingSystemDefaults())
{ {
; // FIXME if (!loadSystemCertificates(errMsg))
{
return false;
}
} }
else else
{ {

View File

@ -52,6 +52,7 @@ namespace ix
bool init(const std::string& host, bool isClient, std::string& errMsg); bool init(const std::string& host, bool isClient, std::string& errMsg);
void initMBedTLS(); void initMBedTLS();
bool loadSystemCertificates(std::string& errMsg);
}; };
} // namespace ix } // namespace ix

View File

@ -109,7 +109,7 @@ namespace ix
#else #else
(void) OPENSSL_config(nullptr); (void) OPENSSL_config(nullptr);
if (CRYPTO_get_locking_callback() != nullptr) if (CRYPTO_get_locking_callback() == nullptr)
{ {
CRYPTO_set_locking_callback(SocketOpenSSL::openSSLLockingCallback); CRYPTO_set_locking_callback(SocketOpenSSL::openSSLLockingCallback);
} }

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.5.7" #define IX_WEBSOCKET_VERSION "9.6.1"

View File

@ -26,7 +26,7 @@ brew:
# server side ?) and I can't work-around it easily, so we're using mbedtls on # server side ?) and I can't work-around it easily, so we're using mbedtls on
# Linux for the SSL backend, which works great. # Linux for the SSL backend, which works great.
ws_mbedtls_install: ws_mbedtls_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4 install) mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; ninja install)
ws: ws:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4)

View File

@ -138,11 +138,12 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
std::thread publisherThread(runPublisher, config, channel); std::thread publisherThread(runPublisher, config, channel);
std::string filter; ix::CobraBotConfig cobraBotConfig;
std::string position("$"); cobraBotConfig.cobraConfig = config;
cobraBotConfig.channel = channel;
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
cobraBotConfig.enableHeartbeat = false;
bool verbose = true; bool verbose = true;
bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// FIXME: try to get this working with https instead of http // FIXME: try to get this working with https instead of http
// to regress the TLS 1.3 OpenSSL bug // to regress the TLS 1.3 OpenSSL bug
@ -157,18 +158,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
SentryClient sentryClient(dsn); SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptionsClient); sentryClient.setTLSOptions(tlsOptionsClient);
// Only run the bot for 3 seconds int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
int runtime = 3;
int64_t sentCount = cobra_to_sentry_bot(config,
channel,
filter,
position,
sentryClient,
verbose,
enableHeartbeat,
heartBeatTimeout,
runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -87,14 +87,11 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
std::thread publisherThread(runPublisher, config, channel); std::thread publisherThread(runPublisher, config, channel);
std::string filter; ix::CobraBotConfig cobraBotConfig;
std::string position("$"); cobraBotConfig.cobraConfig = config;
bool verbose = true; cobraBotConfig.channel = channel;
bool enableHeartbeat = false; cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
int heartBeatTimeout = 60; cobraBotConfig.enableHeartbeat = false;
// Only run the bot for 3 seconds
int runtime = 3;
std::string hostname("127.0.0.1"); std::string hostname("127.0.0.1");
// std::string hostname("www.google.com"); // std::string hostname("www.google.com");
@ -113,19 +110,10 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
std::string fields("device.game\ndevice.os_name"); std::string fields("device.game\ndevice.os_name");
std::string gauge; std::string gauge;
std::string timer; std::string timer;
bool verbose = true;
int64_t sentCount = ix::cobra_to_statsd_bot(config, int64_t sentCount =
channel, ix::cobra_to_statsd_bot(cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
enableHeartbeat,
heartBeatTimeout,
runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -85,27 +85,17 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
std::thread publisherThread(runPublisher, config, channel); std::thread publisherThread(runPublisher, config, channel);
std::string filter; ix::CobraBotConfig cobraBotConfig;
std::string position("$"); cobraBotConfig.cobraConfig = config;
cobraBotConfig.channel = channel;
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
cobraBotConfig.enableHeartbeat = false;
bool quiet = false; bool quiet = false;
bool enableHeartbeat = false;
int heartBeatTimeout = 60;
// Only run the bot for 3 seconds
int runtime = 3;
// We could try to capture the output ... not sure how. // We could try to capture the output ... not sure how.
bool fluentd = true; bool fluentd = true;
int64_t sentCount = ix::cobra_to_stdout_bot(config, int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
channel,
filter,
position,
fluentd,
quiet,
enableHeartbeat,
heartBeatTimeout,
runtime);
// //
// We want at least 2 messages to be sent // We want at least 2 messages to be sent
// //

View File

@ -93,10 +93,11 @@ TEST_CASE("subprotocol", "[websocket_subprotocol]")
webSocket.setUrl(url); webSocket.setUrl(url);
webSocket.start(); webSocket.start();
// Give us 3 seconds to connect
int attempts = 0; int attempts = 0;
while (!connected) while (!connected)
{ {
REQUIRE(attempts++ < 100); REQUIRE(attempts++ < 300);
ix::msleep(10); ix::msleep(10);
} }

View File

@ -120,6 +120,7 @@ int main(int argc, char** argv)
std::string logfile; std::string logfile;
ix::SocketTLSOptions tlsOptions; ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig; ix::CobraConfig cobraConfig;
ix::CobraBotConfig cobraBotConfig;
std::string ciphers; std::string ciphers;
std::string redirectUrl; std::string redirectUrl;
bool headersOnly = false; bool headersOnly = false;
@ -149,8 +150,6 @@ int main(int argc, char** argv)
int count = 1; int count = 1;
uint32_t maxWaitBetweenReconnectionRetries; uint32_t maxWaitBetweenReconnectionRetries;
int pingIntervalSecs = 30; int pingIntervalSecs = 30;
int runtime = -1; // run indefinitely
int heartBeatTimeout = 60;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option( app->add_option(
@ -174,6 +173,24 @@ int main(int argc, char** argv)
app->add_option("--rolesecret", cobraConfig.rolesecret, "Role secret")->required(); app->add_option("--rolesecret", cobraConfig.rolesecret, "Role secret")->required();
}; };
auto addCobraBotConfig = [&cobraBotConfig](CLI::App* app) {
app->add_option("--appkey", cobraBotConfig.cobraConfig.appkey, "Appkey")->required();
app->add_option("--endpoint", cobraBotConfig.cobraConfig.endpoint, "Endpoint")->required();
app->add_option("--rolename", cobraBotConfig.cobraConfig.rolename, "Role name")->required();
app->add_option("--rolesecret", cobraBotConfig.cobraConfig.rolesecret, "Role secret")
->required();
app->add_option("--channel", cobraBotConfig.channel, "Channel")->required();
app->add_option("--filter", cobraBotConfig.filter, "Filter");
app->add_option("--position", cobraBotConfig.position, "Position");
app->add_option("--runtime", cobraBotConfig.runtime, "Runtime");
app->add_option("--heartbeat", cobraBotConfig.enableHeartbeat, "Runtime");
app->add_option("--heartbeat_timeout", cobraBotConfig.heartBeatTimeout, "Runtime");
app->add_flag(
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
app->add_option(
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
};
app.add_flag("--version", version, "Print ws version"); app.add_flag("--version", version, "Print ws version");
app.add_option("--logfile", logfile, "path where all logs will be redirected"); app.add_option("--logfile", logfile, "path where all logs will be redirected");
@ -281,16 +298,11 @@ int main(int argc, char** argv)
CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber"); CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber");
cobraSubscribeApp->fallthrough(); cobraSubscribeApp->fallthrough();
cobraSubscribeApp->add_option("--channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_option("--position", position, "Stream position");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix"); cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
cobraSubscribeApp->add_option("--runtime", runtime, "Runtime in seconds");
cobraSubscribeApp->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobraSubscribeApp); addTLSOptions(cobraSubscribeApp);
addCobraConfig(cobraSubscribeApp); addCobraBotConfig(cobraSubscribeApp);
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->fallthrough(); cobraPublish->fallthrough();
@ -324,28 +336,18 @@ int main(int argc, char** argv)
->join(); ->join();
cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer") cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")
->join(); ->join();
cobra2statsd->add_option("channel", channel, "Channel")->required();
cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_flag("-v", verbose, "Verbose");
cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
cobra2statsd->add_option("--position", position, "Stream position");
cobra2statsd->add_option("--runtime", runtime, "Runtime in seconds");
cobra2statsd->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobra2statsd); addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd); addCobraBotConfig(cobra2statsd);
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry"); CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
cobra2sentry->fallthrough(); cobra2sentry->fallthrough();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position");
cobra2sentry->add_option("--runtime", runtime, "Runtime in seconds");
cobra2sentry->add_option("--heartbeat_timeout", heartBeatTimeout, "Heartbeat timeout");
addTLSOptions(cobra2sentry); addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry); addCobraBotConfig(cobra2sentry);
CLI::App* cobra2redisApp = CLI::App* cobra2redisApp =
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis"); app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
@ -456,6 +458,10 @@ int main(int argc, char** argv)
cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true); cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
cobraConfig.socketTLSOptions = tlsOptions; cobraConfig.socketTLSOptions = tlsOptions;
cobraBotConfig.cobraConfig.webSocketPerMessageDeflateOptions =
ix::WebSocketPerMessageDeflateOptions(true);
cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions;
int ret = 1; int ret = 1;
if (app.got_subcommand("transfer")) if (app.got_subcommand("transfer"))
{ {
@ -525,16 +531,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("cobra_subscribe")) else if (app.got_subcommand("cobra_subscribe"))
{ {
bool enableHeartbeat = true; int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
int64_t sentCount = ix::cobra_to_stdout_bot(cobraConfig,
channel,
filter,
position,
fluentd,
quiet,
enableHeartbeat,
heartBeatTimeout,
runtime);
ret = (int) sentCount; ret = (int) sentCount;
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
@ -555,7 +552,6 @@ int main(int argc, char** argv)
} }
else else
{ {
bool enableHeartbeat = true;
ix::StatsdClient statsdClient(hostname, statsdPort, prefix); ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
std::string errMsg; std::string errMsg;
@ -567,36 +563,17 @@ int main(int argc, char** argv)
} }
else else
{ {
ret = (int) ix::cobra_to_statsd_bot(cobraConfig, ret = (int) ix::cobra_to_statsd_bot(
channel, cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
enableHeartbeat,
heartBeatTimeout,
runtime);
} }
} }
} }
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
{ {
bool enableHeartbeat = true;
ix::SentryClient sentryClient(dsn); ix::SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptions); sentryClient.setTLSOptions(tlsOptions);
ret = (int) ix::cobra_to_sentry_bot(cobraConfig, ret = (int) ix::cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
channel,
filter,
position,
sentryClient,
verbose,
enableHeartbeat,
heartBeatTimeout,
runtime);
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))
{ {