Compare commits

...

21 Commits

Author SHA1 Message Date
332ffb0603 (cobra client) pass the message position to the subscription data callback 2020-03-13 12:49:37 -07:00
90df3d1805 (openssl tls backend) Fix a hand in OpenSSL when using TLS v1.3 ... by disabling TLS v1.3 2020-03-12 16:27:25 -07:00
bda1bb6ab4 expose a way to set tls options for a sentry client, for testing 2020-03-12 16:18:28 -07:00
d4e1f71e3c (cobra2sentry bot) take a sentry client as input instead of a dsn 2020-03-12 12:30:58 -07:00
adf6aa1d6c (cobra2sentry bot) remove the jobs option passed to ws, and only use one sentry sender 2020-03-12 12:24:25 -07:00
cb1f9f5a44 clang formatting 2020-03-12 12:15:56 -07:00
83ae105edb minor refactoring to delete files which are not needed 2020-03-12 12:13:31 -07:00
7642ccc99e (unittest) fix silly compile error with renaming of Logger to TLogger 2020-03-12 11:15:54 -07:00
cb1ec7dc96 add unittest for cobra to sentry bots 2020-03-12 09:07:01 -07:00
09b9483ddf fix casing problem with cmake filename 2020-03-11 16:04:16 -07:00
27a8ae309f build failure on Linux 2020-03-11 15:59:48 -07:00
3df7c942d7 move sentry and statsd cobra ws commands into a new ixbots folder 2020-03-11 15:55:56 -07:00
6a4d69afc5 (cobra) IXCobraConfig struct has tlsOptions and per message deflate options 2020-03-11 12:40:32 -07:00
0a11132b07 (cobra) add IXCobraConfig struct to pass cobra config around 2020-03-11 10:48:41 -07:00
cb9f0cb968 (doc) mention that OpenSSL can be used on Windows 2020-03-11 10:18:48 -07:00
b1f30bb40f (ws cobra_subscribe) add a --fluentd option to wrap a message in an enveloppe so that fluentd can recognize it 2020-03-09 15:25:43 -07:00
4ef04b8339 (websocket server) fix regression with disabling zlib extension on the server side. If a client does not support this extension the server will handle it fine. We still need to figure out how to disable the option. cc #160 2020-03-02 16:53:08 -08:00
e581f29b42 Move zlib include_directories before add_subdirectory (#159) 2020-02-28 09:30:37 -08:00
a42f115f79 compatibility: add node.js example server 2020-02-26 12:17:34 -08:00
5ce1a596cf add a python echo server that does not close the connection after each received messages 2020-02-26 12:11:31 -08:00
21db7b6c5b add a simple pytho echo client 2020-02-26 11:50:24 -08:00
64 changed files with 1183 additions and 505 deletions

19
CMake/FindSpdLog.cmake Normal file
View File

@ -0,0 +1,19 @@
# Find package structure taken from libcurl
include(FindPackageHandleStandardArgs)
find_path(SPDLOG_INCLUDE_DIRS spdlog/spdlog.h)
find_library(JSONCPP_LIBRARY spdlog)
find_package_handle_standard_args(SPDLOG
FOUND_VAR
SPDLOG_FOUND
REQUIRED_VARS
SPDLOG_LIBRARY
SPDLOG_INCLUDE_DIRS
FAIL_MESSAGE
"Could NOT find spdlog"
)
set(SPDLOG_INCLUDE_DIRS ${SPDLOG_INCLUDE_DIRS})
set(SPDLOG_LIBRARIES ${SPDLOG_LIBRARY})

View File

@ -201,8 +201,8 @@ if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
else()
add_subdirectory(third_party/zlib)
include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib)
add_subdirectory(third_party/zlib)
target_link_libraries(ixwebsocket zlibstatic)
endif()
@ -230,6 +230,7 @@ if (USE_WS OR USE_TEST)
add_subdirectory(ixcobra)
add_subdirectory(ixsnake)
add_subdirectory(ixsentry)
add_subdirectory(ixbots)
add_subdirectory(third_party/spdlog spdlog)

View File

@ -1,6 +1,30 @@
# Changelog
All changes to this project will be documented in this file.
## [8.2.3] - 2020-03-13
(cobra client) pass the message position to the subscription data callback
## [8.2.2] - 2020-03-12
(openssl tls backend) Fix a hand in OpenSSL when using TLS v1.3 ... by disabling TLS v1.3
## [8.2.1] - 2020-03-11
(cobra) IXCobraConfig struct has tlsOptions and per message deflate options
## [8.2.0] - 2020-03-11
(cobra) add IXCobraConfig struct to pass cobra config around
## [8.1.9] - 2020-03-09
(ws cobra_subscribe) add a --fluentd option to wrap a message in an enveloppe so that fluentd can recognize it
## [8.1.8] - 2020-03-02
(websocket server) fix regression with disabling zlib extension on the server side. If a client does not support this extension the server will handle it fine. We still need to figure out how to disable the option.
## [8.1.7] - 2020-02-26
(websocket) traffic tracker received bytes is message size while it should be wire size

View File

@ -6,7 +6,7 @@ The per message deflate compression option is supported. It can lead to very nic
### TLS/SSL
Connections can be optionally secured and encrypted with TLS/SSL when using a wss:// endpoint, or using normal un-encrypted socket with ws:// endpoints. AppleSSL is used on iOS and macOS, OpenSSL is used on Android and Linux, mbedTLS is used on Windows.
Connections can be optionally secured and encrypted with TLS/SSL when using a wss:// endpoint, or using normal un-encrypted socket with ws:// endpoints. AppleSSL is used on iOS and macOS, OpenSSL and mbedTLS can be used on Android, Linux and Windows.
### Polling and background thread work

45
ixbots/CMakeLists.txt Normal file
View File

@ -0,0 +1,45 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
#
set (IXBOTS_SOURCES
ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp
ixbots/IXQueueManager.cpp
)
set (IXBOTS_HEADERS
ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h
ixbots/IXQueueManager.h
)
add_library(ixbots STATIC
${IXBOTS_SOURCES}
${IXBOTS_HEADERS}
)
find_package(JsonCpp)
if (NOT JSONCPP_FOUND)
set(JSONCPP_INCLUDE_DIRS ../third_party/jsoncpp)
endif()
find_package(SpdLog)
if (NOT SPDLOG_FOUND)
set(SPDLOG_INCLUDE_DIRS ../third_party/spdlog/include)
endif()
set(STATSD_CLIENT_INCLUDE_DIRS ../third_party/statsd-client-cpp/src)
set(IXBOTS_INCLUDE_DIRS
.
..
../ixcore
../ixcobra
../ixsentry
${JSONCPP_INCLUDE_DIRS}
${SPDLOG_INCLUDE_DIRS}
${STATSD_CLIENT_INCLUDE_DIRS})
target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} )

View File

@ -1,17 +1,14 @@
/*
* ws_cobra_to_sentry.cpp
* IXCobraToSentryBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include "IXCobraToSentryBot.h"
#include "IXQueueManager.h"
#include <chrono>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h>
#include <map>
#include <mutex>
#include <queue>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
@ -19,99 +16,18 @@
namespace ix
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return val;
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
_condition.wait(lock, [this] { return !_stop; });
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& dsn,
bool verbose,
bool strict,
int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions)
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
SentryClient& sentryClient,
bool verbose,
bool strict,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
Json::FastWriter jsonWriter;
@ -123,22 +39,26 @@ namespace ix
QueueManager queueManager(maxQueueSize, stop);
auto timer = [&sentCount, &receivedCount] {
while (true)
auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop)
{
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("timer thread done");
};
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount] {
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na");
while (true)
if (!enableHeartbeat) return;
while (!stop)
{
std::stringstream ss;
ss << "messages received " << receivedCount;
@ -156,20 +76,21 @@ namespace ix
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("heartbeat thread done");
};
std::thread t2(heartbeat);
auto sentrySender =
[&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
SentryClient sentryClient(dsn);
[&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &sentryClient] {
while (true)
{
Json::Value msg = queueManager.pop();
if (stop) break;
if (msg.isNull()) continue;
if (stop) return;
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
@ -241,17 +162,13 @@ namespace ix
++sentCount;
}
if (stop) return;
if (stop) break;
}
spdlog::info("sentrySender thread done");
};
// Create a thread pool
spdlog::info("Starting {} sentry sender jobs", jobs);
std::vector<std::thread> pool;
for (int i = 0; i < jobs; i++)
{
pool.push_back(std::thread(sentrySender));
}
std::thread t3(sentrySender);
conn.setEventCallback([&conn,
&channel,
@ -284,10 +201,10 @@ namespace ix
conn.subscribe(channel,
filter,
[&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
const Json::Value& msg) {
const Json::Value& msg, const std::string& position) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
}
// If we cannot send to sentry fast enough, drop the message
@ -322,24 +239,42 @@ namespace ix
}
});
while (true)
// Run forever
if (runtime == -1)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
if (strict && errorSending) break;
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
}
}
conn.disconnect();
//
// Cleanup.
// join all the bg threads and stop them.
//
conn.disconnect();
stop = true;
for (int i = 0; i < jobs; i++)
{
spdlog::error("joining thread {}", i);
pool[i].join();
}
return (strict && errorSending) ? 1 : 0;
t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
t3.join();
return (strict && errorSending) ? -1 : (int) sentCount;
}
} // namespace ix

View File

@ -0,0 +1,23 @@
/*
* IXCobraToSentryBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <ixsentry/IXSentryClient.h>
#include <string>
namespace ix
{
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
SentryClient& sentryClient,
bool verbose,
bool strict,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix

View File

@ -1,9 +1,12 @@
/*
* ws_cobra_to_statsd.cpp
* IXCobraToStatsdBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXCobraToStatsdBot.h"
#include "IXQueueManager.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
@ -17,59 +20,6 @@
#include <statsd_client.h>
#endif
namespace
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queue.empty())
{
Json::Value val;
return val;
}
_condition.wait(lock, [this] { return !_stop; });
auto msg = _queue.front();
_queue.pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
}
} // namespace
namespace ix
{
// fields are command line argument that can be specified multiple times
@ -109,26 +59,17 @@ namespace ix
return val.asString();
}
int ws_cobra_to_statsd_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose,
const ix::SocketTLSOptions& tlsOptions)
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
auto tokens = parseFields(fields);
@ -237,10 +178,10 @@ namespace ix
conn.subscribe(channel,
filter,
[&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg) {
const Json::Value& msg, const std::string& position) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
}
receivedCount++;

View File

@ -0,0 +1,22 @@
/*
* IXCobraToStatsdBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <string>
#include <stddef.h>
namespace ix
{
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose);
} // namespace ix

View File

@ -0,0 +1,66 @@
/*
* IXQueueManager.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXQueueManager.h"
#include <vector>
#include <algorithm>
namespace ix
{
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return val;
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
auto duration = std::chrono::seconds(1);
_condition.wait_for(lock, duration);
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
}

View File

@ -0,0 +1,38 @@
/*
* IXQueueManager.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stddef.h>
#include <atomic>
#include <json/json.h>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <map>
namespace ix
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
}

View File

@ -13,6 +13,7 @@ set (IXCOBRA_HEADERS
ixcobra/IXCobraConnection.h
ixcobra/IXCobraMetricsThreadedPublisher.h
ixcobra/IXCobraMetricsPublisher.h
ixcobra/IXCobraConfig.h
)
add_library(ixcobra STATIC

View File

@ -0,0 +1,35 @@
/*
* IXCobraConfig.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
namespace ix
{
struct CobraConfig
{
std::string appkey;
std::string endpoint;
std::string rolename;
std::string rolesecret;
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions;
SocketTLSOptions socketTLSOptions;
CobraConfig(const std::string& a = std::string(),
const std::string& e = std::string(),
const std::string& r = std::string(),
const std::string& s = std::string())
: appkey(a)
, endpoint(e)
, rolename(r)
, rolesecret(s)
{
;
}
};
} // namespace ix

View File

@ -276,6 +276,16 @@ namespace ix
_webSocket->setPingTimeout(3 * kPingIntervalSecs);
}
void CobraConnection::configure(const ix::CobraConfig& config)
{
configure(config.appkey,
config.endpoint,
config.rolename,
config.rolesecret,
config.webSocketPerMessageDeflateOptions,
config.socketTLSOptions);
}
//
// Handshake message schema.
//
@ -431,9 +441,12 @@ namespace ix
if (!body.isMember("messages")) return false;
Json::Value messages = body["messages"];
if (!body.isMember("position")) return false;
std::string position = body["position"].asString();
for (auto&& msg : messages)
{
cb->second(msg);
cb->second(msg, position);
}
return true;

View File

@ -17,6 +17,8 @@
#include <unordered_map>
#include <limits>
#include "IXCobraConfig.h"
namespace ix
{
class WebSocket;
@ -40,7 +42,7 @@ namespace ix
CobraConnection_PublishMode_Batch = 1
};
using SubscriptionCallback = std::function<void(const Json::Value&)>;
using SubscriptionCallback = std::function<void(const Json::Value&, const std::string&)>;
using EventCallback = std::function<void(CobraConnectionEventType,
const std::string&,
const WebSocketHttpHeaders&,
@ -67,6 +69,8 @@ namespace ix
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions);
void configure(const ix::CobraConfig& config);
/// Set the traffic tracker callback
static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);

View File

@ -40,6 +40,11 @@ namespace ix
}
}
void SentryClient::setTLSOptions(const SocketTLSOptions& tlsOptions)
{
_httpClient->setTLSOptions(tlsOptions);
}
int64_t SentryClient::getTimestamp()
{
const auto tp = std::chrono::system_clock::now();

View File

@ -8,6 +8,7 @@
#include <algorithm>
#include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <json/json.h>
#include <regex>
#include <memory>
@ -24,6 +25,9 @@ namespace ix
Json::Value parseLuaStackTrace(const std::string& stack);
// Mostly for testing
void setTLSOptions(const SocketTLSOptions& tlsOptions);
void uploadMinidump(
const std::string& sentryMetadata,
const std::string& minidumpBytes,

View File

@ -189,7 +189,7 @@ namespace snake
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
{"body", {{"subscription_id", subscriptionId}, {"messages", {msg}}}}};
{"body", {{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
ws->sendText(response.dump());
};

View File

@ -131,8 +131,14 @@ namespace ix
SSL_CTX_set_mode(ctx,
SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
SSL_CTX_set_options(
ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_CIPHER_SERVER_PREFERENCE);
int options = SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_CIPHER_SERVER_PREFERENCE;
#ifdef SSL_OP_NO_TLSv1_3
// (partially?) work around hang in openssl 1.1.1b, by disabling TLS V1.3
// https://github.com/openssl/openssl/issues/7967
options |= SSL_OP_NO_TLSv1_3;
#endif
SSL_CTX_set_options(ctx, options);
}
return ctx;
}

View File

@ -337,8 +337,7 @@ namespace ix
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(header);
// If the client has requested that extension,
// and the server does not prevent it, enable it.
if (_enablePerMessageDeflate && webSocketPerMessageDeflateOptions.enabled())
if (webSocketPerMessageDeflateOptions.enabled())
{
_enablePerMessageDeflate = true;

View File

@ -90,15 +90,6 @@ namespace ix
webSocket->disablePong();
}
if (_enablePerMessageDeflate)
{
webSocket->enablePerMessageDeflate();
}
else
{
webSocket->disablePerMessageDeflate();
}
// Add this client to our client set
{
std::lock_guard<std::mutex> lock(_clientsMutex);

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "8.1.7"
#define IX_WEBSOCKET_VERSION "8.2.3"

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDQTCCAimgAwIBAgIUNJBwOQdDle1TI/MHGd+cSpxIllwwDQYJKoZIhvcNAQEL
BQAwSDEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRowGAYDVQQDDBFzZWxmc2lnbmVkLWNsaWVudDAeFw0yMDAzMTIyMzA0MzdaFw0y
MTAzMTIyMzA0MzdaMEgxFDASBgNVBAoMC21hY2hpbmV6b25lMRQwEgYDVQQKDAtJ
WFdlYlNvY2tldDEaMBgGA1UEAwwRc2VsZnNpZ25lZC1jbGllbnQwggEiMA0GCSqG
SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC7q6W0f5vRSHaNOuM1VQpY0rC0a5u04J5Z
nssUD1QfgilY1UEaaR/4K6ILE4oClqeDsQy/7+04Wt6i/ttceB/k1Jk6n0kgdtvA
CsX1H+nA7JL7ANBZvQ6W2E1mwJieTDSVDgL4YB9qzJQu3PdwZJgm5GTlVK66DMr1
IH2EYwu73M/ZwOzfgyd7m0TcgkRV8OHiD1dVDERNQR9gzDUsBtCoWPmzXxgPMOSE
Oq1sEhNC0bPaG3zTDvCv0t4Hti33po/U8PZwOtz2b8StSjS5BnvEDnksAtEZuNEu
4B3KJN4Oxrtgh7DYdiF7S9Gh0dN6yqtRfDWkGyC9WkyoqpFKCM4fAgMBAAGjIzAh
MB8GA1UdEQQYMBaCCWxvY2FsaG9zdIIJMTI3LjAuMC4xMA0GCSqGSIb3DQEBCwUA
A4IBAQB4oIutDYbCRfsyWRAiAY+D9rhYsJYlsQjyml1q2+pCv7BJ1kWsKk7m2VMX
Tl6CM+PI0zXPpLN6Ot79jf/jxEbDMvqrBgGpYfddvLhyTFnzIZpG8d63RvzPADF6
lV3x34eZf/EdtrWgZAHK+5oZjtzePGHwKDFIPva9nvJXYIxNwKYWGRX8HSm0OZi2
FQiaOt6WYLo7ZdefNPS9nugFRM6hfztJe6WvvglKm+BTnHbCSKj5xRuT9iA80+jX
Zij7po8opY3S+zEZ0eNUCHxMBQ+2Jdq3HxggJ2cFQVRHdvKfwzmavVeGgni75d16
+xFD5nS3g3eIEME+lZ8c8GbL0AJ4
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAu6ultH+b0Uh2jTrjNVUKWNKwtGubtOCeWZ7LFA9UH4IpWNVB
Gmkf+CuiCxOKApang7EMv+/tOFreov7bXHgf5NSZOp9JIHbbwArF9R/pwOyS+wDQ
Wb0OlthNZsCYnkw0lQ4C+GAfasyULtz3cGSYJuRk5VSuugzK9SB9hGMLu9zP2cDs
34Mne5tE3IJEVfDh4g9XVQxETUEfYMw1LAbQqFj5s18YDzDkhDqtbBITQtGz2ht8
0w7wr9LeB7Yt96aP1PD2cDrc9m/ErUo0uQZ7xA55LALRGbjRLuAdyiTeDsa7YIew
2HYhe0vRodHTesqrUXw1pBsgvVpMqKqRSgjOHwIDAQABAoIBAQC2q7IESj2x7TWv
7ITiEZ+bq6DiTOfnnMeldkI3iWAZt0lltVXETlUW6+mznFY2hMwTDE/bt78Qnqqc
vzNoA2kQBLwNaqP0XJ0zhYkAOwr9hYjflwA2iSZdP7e/b3JeitCX0WakunN6Mh1+
rAiRtui+2os3CkF0yST4iqKCLSJrvSvvK5fU92aKEaE3k9kznBljvVOJIIRQBUPz
G8tvtPgpLALrT7XMnGfaCyGS8c1IbFMm84KTxAlVV0bnuGgeYQ2VupqUmZpJjcJ0
B08hr7vPfxz3UXSOKwYY8TRfmF3X370ky5Ov2I9ddg27V1QoeRTWlL7VMxRtiSer
hoM5SPKpAoGBAO0vBd1Z6425wGT0PClUbJAVm2OBYMDnl2RmhBK5TAxrKjs9ag08
65jfVCMD8wDMDhEbvbmzkgRa9BC6AY97JBmyr4m9oGfA7oenuou+a9LYAKqtO0ts
hxHf2LnpC1HCyh4+l5gohjlUG7gSVu/oBhNTJNKmqUKuQ8v1b6My/JR9AoGBAMqP
DugL9DusECncKHQbaIEzvEBe+QErcUxXxq+G4LLvFTZVvthHbrZ/0cxm5Ve6rfd2
krqjYFA3WPOuTcKEUouNeRK2A4V6PbnSdpf0kagN6KbEjK66ZSZs8wnWitghqo7J
n2IHcSDEEACTyjS7K8HjPx0fQGU1tzkG/7/xs3vLAoGAI61JEoyuE/l26TibvBPI
6Lt3TjZt2VZ8vUt2XmKk/9E23wZT533canhdbY7whJQtIYGsvjw2oJUV1VZFWdHK
EluAcBWoBTNOLfWa595S1bpMD2BTZPsELjofnYdifn/wazA7GVYvKnxuVvfbP+cE
0u9UwKL1HuSbqhhXHJNUzvkCgYAeFRLsqWHTPuGDpfuoCq4BijJqCPDIGLCR2vNZ
/BkA2fr3f9KBAlLR7be1uI5U8heGCekOqNbT8vRV9Ev+GHK94PvbKIbrWtUx9KzC
MoMzRyWHJueRx4LgKwwJKQCjypQu8oimIV7Os++AdnJwVF/SQrKL26lPnqOgZ4ax
9e5m8wKBgQCF626EmJk34+WTGEa5gdTx567Y+1EAbag+7fQSskwiRPvRN2fcg3H8
ynUAtIgWbrecgKhblXxc7zwJrl41P71uQzCFspgvOPXMxL2xqN+tnTfuz84OXk26
h1xSdS3e+JYsWUIxqbH1W59S+dC7KtklBAcUxb8DNpDoVjVBeAEqzw==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDYzCCAkugAwIBAgIUD0V1mxoZF9TNpsoyvuHU2zhrg0wwDQYJKoZIhvcNAQEL
BQAwQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRMwEQYDVQQDDAp0cnVzdGVkLWNhMB4XDTIwMDMxMjIzMDQzN1oXDTMwMDMxMDIz
MDQzN1owQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29j
a2V0MRMwEQYDVQQDDAp0cnVzdGVkLWNhMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEAr2nVpfIzxsxK76Va+HaBfZ7aqjk90zipzH3/CWuMMN9wzBhg2HPE
cRreq1vKm2M/L9CZH6y6fnr68n8lW4rDATmbH0GeY4OqI9jw/mfjL4jUsAxwRi4X
kkk4G2nz1G81LvWLFXXAZlOxeHSZtpPh5OP1tNGiJNL4eGVxjlwFJIFwDvweJ/tW
J7dh/FTzO0jqh8FheJTeJO64Gflqfln64WRUOPSpO7v4KmyesM/BGwGMfZjcwhs/
KZT+OKXpPgYhdmAZJE24ftwWTP84DP9wnJbNqTRt0r5ud+q8EusKIjw/Pbf/tPUF
7J0bkMp4y5/+7MMuIxeZ+s2uHdp6hmwdJQIDAQABo1MwUTAdBgNVHQ4EFgQUPARq
Vm19yGgWqEnpNT1ILIkfWhEwHwYDVR0jBBgwFoAUPARqVm19yGgWqEnpNT1ILIkf
WhEwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAEIsTvJhs6r2r
x1xrHKaGo4sSuywJiZqMabvC9g22Xw3Cno5qGVFYWi4k0qjX/j9DN36DyOY1rei+
kNBnOnLdtdNDltcvaLeA/9SeIhxRYOwjXpPzy9AqHpGZPui988qtptA+DI+IOLAm
mQyssYC4doDcohMXaI7KumKHojTDAPrF2INJRTF9zWgbsFjvSWU5CY5CNERWCydh
OXfzFylifScNOppioZL9VTa6At7R+MGg834kMi6WDIvtD6Ibn+pw0bV60aiMhBe8
8qgZ8lxjGOHlvQrjqdk65smhfaECJcFJxybOSA3Z1f+Y9j/p0e0hyUJM/b/NouaE
64H6vXczLQ==
-----END CERTIFICATE-----

View File

@ -0,0 +1 @@
297E3BFAD1F1F96A60A2AF0F48B092E705C0C68A

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAr2nVpfIzxsxK76Va+HaBfZ7aqjk90zipzH3/CWuMMN9wzBhg
2HPEcRreq1vKm2M/L9CZH6y6fnr68n8lW4rDATmbH0GeY4OqI9jw/mfjL4jUsAxw
Ri4Xkkk4G2nz1G81LvWLFXXAZlOxeHSZtpPh5OP1tNGiJNL4eGVxjlwFJIFwDvwe
J/tWJ7dh/FTzO0jqh8FheJTeJO64Gflqfln64WRUOPSpO7v4KmyesM/BGwGMfZjc
whs/KZT+OKXpPgYhdmAZJE24ftwWTP84DP9wnJbNqTRt0r5ud+q8EusKIjw/Pbf/
tPUF7J0bkMp4y5/+7MMuIxeZ+s2uHdp6hmwdJQIDAQABAoIBAH4sPkUTJjMEl5Iw
+nJlq1bUgKyYZ+QaiehRaLU56qjsz5G+p0qKWu6QSUIw0Fdc2AJopPunnq2DgCYV
VqW19fZXnUCqTmd+OU93qEEWMM/sODA5gji4xrOufvEZEQ3ov/R7IgPZov73jFv8
YuR1ErM1VXMuptad+aOANGIVxo0ubDEXKK/zhOfUUXQy7ZsEruJCCIpigULU159r
sVOq2lwgLz+hClFBIq0IKAKqiPWpw2GtHtU5WtAo3qZEMJkNM5SppjDmS2Wy3qN6
Gq6sXtlAmLFZAyVpXXklQK3mCaAs5gcV94nm+r++F884obaOtJ126uDdIKlL+A6k
l41DXwECgYEA4KAswbdoa18J1Ql2QtwW3+knEaUO62JH11RO5VV02uiYv4v4mHmA
prnl1jsfgbc3qfIlZWDLlNRovKCfQSj/HzOe4Hd+gEPiSYjA77PRqQeYTPXTf0Ml
IQ3j9z1CdBWNoKJ18CEiIncvjpDYkdFf3RsawcnYXklXRjmm6kIJJzUCgYEAx+oA
gm/xXK28P/CFksZzsseF5i/1MPdniyP3oY34DlEmDvl9ZA1Z52De8vojfNd9X12M
ccjiGMMGgknJqncCB+uTWYFy2pWnr9dVVxf+oirAlT1Z03AkT5gxmIZ3FUQw8VkB
HjKJYD1mpTwoSlc+DR3R0xNdl84nkUI2hxGErDECgYEAjdsZ6MyXGRfP8cYj9V1g
5M8taStAHM7YZ9hKavJo9cZmkLEoscIpySElUQHNh/HZKW5Ox5M1fiwWaOlXKaNm
WqIS99b/AKneQmomzjpVcdXmDNRCWOBilllbWkxJp13lL0jqClgiYnm6guJeotgD
HnN7ll6OUh0nDKZkDxTdCvECgYEAtlQZet2WCKz70GURrjgJNbj7ymFbAvniGekH
5PSSlJw2Vdn+Hs5+fKTBMmIpE6eF1QCBIxXQAD1/Jj0eDLbVx1t33F5P3kQ32AxQ
7UoZFtZfJr35uvnAZEeulCmvWloDOVuvxVbaLEhT4cfoB0VidpwHzrcO2XFQbQ8y
pCW6F0ECgYBbO0NU/Jlu3acIzGwAv69CMo8udwnWrhzGStZD67swdQ/yxHVpx2RH
0sNk6UfLku8Mal7Pp+RglAmsOZEjSgk1V92J9lXYjYD8IUNwNyRRCpQ8xu0KPgDM
XGeUca/Ao7jRVcsPOiqFH7wgfEjyzpO85X/K9BoBnA0EcUTOScaqmw==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDNzCCAh+gAwIBAgIUKX47+tHx+Wpgoq8PSLCS5wXAxoowDQYJKoZIhvcNAQEL
BQAwQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRMwEQYDVQQDDAp0cnVzdGVkLWNhMB4XDTIwMDMxMjIzMDQzN1oXDTIxMDMxMjIz
MDQzN1owRTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29j
a2V0MRcwFQYDVQQDDA50cnVzdGVkLWNsaWVudDCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBALijaV0JhdoRAXnD5fX5W/9nZFb6jor6lIGW56Mdn+11ICYw
GoJ7ATnygUwfBMepoD5RfJ5pkNxYewo8N5JR+8rb4V9atJCSYQLT8P7Dm2YNMtkq
mNiRuRLrTqoPYajEzz5ENWSNnsjUB1GMGEpcCvRDgsTF24OsVV9BmLV166BEye7w
ah+jk1YYJHbEnNT4wzr4drJSGEYh2aRO72yY+ROe49Tz/GVVXfCamcj88z5hOS/+
+nCF/odLLB9Ij4xhR8WwTrwE/TxlkIQRBBPTsNetZjvMQZT+TkKw9nNjdoHiDlz9
BLOYxovUIB8OtOQQfour8V7nwZ2bL9Pp51mnmBsCAwEAAaMjMCEwHwYDVR0RBBgw
FoIJbG9jYWxob3N0ggkxMjcuMC4wLjEwDQYJKoZIhvcNAQELBQADggEBAFTus7o2
fQuSMk52qXUESVWG4ygvd2scV58zRrLxZL7Ug9p4DIJo0cY59l3Vhwn2xDSYlAFi
1h/qSEGkR2a0U2LzMK7BPSkqqYceSwnUvnwHvCwgH9aL1Rvk/4f1sFfsKegjScle
wraYsRmpidEZJYICvokHev36mX3fHaZZEU+WIoTvChgu0OtD+qkI4DECywLgtB92
/geabKC3C5JgiW0Jz8AScWoO2uKHFeuD2nfI1SiAbfMIAmG3RTanbZ8JMEVomVep
txMNGnojun923KTEScnH3cQfnkJjm2AM5yKgT6I/OHELe9Gg7R0IOJbiPmSru7/k
x5tBp3iMsZZ26VE=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAuKNpXQmF2hEBecPl9flb/2dkVvqOivqUgZbnox2f7XUgJjAa
gnsBOfKBTB8Ex6mgPlF8nmmQ3Fh7Cjw3klH7ytvhX1q0kJJhAtPw/sObZg0y2SqY
2JG5EutOqg9hqMTPPkQ1ZI2eyNQHUYwYSlwK9EOCxMXbg6xVX0GYtXXroETJ7vBq
H6OTVhgkdsSc1PjDOvh2slIYRiHZpE7vbJj5E57j1PP8ZVVd8JqZyPzzPmE5L/76
cIX+h0ssH0iPjGFHxbBOvAT9PGWQhBEEE9Ow161mO8xBlP5OQrD2c2N2geIOXP0E
s5jGi9QgHw605BB+i6vxXufBnZsv0+nnWaeYGwIDAQABAoIBAFQ2XAEOLdmW9ghW
fBUjRX2I56/wGYFz5rXwYPf5tA625BHm0MCAX7/RRn20jBaQ3EBwJBmQZnzJclzp
uCLpd6E/hlxaX46s5MhIaFuaVc9G59E653mnhTUG09smptE16pwouf2BxlEsu6XK
8u0/a9Oa0xLydztoJ4wJvB/Ph8eRsbdbfL/ZAe+vk+bEp9ugyec3B5KTc+hWRneH
BRfe239OX4mEhxNoO1tPJz1hJLjJH5F/iE1wkjSzLr1SI/cSbcbnyYj/kyXmktZw
uaeFptkT6rB9GO0YunEPzzuQ4EEPpK9F63uu74dGqyW56STq26km7diAHhEpFdp1
7X0rfHECgYEA5YPtjdqKEn5pQEdehqFnzi3IIu593o7baM6qEyFpMsTP53QCjUKX
rrImyr2opfFKrXrI0IYXlDgOZApb2sKLoeP/wpfZiGSyqrzj+Y49cNRHjH643ClL
Ri5eO6TRBukAW1gQFwuVBPbcnswaU6Ah85uTxqj+hO0g18rkuVdf72MCgYEAzfHH
lb9TMf4DZEoL7GMpc4gDG9V66UWWzXyJB4CWHd6QX1vl6Ow5wE7q3fewD4SNgvDs
DHZ8oqK2OMKJH/h/tqxyu+g1huajOhPqy1TIt5ncMjS0sguQ+7bQeHASKLxHhjPC
YdqGMxOBQI5olWGq5U9Td5TYE95qk50KoIyNnekCgYEAkhMwa1tPC0w3UrjZuZga
yEetHEZsB+0mSgNWjYxzNuO6atYUFbHvdjlepSSmpM74t4bxLn5ZnXU7+4H4SjgN
xMCm9EPPKJbme/Jyqk9UXW5OB2ZT45PIm+dBBHb2ro43MuvOecxeUOWJLuw6SUUe
trwrBoJiU1nU0GMKxceNgH8CgYEAzeNMpDG9S7ply6qXVwEf3Kd6bCY1leaDR/Wb
zMtJyJzL+vmV1RHs/ownFDfeZPUgwGp5olAGdFV1FTOvAS5fB9JJdgBFGxOS1ao5
zoN5kswYLn0wtNsJXAy9R9rK3Ly2SL2QNGHSTlfOnSqB9e3JeyyeBmvgxaRTKjYS
/MTng5kCgYAIL79seoBnd9ZSp8A7QUBighxBn6DwrvLgexaysmC0zYqxbatczHk9
iFbQRmPnFHhUt4URhxyhCoTgd7F0JpxklQODNfseVwtDiDMj8Fu8Tfmn6+9GdFRv
0QEU+dR3gi98bO6G4IuAFGO9emXho3Snu6odRmh4HZVNOdLCuQe1Cw==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDNzCCAh+gAwIBAgIUKX47+tHx+Wpgoq8PSLCS5wXAxokwDQYJKoZIhvcNAQEL
BQAwQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRMwEQYDVQQDDAp0cnVzdGVkLWNhMB4XDTIwMDMxMjIzMDQzN1oXDTIxMDMxMjIz
MDQzN1owRTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29j
a2V0MRcwFQYDVQQDDA50cnVzdGVkLXNlcnZlcjCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAMK5XAcHJwVSor1SfoMM5H5aNfNnM4JKq8kfAOl6KlXCsgs3
bBcrJ24gEG6/goxkgLxhC1SXdbebt3Jay2lxAa9/7Uj87yztozSsctMkxXE0u3R+
ih+9sP7ctpZ1hrF2Gv+ztd49/mXe1iRLPhkPijGpPlNsfie/TYybrw3WQlGH8jUm
MnW12QUOzoBrIOCO6uIxFBJ1qiMq5mIBLlYOMj+MQubnQdvaQPNf1zaZWsCVGyTv
95roHAb/s70Ie4r4ATcubtZs/ftjvzSmJegodTprPAedkrJ/k6Od9as7hpL37605
haBU5pMyPNMWYi1MwYc9k0R0IpCKdyeX0huHfpkCAwEAAaMjMCEwHwYDVR0RBBgw
FoIJbG9jYWxob3N0ggkxMjcuMC4wLjEwDQYJKoZIhvcNAQELBQADggEBAI41ZI4Z
WbPFB1e+wIWQE7O2rJMeTEImjBOtcJEN3bqhsdE3Zqk0fPaE6jNz0Fp4IXqUYXzo
SGsgBroV6sgknuLo8HdcTLcg8p9qZ3FGFHFQD1QYINn4ykupJZE2KcrIV8BZ/Tiv
ciFrJ7i/qwpOrTRBV/w47yP3WZ3v8UdBnj5URD0v/yaAfkaReDO59Dlht/wyItQi
GkDczMqMF1GTqcLqBwZdfpHq7B/UI8sp58a6eR9lOgryYCr+QJn7TcZrYzkcSWzg
KE6VuzK6+NElvtg1hSST2Rc/RuuKzexsO/PLesVzaU/6NdDwXmpuSxeCiWm1mosA
xfQZ9fSOQG6reFk=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAwrlcBwcnBVKivVJ+gwzkflo182czgkqryR8A6XoqVcKyCzds
FysnbiAQbr+CjGSAvGELVJd1t5u3clrLaXEBr3/tSPzvLO2jNKxy0yTFcTS7dH6K
H72w/ty2lnWGsXYa/7O13j3+Zd7WJEs+GQ+KMak+U2x+J79NjJuvDdZCUYfyNSYy
dbXZBQ7OgGsg4I7q4jEUEnWqIyrmYgEuVg4yP4xC5udB29pA81/XNplawJUbJO/3
mugcBv+zvQh7ivgBNy5u1mz9+2O/NKYl6Ch1Oms8B52Ssn+To531qzuGkvfvrTmF
oFTmkzI80xZiLUzBhz2TRHQikIp3J5fSG4d+mQIDAQABAoIBACFw4dwXH11rpqUq
4K0y7p7AcVl+1LrAhiYBHA/8uf6GdDs25mpIL/paqVfLrejcbbtsUxzQ8hd5N5T9
AMf371kreB27ynuFyCyInSOjwgDCFJtaC/CNjDMIxpaqUlpxtQtK2qXzMZhfH5mW
DnERWSNUNG7xR+0djnziU7rlm/gSOxUA2gS/5ik9JXAx0yoML7HWlBbk0PJ5o8Ac
sy6w/YwJVZAjXyUuYptPy2bK8WpGAsthw6RmW1fdOdDAUC3wz7TIKLuPD0AP9g7j
u8grDYtD+U3ls1Z1Grow2UUG8CedotzVE8KIhDWi35aNiGIuaMFnnLf2OO/Mgd6G
V82kkLECgYEA4sBtNsmlfFteFmHPS0s8wzg7lzLN15yifk7kO9GQcsbymXVSgU93
XnvADAflly382pyMpr7Fb6V126iOx+YQhr6ya116S4UtAKq4kCau3Im6OedKefwx
B71rST+vuAlUcv2ZcAVRJK8GtQQvwcAeI24ShPOXC+vyFAaaZ5eZo/0CgYEA29dZ
LcREVlv2Tgy/YJVnZ7EYRGiheuF0rl0d0+Stggj34fSS1cexv3gMF13HBk2HpXfW
3LfJyj2iGRZE9OUjN0ozVIVZWgZS/cwZbEUyl3o6IK0kPE4fZBaE16Onh4OGXKwr
XTG+EjmIJRVRawECbLMONj5rHQLdcy+5YIH28c0CgYEA1XSL2y2MCTsBoVRGDf0v
oB7JihYbTEN5fCnMFLu8nS/HpMqa9nvWRS19plWwvdZe13TTuwyPVACQqE1Oy8M5
/354+zUuMPWXXa9YuuqPZbCJjIS8yYSsqzqXSocXZcnyo6Uz0g5PSpcxWyorwtqW
BIhUCrA8ms5sPonQxIAj9AkCgYBd/6g7722g11VrbfvuWjOKnKhZp7tUBU6Ut2/n
iCHANgF3ddHK4sXXrobM/uX4hfH4CFOwsEzx0oSa4XC+nbL/ExT7kMDxwz59EmXU
a4oERtjP2/hgaK73ZsGKSol5Yf1zZpJsGLbCqCLUaFcVv6q/u5faDbpS/0Sc2c0T
vL5QCQKBgQCL6ySxvEb5+zst/kRxXcnefXjoB+LSYsU4zy8WfkcP4r38AAQ2Hn+F
f3/9BUX+2gNr99VDMjI+TUEf+NdQA/nFu4RbFvJ9Wpw9pXkIJpJkZ9g3Why4Ziji
h0IrXm5JCet71+EIMwP0LhKJKrXZudlzP4DYMWmA7Avqb3HIdPXd7Q==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDZzCCAk+gAwIBAgIUBEbp5x1IlwAV6OcQ/4xHk1Y+K4UwDQYJKoZIhvcNAQEL
BQAwQzEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRUwEwYDVQQDDAx1bnRydXN0ZWQtY2EwHhcNMjAwMzEyMjMwNDM3WhcNMzAwMzEw
MjMwNDM3WjBDMRQwEgYDVQQKDAttYWNoaW5lem9uZTEUMBIGA1UECgwLSVhXZWJT
b2NrZXQxFTATBgNVBAMMDHVudHJ1c3RlZC1jYTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAMmmYROZf/Kg46b/0Zvq4pUY8ghUEA+eYit8dLyUZ/onoW4l
xl3CK5NhJIer62Olv7QIu8WhU/hYoeE+8lLva9v0HaJgGjKmPQ3tyej319PzIc7o
uKatrQ0BAi/KReBQOoqAGqa+DBIGAHoi29x4wZ/ZGSjeVManNb58Lz3+caFlZRCW
8vcrE5J8OcpD+0O/CKM1UJDlTVFSBJS229my5WjxQnfNZeuxRnMxOCah/qaJsZZr
FdRd0th2mRZtpjM8vZfXuoUcK+XVSENuJKdqFR4hXQU5Xq62ofxz+IiToPHO24zi
S1lp7ggeIrgZXaz2I+7LmIy6gnZWP6oXE8XcyW8CAwEAAaNTMFEwHQYDVR0OBBYE
FJLe6w7SsBTwFnIQYjjjH16p/3zDMB8GA1UdIwQYMBaAFJLe6w7SsBTwFnIQYjjj
H16p/3zDMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGrHNKNe
5UqrNPdIXlGwpabOdrhmAc9yN/tXiB386lByktIeOShS6pvD+UuV14PcTXUFGCwW
2o8I5OE/+O8w+InyWyV7qC7dgeWyEL4qDAuIYmxs71T2VOv/eekYp1Zq/o3kL3hI
f0oxonJVZXkR4p39L4TCS3z6EiWRJxWlI4LVNcvWgkwJB8w7wIxSbql0Y/EO9yoU
07u8QHVj7Nth7YteacOpj8jEy42SuWq5sdW7ccMgEfptRSYiVAmgD7mOCaELCBHz
NVqyLRPkvWqX7apqDy9vR3ZnMiHWEpTPeQqK12GJbVMW53AVEDWKiL0bhrjnY/uS
dwnpMp7fEUJLXQk=
-----END CERTIFICATE-----

View File

@ -0,0 +1 @@
5CB637D0B24622D344F4C956FE5930B22CF87221

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAyaZhE5l/8qDjpv/Rm+rilRjyCFQQD55iK3x0vJRn+iehbiXG
XcIrk2Ekh6vrY6W/tAi7xaFT+Fih4T7yUu9r2/QdomAaMqY9De3J6PfX0/Mhzui4
pq2tDQECL8pF4FA6ioAapr4MEgYAeiLb3HjBn9kZKN5Uxqc1vnwvPf5xoWVlEJby
9ysTknw5ykP7Q78IozVQkOVNUVIElLbb2bLlaPFCd81l67FGczE4JqH+pomxlmsV
1F3S2HaZFm2mMzy9l9e6hRwr5dVIQ24kp2oVHiFdBTlerrah/HP4iJOg8c7bjOJL
WWnuCB4iuBldrPYj7suYjLqCdlY/qhcTxdzJbwIDAQABAoIBAQCWRBLRLTDoWDZs
6vODEczZOGacCDCTwv3609qV8K1u/3tPfnzMv3YDdH9pTpaxggFSIrPyeN7/EOVI
2cRwQxQIK2it6Jl9Jt4WdB1jKtW9js+hxVBcfM2ZBChh/oSFvKNzNDUoDjUmdSyD
11gpeh8ng/s4tj1Mb6wgD6CQvPxmPLsJZ3swxdSFgR5hpXXELtAK+oOlP0Y6SFpi
d5AyiaMP9imBKQV7qgJSiKWVtSAvMhfCOPaeYM9wPCA9nha6dYGC8Fgh9FklOf2+
fj+0dqmbWwa3xuEBfZ7oS+uKnzzcBvTxtNz/U8b9bPzTtoJU6Z6P3wLIB5x8DgQ3
NcDqVbtRAoGBAOnEl1hfHsm1Ni0flugvNSY5pRF9CGQjbTk2tQxEfsPc7LiNZxjF
NFyJK2wVs17bsCI4PUO9nnMjnCi86SMKj0ifVoroYlMkt4ruY9iQPTLbrJpTBF/X
LkU77s6TSeOQdzUlVPcIXfTCYwguicpIP6kOcohHplmzdurWtl723GqHAoGBANzT
1G2h8dS7UtR0GRO4u9QM8jhRFszariovI6eOEKPaVhBhPeiwwcWRq40un7koCLzU
WA5CV6h1fGQVvN8pjpZdXYUAa26jlnISQLvNgNvwD2b5UjRi4tH2QuV0LOAMiMGs
vcQtpjM12RNfii/Tdun0mYZ9pcb65T4p5VubM9vZAoGADl4i3y+ZeNRGbCeQ4txj
6+GHH7gLl/wFborKPeLH18nwUrd+KquUOEvF+3Kp/56JCNFkEpHI91Ks+mQCAEFZ
5SDF9Ourf2i2Tzevs1PKLyIJTcLkde+HzIGOf+vVksMCUKXmvvgori50X8Bcf65J
G17j8zRUKRc6q9xegR+zFGkCgYEAtA2UG3/76nSCaO/wsn/hxlh39ytG5+k2MPcW
nzvanX8cxWZEUEIu/KR1uDvXx+S4mx6YXagCSTziG8kNovgDZt7hrdxVvHRt6ryv
Q3GgK7RlGpUXTdeDEac1jFlZbaVKrH/oitidtwuk34L67VwCjWf+9gXk8YUI/dKz
TCoT8qECgYEAyUbioIZuc6iWF2oIk3VuPdWHUvhuhzvYr+gb++P/xIXraEUMI81c
UFUDOw+jVVF6H0aioD1rRUczF9vJVE1pUZrXHbAViPECr6QgZTl1mluHtxnT8Asq
7sXS69HdlV+k+P+YZ51qRXRLKZsjwSJwn8fCRWS+7HPdQ3ogIWp1Q+A=
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDOzCCAiOgAwIBAgIUXLY30LJGItNE9MlW/lkwsiz4ciEwDQYJKoZIhvcNAQEL
BQAwQzEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRUwEwYDVQQDDAx1bnRydXN0ZWQtY2EwHhcNMjAwMzEyMjMwNDM3WhcNMjEwMzEy
MjMwNDM3WjBHMRQwEgYDVQQKDAttYWNoaW5lem9uZTEUMBIGA1UECgwLSVhXZWJT
b2NrZXQxGTAXBgNVBAMMEHVudHJ1c3RlZC1jbGllbnQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQC5SOVG06/37lekGxkBJUt7AN3Xw708jN8XI7DR1sq+
NPeGN/wEfCUSIHJXQq1fqBJQkYKpyYa9EkvQs2RhrOXahul3ZdX1kP3zxQLvvbxU
EcB2gMS4B61EqnmBHRMsj+dI91++YSEFE1hkolD3+gQtm0+FVbPoXt5Y3rBAF/l0
UMvrBsgraB12OHUlqqj8WkUIul37u8XcnsnPWKoigWb2k+/W47LCGsd+haRnulIK
ADQOsjNs7wy3IV9d8zCifEV0YUT5ZPBg2K2f1lpYfOSobK7JLqgV03HVrkROQfej
FTvMRtDAxlsa6bHLrGUeBhCNaO7SLj16oo5nMCq4DnTjAgMBAAGjIzAhMB8GA1Ud
EQQYMBaCCWxvY2FsaG9zdIIJMTI3LjAuMC4xMA0GCSqGSIb3DQEBCwUAA4IBAQDH
7XbX6dCzUCGj91835gvTPr5FgKrTqocVQ+EtCxJxRVvqB4zj7/80SHxByyWz9XJQ
IBZmDz298nVqfW6uegq3qU29sG9OAOOg6I0SpWOL9qq/ZKMoEqRv6fHnjHhRiOwT
isqdZISh1vhoIvcUpNsm1PwpaDxerjeE3oPyuNO0P0lKI5jykO3orDANGvyC8fzx
jxlDsSXCgmcaPh99752vBe8UlY1M8t4GxsJAV8DXxdDZCYIWMe+/C5aQ2xDvj3+l
vYht9+yc6ebl5uGOttgWSYPxdryCynDKsdBfXxet9Ix/qdsLF9hwU2JokVDh50J+
er36eML3WvEO2HuBKTq8
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAuUjlRtOv9+5XpBsZASVLewDd18O9PIzfFyOw0dbKvjT3hjf8
BHwlEiByV0KtX6gSUJGCqcmGvRJL0LNkYazl2obpd2XV9ZD988UC7728VBHAdoDE
uAetRKp5gR0TLI/nSPdfvmEhBRNYZKJQ9/oELZtPhVWz6F7eWN6wQBf5dFDL6wbI
K2gddjh1Jaqo/FpFCLpd+7vF3J7Jz1iqIoFm9pPv1uOywhrHfoWkZ7pSCgA0DrIz
bO8MtyFfXfMwonxFdGFE+WTwYNitn9ZaWHzkqGyuyS6oFdNx1a5ETkH3oxU7zEbQ
wMZbGumxy6xlHgYQjWju0i49eqKOZzAquA504wIDAQABAoIBAHle6OG2dUShmkNj
hMOdXI5ciPV3wRRS6yhLNt6eJvzl0WbYcXu2nsn6+ytyAAPzItwoFUGHQ33C6Grz
uEPLcF3vliuiR7+ulMwEN+I3lZA0eLCntTUfwj6CtUkAdLjyIv1HHi6ljW23uGVj
dkqaOfZuEG81Lr5+toPci/PQQJYR4btVJJHCXJ6KVx6w8i++fwcRwby9riNhWAzk
8OhUiSTTsx9sioBk62QRB8Qs0LVR5tGbDSrpQW5Ns9KnH7sayInwEN94PTsPKcqY
i/oNNZG+qvSf8jG8QiIMdyGx/goVKuQVx5Gev8my5mnfuVM/oXB20T56z2iII64V
kNh/sNECgYEA3qjAPqTY0rnu2tmvQN1PwAfyENz3XmTVWpVtBvedPj3qiV3mXJGZ
qQoS0wb/2t/D05GhTxBJARk8foorNzGchMVtECMlGxDAs1vBw6dwSK5hJnw47PQQ
Q68Vz/zwvrzJgmeijPow87PdpomYECgerTa6BynH8W0ffuSNIJC8Ke8CgYEA1Qd2
FpwFjUFqhYbcvR3VG8qAMIF8RKLzmQDDZh7liKMeHLypdXRz1ZEa6NFkvsg7qRZh
ahe/ULubnRhdOxs0JVyZPS0dU3ZmHT9bBcIuLzC//5e1ictXUZspFzIHE9T4suLC
Xnh2vqQzlEy3iZLx5B6FMzc3ws7LM7q7L2AfqE0CgYAMkvEQWJTaCaAAgeyQuC7J
xGkaJLBfh0g5LlkS3Kbnne2Bxmi874gC8MuxWSLXxG01pHK8mUnWIwu0ha79FfMl
2FRZZfKxfZe0SUk++FSx9g8MclVwpDPK7rdHoJwj2Vtz3tBiL7rV+GFbB0gsGWfq
Fj4ZK3XcH3J44wVJQoMtxwKBgQDM/ZkMuKY+/yvZwaS39vUTARHJm1BRW9y85pcg
tap6iTx4urL2a1Drue4DCzu+uj9uvjKPPLrEnUNpMADG166eJTTwQXFu1wf8LPMR
34FBt8+JzBrMtfcYeA5aW7Gjy9Rljv8qmRDq8mcP1aLnp5dMxHG4jvIBa6zt4kot
lHniIQKBgQDWuMWA2Q1kcKKy7OJszp60jO+ftq306QMoDsPNFLUUUtCxNSrpAeC2
MVvI4kzIn+6hYsMdRsqDSadosuKE4ZzCPIfuyadiAKTAO5esBJs7KAQFMJXSnfY7
+Zs1QUcdLZAWivO7j3ZASbR8L/1mawlBMgyIaT9YKp1+iW+uzaYgUQ==
-----END RSA PRIVATE KEY-----

View File

@ -56,6 +56,7 @@ set (SOURCES
IXWebSocketSubProtocolTest.cpp
IXSentryClientTest.cpp
IXWebSocketChatTest.cpp
IXCobraToSentryBotTest.cpp
)
# Some unittest don't work on windows yet
@ -99,6 +100,7 @@ target_link_libraries(ixwebsocket_unittest ixwebsocket)
target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest ixbots)
target_link_libraries(ixwebsocket_unittest spdlog)

View File

@ -122,31 +122,32 @@ namespace
void CobraChat::subscribe(const std::string& channel)
{
std::string filter;
_conn.subscribe(channel, filter, [this](const Json::Value& msg) {
spdlog::info("receive {}", msg.toStyledString());
_conn.subscribe(
channel, filter, [this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString());
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in our own messages
if (msg_user == _user) return;
// We are not interested in our own messages
if (msg_user == _user) return;
_receivedQueue.push(msg);
_receivedQueue.push(msg);
std::stringstream ss;
ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > ";
log(ss.str());
});
std::stringstream ss;
ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > ";
log(ss.str());
});
}
void CobraChat::sendMessage(const std::string& text)
@ -218,7 +219,7 @@ namespace
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
TLogger() << "Subscriber: published message acked: " << msgId;
}
});

View File

@ -79,7 +79,7 @@ namespace
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
Logger() << "Subscriber connected:";
TLogger() << "Subscriber connected:";
for (auto&& it : headers)
{
log("Headers " + it.first + " " + it.second);
@ -87,47 +87,48 @@ namespace
}
if (eventType == ix::CobraConnection_EventType_Error)
{
Logger() << "Subscriber error:" << errMsg;
TLogger() << "Subscriber error:" << errMsg;
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated");
std::string filter;
conn.subscribe(CHANNEL, filter, [](const Json::Value& msg) {
log(msg.toStyledString());
conn.subscribe(
CHANNEL, filter, [](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString());
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
gMessageCount++;
});
gMessageCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
Logger() << "Subscriber: subscribed to channel " << subscriptionId;
TLogger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == CHANNEL)
{
gSubscriberConnectedAndSubscribed = true;
}
else
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
Logger() << "Subscriber: ununexpected from channel " << subscriptionId;
TLogger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != CHANNEL)
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
TLogger() << "Subscriber: published message acked: " << msgId;
}
});

View File

@ -0,0 +1,211 @@
/*
* cmd_satori_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <chrono>
#include <iostream>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
});
}
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
Json::Value msg;
msg["fps"] = 60;
cobraMetricsPublisher.setGenericAttributes("game", "ody");
// Wait a bit
ix::msleep(500);
// publish some messages
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
ix::msleep(500);
}
} // namespace
TEST_CASE("Cobra_to_sentry_bot", "[foo]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// Start a fake sentry http server
SocketTLSOptions tlsOptionsServer;
tlsOptionsServer.certFile = ".certs/trusted-server-crt.pem";
tlsOptionsServer.keyFile = ".certs/trusted-server-key.pem";
tlsOptionsServer.caFile = ".certs/trusted-ca-crt.pem";
int sentryPort = getFreePort();
ix::HttpServer sentryServer(sentryPort, "127.0.0.1");
sentryServer.setTLSOptions(tlsOptionsServer);
sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
if (request->method == "POST")
{
return std::make_shared<HttpResponse>(
200, "OK", HttpErrorCode::Ok, headers, std::string());
}
else
{
return std::make_shared<HttpResponse>(
405, "OK", HttpErrorCode::Invalid, headers, std::string("Invalid method"));
}
});
res = sentryServer.listen();
REQUIRE(res.first);
sentryServer.start();
setupTrafficTrackerCallback();
// Run the bot for a small amount of time
std::string channel = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
std::thread publisherThread(runPublisher, config, channel);
std::string filter;
bool verbose = true;
bool strict = true;
size_t maxQueueSize = 10;
bool enableHeartbeat = false;
// FIXME: try to get this working with https instead of http
// to regress the TLS 1.3 OpenSSL bug
// -> https://github.com/openssl/openssl/issues/7967
// https://xxxxx:yyyyyy@sentry.io/1234567
std::stringstream oss;
std::string scheme("http://");
oss << scheme << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
std::string dsn = oss.str();
SocketTLSOptions tlsOptionsClient;
tlsOptionsClient.certFile = ".certs/trusted-client-crt.pem";
tlsOptionsClient.keyFile = ".certs/trusted-client-key.pem";
tlsOptionsClient.caFile = ".certs/trusted-ca-crt.pem";
SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptionsClient);
// Only run the bot for 3 seconds
int runtime = 3;
int sentCount = cobra_to_sentry_bot(config,
channel,
filter,
sentryClient,
verbose,
strict,
maxQueueSize,
enableHeartbeat,
runtime);
//
// We want at least 2 messages to be sent
//
REQUIRE(sentCount >= 2);
// Give us 1s for all messages to be received
ix::msleep(1000);
spdlog::info("Incoming bytes {}", incomingBytes);
spdlog::info("Outgoing bytes {}", outgoingBytes);
spdlog::info("Stopping snake server...");
snakeServer.stop();
spdlog::info("Stopping redis server...");
redisServer.stop();
publisherThread.join();
sentryServer.stop();
}
}

View File

@ -29,17 +29,17 @@ namespace ix
makeCancellationRequestWithTimeout(timeoutSecs, requestInitCancellation);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
Logger() << "errMsg: " << errMsg;
TLogger() << "errMsg: " << errMsg;
REQUIRE(success);
Logger() << "Sending request: " << request << "to " << host << ":" << port;
TLogger() << "Sending request: " << request << "to " << host << ":" << port;
REQUIRE(socket->writeBytes(request, isCancellationRequested));
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
Logger() << "read error: " << strerror(Socket::getErrno());
TLogger() << "read error: " << strerror(Socket::getErrno());
REQUIRE(lineValid);

View File

@ -24,7 +24,7 @@ namespace ix
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
std::mutex Logger::_mutex;
std::mutex TLogger::_mutex;
std::stack<int> freePorts;
void setupWebSocketTrafficTrackerCallback()
@ -43,9 +43,9 @@ namespace ix
void reportWebSocketTraffic()
{
Logger() << incomingBytes;
Logger() << "Incoming bytes: " << incomingBytes;
Logger() << "Outgoing bytes: " << outgoingBytes;
TLogger() << incomingBytes;
TLogger() << "Incoming bytes: " << incomingBytes;
TLogger() << "Outgoing bytes: " << outgoingBytes;
}
void msleep(int ms)
@ -65,7 +65,7 @@ namespace ix
void log(const std::string& msg)
{
Logger() << msg;
TLogger() << msg;
}
void hexDump(const std::string& prefix, const std::string& s)
@ -90,17 +90,17 @@ namespace ix
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New connection";
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -118,7 +118,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
Logger() << res.second;
TLogger() << res.second;
return false;
}

View File

@ -28,11 +28,11 @@ namespace ix
void setupWebSocketTrafficTrackerCallback();
void reportWebSocketTraffic();
struct Logger
struct TLogger
{
public:
template<typename T>
Logger& operator<<(T const& obj)
TLogger& operator<<(T const& obj)
{
std::lock_guard<std::mutex> lock(_mutex);

View File

@ -199,13 +199,13 @@ namespace
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)

View File

@ -180,13 +180,13 @@ namespace
&mutexWrite](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New server connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)

View File

@ -23,23 +23,23 @@ namespace
[connectionState, &server](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
TLogger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto&& it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
Logger() << "Message received: " << msg->str;
TLogger() << "Message received: " << msg->str;
for (auto&& client : server.getClients())
{
@ -52,7 +52,7 @@ namespace
auto res = server.listen();
if (!res.first)
{
Logger() << res.second;
TLogger() << res.second;
return false;
}

View File

@ -140,13 +140,13 @@ namespace
const ix::WebSocketCloseInfo& closeInfo) {
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
TLogger() << "New server connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << openInfo.uri;
TLogger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)

View File

@ -169,13 +169,13 @@ namespace
const ix::WebSocketCloseInfo& closeInfo) {
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
TLogger() << "New server connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << openInfo.uri;
TLogger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)

View File

@ -40,21 +40,21 @@ namespace ix
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
TLogger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -72,7 +72,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
Logger() << res.second;
TLogger() << res.second;
return false;
}
@ -133,7 +133,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
Logger() << "writeBytes";
TLogger() << "writeBytes";
socket->writeBytes("GET /\r\n", isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);

View File

@ -23,13 +23,13 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];

View File

@ -0,0 +1,11 @@
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});

View File

@ -0,0 +1,11 @@
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080, perMessageDeflate: true });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
# websocket send client
import argparse
import asyncio
import websockets
async def send(url):
async with websockets.connect(url) as ws:
while True:
message = input('> ')
print('Sending message...')
await ws.send(message)
print('Message sent.')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='websocket proxy.')
parser.add_argument('--url', help='Remote websocket url',
default='wss://echo.websocket.org')
args = parser.parse_args()
asyncio.get_event_loop().run_until_complete(send(args.url))

View File

@ -8,9 +8,10 @@ import websockets
async def echo(websocket, path):
msg = await websocket.recv()
print(f'Received {len(msg)} bytes')
await websocket.send(msg)
while True:
msg = await websocket.recv()
print(f'Received {len(msg)} bytes')
await websocket.send(msg)
host = os.getenv('BIND_HOST', 'localhost')
print(f'Serving on {host}:8766')

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# WS server example
import asyncio
import os
import websockets
async def echo(websocket, path):
while True:
msg = await websocket.recv()
print(f'Received {len(msg)} bytes')
await websocket.send(msg)
host = os.getenv('BIND_HOST', 'localhost')
print(f'Serving on {host}:8766')
start_server = websockets.serve(echo, host, 8766, max_size=2 ** 30)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@ -58,8 +58,6 @@ add_executable(ws
ws_cobra_subscribe.cpp
ws_cobra_metrics_publish.cpp
ws_cobra_publish.cpp
ws_cobra_to_statsd.cpp
ws_cobra_to_sentry.cpp
ws_cobra_metrics_to_redis.cpp
ws_httpd.cpp
ws_autobahn.cpp
@ -74,6 +72,7 @@ target_link_libraries(ws ixwebsocket)
target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore)
target_link_libraries(ws ixsentry)
target_link_libraries(ws ixbots)
target_link_libraries(ws spdlog)

111
ws/ws.cpp
View File

@ -11,7 +11,10 @@
#include <cli11/CLI11.hpp>
#include <fstream>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixsentry/IXSentryClient.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXUserAgent.h>
@ -64,10 +67,6 @@ int main(int argc, char** argv)
std::string filter;
std::string message;
std::string password;
std::string appkey;
std::string endpoint;
std::string rolename;
std::string rolesecret;
std::string prefix("ws.test.v0");
std::string fields;
std::string dsn;
@ -81,6 +80,7 @@ int main(int argc, char** argv)
std::string project;
std::string key;
ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig;
std::string ciphers;
std::string redirectUrl;
bool headersOnly = false;
@ -88,6 +88,7 @@ int main(int argc, char** argv)
bool verbose = false;
bool save = false;
bool quiet = false;
bool fluentd = false;
bool compress = false;
bool strict = false;
bool stress = false;
@ -108,7 +109,6 @@ int main(int argc, char** argv)
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
int jobs = 4;
uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
@ -127,6 +127,13 @@ int main(int argc, char** argv)
app->add_flag("--verify_none", verifyNone, "Disable peer cert verification");
};
auto addCobraConfig = [&cobraConfig](CLI::App* app) {
app->add_option("--appkey", cobraConfig.appkey, "Appkey")->required();
app->add_option("--endpoint", cobraConfig.endpoint, "Endpoint")->required();
app->add_option("--rolename", cobraConfig.rolename, "Role name")->required();
app->add_option("--rolesecret", cobraConfig.rolesecret, "Role secret")->required();
};
app.add_flag("--version", version, "Print ws version");
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
@ -219,34 +226,25 @@ int main(int argc, char** argv)
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber");
cobraSubscribeApp->add_option("--appkey", appkey, "Appkey")->required();
cobraSubscribeApp->add_option("--endpoint", endpoint, "Endpoint")->required();
cobraSubscribeApp->add_option("--rolename", rolename, "Role name")->required();
cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobraSubscribeApp->add_option("--channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
addTLSOptions(cobraSubscribeApp);
addCobraConfig(cobraSubscribeApp);
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->add_option("--appkey", appkey, "Appkey")->required();
cobraPublish->add_option("--endpoint", endpoint, "Endpoint")->required();
cobraPublish->add_option("--rolename", rolename, "Role name")->required();
cobraPublish->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobraPublish->add_option("--channel", channel, "Channel")->required();
cobraPublish->add_option("--pidfile", pidfile, "Pid file");
cobraPublish->add_option("path", path, "Path to the file to send")
->required()
->check(CLI::ExistingPath);
addTLSOptions(cobraPublish);
addCobraConfig(cobraPublish);
CLI::App* cobraMetricsPublish =
app.add_subcommand("cobra_metrics_publish", "Cobra metrics publisher");
cobraMetricsPublish->add_option("--appkey", appkey, "Appkey");
cobraMetricsPublish->add_option("--endpoint", endpoint, "Endpoint");
cobraMetricsPublish->add_option("--rolename", rolename, "Role name");
cobraMetricsPublish->add_option("--rolesecret", rolesecret, "Role secret");
cobraMetricsPublish->add_option("--channel", channel, "Channel")->required();
cobraMetricsPublish->add_option("--pidfile", pidfile, "Pid file");
cobraMetricsPublish->add_option("path", path, "Path to the file to send")
@ -254,12 +252,9 @@ int main(int argc, char** argv)
->check(CLI::ExistingPath);
cobraMetricsPublish->add_flag("--stress", stress, "Stress mode");
addTLSOptions(cobraMetricsPublish);
addCobraConfig(cobraMetricsPublish);
CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra metrics to statsd");
cobra2statsd->add_option("--appkey", appkey, "Appkey");
cobra2statsd->add_option("--endpoint", endpoint, "Endpoint");
cobra2statsd->add_option("--rolename", rolename, "Role name");
cobra2statsd->add_option("--rolesecret", rolesecret, "Role secret");
cobra2statsd->add_option("--host", hostname, "Statsd host");
cobra2statsd->add_option("--port", statsdPort, "Statsd port");
cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
@ -269,14 +264,10 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd);
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
cobra2sentry->add_option("--appkey", appkey, "Appkey")->required();
cobra2sentry->add_option("--endpoint", endpoint, "Endpoint")->required();
cobra2sentry->add_option("--rolename", rolename, "Role name")->required();
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
@ -286,13 +277,10 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry);
CLI::App* cobra2redisApp =
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
cobra2redisApp->add_option("--appkey", appkey, "Appkey")->required();
cobra2redisApp->add_option("--endpoint", endpoint, "Endpoint")->required();
cobra2redisApp->add_option("--rolename", rolename, "Role name")->required();
cobra2redisApp->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2redisApp->add_option("channel", channel, "Channel")->required();
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
@ -300,6 +288,7 @@ int main(int argc, char** argv)
cobra2redisApp->add_option("--port", redisPort, "Redis port");
cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
addTLSOptions(cobra2redisApp);
addCobraConfig(cobra2redisApp);
CLI::App* snakeApp = app.add_subcommand("snake", "Snake server");
snakeApp->add_option("--port", port, "Connection url");
@ -368,6 +357,10 @@ int main(int argc, char** argv)
tlsOptions.caFile = "NONE";
}
// Cobra config
cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
cobraConfig.socketTLSOptions = tlsOptions;
int ret = 1;
if (app.got_subcommand("transfer"))
{
@ -436,60 +429,40 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_subscribe"))
{
ret = ix::ws_cobra_subscribe_main(
appkey, endpoint, rolename, rolesecret, channel, filter, quiet, tlsOptions);
ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, quiet, fluentd);
}
else if (app.got_subcommand("cobra_publish"))
{
ret = ix::ws_cobra_publish_main(
appkey, endpoint, rolename, rolesecret, channel, path, tlsOptions);
ret = ix::ws_cobra_publish_main(cobraConfig, channel, path);
}
else if (app.got_subcommand("cobra_metrics_publish"))
{
ret = ix::ws_cobra_metrics_publish_main(
appkey, endpoint, rolename, rolesecret, channel, path, stress, tlsOptions);
ret = ix::ws_cobra_metrics_publish_main(cobraConfig, channel, path, stress);
}
else if (app.got_subcommand("cobra_to_statsd"))
{
ret = ix::ws_cobra_to_statsd_main(appkey,
endpoint,
rolename,
rolesecret,
channel,
filter,
hostname,
statsdPort,
prefix,
fields,
verbose,
tlsOptions);
ret = ix::cobra_to_statsd_bot(
cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose);
}
else if (app.got_subcommand("cobra_to_sentry"))
{
ret = ix::ws_cobra_to_sentry_main(appkey,
endpoint,
rolename,
rolesecret,
channel,
filter,
dsn,
verbose,
strict,
jobs,
maxQueueSize,
tlsOptions);
bool enableHeartbeat = true;
int runtime = -1;
ix::SentryClient sentryClient(dsn);
ret = ix::cobra_to_sentry_bot(cobraConfig,
channel,
filter,
sentryClient,
verbose,
strict,
maxQueueSize,
enableHeartbeat,
runtime);
}
else if (app.got_subcommand("cobra_metrics_to_redis"))
{
ret = ix::ws_cobra_metrics_to_redis(appkey,
endpoint,
rolename,
rolesecret,
channel,
filter,
hostname,
redisPort,
tlsOptions);
ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, hostname, redisPort);
}
else if (app.got_subcommand("snake"))
{

58
ws/ws.h
View File

@ -5,6 +5,7 @@
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <string>
@ -74,67 +75,26 @@ namespace ix
const std::string& channel,
bool verbose);
int ws_cobra_subscribe_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
bool quiet,
const ix::SocketTLSOptions& tlsOptions);
bool fluentd);
int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_publish_main(const ix::CobraConfig& appkey,
const std::string& channel,
const std::string& path,
const ix::SocketTLSOptions& tlsOptions);
const std::string& path);
int ws_cobra_metrics_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
bool stress,
const ix::SocketTLSOptions& tlsOptions);
bool stress);
int ws_cobra_to_statsd_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& dsn,
bool verbose,
bool strict,
int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_metrics_to_redis(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const ix::SocketTLSOptions& tlsOptions);
int port);
int ws_snake_main(int port,
const std::string& hostname,

View File

@ -15,14 +15,10 @@
namespace ix
{
int ws_cobra_metrics_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
bool stress,
const ix::SocketTLSOptions& tlsOptions)
bool stress)
{
std::atomic<int> sentMessages(0);
std::atomic<int> ackedMessages(0);
@ -36,8 +32,13 @@ namespace ix
cobraMetricsPublisher.enable(true);
bool enablePerMessageDeflate = true;
cobraMetricsPublisher.configure(
appkey, endpoint, channel, rolename, rolesecret, enablePerMessageDeflate, tlsOptions);
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
enablePerMessageDeflate,
config.socketTLSOptions);
while (!cobraMetricsPublisher.isAuthenticated())
;

View File

@ -17,23 +17,14 @@
namespace ix
{
int ws_cobra_metrics_to_redis(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const ix::SocketTLSOptions& tlsOptions)
int port)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
// Display incoming messages
@ -135,7 +126,7 @@ namespace ix
channel,
filter,
[&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue](
const Json::Value& msg) {
const Json::Value& msg, const std::string& /*position*/) {
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg);

View File

@ -16,13 +16,9 @@
namespace ix
{
int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
const ix::SocketTLSOptions& tlsOptions)
const std::string& path)
{
std::ifstream f(path);
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
@ -36,12 +32,7 @@ namespace ix
}
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
// Display incoming messages
std::atomic<bool> authenticated(false);

View File

@ -6,6 +6,7 @@
#include <atomic>
#include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -13,22 +14,38 @@
namespace ix
{
int ws_cobra_subscribe_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
void writeToStdout(bool fluentd,
Json::FastWriter& jsonWriter,
const Json::Value& msg,
const std::string& position)
{
Json::Value enveloppe;
if (fluentd)
{
enveloppe["producer"] = "cobra";
enveloppe["consumer"] = "fluentd";
Json::Value msgWithPosition(msg);
msgWithPosition["position"] = position;
enveloppe["message"] = msgWithPosition;
std::cout << jsonWriter.write(enveloppe);
}
else
{
enveloppe = msg;
std::cout << position << " " << jsonWriter.write(enveloppe);
}
}
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
bool quiet,
const ix::SocketTLSOptions& tlsOptions)
bool fluentd)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
Json::FastWriter jsonWriter;
@ -51,7 +68,7 @@ namespace ix
std::thread t(timer);
conn.setEventCallback(
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet](
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet, &fluentd](
ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
@ -69,18 +86,18 @@ namespace ix
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(
channel,
filter,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) {
if (!quiet)
{
spdlog::info(jsonWriter.write(msg));
}
conn.subscribe(channel,
filter,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd](
const Json::Value& msg, const std::string& position) {
if (!quiet)
{
writeToStdout(fluentd, jsonWriter, msg, position);
}
msgPerSeconds++;
msgCount++;
});
msgPerSeconds++;
msgCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{

View File

@ -30,8 +30,14 @@ namespace ix
void start();
void stop();
int getSentBytes() { return _sentBytes; }
int getReceivedBytes() { return _receivedBytes; }
int getSentBytes()
{
return _sentBytes;
}
int getReceivedBytes()
{
return _receivedBytes;
}
void sendMessage(const std::string& text);
@ -76,19 +82,16 @@ namespace ix
_webSocket.addSubProtocol(subprotocol);
}
WebSocket::setTrafficTrackerCallback(
[this](int size, bool incoming)
WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) {
if (incoming)
{
if (incoming)
{
_receivedBytes += size;
}
else
{
_sentBytes += size;
}
_receivedBytes += size;
}
);
else
{
_sentBytes += size;
}
});
}
void WebSocketConnect::log(const std::string& msg)