Compare commits

..

1 Commits

Author SHA1 Message Date
cc588f9c05 wip / compile but unittest failures 2020-04-14 10:47:10 -07:00
46 changed files with 1079 additions and 1245 deletions

View File

@ -5,6 +5,38 @@ on:
- 'docs/**'
jobs:
linux:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: make test
run: make test
mac_tsan_sectransport:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: make test_tsan
run: make test_tsan
mac_tsan_openssl:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: install openssl
run: brew install openssl
- name: make test
run: make test_tsan_openssl
mac_tsan_mbedtls:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: install mbedtls
run: brew install mbedtls
- name: make test
run: make test_tsan_mbedtls
windows_mbedtls:
runs-on: windows-latest
steps:
@ -12,10 +44,35 @@ jobs:
- uses: seanmiddleditch/gha-setup-vsdevenv@master
- run: |
vcpkg install zlib:x64-windows
- run: |
vcpkg install mbedtls:x64-windows
- run: |
mkdir build
cd build
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_MBED_TLS=1 -DUSE_TLS=1 -DUSE_WS=1 ..
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_MBED_TLS=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 ..
- run: cmake --build build
# Running the unittest does not work, the binary cannot be found
#- run: ../build/test/ixwebsocket_unittest.exe
# working-directory: test
#
# Windows with OpenSSL is working but disabled as it takes 13 minutes (10 for openssl) to build with vcpkg
#
# windows_openssl:
# runs-on: windows-latest
# steps:
# - uses: actions/checkout@v1
# - uses: seanmiddleditch/gha-setup-vsdevenv@master
# - run: |
# vcpkg install zlib:x64-windows
# vcpkg install openssl:x64-windows
# - run: |
# mkdir build
# cd build
# cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_OPEN_SSL=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 ..
# - run: cmake --build build
#
# # Running the unittest does not work, the binary cannot be found
# #- run: ../build/test/ixwebsocket_unittest.exe
# # working-directory: test

View File

@ -44,6 +44,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketHandshake.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXWebSocketMessage.cpp
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp

View File

@ -1,38 +1,6 @@
# Changelog
All changes to this project will be documented in this file.
## [9.3.3] - 2020-04-17
(ixbots) display sent/receive message, per seconds as accumulated
## [9.3.2] - 2020-04-17
(ws) add a --logfile option to configure all logs to go to a file
## [9.3.1] - 2020-04-16
(cobra bots) add a utility class to factor out the common bots features (heartbeat) and move all bots to used it + convert cobra_subscribe to be a bot and add a unittest for it
## [9.3.0] - 2020-04-15
(websocket) add a positive number to the heartbeat message sent, incremented each time the heartbeat is sent
## [9.2.9] - 2020-04-15
(ixcobra) change cobra event callback to use a struct instead of several objects, which is more flexible/extensible
## [9.2.8] - 2020-04-15
(ixcobra) make CobraConnection_EventType an enum class (CobraEventType)
## [9.2.7] - 2020-04-14
(ixsentry) add a library method to upload a payload directly to sentry
## [9.2.6] - 2020-04-14
(ixcobra) snake server / handle invalid incoming json messages + cobra subscriber in fluentd mode insert a created_at timestamp entry
## [9.2.5] - 2020-04-13
(websocket) WebSocketMessagePtr is a unique_ptr instead of a shared_ptr

View File

@ -4,19 +4,15 @@
#
set (IXBOTS_SOURCES
ixbots/IXCobraBot.cpp
ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp
ixbots/IXCobraToStdoutBot.cpp
ixbots/IXQueueManager.cpp
ixbots/IXStatsdClient.cpp
)
set (IXBOTS_HEADERS
ixbots/IXCobraBot.h
ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h
ixbots/IXCobraToStdoutBot.h
ixbots/IXQueueManager.h
ixbots/IXStatsdClient.h
)

View File

@ -1,303 +0,0 @@
/*
* IXCobraBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXCobraBot.h"
#include "IXQueueManager.h"
#include <algorithm>
#include <chrono>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
#include <vector>
namespace ix
{
int64_t CobraBot::run(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool verbose,
size_t maxQueueSize,
bool useQueue,
bool enableHeartbeat,
int runtime)
{
ix::CobraConnection conn;
conn.configure(config);
conn.connect();
Json::FastWriter jsonWriter;
std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<uint64_t> sentCountTotal(0);
std::atomic<uint64_t> receivedCountTotal(0);
std::atomic<uint64_t> sentCountPerSecs(0);
std::atomic<uint64_t> receivedCountPerSecs(0);
std::atomic<bool> stop(false);
std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount,
&receivedCount,
&sentCountTotal,
&receivedCountTotal,
&sentCountPerSecs,
&receivedCountPerSecs,
&stop] {
while (!stop)
{
//
// We cannot write to sentCount and receivedCount
// as those are used externally, so we need to introduce
// our own counters
//
spdlog::info("messages received {} {} sent {} {}",
receivedCountPerSecs,
receivedCountTotal,
sentCountPerSecs,
sentCountTotal);
receivedCountPerSecs = receivedCount - receivedCountTotal;
sentCountPerSecs = sentCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
sentCountTotal += sentCountPerSecs;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("timer thread done");
};
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na");
if (!enableHeartbeat) return;
while (!stop)
{
std::stringstream ss;
ss << "messages received " << receivedCount;
ss << "messages sent " << sentCount;
std::string currentState = ss.str();
if (currentState == state)
{
spdlog::error("no messages received or sent for 1 minute, exiting");
exit(1);
}
state = currentState;
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("heartbeat thread done");
};
std::thread t2(heartbeat);
auto sender =
[this, &queueManager, verbose, &sentCount, &stop, &throttled, &fatalCobraError] {
while (true)
{
auto data = queueManager.pop();
Json::Value msg = data.first;
std::string position = data.second;
if (stop) break;
if (msg.isNull()) continue;
if (_onBotMessageCallback && _onBotMessageCallback(msg, position, verbose, throttled, fatalCobraError))
{
// That might be too noisy
if (verbose)
{
spdlog::info("cobra bot: sending succesfull");
}
++sentCount;
}
else
{
spdlog::error("cobra bot: error sending");
}
if (stop) break;
}
spdlog::info("sender thread done");
};
std::thread t3(sender);
std::string subscriptionPosition(position);
conn.setEventCallback([this,
&conn,
&channel,
&filter,
&subscriptionPosition,
&jsonWriter,
verbose,
&throttled,
&receivedCount,
&fatalCobraError,
&useQueue,
&queueManager,
&sentCount](const CobraEventPtr& event)
{
if (event->type == ix::CobraEventType::Open)
{
spdlog::info("Subscriber connected");
for (auto&& it : event->headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (event->type == ix::CobraEventType::Closed)
{
spdlog::info("Subscriber closed: {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::Authenticated)
{
spdlog::info("Subscriber authenticated");
spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition);
spdlog::info("Using filter: {}", filter);
conn.subscribe(channel,
filter,
subscriptionPosition,
[this, &jsonWriter, verbose, &throttled, &receivedCount, &queueManager, &useQueue, &subscriptionPosition, &fatalCobraError, &sentCount](
const Json::Value& msg, const std::string& position) {
if (verbose)
{
spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
}
subscriptionPosition = position;
// If we cannot send to sentry fast enough, drop the message
if (throttled)
{
return;
}
++receivedCount;
if (useQueue)
{
queueManager.add(msg, position);
}
else
{
if (_onBotMessageCallback && _onBotMessageCallback(msg, position, verbose, throttled, fatalCobraError))
{
// That might be too noisy
if (verbose)
{
spdlog::info("cobra bot: sending succesfull");
}
++sentCount;
}
else
{
spdlog::error("cobra bot: error sending");
}
}
});
}
else if (event->type == ix::CobraEventType::Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::Error)
{
spdlog::error("Subscriber: error {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::Published)
{
spdlog::error("Published message hacked: {}", event->msgId);
}
else if (event->type == ix::CobraEventType::Pong)
{
spdlog::info("Received websocket pong: {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::HandshakeError)
{
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
fatalCobraError = true;
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
fatalCobraError = true;
}
else if (event->type == ix::CobraEventType::SubscriptionError)
{
spdlog::error("Subscriber: Subscription error: {}", event->errMsg);
fatalCobraError = true;
}
});
// Run forever
if (runtime == -1)
{
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
}
}
//
// Cleanup.
// join all the bg threads and stop them.
//
conn.disconnect();
stop = true;
// progress thread
t1.join();
// heartbeat thread
if (t2.joinable()) t2.join();
// sentry sender thread
t3.join();
return fatalCobraError ? -1 : (int64_t) sentCount;
}
void CobraBot::setOnBotMessageCallback(const OnBotMessageCallback& callback)
{
_onBotMessageCallback = callback;
}
}

View File

@ -1,43 +0,0 @@
/*
* IXCobraBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <stddef.h>
#include <json/json.h>
#include <functional>
#include <atomic>
namespace ix
{
using OnBotMessageCallback = std::function<bool(const Json::Value&,
const std::string&,
const bool verbose,
std::atomic<bool>&,
std::atomic<bool>&)>;
class CobraBot
{
public:
CobraBot() = default;
int64_t run(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool verbose,
size_t maxQueueSize,
bool useQueue,
bool enableHeartbeat,
int runtime);
void setOnBotMessageCallback(const OnBotMessageCallback& callback);
private:
OnBotMessageCallback _onBotMessageCallback;
};
}

View File

@ -5,113 +5,301 @@
*/
#include "IXCobraToSentryBot.h"
#include "IXCobraBot.h"
#include "IXQueueManager.h"
#include <chrono>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
#include <vector>
namespace ix
{
int64_t cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient,
bool verbose,
bool strict,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{
CobraBot bot;
bot.setOnBotMessageCallback([&sentryClient](const Json::Value& msg,
const std::string& /*position*/,
const bool verbose,
std::atomic<bool>& throttled,
std::atomic<bool>& /*fatalCobraError*/) -> bool {
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
ix::CobraConnection conn;
conn.configure(config);
conn.connect();
if (!response)
Json::FastWriter jsonWriter;
std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> errorSending(false);
std::atomic<bool> stop(false);
std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop)
{
spdlog::warn("Null HTTP Response");
return false;
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
if (verbose)
spdlog::info("timer thread done");
};
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na");
if (!enableHeartbeat) return;
while (!stop)
{
for (auto it : response->headers)
std::stringstream ss;
ss << "messages received " << receivedCount;
ss << "messages sent " << sentCount;
std::string currentState = ss.str();
if (currentState == state)
{
spdlog::error("no messages received or sent for 1 minute, exiting");
exit(1);
}
state = currentState;
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("heartbeat thread done");
};
std::thread t2(heartbeat);
auto sentrySender =
[&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &sentryClient] {
while (true)
{
Json::Value msg = queueManager.pop();
if (stop) break;
if (msg.isNull()) continue;
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
if (!response)
{
spdlog::warn("Null HTTP Response");
continue;
}
if (verbose)
{
for (auto it : response->headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{
spdlog::info("error message: {}", response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
}
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true;
// Error 429 Too Many Requests
if (response->statusCode == 429)
{
auto retryAfter = response->headers["Retry-After"];
std::stringstream ss;
ss << retryAfter;
int seconds;
ss >> seconds;
if (!ss.eof() || ss.fail())
{
seconds = 30;
spdlog::warn("Error parsing Retry-After header. "
"Using {} for the sleep duration",
seconds);
}
spdlog::warn("Error 429 - Too Many Requests. ws will sleep "
"and retry after {} seconds",
retryAfter);
throttled = true;
auto duration = std::chrono::seconds(seconds);
std::this_thread::sleep_for(duration);
throttled = false;
}
}
else
{
++sentCount;
}
if (stop) break;
}
spdlog::info("sentrySender thread done");
};
std::thread t3(sentrySender);
conn.setEventCallback([&conn,
&channel,
&filter,
&position,
&jsonWriter,
verbose,
&throttled,
&receivedCount,
&fatalCobraError,
&queueManager](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
for (auto it : headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{
spdlog::info("error message: {}", response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
}
bool success = response->statusCode == 200;
if (!success)
if (eventType == ix::CobraConnection_EventType_Closed)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
// Error 429 Too Many Requests
if (response->statusCode == 429)
{
auto retryAfter = response->headers["Retry-After"];
std::stringstream ss;
ss << retryAfter;
int seconds;
ss >> seconds;
if (!ss.eof() || ss.fail())
{
seconds = 30;
spdlog::warn("Error parsing Retry-After header. "
"Using {} for the sleep duration",
seconds);
}
spdlog::warn("Error 429 - Too Many Requests. ws will sleep "
"and retry after {} seconds",
retryAfter);
throttled = true;
auto duration = std::chrono::seconds(seconds);
std::this_thread::sleep_for(duration);
throttled = false;
}
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
position,
[&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
const Json::Value& msg, const std::string& position) {
if (verbose)
{
spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
}
return success;
// If we cannot send to sentry fast enough, drop the message
if (throttled)
{
return;
}
++receivedCount;
queueManager.add(msg);
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
});
bool useQueue = true;
// Run forever
if (runtime == -1)
{
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
return bot.run(config,
channel,
filter,
position,
verbose,
maxQueueSize,
useQueue,
enableHeartbeat,
runtime);
if (strict && errorSending) break;
if (fatalCobraError) break;
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
if (fatalCobraError) break;
}
}
//
// Cleanup.
// join all the bg threads and stop them.
//
conn.disconnect();
stop = true;
// progress thread
t1.join();
// heartbeat thread
if (t2.joinable()) t2.join();
// sentry sender thread
t3.join();
return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount;
}
} // namespace ix

View File

@ -8,17 +8,17 @@
#include <ixcobra/IXCobraConfig.h>
#include <ixsentry/IXSentryClient.h>
#include <string>
#include <cstdint>
namespace ix
{
int64_t cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient,
bool verbose,
bool strict,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix

View File

@ -7,12 +7,14 @@
#include "IXCobraToStatsdBot.h"
#include "IXQueueManager.h"
#include "IXStatsdClient.h"
#include "IXCobraBot.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
#include <vector>
namespace ix
@ -54,18 +56,18 @@ namespace ix
return val;
}
int64_t cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{
ix::CobraConnection conn;
conn.configure(config);
@ -73,85 +75,246 @@ namespace ix
auto tokens = parseFields(fields);
CobraBot bot;
bot.setOnBotMessageCallback([&statsdClient, &tokens, &gauge, &timer](const Json::Value& msg,
const std::string& /*position*/,
const bool verbose,
std::atomic<bool>& /*throttled*/,
std::atomic<bool>& fatalCobraError) -> bool {
std::string id;
for (auto&& attr : tokens)
Json::FastWriter jsonWriter;
std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize);
auto progress = [&sentCount, &receivedCount, &stop] {
while (!stop)
{
id += ".";
auto val = extractAttr(attr, msg);
id += val.asString();
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
if (gauge.empty() && timer.empty())
{
statsdClient.count(id, 1);
}
else
{
std::string attrName = (!gauge.empty()) ? gauge : timer;
auto val = extractAttr(attrName, msg);
size_t x;
spdlog::info("timer thread done");
};
if (val.isInt())
std::thread t1(progress);
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na");
if (!enableHeartbeat) return;
while (!stop)
{
std::stringstream ss;
ss << "messages received " << receivedCount;
ss << "messages sent " << sentCount;
std::string currentState = ss.str();
if (currentState == state)
{
x = (size_t) val.asInt();
spdlog::error("no messages received or sent for 1 minute, exiting");
exit(1);
}
else if (val.isInt64())
state = currentState;
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("heartbeat thread done");
};
std::thread t2(heartbeat);
auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop, &gauge, &timer, &fatalCobraError, &verbose] {
while (true)
{
Json::Value msg = queueManager.pop();
if (stop) return;
if (msg.isNull()) continue;
std::string id;
for (auto&& attr : tokens)
{
x = (size_t) val.asInt64();
id += ".";
auto val = extractAttr(attr, msg);
id += val.asString();
}
else if (val.isUInt())
if (gauge.empty() && timer.empty())
{
x = (size_t) val.asUInt();
}
else if (val.isUInt64())
{
x = (size_t) val.asUInt64();
}
else if (val.isDouble())
{
x = (size_t) val.asUInt64();
statsdClient.count(id, 1);
}
else
{
spdlog::error("Gauge {} is not a numeric type", gauge);
std::string attrName = (!gauge.empty()) ? gauge : timer;
auto val = extractAttr(attrName, msg);
size_t x;
if (val.isInt())
{
x = (size_t) val.asInt();
}
else if (val.isInt64())
{
x = (size_t) val.asInt64();
}
else if (val.isUInt())
{
x = (size_t) val.asUInt();
}
else if (val.isUInt64())
{
x = (size_t) val.asUInt64();
}
else if (val.isDouble())
{
x = (size_t) val.asUInt64();
}
else
{
spdlog::error("Gauge {} is not a numberic type", gauge);
fatalCobraError = true;
break;
}
if (verbose)
{
spdlog::info("{} - {} -> {}", id, attrName, x);
}
if (!gauge.empty())
{
statsdClient.gauge(id, x);
}
else
{
statsdClient.timing(id, x);
}
}
sentCount += 1;
}
};
std::thread t3(statsdSender);
conn.setEventCallback(
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](
ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
for (auto it : headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
if (eventType == ix::CobraConnection_EventType_Closed)
{
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
position,
[&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg, const std::string& position) {
if (verbose)
{
spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
}
receivedCount++;
++receivedCount;
queueManager.add(msg);
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
return false;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
});
if (verbose)
{
spdlog::info("{} - {} -> {}", id, attrName, x);
}
// Run forever
if (runtime == -1)
{
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (!gauge.empty())
{
statsdClient.gauge(id, x);
}
else
{
statsdClient.timing(id, x);
}
if (fatalCobraError) break;
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
return true;
});
if (fatalCobraError) break;
}
}
bool useQueue = true;
//
// Cleanup.
// join all the bg threads and stop them.
//
conn.disconnect();
stop = true;
return bot.run(config,
channel,
filter,
position,
verbose,
maxQueueSize,
useQueue,
enableHeartbeat,
runtime);
// progress thread
t1.join();
// heartbeat thread
if (t2.joinable()) t2.join();
// statsd sender thread
t3.join();
return fatalCobraError ? -1 : (int) sentCount;
}
} // namespace ix

View File

@ -9,20 +9,19 @@
#include <ixbots/IXStatsdClient.h>
#include <string>
#include <stddef.h>
#include <cstdint>
namespace ix
{
int64_t cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix

View File

@ -1,106 +0,0 @@
/*
* IXCobraToStdoutBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXCobraToStdoutBot.h"
#include "IXCobraBot.h"
#include "IXQueueManager.h"
#include <chrono>
#include <spdlog/spdlog.h>
#include <sstream>
#include <iostream>
namespace ix
{
using StreamWriterPtr = std::unique_ptr<Json::StreamWriter>;
StreamWriterPtr makeStreamWriter()
{
Json::StreamWriterBuilder builder;
builder["commentStyle"] = "None";
builder["indentation"] = ""; // will make the JSON object compact
std::unique_ptr<Json::StreamWriter> jsonWriter(builder.newStreamWriter());
return jsonWriter;
}
std::string timeSinceEpoch()
{
std::chrono::system_clock::time_point tp = std::chrono::system_clock::now();
std::chrono::system_clock::duration dtn = tp.time_since_epoch();
std::stringstream ss;
ss << dtn.count() * std::chrono::system_clock::period::num /
std::chrono::system_clock::period::den;
return ss.str();
}
void writeToStdout(bool fluentd,
const StreamWriterPtr& jsonWriter,
const Json::Value& msg,
const std::string& position)
{
Json::Value enveloppe;
if (fluentd)
{
enveloppe["producer"] = "cobra";
enveloppe["consumer"] = "fluentd";
Json::Value nestedMessage(msg);
nestedMessage["position"] = position;
nestedMessage["created_at"] = timeSinceEpoch();
enveloppe["message"] = nestedMessage;
jsonWriter->write(enveloppe, &std::cout);
std::cout << std::endl; // add lf and flush
}
else
{
enveloppe = msg;
std::cout << position << " ";
jsonWriter->write(enveloppe, &std::cout);
std::cout << std::endl;
}
}
int64_t cobra_to_stdout_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool fluentd,
bool quiet,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{
CobraBot bot;
auto jsonWriter = makeStreamWriter();
bot.setOnBotMessageCallback([&fluentd, &quiet, &jsonWriter](const Json::Value& msg,
const std::string& position,
const bool /*verbose*/,
std::atomic<bool>& /*throttled*/,
std::atomic<bool>& /*fatalCobraError*/) -> bool {
if (!quiet)
{
writeToStdout(fluentd, jsonWriter, msg, position);
}
return true;
});
bool useQueue = false;
return bot.run(config,
channel,
filter,
position,
verbose,
maxQueueSize,
useQueue,
enableHeartbeat,
runtime);
}
} // namespace ix

View File

@ -1,25 +0,0 @@
/*
* IXCobraToStdoutBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <string>
#include <stddef.h>
#include <cstdint>
namespace ix
{
int64_t cobra_to_stdout_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool fluentd,
bool quiet,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix

View File

@ -10,14 +10,14 @@
namespace ix
{
std::pair<Json::Value, std::string> QueueManager::pop()
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return std::make_pair(val, std::string());
return val;
}
std::vector<std::string> games;
@ -35,7 +35,7 @@ namespace ix
if (_queues[game].empty())
{
Json::Value val;
return std::make_pair(val, std::string());
return val;
}
auto msg = _queues[game].front();
@ -43,7 +43,7 @@ namespace ix
return msg;
}
void QueueManager::add(const Json::Value& msg, const std::string& position)
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
@ -59,7 +59,7 @@ namespace ix
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(std::make_pair(msg, position));
_queues[game].push(msg);
_condition.notify_one();
}
}

View File

@ -23,11 +23,11 @@ namespace ix
{
}
std::pair<Json::Value, std::string> pop();
void add(const Json::Value& msg, const std::string& position);
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<std::pair<Json::Value, std::string>>> _queues;
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;

View File

@ -14,7 +14,6 @@ set (IXCOBRA_HEADERS
ixcobra/IXCobraMetricsThreadedPublisher.h
ixcobra/IXCobraMetricsPublisher.h
ixcobra/IXCobraConfig.h
ixcobra/IXCobraEventType.h
)
add_library(ixcobra STATIC

View File

@ -87,7 +87,7 @@ namespace ix
_eventCallback = eventCallback;
}
void CobraConnection::invokeEventCallback(ix::CobraEventType eventType,
void CobraConnection::invokeEventCallback(ix::CobraConnectionEventType eventType,
const std::string& errorMsg,
const WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
@ -96,12 +96,7 @@ namespace ix
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
if (_eventCallback)
{
_eventCallback(
std::make_unique<CobraEvent>(eventType,
errorMsg,
headers,
subscriptionId,
msgId));
_eventCallback(eventType, errorMsg, headers, subscriptionId, msgId);
}
}
@ -110,7 +105,7 @@ namespace ix
{
std::stringstream ss;
ss << errorMsg << " : received pdu => " << serializedPdu;
invokeEventCallback(ix::CobraEventType::Error, ss.str());
invokeEventCallback(ix::CobraConnection_EventType_Error, ss.str());
}
void CobraConnection::disconnect()
@ -129,7 +124,7 @@ namespace ix
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
invokeEventCallback(ix::CobraEventType::Open,
invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(),
msg->openInfo.headers);
sendHandshakeMessage();
@ -141,7 +136,7 @@ namespace ix
std::stringstream ss;
ss << "Close code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraEventType::Closed,
invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
@ -171,18 +166,18 @@ namespace ix
}
else if (action == "auth/handshake/error")
{
invokeEventCallback(ix::CobraEventType::HandshakeError,
invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error,
msg->str);
}
else if (action == "auth/authenticate/ok")
{
_authenticated = true;
invokeEventCallback(ix::CobraEventType::Authenticated);
invokeEventCallback(ix::CobraConnection_EventType_Authenticated);
flushQueue();
}
else if (action == "auth/authenticate/error")
{
invokeEventCallback(ix::CobraEventType::AuthenticationError,
invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error,
msg->str);
}
else if (action == "rtm/subscription/data")
@ -198,7 +193,7 @@ namespace ix
}
else if (action == "rtm/subscribe/error")
{
invokeEventCallback(ix::CobraEventType::SubscriptionError,
invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error,
msg->str);
}
else if (action == "rtm/unsubscribe/ok")
@ -239,7 +234,7 @@ namespace ix
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
invokeEventCallback(ix::CobraEventType::Pong, msg->str);
invokeEventCallback(ix::CobraConnection_EventType_Pong);
}
});
}
@ -401,7 +396,7 @@ namespace ix
if (!subscriptionId.isString()) return false;
invokeEventCallback(ix::CobraEventType::Subscribed,
invokeEventCallback(ix::CobraConnection_EventType_Subscribed,
std::string(), WebSocketHttpHeaders(),
subscriptionId.asString());
return true;
@ -419,7 +414,7 @@ namespace ix
if (!subscriptionId.isString()) return false;
invokeEventCallback(ix::CobraEventType::UnSubscribed,
invokeEventCallback(ix::CobraConnection_EventType_UnSubscribed,
std::string(), WebSocketHttpHeaders(),
subscriptionId.asString());
return true;
@ -467,7 +462,7 @@ namespace ix
uint64_t msgId = id.asUInt64();
invokeEventCallback(ix::CobraEventType::Published,
invokeEventCallback(ix::CobraConnection_EventType_Published,
std::string(), WebSocketHttpHeaders(),
std::string(), msgId);

View File

@ -8,8 +8,6 @@
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
#include "IXCobraEventType.h"
#include "IXCobraEvent.h"
#include <json/json.h>
#include <memory>
#include <mutex>
@ -30,6 +28,21 @@ namespace ix
class WebSocket;
struct SocketTLSOptions;
enum CobraConnectionEventType
{
CobraConnection_EventType_Authenticated = 0,
CobraConnection_EventType_Error = 1,
CobraConnection_EventType_Open = 2,
CobraConnection_EventType_Closed = 3,
CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7,
CobraConnection_EventType_Handshake_Error = 8,
CobraConnection_EventType_Authentication_Error = 9,
CobraConnection_EventType_Subscription_Error = 10
};
enum CobraConnectionPublishMode
{
CobraConnection_PublishMode_Immediate = 0,
@ -37,7 +50,11 @@ namespace ix
};
using SubscriptionCallback = std::function<void(const Json::Value&, const std::string&)>;
using EventCallback = std::function<void(const CobraEventPtr&)>;
using EventCallback = std::function<void(CobraConnectionEventType,
const std::string&,
const WebSocketHttpHeaders&,
const std::string&,
uint64_t msgId)>;
using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
using PublishTrackerCallback = std::function<void(bool sent, bool acked)>;
@ -154,7 +171,7 @@ namespace ix
static void invokePublishTrackerCallback(bool sent, bool acked);
/// Invoke event callbacks
void invokeEventCallback(CobraEventType eventType,
void invokeEventCallback(CobraConnectionEventType eventType,
const std::string& errorMsg = std::string(),
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders(),
const std::string& subscriptionId = std::string(),

View File

@ -1,41 +0,0 @@
/*
* IXCobraEvent.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXCobraEventType.h"
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <memory>
#include <string>
#include <cstdint>
namespace ix
{
struct CobraEvent
{
ix::CobraEventType type;
const std::string& errMsg;
const ix::WebSocketHttpHeaders& headers;
const std::string& subscriptionId;
uint64_t msgId; // CobraConnection::MsgId
CobraEvent(ix::CobraEventType t,
const std::string& e,
const ix::WebSocketHttpHeaders& h,
const std::string& s,
uint64_t m)
: type(t)
, errMsg(e)
, headers(h)
, subscriptionId(s)
, msgId(m)
{
;
}
};
using CobraEventPtr = std::unique_ptr<CobraEvent>;
}

View File

@ -1,25 +0,0 @@
/*
* IXCobraEventType.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
enum class CobraEventType
{
Authenticated = 0,
Error = 1,
Open = 2,
Closed = 3,
Subscribed = 4,
UnSubscribed = 5,
Published = 6,
Pong = 7,
HandshakeError = 8,
AuthenticationError = 9,
SubscriptionError = 10
};
}

View File

@ -22,59 +22,53 @@ namespace ix
CobraMetricsThreadedPublisher::CobraMetricsThreadedPublisher() :
_stop(false)
{
_cobra_connection.setEventCallback([](const CobraEventPtr& event)
_cobra_connection.setEventCallback(
[]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
std::stringstream ss;
if (event->type == ix::CobraEventType::Open)
if (eventType == ix::CobraConnection_EventType_Open)
{
ss << "Handshake headers" << std::endl;
for (auto&& it : event->headers)
for (auto it : headers)
{
ss << it.first << ": " << it.second << std::endl;
}
}
else if (event->type == ix::CobraEventType::Authenticated)
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
ss << "Authenticated";
}
else if (event->type == ix::CobraEventType::Error)
else if (eventType == ix::CobraConnection_EventType_Error)
{
ss << "Error: " << event->errMsg;
ss << "Error: " << errMsg;
}
else if (event->type == ix::CobraEventType::Closed)
else if (eventType == ix::CobraConnection_EventType_Closed)
{
ss << "Connection closed: " << event->errMsg;
ss << "Connection closed: " << errMsg;
}
else if (event->type == ix::CobraEventType::Subscribed)
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
ss << "Subscribed through subscription id: " << event->subscriptionId;
ss << "Subscribed through subscription id: " << subscriptionId;
}
else if (event->type == ix::CobraEventType::UnSubscribed)
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
ss << "Unsubscribed through subscription id: " << event->subscriptionId;
ss << "Unsubscribed through subscription id: " << subscriptionId;
}
else if (event->type == ix::CobraEventType::Published)
else if (eventType == ix::CobraConnection_EventType_Published)
{
ss << "Published message " << event->msgId << " acked";
ss << "Published message " << msgId << " acked";
}
else if (event->type == ix::CobraEventType::Pong)
else if (eventType == ix::CobraConnection_EventType_Pong)
{
ss << "Received websocket pong";
}
else if (event->type == ix::CobraEventType::HandshakeError)
{
ss << "Handshake error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
ss << "Authentication error: " << event->errMsg;
}
else if (event->type == ix::CobraEventType::SubscriptionError)
{
ss << "Subscription error: " << event->errMsg;
}
ix::IXCoreLogger::Log(ss.str().c_str());
});

View File

@ -64,8 +64,7 @@ namespace ix
std::string SentryClient::computeAuthHeader()
{
std::string securityHeader("Sentry sentry_version=5");
securityHeader += ",sentry_client=ws/";
securityHeader += std::string(IX_WEBSOCKET_VERSION);
securityHeader += ",sentry_client=ws/1.0.0";
securityHeader += ",sentry_timestamp=" + std::to_string(SentryClient::getTimestamp());
securityHeader += ",sentry_key=" + _publicKey;
securityHeader += ",sentry_secret=" + _secretKey;
@ -287,24 +286,4 @@ namespace ix
_httpClient->performRequest(args, onResponseCallback);
}
void SentryClient::uploadPayload(
const Json::Value& payload,
bool verbose,
const OnResponseCallback& onResponseCallback)
{
auto args = _httpClient->createRequest();
args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args->verb = HttpClient::kPost;
args->connectTimeout = 60;
args->transferTimeout = 5 * 60;
args->followRedirects = true;
args->verbose = verbose;
args->logger = [](const std::string& msg) { ix::IXCoreLogger::Log(msg.c_str()); };
args->url = _url;
args->body = _jsonWriter.write(payload);
_httpClient->performRequest(args, onResponseCallback);
}
} // namespace ix

View File

@ -36,11 +36,6 @@ namespace ix
bool verbose,
const OnResponseCallback& onResponseCallback);
void uploadPayload(
const Json::Value& payload,
bool verbose,
const OnResponseCallback& onResponseCallback);
private:
int64_t getTimestamp();
std::string computeAuthHeader();

View File

@ -251,22 +251,7 @@ namespace snake
const AppConfig& appConfig,
const std::string& str)
{
nlohmann::json pdu;
try
{
pdu = nlohmann::json::parse(str);
}
catch (const nlohmann::json::parse_error& e)
{
std::stringstream ss;
ss << "malformed json pdu: " << e.what() << " -> " << str << "";
nlohmann::json response = {{"body", {{"error", "invalid_json"},
{"reason", ss.str()}}}};
ws->sendText(response.dump());
return;
}
auto pdu = nlohmann::json::parse(str);
auto action = pdu["action"];
if (action == "auth/handshake")

View File

@ -376,8 +376,7 @@ namespace ix
{
if (isCancellationRequested && isCancellationRequested())
{
const std::string errorMsg("Cancellation Requested");
return std::make_pair(false, errorMsg);
return std::make_pair(false, std::string());
}
size_t size = std::min(kChunkSize, length - output.size());
@ -389,8 +388,7 @@ namespace ix
}
else if (ret <= 0 && !Socket::isWaitNeeded())
{
const std::string errorMsg("Recv Error");
return std::make_pair(false, errorMsg);
return std::make_pair(false, std::string());
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
@ -399,8 +397,7 @@ namespace ix
// This way we are not busy looping
if (isReadyToRead(1) == PollResultType::Error)
{
const std::string errorMsg("Poll Error");
return std::make_pair(false, errorMsg);
return std::make_pair(false, std::string());
}
}

View File

@ -30,16 +30,16 @@ namespace ix
, _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
, _enablePong(kDefaultEnablePong)
, _pingIntervalSecs(kDefaultPingIntervalSecs)
, _webSocketMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Message))
, _webSocketErrorMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Error))
, _webSocketOpenMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Open))
, _webSocketCloseMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Close))
{
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) {
_onMessageCallback(
std::make_unique<WebSocketMessage>(WebSocketMessageType::Close,
"",
wireSize,
WebSocketErrorInfo(),
WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
_webSocketCloseMessage->wireSize = wireSize;
_webSocketCloseMessage->closeInfo = WebSocketCloseInfo(code, reason, remote);
_onMessageCallback(_webSocketCloseMessage);
});
}
@ -193,13 +193,8 @@ namespace ix
return status;
}
_onMessageCallback(std::make_unique<WebSocketMessage>(
WebSocketMessageType::Open,
"",
0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers, status.protocol),
WebSocketCloseInfo()));
_webSocketOpenMessage->openInfo = WebSocketOpenInfo(status.uri, status.headers, status.protocol),
_onMessageCallback(_webSocketOpenMessage);
if (_pingIntervalSecs > 0)
{
@ -224,13 +219,8 @@ namespace ix
return status;
}
_onMessageCallback(
std::make_unique<WebSocketMessage>(WebSocketMessageType::Open,
"",
0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
_webSocketOpenMessage->openInfo = WebSocketOpenInfo(status.uri, status.headers);
_onMessageCallback(_webSocketOpenMessage);
if (_pingIntervalSecs > 0)
{
@ -310,12 +300,8 @@ namespace ix
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(std::make_unique<WebSocketMessage>(WebSocketMessageType::Error,
"",
0,
connectErr,
WebSocketOpenInfo(),
WebSocketCloseInfo()));
_webSocketErrorMessage->errorInfo = connectErr;
_onMessageCallback(_webSocketErrorMessage);
}
}
}
@ -381,18 +367,15 @@ namespace ix
break;
}
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
_onMessageCallback(std::make_unique<WebSocketMessage>(webSocketMessageType,
msg,
wireSize,
webSocketErrorInfo,
WebSocketOpenInfo(),
WebSocketCloseInfo(),
binary));
_webSocketMessage->type = webSocketMessageType;
_webSocketMessage->str = msg;
_webSocketMessage->wireSize = wireSize;
_webSocketMessage->errorInfo.decompressionError = decompressionError;
_webSocketMessage->binary = binary;
_onMessageCallback(_webSocketMessage);
WebSocket::invokeTrafficTrackerCallback(wireSize, true);
});

View File

@ -155,6 +155,12 @@ namespace ix
static const int kDefaultPingIntervalSecs;
static const int kDefaultPingTimeoutSecs;
// One message ptr for each message kinds
WebSocketMessagePtr _webSocketMessage;
WebSocketMessagePtr _webSocketErrorMessage;
WebSocketMessagePtr _webSocketOpenMessage;
WebSocketMessagePtr _webSocketCloseMessage;
// Subprotocols
std::vector<std::string> _subProtocols;

View File

@ -18,5 +18,10 @@ namespace ix
int http_status = 0;
std::string reason;
bool decompressionError = false;
WebSocketErrorInfo()
{
;
}
};
} // namespace ix

View File

@ -0,0 +1,12 @@
/*
* IXWebSocketMessage.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketMessage.h"
namespace ix
{
std::string WebSocketMessage::kStr("foo");
}

View File

@ -19,22 +19,23 @@ namespace ix
struct WebSocketMessage
{
WebSocketMessageType type;
const std::string& str;
std::string& str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
bool binary;
static std::string kStr;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c,
size_t w = 0,
WebSocketErrorInfo e = WebSocketErrorInfo(),
WebSocketOpenInfo o = WebSocketOpenInfo(),
WebSocketCloseInfo c = WebSocketCloseInfo(),
bool b = false)
: type(t)
, str(s)
, str(WebSocketMessage::kStr)
, wireSize(w)
, errorInfo(e)
, openInfo(o)
@ -43,6 +44,11 @@ namespace ix
{
;
}
// void setStr(const std::string& s)
// {
// str = std::move(s);
// }
};
using WebSocketMessagePtr = std::unique_ptr<WebSocketMessage>;

View File

@ -6,9 +6,9 @@
#pragma once
#include "IXWebSocketHttpHeaders.h"
#include <cstdint>
#include <string>
#include "IXWebSocketHttpHeaders.h"
namespace ix
{

View File

@ -74,7 +74,6 @@ namespace ix
, _enablePong(kDefaultEnablePong)
, _pingIntervalSecs(kDefaultPingIntervalSecs)
, _pongReceived(false)
, _pingCount(0)
, _lastSendPingTimePoint(std::chrono::steady_clock::now())
{
_readbuf.resize(kChunkSize);
@ -222,8 +221,7 @@ namespace ix
{
_pongReceived = false;
std::stringstream ss;
ss << kPingMessage << "::" << _pingIntervalSecs << "s"
<< "::" << _pingCount++;
ss << kPingMessage << "::" << _pingIntervalSecs << "s";
return sendPing(ss.str());
}

View File

@ -212,7 +212,6 @@ namespace ix
static const int kDefaultPingIntervalSecs;
static const std::string kPingMessage;
std::atomic<uint64_t> _pingCount;
// We record when ping are being sent so that we can know when to send the next one
mutable std::mutex _lastSendPingTimePointMutex;

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "9.3.3"
#define IX_WEBSOCKET_VERSION "9.2.6"

View File

@ -116,12 +116,10 @@ test_ubsan:
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_asan: build_test_asan
(cd test ; python2.7 run.py -r)
build_test_asan:
test_asan:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableAddressSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_tsan_openssl:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_OPEN_SSL=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)

View File

@ -66,7 +66,6 @@ if (UNIX)
IXCobraMetricsPublisherTest.cpp
IXCobraToSentryBotTest.cpp
IXCobraToStatsdBotTest.cpp
IXCobraToStdoutBotTest.cpp
)
endif()

View File

@ -180,40 +180,44 @@ namespace
_conn.configure(_cobraConfig);
_conn.connect();
_conn.setEventCallback([this, channel](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open)
_conn.setEventCallback([this, channel](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
log("Subscriber connected: " + _user);
for (auto&& it : event->headers)
for (auto&& it : headers)
{
log("Headers " + it.first + " " + it.second);
}
}
else if (event->type == ix::CobraEventType::Authenticated)
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated: " + _user);
subscribe(channel);
}
else if (event->type == ix::CobraEventType::Error)
else if (eventType == ix::CobraConnection_EventType_Error)
{
log(event->errMsg + _user);
log(errMsg + _user);
}
else if (event->type == ix::CobraEventType::Closed)
else if (eventType == ix::CobraConnection_EventType_Closed)
{
log("Connection closed: " + _user);
}
else if (event->type == ix::CobraEventType::Subscribed)
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
log("Subscription ok: " + _user + " subscription_id " + event->subscriptionId);
log("Subscription ok: " + _user + " subscription_id " + subscriptionId);
_connectedAndSubscribed = true;
}
else if (event->type == ix::CobraEventType::UnSubscribed)
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
log("Unsubscription ok: " + _user + " subscription_id " + event->subscriptionId);
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
}
else if (event->type == ix::CobraEventType::Published)
else if (eventType == ix::CobraConnection_EventType_Published)
{
TLogger() << "Subscriber: published message acked: " << event->msgId;
TLogger() << "Subscriber: published message acked: " << msgId;
}
});
@ -244,7 +248,11 @@ namespace
ix::msleep(50);
_conn.disconnect();
_conn.setEventCallback([](const CobraEventPtr& /*event*/) {});
_conn.setEventCallback([](ix::CobraConnectionEventType /*eventType*/,
const std::string& /*errMsg*/,
const ix::WebSocketHttpHeaders& /*headers*/,
const std::string& /*subscriptionId*/,
CobraConnection::MsgId /*msgId*/) { ; });
}
} // namespace

View File

@ -54,24 +54,24 @@ namespace
conn.configure(config);
conn.connect();
conn.setEventCallback([&conn, &channel](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open)
conn.setEventCallback([&conn, &channel](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
TLogger() << "Subscriber connected:";
for (auto&& it : event->headers)
for (auto&& it : headers)
{
log("Headers " + it.first + " " + it.second);
}
}
else if (event->type == ix::CobraEventType::Closed)
if (eventType == ix::CobraConnection_EventType_Error)
{
TLogger() << "Subscriber closed:" << event->errMsg;
TLogger() << "Subscriber error:" << errMsg;
}
else if (event->type == ix::CobraEventType::Error)
{
TLogger() << "Subscriber error:" << event->errMsg;
}
else if (event->type == ix::CobraEventType::Authenticated)
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated");
std::string filter;
@ -92,29 +92,29 @@ namespace
gMessageCount++;
});
}
else if (event->type == ix::CobraEventType::Subscribed)
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
TLogger() << "Subscriber: subscribed to channel " << event->subscriptionId;
if (event->subscriptionId == channel)
TLogger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == channel)
{
gSubscriberConnectedAndSubscribed = true;
}
else
{
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (event->type == ix::CobraEventType::UnSubscribed)
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
TLogger() << "Subscriber: ununexpected from channel " << event->subscriptionId;
if (event->subscriptionId != channel)
TLogger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != channel)
{
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (event->type == ix::CobraEventType::Published)
else if (eventType == ix::CobraConnection_EventType_Published)
{
TLogger() << "Subscriber: published message acked: " << event->msgId;
TLogger() << "Subscriber: published message acked: " << msgId;
}
});

View File

@ -141,6 +141,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
std::string filter;
std::string position("$");
bool verbose = true;
bool strict = true;
size_t maxQueueSize = 10;
bool enableHeartbeat = false;
@ -160,15 +161,16 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
// Only run the bot for 3 seconds
int runtime = 3;
int64_t sentCount = cobra_to_sentry_bot(config,
channel,
filter,
position,
sentryClient,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
int sentCount = cobra_to_sentry_bot(config,
channel,
filter,
position,
sentryClient,
verbose,
strict,
maxQueueSize,
enableHeartbeat,
runtime);
//
// We want at least 2 messages to be sent
//

View File

@ -114,18 +114,18 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
std::string gauge;
std::string timer;
int64_t sentCount = ix::cobra_to_statsd_bot(config,
channel,
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
int sentCount = ix::cobra_to_statsd_bot(config,
channel,
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
//
// We want at least 2 messages to be sent
//

View File

@ -1,127 +0,0 @@
/*
* IXCobraToStdoutTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <chrono>
#include <iostream>
#include <ixbots/IXCobraToStdoutBot.h>
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
using namespace ix;
namespace
{
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.configure(config, channel);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true);
Json::Value msg;
msg["fps"] = 60;
cobraMetricsPublisher.setGenericAttributes("game", "ody");
// Wait a bit
ix::msleep(500);
// publish some messages
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
ix::msleep(500);
}
} // namespace
TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// Run the bot for a small amount of time
std::string channel = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::string endpoint = makeCobraEndpoint(port, true);
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
std::thread publisherThread(runPublisher, config, channel);
std::string filter;
std::string position("$");
bool verbose = true;
bool quiet = false;
size_t maxQueueSize = 10;
bool enableHeartbeat = false;
// Only run the bot for 3 seconds
int runtime = 3;
// We could try to capture the output ... not sure how.
bool fluentd = true;
int64_t sentCount = ix::cobra_to_stdout_bot(config,
channel,
filter,
position,
fluentd,
quiet,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
//
// We want at least 2 messages to be sent
//
REQUIRE(sentCount >= 2);
// Give us 1s for all messages to be received
ix::msleep(1000);
spdlog::info("Stopping snake server...");
snakeServer.stop();
spdlog::info("Stopping redis server...");
redisServer.stop();
publisherThread.join();
}
}

View File

@ -49,6 +49,7 @@ add_executable(ws
ws_redis_subscribe.cpp
ws_redis_server.cpp
ws_snake.cpp
ws_cobra_subscribe.cpp
ws_cobra_metrics_publish.cpp
ws_cobra_publish.cpp
ws_cobra_metrics_to_redis.cpp

105
ws/ws.cpp
View File

@ -13,13 +13,11 @@
#include <fstream>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixbots/IXCobraToStdoutBot.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixsentry/IXSentryClient.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXUserAgent.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <string>
@ -84,7 +82,6 @@ int main(int argc, char** argv)
std::string metadata;
std::string project;
std::string key;
std::string logfile;
ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig;
std::string ciphers;
@ -96,6 +93,7 @@ int main(int argc, char** argv)
bool quiet = false;
bool fluentd = false;
bool compress = false;
bool strict = false;
bool stress = false;
bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
@ -142,10 +140,8 @@ int main(int argc, char** argv)
};
app.add_flag("--version", version, "Print ws version");
app.add_option("--logfile", logfile, "path where all logs will be redirected");
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->fallthrough();
sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send")
->required()
@ -155,7 +151,6 @@ int main(int argc, char** argv)
addTLSOptions(sendApp);
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->fallthrough();
receiveApp->add_option("url", url, "Connection url")->required();
receiveApp->add_option("--delay",
delayMs,
@ -165,14 +160,12 @@ int main(int argc, char** argv)
addTLSOptions(receiveApp);
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->fallthrough();
transferApp->add_option("--port", port, "Connection url");
transferApp->add_option("--host", hostname, "Hostname");
transferApp->add_option("--pidfile", pidfile, "Pid file");
addTLSOptions(transferApp);
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->fallthrough();
connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_option("-H", headers, "Header")->join();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
@ -186,12 +179,10 @@ int main(int argc, char** argv)
addTLSOptions(connectApp);
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->fallthrough();
chatApp->add_option("url", url, "Connection url")->required();
chatApp->add_option("user", user, "User name")->required();
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->fallthrough();
echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
@ -201,18 +192,15 @@ int main(int argc, char** argv)
addTLSOptions(echoServerApp);
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->fallthrough();
broadcastServerApp->add_option("--port", port, "Port");
broadcastServerApp->add_option("--host", hostname, "Hostname");
addTLSOptions(broadcastServerApp);
CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong");
pingPongApp->fallthrough();
pingPongApp->add_option("url", url, "Connection url")->required();
addTLSOptions(pingPongApp);
CLI::App* httpClientApp = app.add_subcommand("curl", "HTTP Client");
httpClientApp->fallthrough();
httpClientApp->add_option("url", url, "Connection url")->required();
httpClientApp->add_option("-d", data, "Form data")->join();
httpClientApp->add_option("-F", data, "Form data")->join();
@ -229,7 +217,6 @@ int main(int argc, char** argv)
addTLSOptions(httpClientApp);
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->fallthrough();
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("--password", password, "Password");
@ -238,7 +225,6 @@ int main(int argc, char** argv)
redisPublishApp->add_option("-c", count, "Count");
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->fallthrough();
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("--password", password, "Password");
@ -247,7 +233,6 @@ int main(int argc, char** argv)
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber");
cobraSubscribeApp->fallthrough();
cobraSubscribeApp->add_option("--channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
@ -259,7 +244,6 @@ int main(int argc, char** argv)
addCobraConfig(cobraSubscribeApp);
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->fallthrough();
cobraPublish->add_option("--channel", channel, "Channel")->required();
cobraPublish->add_option("--pidfile", pidfile, "Pid file");
cobraPublish->add_option("path", path, "Path to the file to send")
@ -270,7 +254,6 @@ int main(int argc, char** argv)
CLI::App* cobraMetricsPublish =
app.add_subcommand("cobra_metrics_publish", "Cobra metrics publisher");
cobraMetricsPublish->fallthrough();
cobraMetricsPublish->add_option("--channel", channel, "Channel")->required();
cobraMetricsPublish->add_option("--pidfile", pidfile, "Pid file");
cobraMetricsPublish->add_option("path", path, "Path to the file to send")
@ -281,7 +264,6 @@ int main(int argc, char** argv)
addCobraConfig(cobraMetricsPublish);
CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra metrics to statsd");
cobra2statsd->fallthrough();
cobra2statsd->add_option("--host", hostname, "Statsd host");
cobra2statsd->add_option("--port", statsdPort, "Statsd port");
cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
@ -303,13 +285,13 @@ int main(int argc, char** argv)
addCobraConfig(cobra2statsd);
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
cobra2sentry->fallthrough();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position");
@ -319,7 +301,6 @@ int main(int argc, char** argv)
CLI::App* cobra2redisApp =
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
cobra2redisApp->fallthrough();
cobra2redisApp->add_option("channel", channel, "Channel")->required();
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
@ -331,7 +312,6 @@ int main(int argc, char** argv)
addCobraConfig(cobra2redisApp);
CLI::App* snakeApp = app.add_subcommand("snake", "Snake server");
snakeApp->fallthrough();
snakeApp->add_option("--port", port, "Connection url");
snakeApp->add_option("--host", hostname, "Hostname");
snakeApp->add_option("--pidfile", pidfile, "Pid file");
@ -345,7 +325,6 @@ int main(int argc, char** argv)
addTLSOptions(snakeApp);
CLI::App* httpServerApp = app.add_subcommand("httpd", "HTTP server");
httpServerApp->fallthrough();
httpServerApp->add_option("--port", port, "Port");
httpServerApp->add_option("--host", hostname, "Hostname");
httpServerApp->add_flag("-L", redirect, "Redirect all request to redirect_url");
@ -353,17 +332,14 @@ int main(int argc, char** argv)
addTLSOptions(httpServerApp);
CLI::App* autobahnApp = app.add_subcommand("autobahn", "Test client Autobahn compliance");
autobahnApp->fallthrough();
autobahnApp->add_option("--url", url, "url");
autobahnApp->add_flag("-q", quiet, "Quiet");
CLI::App* redisServerApp = app.add_subcommand("redis_server", "Redis server");
redisServerApp->fallthrough();
redisServerApp->add_option("--port", port, "Port");
redisServerApp->add_option("--host", hostname, "Hostname");
CLI::App* proxyServerApp = app.add_subcommand("proxy_server", "Proxy server");
proxyServerApp->fallthrough();
proxyServerApp->add_option("--port", port, "Port");
proxyServerApp->add_option("--host", hostname, "Hostname");
proxyServerApp->add_option("--remote_host", remoteHost, "Remote Hostname");
@ -371,7 +347,6 @@ int main(int argc, char** argv)
addTLSOptions(proxyServerApp);
CLI::App* minidumpApp = app.add_subcommand("upload_minidump", "Upload a minidump to sentry");
minidumpApp->fallthrough();
minidumpApp->add_option("--minidump", minidump, "Minidump path")
->required()
->check(CLI::ExistingPath);
@ -383,7 +358,6 @@ int main(int argc, char** argv)
minidumpApp->add_flag("-v", verbose, "Verbose");
CLI::App* dnsLookupApp = app.add_subcommand("dnslookup", "DNS lookup");
dnsLookupApp->fallthrough();
dnsLookupApp->add_option("host", hostname, "Hostname")->required();
CLI11_PARSE(app, argc, argv);
@ -404,24 +378,6 @@ int main(int argc, char** argv)
tlsOptions.caFile = "NONE";
}
if (!logfile.empty())
{
try
{
auto fileLogger = spdlog::basic_logger_mt("ws", logfile);
spdlog::set_default_logger(fileLogger);
spdlog::flush_every(std::chrono::seconds(1));
std::cerr << "All logs will be redirected to " << logfile << std::endl;
}
catch (const spdlog::spdlog_ex& ex)
{
std::cerr << "Fatal error, log init failed: " << ex.what() << std::endl;
ix::uninitNetSystem();
return 1;
}
}
// Cobra config
cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
cobraConfig.socketTLSOptions = tlsOptions;
@ -495,18 +451,8 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_subscribe"))
{
bool enableHeartbeat = true;
int64_t sentCount = ix::cobra_to_stdout_bot(cobraConfig,
channel,
filter,
position,
fluentd,
quiet,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
ret = (int) sentCount;
ret = ix::ws_cobra_subscribe_main(
cobraConfig, channel, filter, position, quiet, fluentd, runtime);
}
else if (app.got_subcommand("cobra_publish"))
{
@ -538,18 +484,18 @@ int main(int argc, char** argv)
}
else
{
ret = (int) ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
ret = ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
}
}
}
@ -559,15 +505,16 @@ int main(int argc, char** argv)
ix::SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptions);
ret = (int) ix::cobra_to_sentry_bot(cobraConfig,
channel,
filter,
position,
sentryClient,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
ret = ix::cobra_to_sentry_bot(cobraConfig,
channel,
filter,
position,
sentryClient,
verbose,
strict,
maxQueueSize,
enableHeartbeat,
runtime);
}
else if (app.got_subcommand("cobra_metrics_to_redis"))
{

View File

@ -77,6 +77,14 @@ namespace ix
const std::string& channel,
bool verbose);
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool quiet,
bool fluentd,
int runtime);
int ws_cobra_publish_main(const ix::CobraConfig& appkey,
const std::string& channel,
const std::string& path);

View File

@ -106,17 +106,21 @@ namespace ix
&msgPerSeconds,
&conditionVariableMutex,
&condition,
&queue](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open)
&queue](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
for (auto&& it : event->headers)
for (auto it : headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (event->type == ix::CobraEventType::Authenticated)
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
@ -137,21 +141,21 @@ namespace ix
msgCount++;
});
}
else if (event->type == ix::CobraEventType::Subscribed)
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", event->subscriptionId);
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (event->type == ix::CobraEventType::UnSubscribed)
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", event->subscriptionId);
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (event->type == ix::CobraEventType::Error)
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", event->errMsg);
spdlog::error("Subscriber: error {}", errMsg);
}
else if (event->type == ix::CobraEventType::Published)
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", event->msgId);
spdlog::error("Published message hacked: {}", msgId);
}
});

View File

@ -38,62 +38,54 @@ namespace ix
std::atomic<bool> authenticated(false);
std::atomic<bool> messageAcked(false);
conn.setEventCallback(
[&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open)
{
spdlog::info("Publisher connected");
conn.setEventCallback([&conn, &channel, &data, &authenticated, &messageAcked](
ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Publisher connected");
for (auto&& it : event->headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (event->type == ix::CobraEventType::Closed)
for (auto it : headers)
{
spdlog::info("Subscriber closed: {}", event->errMsg);
spdlog::info("{}: {}", it.first, it.second);
}
else if (event->type == ix::CobraEventType::Authenticated)
{
spdlog::info("Publisher authenticated");
authenticated = true;
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Publisher authenticated");
authenticated = true;
Json::Value channels;
channels[0] = channel;
auto msgId = conn.publish(channels, data);
Json::Value channels;
channels[0] = channel;
auto msgId = conn.publish(channels, data);
spdlog::info("Published msg {}", msgId);
}
else if (event->type == ix::CobraEventType::Subscribed)
{
spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::UnSubscribed)
{
spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::Error)
{
spdlog::error("Publisher: error {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::Published)
{
spdlog::info("Published message id {} acked", event->msgId);
messageAcked = true;
}
else if (event->type == ix::CobraEventType::Pong)
{
spdlog::info("Received websocket pong");
}
else if (event->type == ix::CobraEventType::HandshakeError)
{
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
}
});
spdlog::info("Published msg {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Publisher: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Publisher: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Publisher: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::info("Published message id {} acked", msgId);
messageAcked = true;
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
conn.connect();

203
ws/ws_cobra_subscribe.cpp Normal file
View File

@ -0,0 +1,203 @@
/*
* ws_cobra_subscribe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
using StreamWriterPtr = std::unique_ptr<Json::StreamWriter>;
StreamWriterPtr makeStreamWriter()
{
Json::StreamWriterBuilder builder;
builder["commentStyle"] = "None";
builder["indentation"] = ""; // will make the JSON object compact
std::unique_ptr<Json::StreamWriter> jsonWriter(builder.newStreamWriter());
return jsonWriter;
}
void writeToStdout(bool fluentd,
const StreamWriterPtr& jsonWriter,
const Json::Value& msg,
const std::string& position)
{
Json::Value enveloppe;
if (fluentd)
{
enveloppe["producer"] = "cobra";
enveloppe["consumer"] = "fluentd";
Json::Value msgWithPosition(msg);
msgWithPosition["position"] = position;
enveloppe["message"] = msgWithPosition;
jsonWriter->write(enveloppe, &std::cout);
std::cout << std::endl; // add lf and flush
}
else
{
enveloppe = msg;
std::cout << position << " ";
jsonWriter->write(enveloppe, &std::cout);
std::cout << std::endl;
}
}
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool quiet,
bool fluentd,
int runtime)
{
ix::CobraConnection conn;
conn.configure(config);
conn.connect();
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
std::atomic<bool> stop(false);
std::atomic<bool> fatalCobraError(false);
auto jsonWriter = makeStreamWriter();
auto timer = [&msgPerSeconds, &msgCount, &stop] {
while (!stop)
{
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t(timer);
std::string subscriptionPosition(position);
conn.setEventCallback([&conn,
&channel,
&jsonWriter,
&filter,
&subscriptionPosition,
&msgCount,
&msgPerSeconds,
&quiet,
&fluentd,
&fatalCobraError](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
for (auto it : headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition);
conn.subscribe(
channel,
filter,
subscriptionPosition,
[&jsonWriter,
&quiet,
&msgPerSeconds,
&msgCount,
&fluentd,
&subscriptionPosition](const Json::Value& msg, const std::string& position) {
if (!quiet)
{
writeToStdout(fluentd, jsonWriter, msg, position);
}
msgPerSeconds++;
msgCount++;
subscriptionPosition = position;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
});
// Run forever
if (runtime == -1)
{
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
}
}
stop = true;
conn.disconnect();
t.join();
return fatalCobraError ? 1 : 0;
}
} // namespace ix