Compare commits

...

26 Commits

Author SHA1 Message Date
332ffb0603 (cobra client) pass the message position to the subscription data callback 2020-03-13 12:49:37 -07:00
90df3d1805 (openssl tls backend) Fix a hand in OpenSSL when using TLS v1.3 ... by disabling TLS v1.3 2020-03-12 16:27:25 -07:00
bda1bb6ab4 expose a way to set tls options for a sentry client, for testing 2020-03-12 16:18:28 -07:00
d4e1f71e3c (cobra2sentry bot) take a sentry client as input instead of a dsn 2020-03-12 12:30:58 -07:00
adf6aa1d6c (cobra2sentry bot) remove the jobs option passed to ws, and only use one sentry sender 2020-03-12 12:24:25 -07:00
cb1f9f5a44 clang formatting 2020-03-12 12:15:56 -07:00
83ae105edb minor refactoring to delete files which are not needed 2020-03-12 12:13:31 -07:00
7642ccc99e (unittest) fix silly compile error with renaming of Logger to TLogger 2020-03-12 11:15:54 -07:00
cb1ec7dc96 add unittest for cobra to sentry bots 2020-03-12 09:07:01 -07:00
09b9483ddf fix casing problem with cmake filename 2020-03-11 16:04:16 -07:00
27a8ae309f build failure on Linux 2020-03-11 15:59:48 -07:00
3df7c942d7 move sentry and statsd cobra ws commands into a new ixbots folder 2020-03-11 15:55:56 -07:00
6a4d69afc5 (cobra) IXCobraConfig struct has tlsOptions and per message deflate options 2020-03-11 12:40:32 -07:00
0a11132b07 (cobra) add IXCobraConfig struct to pass cobra config around 2020-03-11 10:48:41 -07:00
cb9f0cb968 (doc) mention that OpenSSL can be used on Windows 2020-03-11 10:18:48 -07:00
b1f30bb40f (ws cobra_subscribe) add a --fluentd option to wrap a message in an enveloppe so that fluentd can recognize it 2020-03-09 15:25:43 -07:00
4ef04b8339 (websocket server) fix regression with disabling zlib extension on the server side. If a client does not support this extension the server will handle it fine. We still need to figure out how to disable the option. cc #160 2020-03-02 16:53:08 -08:00
e581f29b42 Move zlib include_directories before add_subdirectory (#159) 2020-02-28 09:30:37 -08:00
a42f115f79 compatibility: add node.js example server 2020-02-26 12:17:34 -08:00
5ce1a596cf add a python echo server that does not close the connection after each received messages 2020-02-26 12:11:31 -08:00
21db7b6c5b add a simple pytho echo client 2020-02-26 11:50:24 -08:00
e15a2900e7 (websocket) traffic tracker received bytes is message size while it should be wire size 2020-02-26 11:24:41 -08:00
140a21c8b3 (ws_connect) display sent/received bytes statistics on exit 2020-02-26 11:23:36 -08:00
6d0c568aaa update doc / fix incorrect comment about sending defaultint to binary mode 2020-02-24 16:24:32 -08:00
c96abcef1c build status github badge 2020-02-23 09:46:08 -08:00
4a9b0b9dfd (server) give thread name to some usual worker threads / unittest is broken !! 2020-02-23 09:44:58 -08:00
69 changed files with 1218 additions and 496 deletions

View File

@ -1,4 +1,4 @@
name: C/C++ CI
name: unittest
on: [push]

19
CMake/FindSpdLog.cmake Normal file
View File

@ -0,0 +1,19 @@
# Find package structure taken from libcurl
include(FindPackageHandleStandardArgs)
find_path(SPDLOG_INCLUDE_DIRS spdlog/spdlog.h)
find_library(JSONCPP_LIBRARY spdlog)
find_package_handle_standard_args(SPDLOG
FOUND_VAR
SPDLOG_FOUND
REQUIRED_VARS
SPDLOG_LIBRARY
SPDLOG_INCLUDE_DIRS
FAIL_MESSAGE
"Could NOT find spdlog"
)
set(SPDLOG_INCLUDE_DIRS ${SPDLOG_INCLUDE_DIRS})
set(SPDLOG_LIBRARIES ${SPDLOG_LIBRARY})

View File

@ -201,8 +201,8 @@ if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
else()
add_subdirectory(third_party/zlib)
include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib)
add_subdirectory(third_party/zlib)
target_link_libraries(ixwebsocket zlibstatic)
endif()
@ -230,6 +230,7 @@ if (USE_WS OR USE_TEST)
add_subdirectory(ixcobra)
add_subdirectory(ixsnake)
add_subdirectory(ixsentry)
add_subdirectory(ixbots)
add_subdirectory(third_party/spdlog spdlog)

View File

@ -1,6 +1,6 @@
## Hello world
![Build status badge](https://travis-ci.org/machinezone/IXWebSocket.svg?branch=master)
![Build status](https://github.com/machinezone/IXWebSocket/workflows/unittest/badge.svg)
IXWebSocket is a C++ library for WebSocket client and server development. It has minimal dependencies (no boost), is very simple to use and support everything you'll likely need for websocket dev (SSL, deflate compression, compiles on most platforms, etc...). HTTP client and server code is also available, but it hasn't received as much testing.

View File

@ -1,6 +1,42 @@
# Changelog
All changes to this project will be documented in this file.
## [8.2.3] - 2020-03-13
(cobra client) pass the message position to the subscription data callback
## [8.2.2] - 2020-03-12
(openssl tls backend) Fix a hand in OpenSSL when using TLS v1.3 ... by disabling TLS v1.3
## [8.2.1] - 2020-03-11
(cobra) IXCobraConfig struct has tlsOptions and per message deflate options
## [8.2.0] - 2020-03-11
(cobra) add IXCobraConfig struct to pass cobra config around
## [8.1.9] - 2020-03-09
(ws cobra_subscribe) add a --fluentd option to wrap a message in an enveloppe so that fluentd can recognize it
## [8.1.8] - 2020-03-02
(websocket server) fix regression with disabling zlib extension on the server side. If a client does not support this extension the server will handle it fine. We still need to figure out how to disable the option.
## [8.1.7] - 2020-02-26
(websocket) traffic tracker received bytes is message size while it should be wire size
## [8.1.6] - 2020-02-26
(ws_connect) display sent/received bytes statistics on exit
## [8.1.5] - 2020-02-23
(server) give thread name to some usual worker threads / unittest is broken !!
## [8.1.4] - 2020-02-22
(websocket server) fix regression from 8.1.2, where per-deflate message compression was always disabled

View File

@ -6,7 +6,7 @@ The per message deflate compression option is supported. It can lead to very nic
### TLS/SSL
Connections can be optionally secured and encrypted with TLS/SSL when using a wss:// endpoint, or using normal un-encrypted socket with ws:// endpoints. AppleSSL is used on iOS and macOS, OpenSSL is used on Android and Linux, mbedTLS is used on Windows.
Connections can be optionally secured and encrypted with TLS/SSL when using a wss:// endpoint, or using normal un-encrypted socket with ws:// endpoints. AppleSSL is used on iOS and macOS, OpenSSL and mbedTLS can be used on Android, Linux and Windows.
### Polling and background thread work

45
ixbots/CMakeLists.txt Normal file
View File

@ -0,0 +1,45 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
#
set (IXBOTS_SOURCES
ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp
ixbots/IXQueueManager.cpp
)
set (IXBOTS_HEADERS
ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h
ixbots/IXQueueManager.h
)
add_library(ixbots STATIC
${IXBOTS_SOURCES}
${IXBOTS_HEADERS}
)
find_package(JsonCpp)
if (NOT JSONCPP_FOUND)
set(JSONCPP_INCLUDE_DIRS ../third_party/jsoncpp)
endif()
find_package(SpdLog)
if (NOT SPDLOG_FOUND)
set(SPDLOG_INCLUDE_DIRS ../third_party/spdlog/include)
endif()
set(STATSD_CLIENT_INCLUDE_DIRS ../third_party/statsd-client-cpp/src)
set(IXBOTS_INCLUDE_DIRS
.
..
../ixcore
../ixcobra
../ixsentry
${JSONCPP_INCLUDE_DIRS}
${SPDLOG_INCLUDE_DIRS}
${STATSD_CLIENT_INCLUDE_DIRS})
target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} )

View File

@ -1,17 +1,14 @@
/*
* ws_cobra_to_sentry.cpp
* IXCobraToSentryBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include "IXCobraToSentryBot.h"
#include "IXQueueManager.h"
#include <chrono>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h>
#include <map>
#include <mutex>
#include <queue>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
@ -19,99 +16,18 @@
namespace ix
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return val;
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
_condition.wait(lock, [this] { return !_stop; });
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& dsn,
bool verbose,
bool strict,
int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions)
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
SentryClient& sentryClient,
bool verbose,
bool strict,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
Json::FastWriter jsonWriter;
@ -123,22 +39,26 @@ namespace ix
QueueManager queueManager(maxQueueSize, stop);
auto timer = [&sentCount, &receivedCount] {
while (true)
auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop)
{
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("timer thread done");
};
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount] {
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na");
while (true)
if (!enableHeartbeat) return;
while (!stop)
{
std::stringstream ss;
ss << "messages received " << receivedCount;
@ -156,20 +76,21 @@ namespace ix
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("heartbeat thread done");
};
std::thread t2(heartbeat);
auto sentrySender =
[&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
SentryClient sentryClient(dsn);
[&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &sentryClient] {
while (true)
{
Json::Value msg = queueManager.pop();
if (stop) break;
if (msg.isNull()) continue;
if (stop) return;
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
@ -241,17 +162,13 @@ namespace ix
++sentCount;
}
if (stop) return;
if (stop) break;
}
spdlog::info("sentrySender thread done");
};
// Create a thread pool
spdlog::info("Starting {} sentry sender jobs", jobs);
std::vector<std::thread> pool;
for (int i = 0; i < jobs; i++)
{
pool.push_back(std::thread(sentrySender));
}
std::thread t3(sentrySender);
conn.setEventCallback([&conn,
&channel,
@ -284,10 +201,10 @@ namespace ix
conn.subscribe(channel,
filter,
[&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
const Json::Value& msg) {
const Json::Value& msg, const std::string& position) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
}
// If we cannot send to sentry fast enough, drop the message
@ -322,24 +239,42 @@ namespace ix
}
});
while (true)
// Run forever
if (runtime == -1)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
if (strict && errorSending) break;
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
}
}
conn.disconnect();
//
// Cleanup.
// join all the bg threads and stop them.
//
conn.disconnect();
stop = true;
for (int i = 0; i < jobs; i++)
{
spdlog::error("joining thread {}", i);
pool[i].join();
}
return (strict && errorSending) ? 1 : 0;
t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
t3.join();
return (strict && errorSending) ? -1 : (int) sentCount;
}
} // namespace ix

View File

@ -0,0 +1,23 @@
/*
* IXCobraToSentryBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <ixsentry/IXSentryClient.h>
#include <string>
namespace ix
{
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
SentryClient& sentryClient,
bool verbose,
bool strict,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix

View File

@ -1,9 +1,12 @@
/*
* ws_cobra_to_statsd.cpp
* IXCobraToStatsdBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXCobraToStatsdBot.h"
#include "IXQueueManager.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
@ -17,59 +20,6 @@
#include <statsd_client.h>
#endif
namespace
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queue.empty())
{
Json::Value val;
return val;
}
_condition.wait(lock, [this] { return !_stop; });
auto msg = _queue.front();
_queue.pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
}
} // namespace
namespace ix
{
// fields are command line argument that can be specified multiple times
@ -109,26 +59,17 @@ namespace ix
return val.asString();
}
int ws_cobra_to_statsd_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose,
const ix::SocketTLSOptions& tlsOptions)
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
auto tokens = parseFields(fields);
@ -237,10 +178,10 @@ namespace ix
conn.subscribe(channel,
filter,
[&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg) {
const Json::Value& msg, const std::string& position) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
spdlog::info("Subscriber received message {} -> {}", position, jsonWriter.write(msg));
}
receivedCount++;

View File

@ -0,0 +1,22 @@
/*
* IXCobraToStatsdBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <string>
#include <stddef.h>
namespace ix
{
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose);
} // namespace ix

View File

@ -0,0 +1,66 @@
/*
* IXQueueManager.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXQueueManager.h"
#include <vector>
#include <algorithm>
namespace ix
{
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return val;
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
auto duration = std::chrono::seconds(1);
_condition.wait_for(lock, duration);
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
}

View File

@ -0,0 +1,38 @@
/*
* IXQueueManager.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stddef.h>
#include <atomic>
#include <json/json.h>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <map>
namespace ix
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
}

View File

@ -13,6 +13,7 @@ set (IXCOBRA_HEADERS
ixcobra/IXCobraConnection.h
ixcobra/IXCobraMetricsThreadedPublisher.h
ixcobra/IXCobraMetricsPublisher.h
ixcobra/IXCobraConfig.h
)
add_library(ixcobra STATIC

View File

@ -0,0 +1,35 @@
/*
* IXCobraConfig.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
namespace ix
{
struct CobraConfig
{
std::string appkey;
std::string endpoint;
std::string rolename;
std::string rolesecret;
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions;
SocketTLSOptions socketTLSOptions;
CobraConfig(const std::string& a = std::string(),
const std::string& e = std::string(),
const std::string& r = std::string(),
const std::string& s = std::string())
: appkey(a)
, endpoint(e)
, rolename(r)
, rolesecret(s)
{
;
}
};
} // namespace ix

View File

@ -276,6 +276,16 @@ namespace ix
_webSocket->setPingTimeout(3 * kPingIntervalSecs);
}
void CobraConnection::configure(const ix::CobraConfig& config)
{
configure(config.appkey,
config.endpoint,
config.rolename,
config.rolesecret,
config.webSocketPerMessageDeflateOptions,
config.socketTLSOptions);
}
//
// Handshake message schema.
//
@ -431,9 +441,12 @@ namespace ix
if (!body.isMember("messages")) return false;
Json::Value messages = body["messages"];
if (!body.isMember("position")) return false;
std::string position = body["position"].asString();
for (auto&& msg : messages)
{
cb->second(msg);
cb->second(msg, position);
}
return true;

View File

@ -17,6 +17,8 @@
#include <unordered_map>
#include <limits>
#include "IXCobraConfig.h"
namespace ix
{
class WebSocket;
@ -40,7 +42,7 @@ namespace ix
CobraConnection_PublishMode_Batch = 1
};
using SubscriptionCallback = std::function<void(const Json::Value&)>;
using SubscriptionCallback = std::function<void(const Json::Value&, const std::string&)>;
using EventCallback = std::function<void(CobraConnectionEventType,
const std::string&,
const WebSocketHttpHeaders&,
@ -67,6 +69,8 @@ namespace ix
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions);
void configure(const ix::CobraConfig& config);
/// Set the traffic tracker callback
static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);

View File

@ -40,6 +40,11 @@ namespace ix
}
}
void SentryClient::setTLSOptions(const SocketTLSOptions& tlsOptions)
{
_httpClient->setTLSOptions(tlsOptions);
}
int64_t SentryClient::getTimestamp()
{
const auto tp = std::chrono::system_clock::now();

View File

@ -8,6 +8,7 @@
#include <algorithm>
#include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <json/json.h>
#include <regex>
#include <memory>
@ -24,6 +25,9 @@ namespace ix
Json::Value parseLuaStackTrace(const std::string& stack);
// Mostly for testing
void setTLSOptions(const SocketTLSOptions& tlsOptions);
void uploadMinidump(
const std::string& sentryMetadata,
const std::string& minidumpBytes,

View File

@ -189,7 +189,7 @@ namespace snake
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
{"body", {{"subscription_id", subscriptionId}, {"messages", {msg}}}}};
{"body", {{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
ws->sendText(response.dump());
};

View File

@ -131,8 +131,14 @@ namespace ix
SSL_CTX_set_mode(ctx,
SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
SSL_CTX_set_options(
ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_CIPHER_SERVER_PREFERENCE);
int options = SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_CIPHER_SERVER_PREFERENCE;
#ifdef SSL_OP_NO_TLSv1_3
// (partially?) work around hang in openssl 1.1.1b, by disabling TLS V1.3
// https://github.com/openssl/openssl/issues/7967
options |= SSL_OP_NO_TLSv1_3;
#endif
SSL_CTX_set_options(ctx, options);
}
return ctx;
}

View File

@ -7,6 +7,7 @@
#include "IXSocketServer.h"
#include "IXNetSystem.h"
#include "IXSetThreadName.h"
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXSocketFactory.h"
@ -247,6 +248,8 @@ namespace ix
// Set the socket to non blocking mode, so that accept calls are not blocking
SocketConnect::configure(_serverFd);
setThreadName("SocketServer::listen");
for (;;)
{
if (_stop) return;
@ -347,6 +350,8 @@ namespace ix
void SocketServer::runGC()
{
setThreadName("SocketServer::GC");
for (;;)
{
// Garbage collection to shutdown/join threads for closed connections.

View File

@ -412,7 +412,7 @@ namespace ix
WebSocketCloseInfo(),
binary));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
WebSocket::invokeTrafficTrackerCallback(wireSize, true);
});
}
}

View File

@ -72,7 +72,7 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs);
void run();
// send is in binary mode by default
// send is in text mode by default
WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr);

View File

@ -337,8 +337,7 @@ namespace ix
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(header);
// If the client has requested that extension,
// and the server does not prevent it, enable it.
if (_enablePerMessageDeflate && webSocketPerMessageDeflateOptions.enabled())
if (webSocketPerMessageDeflateOptions.enabled())
{
_enablePerMessageDeflate = true;

View File

@ -7,6 +7,7 @@
#include "IXWebSocketServer.h"
#include "IXNetSystem.h"
#include "IXSetThreadName.h"
#include "IXSocketConnect.h"
#include "IXWebSocket.h"
#include "IXWebSocketTransport.h"
@ -73,6 +74,8 @@ namespace ix
void WebSocketServer::handleConnection(std::shared_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
{
setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState);
@ -87,15 +90,6 @@ namespace ix
webSocket->disablePong();
}
if (_enablePerMessageDeflate)
{
webSocket->enablePerMessageDeflate();
}
else
{
webSocket->disablePerMessageDeflate();
}
// Add this client to our client set
{
std::lock_guard<std::mutex> lock(_clientsMutex);

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "8.1.4"
#define IX_WEBSOCKET_VERSION "8.2.3"

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDQTCCAimgAwIBAgIUNJBwOQdDle1TI/MHGd+cSpxIllwwDQYJKoZIhvcNAQEL
BQAwSDEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRowGAYDVQQDDBFzZWxmc2lnbmVkLWNsaWVudDAeFw0yMDAzMTIyMzA0MzdaFw0y
MTAzMTIyMzA0MzdaMEgxFDASBgNVBAoMC21hY2hpbmV6b25lMRQwEgYDVQQKDAtJ
WFdlYlNvY2tldDEaMBgGA1UEAwwRc2VsZnNpZ25lZC1jbGllbnQwggEiMA0GCSqG
SIb3DQEBAQUAA4IBDwAwggEKAoIBAQC7q6W0f5vRSHaNOuM1VQpY0rC0a5u04J5Z
nssUD1QfgilY1UEaaR/4K6ILE4oClqeDsQy/7+04Wt6i/ttceB/k1Jk6n0kgdtvA
CsX1H+nA7JL7ANBZvQ6W2E1mwJieTDSVDgL4YB9qzJQu3PdwZJgm5GTlVK66DMr1
IH2EYwu73M/ZwOzfgyd7m0TcgkRV8OHiD1dVDERNQR9gzDUsBtCoWPmzXxgPMOSE
Oq1sEhNC0bPaG3zTDvCv0t4Hti33po/U8PZwOtz2b8StSjS5BnvEDnksAtEZuNEu
4B3KJN4Oxrtgh7DYdiF7S9Gh0dN6yqtRfDWkGyC9WkyoqpFKCM4fAgMBAAGjIzAh
MB8GA1UdEQQYMBaCCWxvY2FsaG9zdIIJMTI3LjAuMC4xMA0GCSqGSIb3DQEBCwUA
A4IBAQB4oIutDYbCRfsyWRAiAY+D9rhYsJYlsQjyml1q2+pCv7BJ1kWsKk7m2VMX
Tl6CM+PI0zXPpLN6Ot79jf/jxEbDMvqrBgGpYfddvLhyTFnzIZpG8d63RvzPADF6
lV3x34eZf/EdtrWgZAHK+5oZjtzePGHwKDFIPva9nvJXYIxNwKYWGRX8HSm0OZi2
FQiaOt6WYLo7ZdefNPS9nugFRM6hfztJe6WvvglKm+BTnHbCSKj5xRuT9iA80+jX
Zij7po8opY3S+zEZ0eNUCHxMBQ+2Jdq3HxggJ2cFQVRHdvKfwzmavVeGgni75d16
+xFD5nS3g3eIEME+lZ8c8GbL0AJ4
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAu6ultH+b0Uh2jTrjNVUKWNKwtGubtOCeWZ7LFA9UH4IpWNVB
Gmkf+CuiCxOKApang7EMv+/tOFreov7bXHgf5NSZOp9JIHbbwArF9R/pwOyS+wDQ
Wb0OlthNZsCYnkw0lQ4C+GAfasyULtz3cGSYJuRk5VSuugzK9SB9hGMLu9zP2cDs
34Mne5tE3IJEVfDh4g9XVQxETUEfYMw1LAbQqFj5s18YDzDkhDqtbBITQtGz2ht8
0w7wr9LeB7Yt96aP1PD2cDrc9m/ErUo0uQZ7xA55LALRGbjRLuAdyiTeDsa7YIew
2HYhe0vRodHTesqrUXw1pBsgvVpMqKqRSgjOHwIDAQABAoIBAQC2q7IESj2x7TWv
7ITiEZ+bq6DiTOfnnMeldkI3iWAZt0lltVXETlUW6+mznFY2hMwTDE/bt78Qnqqc
vzNoA2kQBLwNaqP0XJ0zhYkAOwr9hYjflwA2iSZdP7e/b3JeitCX0WakunN6Mh1+
rAiRtui+2os3CkF0yST4iqKCLSJrvSvvK5fU92aKEaE3k9kznBljvVOJIIRQBUPz
G8tvtPgpLALrT7XMnGfaCyGS8c1IbFMm84KTxAlVV0bnuGgeYQ2VupqUmZpJjcJ0
B08hr7vPfxz3UXSOKwYY8TRfmF3X370ky5Ov2I9ddg27V1QoeRTWlL7VMxRtiSer
hoM5SPKpAoGBAO0vBd1Z6425wGT0PClUbJAVm2OBYMDnl2RmhBK5TAxrKjs9ag08
65jfVCMD8wDMDhEbvbmzkgRa9BC6AY97JBmyr4m9oGfA7oenuou+a9LYAKqtO0ts
hxHf2LnpC1HCyh4+l5gohjlUG7gSVu/oBhNTJNKmqUKuQ8v1b6My/JR9AoGBAMqP
DugL9DusECncKHQbaIEzvEBe+QErcUxXxq+G4LLvFTZVvthHbrZ/0cxm5Ve6rfd2
krqjYFA3WPOuTcKEUouNeRK2A4V6PbnSdpf0kagN6KbEjK66ZSZs8wnWitghqo7J
n2IHcSDEEACTyjS7K8HjPx0fQGU1tzkG/7/xs3vLAoGAI61JEoyuE/l26TibvBPI
6Lt3TjZt2VZ8vUt2XmKk/9E23wZT533canhdbY7whJQtIYGsvjw2oJUV1VZFWdHK
EluAcBWoBTNOLfWa595S1bpMD2BTZPsELjofnYdifn/wazA7GVYvKnxuVvfbP+cE
0u9UwKL1HuSbqhhXHJNUzvkCgYAeFRLsqWHTPuGDpfuoCq4BijJqCPDIGLCR2vNZ
/BkA2fr3f9KBAlLR7be1uI5U8heGCekOqNbT8vRV9Ev+GHK94PvbKIbrWtUx9KzC
MoMzRyWHJueRx4LgKwwJKQCjypQu8oimIV7Os++AdnJwVF/SQrKL26lPnqOgZ4ax
9e5m8wKBgQCF626EmJk34+WTGEa5gdTx567Y+1EAbag+7fQSskwiRPvRN2fcg3H8
ynUAtIgWbrecgKhblXxc7zwJrl41P71uQzCFspgvOPXMxL2xqN+tnTfuz84OXk26
h1xSdS3e+JYsWUIxqbH1W59S+dC7KtklBAcUxb8DNpDoVjVBeAEqzw==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDYzCCAkugAwIBAgIUD0V1mxoZF9TNpsoyvuHU2zhrg0wwDQYJKoZIhvcNAQEL
BQAwQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRMwEQYDVQQDDAp0cnVzdGVkLWNhMB4XDTIwMDMxMjIzMDQzN1oXDTMwMDMxMDIz
MDQzN1owQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29j
a2V0MRMwEQYDVQQDDAp0cnVzdGVkLWNhMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEAr2nVpfIzxsxK76Va+HaBfZ7aqjk90zipzH3/CWuMMN9wzBhg2HPE
cRreq1vKm2M/L9CZH6y6fnr68n8lW4rDATmbH0GeY4OqI9jw/mfjL4jUsAxwRi4X
kkk4G2nz1G81LvWLFXXAZlOxeHSZtpPh5OP1tNGiJNL4eGVxjlwFJIFwDvweJ/tW
J7dh/FTzO0jqh8FheJTeJO64Gflqfln64WRUOPSpO7v4KmyesM/BGwGMfZjcwhs/
KZT+OKXpPgYhdmAZJE24ftwWTP84DP9wnJbNqTRt0r5ud+q8EusKIjw/Pbf/tPUF
7J0bkMp4y5/+7MMuIxeZ+s2uHdp6hmwdJQIDAQABo1MwUTAdBgNVHQ4EFgQUPARq
Vm19yGgWqEnpNT1ILIkfWhEwHwYDVR0jBBgwFoAUPARqVm19yGgWqEnpNT1ILIkf
WhEwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAEIsTvJhs6r2r
x1xrHKaGo4sSuywJiZqMabvC9g22Xw3Cno5qGVFYWi4k0qjX/j9DN36DyOY1rei+
kNBnOnLdtdNDltcvaLeA/9SeIhxRYOwjXpPzy9AqHpGZPui988qtptA+DI+IOLAm
mQyssYC4doDcohMXaI7KumKHojTDAPrF2INJRTF9zWgbsFjvSWU5CY5CNERWCydh
OXfzFylifScNOppioZL9VTa6At7R+MGg834kMi6WDIvtD6Ibn+pw0bV60aiMhBe8
8qgZ8lxjGOHlvQrjqdk65smhfaECJcFJxybOSA3Z1f+Y9j/p0e0hyUJM/b/NouaE
64H6vXczLQ==
-----END CERTIFICATE-----

View File

@ -0,0 +1 @@
297E3BFAD1F1F96A60A2AF0F48B092E705C0C68A

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAr2nVpfIzxsxK76Va+HaBfZ7aqjk90zipzH3/CWuMMN9wzBhg
2HPEcRreq1vKm2M/L9CZH6y6fnr68n8lW4rDATmbH0GeY4OqI9jw/mfjL4jUsAxw
Ri4Xkkk4G2nz1G81LvWLFXXAZlOxeHSZtpPh5OP1tNGiJNL4eGVxjlwFJIFwDvwe
J/tWJ7dh/FTzO0jqh8FheJTeJO64Gflqfln64WRUOPSpO7v4KmyesM/BGwGMfZjc
whs/KZT+OKXpPgYhdmAZJE24ftwWTP84DP9wnJbNqTRt0r5ud+q8EusKIjw/Pbf/
tPUF7J0bkMp4y5/+7MMuIxeZ+s2uHdp6hmwdJQIDAQABAoIBAH4sPkUTJjMEl5Iw
+nJlq1bUgKyYZ+QaiehRaLU56qjsz5G+p0qKWu6QSUIw0Fdc2AJopPunnq2DgCYV
VqW19fZXnUCqTmd+OU93qEEWMM/sODA5gji4xrOufvEZEQ3ov/R7IgPZov73jFv8
YuR1ErM1VXMuptad+aOANGIVxo0ubDEXKK/zhOfUUXQy7ZsEruJCCIpigULU159r
sVOq2lwgLz+hClFBIq0IKAKqiPWpw2GtHtU5WtAo3qZEMJkNM5SppjDmS2Wy3qN6
Gq6sXtlAmLFZAyVpXXklQK3mCaAs5gcV94nm+r++F884obaOtJ126uDdIKlL+A6k
l41DXwECgYEA4KAswbdoa18J1Ql2QtwW3+knEaUO62JH11RO5VV02uiYv4v4mHmA
prnl1jsfgbc3qfIlZWDLlNRovKCfQSj/HzOe4Hd+gEPiSYjA77PRqQeYTPXTf0Ml
IQ3j9z1CdBWNoKJ18CEiIncvjpDYkdFf3RsawcnYXklXRjmm6kIJJzUCgYEAx+oA
gm/xXK28P/CFksZzsseF5i/1MPdniyP3oY34DlEmDvl9ZA1Z52De8vojfNd9X12M
ccjiGMMGgknJqncCB+uTWYFy2pWnr9dVVxf+oirAlT1Z03AkT5gxmIZ3FUQw8VkB
HjKJYD1mpTwoSlc+DR3R0xNdl84nkUI2hxGErDECgYEAjdsZ6MyXGRfP8cYj9V1g
5M8taStAHM7YZ9hKavJo9cZmkLEoscIpySElUQHNh/HZKW5Ox5M1fiwWaOlXKaNm
WqIS99b/AKneQmomzjpVcdXmDNRCWOBilllbWkxJp13lL0jqClgiYnm6guJeotgD
HnN7ll6OUh0nDKZkDxTdCvECgYEAtlQZet2WCKz70GURrjgJNbj7ymFbAvniGekH
5PSSlJw2Vdn+Hs5+fKTBMmIpE6eF1QCBIxXQAD1/Jj0eDLbVx1t33F5P3kQ32AxQ
7UoZFtZfJr35uvnAZEeulCmvWloDOVuvxVbaLEhT4cfoB0VidpwHzrcO2XFQbQ8y
pCW6F0ECgYBbO0NU/Jlu3acIzGwAv69CMo8udwnWrhzGStZD67swdQ/yxHVpx2RH
0sNk6UfLku8Mal7Pp+RglAmsOZEjSgk1V92J9lXYjYD8IUNwNyRRCpQ8xu0KPgDM
XGeUca/Ao7jRVcsPOiqFH7wgfEjyzpO85X/K9BoBnA0EcUTOScaqmw==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDNzCCAh+gAwIBAgIUKX47+tHx+Wpgoq8PSLCS5wXAxoowDQYJKoZIhvcNAQEL
BQAwQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRMwEQYDVQQDDAp0cnVzdGVkLWNhMB4XDTIwMDMxMjIzMDQzN1oXDTIxMDMxMjIz
MDQzN1owRTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29j
a2V0MRcwFQYDVQQDDA50cnVzdGVkLWNsaWVudDCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBALijaV0JhdoRAXnD5fX5W/9nZFb6jor6lIGW56Mdn+11ICYw
GoJ7ATnygUwfBMepoD5RfJ5pkNxYewo8N5JR+8rb4V9atJCSYQLT8P7Dm2YNMtkq
mNiRuRLrTqoPYajEzz5ENWSNnsjUB1GMGEpcCvRDgsTF24OsVV9BmLV166BEye7w
ah+jk1YYJHbEnNT4wzr4drJSGEYh2aRO72yY+ROe49Tz/GVVXfCamcj88z5hOS/+
+nCF/odLLB9Ij4xhR8WwTrwE/TxlkIQRBBPTsNetZjvMQZT+TkKw9nNjdoHiDlz9
BLOYxovUIB8OtOQQfour8V7nwZ2bL9Pp51mnmBsCAwEAAaMjMCEwHwYDVR0RBBgw
FoIJbG9jYWxob3N0ggkxMjcuMC4wLjEwDQYJKoZIhvcNAQELBQADggEBAFTus7o2
fQuSMk52qXUESVWG4ygvd2scV58zRrLxZL7Ug9p4DIJo0cY59l3Vhwn2xDSYlAFi
1h/qSEGkR2a0U2LzMK7BPSkqqYceSwnUvnwHvCwgH9aL1Rvk/4f1sFfsKegjScle
wraYsRmpidEZJYICvokHev36mX3fHaZZEU+WIoTvChgu0OtD+qkI4DECywLgtB92
/geabKC3C5JgiW0Jz8AScWoO2uKHFeuD2nfI1SiAbfMIAmG3RTanbZ8JMEVomVep
txMNGnojun923KTEScnH3cQfnkJjm2AM5yKgT6I/OHELe9Gg7R0IOJbiPmSru7/k
x5tBp3iMsZZ26VE=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAuKNpXQmF2hEBecPl9flb/2dkVvqOivqUgZbnox2f7XUgJjAa
gnsBOfKBTB8Ex6mgPlF8nmmQ3Fh7Cjw3klH7ytvhX1q0kJJhAtPw/sObZg0y2SqY
2JG5EutOqg9hqMTPPkQ1ZI2eyNQHUYwYSlwK9EOCxMXbg6xVX0GYtXXroETJ7vBq
H6OTVhgkdsSc1PjDOvh2slIYRiHZpE7vbJj5E57j1PP8ZVVd8JqZyPzzPmE5L/76
cIX+h0ssH0iPjGFHxbBOvAT9PGWQhBEEE9Ow161mO8xBlP5OQrD2c2N2geIOXP0E
s5jGi9QgHw605BB+i6vxXufBnZsv0+nnWaeYGwIDAQABAoIBAFQ2XAEOLdmW9ghW
fBUjRX2I56/wGYFz5rXwYPf5tA625BHm0MCAX7/RRn20jBaQ3EBwJBmQZnzJclzp
uCLpd6E/hlxaX46s5MhIaFuaVc9G59E653mnhTUG09smptE16pwouf2BxlEsu6XK
8u0/a9Oa0xLydztoJ4wJvB/Ph8eRsbdbfL/ZAe+vk+bEp9ugyec3B5KTc+hWRneH
BRfe239OX4mEhxNoO1tPJz1hJLjJH5F/iE1wkjSzLr1SI/cSbcbnyYj/kyXmktZw
uaeFptkT6rB9GO0YunEPzzuQ4EEPpK9F63uu74dGqyW56STq26km7diAHhEpFdp1
7X0rfHECgYEA5YPtjdqKEn5pQEdehqFnzi3IIu593o7baM6qEyFpMsTP53QCjUKX
rrImyr2opfFKrXrI0IYXlDgOZApb2sKLoeP/wpfZiGSyqrzj+Y49cNRHjH643ClL
Ri5eO6TRBukAW1gQFwuVBPbcnswaU6Ah85uTxqj+hO0g18rkuVdf72MCgYEAzfHH
lb9TMf4DZEoL7GMpc4gDG9V66UWWzXyJB4CWHd6QX1vl6Ow5wE7q3fewD4SNgvDs
DHZ8oqK2OMKJH/h/tqxyu+g1huajOhPqy1TIt5ncMjS0sguQ+7bQeHASKLxHhjPC
YdqGMxOBQI5olWGq5U9Td5TYE95qk50KoIyNnekCgYEAkhMwa1tPC0w3UrjZuZga
yEetHEZsB+0mSgNWjYxzNuO6atYUFbHvdjlepSSmpM74t4bxLn5ZnXU7+4H4SjgN
xMCm9EPPKJbme/Jyqk9UXW5OB2ZT45PIm+dBBHb2ro43MuvOecxeUOWJLuw6SUUe
trwrBoJiU1nU0GMKxceNgH8CgYEAzeNMpDG9S7ply6qXVwEf3Kd6bCY1leaDR/Wb
zMtJyJzL+vmV1RHs/ownFDfeZPUgwGp5olAGdFV1FTOvAS5fB9JJdgBFGxOS1ao5
zoN5kswYLn0wtNsJXAy9R9rK3Ly2SL2QNGHSTlfOnSqB9e3JeyyeBmvgxaRTKjYS
/MTng5kCgYAIL79seoBnd9ZSp8A7QUBighxBn6DwrvLgexaysmC0zYqxbatczHk9
iFbQRmPnFHhUt4URhxyhCoTgd7F0JpxklQODNfseVwtDiDMj8Fu8Tfmn6+9GdFRv
0QEU+dR3gi98bO6G4IuAFGO9emXho3Snu6odRmh4HZVNOdLCuQe1Cw==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDNzCCAh+gAwIBAgIUKX47+tHx+Wpgoq8PSLCS5wXAxokwDQYJKoZIhvcNAQEL
BQAwQTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRMwEQYDVQQDDAp0cnVzdGVkLWNhMB4XDTIwMDMxMjIzMDQzN1oXDTIxMDMxMjIz
MDQzN1owRTEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29j
a2V0MRcwFQYDVQQDDA50cnVzdGVkLXNlcnZlcjCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAMK5XAcHJwVSor1SfoMM5H5aNfNnM4JKq8kfAOl6KlXCsgs3
bBcrJ24gEG6/goxkgLxhC1SXdbebt3Jay2lxAa9/7Uj87yztozSsctMkxXE0u3R+
ih+9sP7ctpZ1hrF2Gv+ztd49/mXe1iRLPhkPijGpPlNsfie/TYybrw3WQlGH8jUm
MnW12QUOzoBrIOCO6uIxFBJ1qiMq5mIBLlYOMj+MQubnQdvaQPNf1zaZWsCVGyTv
95roHAb/s70Ie4r4ATcubtZs/ftjvzSmJegodTprPAedkrJ/k6Od9as7hpL37605
haBU5pMyPNMWYi1MwYc9k0R0IpCKdyeX0huHfpkCAwEAAaMjMCEwHwYDVR0RBBgw
FoIJbG9jYWxob3N0ggkxMjcuMC4wLjEwDQYJKoZIhvcNAQELBQADggEBAI41ZI4Z
WbPFB1e+wIWQE7O2rJMeTEImjBOtcJEN3bqhsdE3Zqk0fPaE6jNz0Fp4IXqUYXzo
SGsgBroV6sgknuLo8HdcTLcg8p9qZ3FGFHFQD1QYINn4ykupJZE2KcrIV8BZ/Tiv
ciFrJ7i/qwpOrTRBV/w47yP3WZ3v8UdBnj5URD0v/yaAfkaReDO59Dlht/wyItQi
GkDczMqMF1GTqcLqBwZdfpHq7B/UI8sp58a6eR9lOgryYCr+QJn7TcZrYzkcSWzg
KE6VuzK6+NElvtg1hSST2Rc/RuuKzexsO/PLesVzaU/6NdDwXmpuSxeCiWm1mosA
xfQZ9fSOQG6reFk=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAwrlcBwcnBVKivVJ+gwzkflo182czgkqryR8A6XoqVcKyCzds
FysnbiAQbr+CjGSAvGELVJd1t5u3clrLaXEBr3/tSPzvLO2jNKxy0yTFcTS7dH6K
H72w/ty2lnWGsXYa/7O13j3+Zd7WJEs+GQ+KMak+U2x+J79NjJuvDdZCUYfyNSYy
dbXZBQ7OgGsg4I7q4jEUEnWqIyrmYgEuVg4yP4xC5udB29pA81/XNplawJUbJO/3
mugcBv+zvQh7ivgBNy5u1mz9+2O/NKYl6Ch1Oms8B52Ssn+To531qzuGkvfvrTmF
oFTmkzI80xZiLUzBhz2TRHQikIp3J5fSG4d+mQIDAQABAoIBACFw4dwXH11rpqUq
4K0y7p7AcVl+1LrAhiYBHA/8uf6GdDs25mpIL/paqVfLrejcbbtsUxzQ8hd5N5T9
AMf371kreB27ynuFyCyInSOjwgDCFJtaC/CNjDMIxpaqUlpxtQtK2qXzMZhfH5mW
DnERWSNUNG7xR+0djnziU7rlm/gSOxUA2gS/5ik9JXAx0yoML7HWlBbk0PJ5o8Ac
sy6w/YwJVZAjXyUuYptPy2bK8WpGAsthw6RmW1fdOdDAUC3wz7TIKLuPD0AP9g7j
u8grDYtD+U3ls1Z1Grow2UUG8CedotzVE8KIhDWi35aNiGIuaMFnnLf2OO/Mgd6G
V82kkLECgYEA4sBtNsmlfFteFmHPS0s8wzg7lzLN15yifk7kO9GQcsbymXVSgU93
XnvADAflly382pyMpr7Fb6V126iOx+YQhr6ya116S4UtAKq4kCau3Im6OedKefwx
B71rST+vuAlUcv2ZcAVRJK8GtQQvwcAeI24ShPOXC+vyFAaaZ5eZo/0CgYEA29dZ
LcREVlv2Tgy/YJVnZ7EYRGiheuF0rl0d0+Stggj34fSS1cexv3gMF13HBk2HpXfW
3LfJyj2iGRZE9OUjN0ozVIVZWgZS/cwZbEUyl3o6IK0kPE4fZBaE16Onh4OGXKwr
XTG+EjmIJRVRawECbLMONj5rHQLdcy+5YIH28c0CgYEA1XSL2y2MCTsBoVRGDf0v
oB7JihYbTEN5fCnMFLu8nS/HpMqa9nvWRS19plWwvdZe13TTuwyPVACQqE1Oy8M5
/354+zUuMPWXXa9YuuqPZbCJjIS8yYSsqzqXSocXZcnyo6Uz0g5PSpcxWyorwtqW
BIhUCrA8ms5sPonQxIAj9AkCgYBd/6g7722g11VrbfvuWjOKnKhZp7tUBU6Ut2/n
iCHANgF3ddHK4sXXrobM/uX4hfH4CFOwsEzx0oSa4XC+nbL/ExT7kMDxwz59EmXU
a4oERtjP2/hgaK73ZsGKSol5Yf1zZpJsGLbCqCLUaFcVv6q/u5faDbpS/0Sc2c0T
vL5QCQKBgQCL6ySxvEb5+zst/kRxXcnefXjoB+LSYsU4zy8WfkcP4r38AAQ2Hn+F
f3/9BUX+2gNr99VDMjI+TUEf+NdQA/nFu4RbFvJ9Wpw9pXkIJpJkZ9g3Why4Ziji
h0IrXm5JCet71+EIMwP0LhKJKrXZudlzP4DYMWmA7Avqb3HIdPXd7Q==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDZzCCAk+gAwIBAgIUBEbp5x1IlwAV6OcQ/4xHk1Y+K4UwDQYJKoZIhvcNAQEL
BQAwQzEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRUwEwYDVQQDDAx1bnRydXN0ZWQtY2EwHhcNMjAwMzEyMjMwNDM3WhcNMzAwMzEw
MjMwNDM3WjBDMRQwEgYDVQQKDAttYWNoaW5lem9uZTEUMBIGA1UECgwLSVhXZWJT
b2NrZXQxFTATBgNVBAMMDHVudHJ1c3RlZC1jYTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAMmmYROZf/Kg46b/0Zvq4pUY8ghUEA+eYit8dLyUZ/onoW4l
xl3CK5NhJIer62Olv7QIu8WhU/hYoeE+8lLva9v0HaJgGjKmPQ3tyej319PzIc7o
uKatrQ0BAi/KReBQOoqAGqa+DBIGAHoi29x4wZ/ZGSjeVManNb58Lz3+caFlZRCW
8vcrE5J8OcpD+0O/CKM1UJDlTVFSBJS229my5WjxQnfNZeuxRnMxOCah/qaJsZZr
FdRd0th2mRZtpjM8vZfXuoUcK+XVSENuJKdqFR4hXQU5Xq62ofxz+IiToPHO24zi
S1lp7ggeIrgZXaz2I+7LmIy6gnZWP6oXE8XcyW8CAwEAAaNTMFEwHQYDVR0OBBYE
FJLe6w7SsBTwFnIQYjjjH16p/3zDMB8GA1UdIwQYMBaAFJLe6w7SsBTwFnIQYjjj
H16p/3zDMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGrHNKNe
5UqrNPdIXlGwpabOdrhmAc9yN/tXiB386lByktIeOShS6pvD+UuV14PcTXUFGCwW
2o8I5OE/+O8w+InyWyV7qC7dgeWyEL4qDAuIYmxs71T2VOv/eekYp1Zq/o3kL3hI
f0oxonJVZXkR4p39L4TCS3z6EiWRJxWlI4LVNcvWgkwJB8w7wIxSbql0Y/EO9yoU
07u8QHVj7Nth7YteacOpj8jEy42SuWq5sdW7ccMgEfptRSYiVAmgD7mOCaELCBHz
NVqyLRPkvWqX7apqDy9vR3ZnMiHWEpTPeQqK12GJbVMW53AVEDWKiL0bhrjnY/uS
dwnpMp7fEUJLXQk=
-----END CERTIFICATE-----

View File

@ -0,0 +1 @@
5CB637D0B24622D344F4C956FE5930B22CF87221

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAyaZhE5l/8qDjpv/Rm+rilRjyCFQQD55iK3x0vJRn+iehbiXG
XcIrk2Ekh6vrY6W/tAi7xaFT+Fih4T7yUu9r2/QdomAaMqY9De3J6PfX0/Mhzui4
pq2tDQECL8pF4FA6ioAapr4MEgYAeiLb3HjBn9kZKN5Uxqc1vnwvPf5xoWVlEJby
9ysTknw5ykP7Q78IozVQkOVNUVIElLbb2bLlaPFCd81l67FGczE4JqH+pomxlmsV
1F3S2HaZFm2mMzy9l9e6hRwr5dVIQ24kp2oVHiFdBTlerrah/HP4iJOg8c7bjOJL
WWnuCB4iuBldrPYj7suYjLqCdlY/qhcTxdzJbwIDAQABAoIBAQCWRBLRLTDoWDZs
6vODEczZOGacCDCTwv3609qV8K1u/3tPfnzMv3YDdH9pTpaxggFSIrPyeN7/EOVI
2cRwQxQIK2it6Jl9Jt4WdB1jKtW9js+hxVBcfM2ZBChh/oSFvKNzNDUoDjUmdSyD
11gpeh8ng/s4tj1Mb6wgD6CQvPxmPLsJZ3swxdSFgR5hpXXELtAK+oOlP0Y6SFpi
d5AyiaMP9imBKQV7qgJSiKWVtSAvMhfCOPaeYM9wPCA9nha6dYGC8Fgh9FklOf2+
fj+0dqmbWwa3xuEBfZ7oS+uKnzzcBvTxtNz/U8b9bPzTtoJU6Z6P3wLIB5x8DgQ3
NcDqVbtRAoGBAOnEl1hfHsm1Ni0flugvNSY5pRF9CGQjbTk2tQxEfsPc7LiNZxjF
NFyJK2wVs17bsCI4PUO9nnMjnCi86SMKj0ifVoroYlMkt4ruY9iQPTLbrJpTBF/X
LkU77s6TSeOQdzUlVPcIXfTCYwguicpIP6kOcohHplmzdurWtl723GqHAoGBANzT
1G2h8dS7UtR0GRO4u9QM8jhRFszariovI6eOEKPaVhBhPeiwwcWRq40un7koCLzU
WA5CV6h1fGQVvN8pjpZdXYUAa26jlnISQLvNgNvwD2b5UjRi4tH2QuV0LOAMiMGs
vcQtpjM12RNfii/Tdun0mYZ9pcb65T4p5VubM9vZAoGADl4i3y+ZeNRGbCeQ4txj
6+GHH7gLl/wFborKPeLH18nwUrd+KquUOEvF+3Kp/56JCNFkEpHI91Ks+mQCAEFZ
5SDF9Ourf2i2Tzevs1PKLyIJTcLkde+HzIGOf+vVksMCUKXmvvgori50X8Bcf65J
G17j8zRUKRc6q9xegR+zFGkCgYEAtA2UG3/76nSCaO/wsn/hxlh39ytG5+k2MPcW
nzvanX8cxWZEUEIu/KR1uDvXx+S4mx6YXagCSTziG8kNovgDZt7hrdxVvHRt6ryv
Q3GgK7RlGpUXTdeDEac1jFlZbaVKrH/oitidtwuk34L67VwCjWf+9gXk8YUI/dKz
TCoT8qECgYEAyUbioIZuc6iWF2oIk3VuPdWHUvhuhzvYr+gb++P/xIXraEUMI81c
UFUDOw+jVVF6H0aioD1rRUczF9vJVE1pUZrXHbAViPECr6QgZTl1mluHtxnT8Asq
7sXS69HdlV+k+P+YZ51qRXRLKZsjwSJwn8fCRWS+7HPdQ3ogIWp1Q+A=
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDOzCCAiOgAwIBAgIUXLY30LJGItNE9MlW/lkwsiz4ciEwDQYJKoZIhvcNAQEL
BQAwQzEUMBIGA1UECgwLbWFjaGluZXpvbmUxFDASBgNVBAoMC0lYV2ViU29ja2V0
MRUwEwYDVQQDDAx1bnRydXN0ZWQtY2EwHhcNMjAwMzEyMjMwNDM3WhcNMjEwMzEy
MjMwNDM3WjBHMRQwEgYDVQQKDAttYWNoaW5lem9uZTEUMBIGA1UECgwLSVhXZWJT
b2NrZXQxGTAXBgNVBAMMEHVudHJ1c3RlZC1jbGllbnQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQC5SOVG06/37lekGxkBJUt7AN3Xw708jN8XI7DR1sq+
NPeGN/wEfCUSIHJXQq1fqBJQkYKpyYa9EkvQs2RhrOXahul3ZdX1kP3zxQLvvbxU
EcB2gMS4B61EqnmBHRMsj+dI91++YSEFE1hkolD3+gQtm0+FVbPoXt5Y3rBAF/l0
UMvrBsgraB12OHUlqqj8WkUIul37u8XcnsnPWKoigWb2k+/W47LCGsd+haRnulIK
ADQOsjNs7wy3IV9d8zCifEV0YUT5ZPBg2K2f1lpYfOSobK7JLqgV03HVrkROQfej
FTvMRtDAxlsa6bHLrGUeBhCNaO7SLj16oo5nMCq4DnTjAgMBAAGjIzAhMB8GA1Ud
EQQYMBaCCWxvY2FsaG9zdIIJMTI3LjAuMC4xMA0GCSqGSIb3DQEBCwUAA4IBAQDH
7XbX6dCzUCGj91835gvTPr5FgKrTqocVQ+EtCxJxRVvqB4zj7/80SHxByyWz9XJQ
IBZmDz298nVqfW6uegq3qU29sG9OAOOg6I0SpWOL9qq/ZKMoEqRv6fHnjHhRiOwT
isqdZISh1vhoIvcUpNsm1PwpaDxerjeE3oPyuNO0P0lKI5jykO3orDANGvyC8fzx
jxlDsSXCgmcaPh99752vBe8UlY1M8t4GxsJAV8DXxdDZCYIWMe+/C5aQ2xDvj3+l
vYht9+yc6ebl5uGOttgWSYPxdryCynDKsdBfXxet9Ix/qdsLF9hwU2JokVDh50J+
er36eML3WvEO2HuBKTq8
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAuUjlRtOv9+5XpBsZASVLewDd18O9PIzfFyOw0dbKvjT3hjf8
BHwlEiByV0KtX6gSUJGCqcmGvRJL0LNkYazl2obpd2XV9ZD988UC7728VBHAdoDE
uAetRKp5gR0TLI/nSPdfvmEhBRNYZKJQ9/oELZtPhVWz6F7eWN6wQBf5dFDL6wbI
K2gddjh1Jaqo/FpFCLpd+7vF3J7Jz1iqIoFm9pPv1uOywhrHfoWkZ7pSCgA0DrIz
bO8MtyFfXfMwonxFdGFE+WTwYNitn9ZaWHzkqGyuyS6oFdNx1a5ETkH3oxU7zEbQ
wMZbGumxy6xlHgYQjWju0i49eqKOZzAquA504wIDAQABAoIBAHle6OG2dUShmkNj
hMOdXI5ciPV3wRRS6yhLNt6eJvzl0WbYcXu2nsn6+ytyAAPzItwoFUGHQ33C6Grz
uEPLcF3vliuiR7+ulMwEN+I3lZA0eLCntTUfwj6CtUkAdLjyIv1HHi6ljW23uGVj
dkqaOfZuEG81Lr5+toPci/PQQJYR4btVJJHCXJ6KVx6w8i++fwcRwby9riNhWAzk
8OhUiSTTsx9sioBk62QRB8Qs0LVR5tGbDSrpQW5Ns9KnH7sayInwEN94PTsPKcqY
i/oNNZG+qvSf8jG8QiIMdyGx/goVKuQVx5Gev8my5mnfuVM/oXB20T56z2iII64V
kNh/sNECgYEA3qjAPqTY0rnu2tmvQN1PwAfyENz3XmTVWpVtBvedPj3qiV3mXJGZ
qQoS0wb/2t/D05GhTxBJARk8foorNzGchMVtECMlGxDAs1vBw6dwSK5hJnw47PQQ
Q68Vz/zwvrzJgmeijPow87PdpomYECgerTa6BynH8W0ffuSNIJC8Ke8CgYEA1Qd2
FpwFjUFqhYbcvR3VG8qAMIF8RKLzmQDDZh7liKMeHLypdXRz1ZEa6NFkvsg7qRZh
ahe/ULubnRhdOxs0JVyZPS0dU3ZmHT9bBcIuLzC//5e1ictXUZspFzIHE9T4suLC
Xnh2vqQzlEy3iZLx5B6FMzc3ws7LM7q7L2AfqE0CgYAMkvEQWJTaCaAAgeyQuC7J
xGkaJLBfh0g5LlkS3Kbnne2Bxmi874gC8MuxWSLXxG01pHK8mUnWIwu0ha79FfMl
2FRZZfKxfZe0SUk++FSx9g8MclVwpDPK7rdHoJwj2Vtz3tBiL7rV+GFbB0gsGWfq
Fj4ZK3XcH3J44wVJQoMtxwKBgQDM/ZkMuKY+/yvZwaS39vUTARHJm1BRW9y85pcg
tap6iTx4urL2a1Drue4DCzu+uj9uvjKPPLrEnUNpMADG166eJTTwQXFu1wf8LPMR
34FBt8+JzBrMtfcYeA5aW7Gjy9Rljv8qmRDq8mcP1aLnp5dMxHG4jvIBa6zt4kot
lHniIQKBgQDWuMWA2Q1kcKKy7OJszp60jO+ftq306QMoDsPNFLUUUtCxNSrpAeC2
MVvI4kzIn+6hYsMdRsqDSadosuKE4ZzCPIfuyadiAKTAO5esBJs7KAQFMJXSnfY7
+Zs1QUcdLZAWivO7j3ZASbR8L/1mawlBMgyIaT9YKp1+iW+uzaYgUQ==
-----END RSA PRIVATE KEY-----

View File

@ -56,6 +56,7 @@ set (SOURCES
IXWebSocketSubProtocolTest.cpp
IXSentryClientTest.cpp
IXWebSocketChatTest.cpp
IXCobraToSentryBotTest.cpp
)
# Some unittest don't work on windows yet
@ -99,6 +100,7 @@ target_link_libraries(ixwebsocket_unittest ixwebsocket)
target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest ixbots)
target_link_libraries(ixwebsocket_unittest spdlog)

View File

@ -122,31 +122,32 @@ namespace
void CobraChat::subscribe(const std::string& channel)
{
std::string filter;
_conn.subscribe(channel, filter, [this](const Json::Value& msg) {
spdlog::info("receive {}", msg.toStyledString());
_conn.subscribe(
channel, filter, [this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString());
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in our own messages
if (msg_user == _user) return;
// We are not interested in our own messages
if (msg_user == _user) return;
_receivedQueue.push(msg);
_receivedQueue.push(msg);
std::stringstream ss;
ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > ";
log(ss.str());
});
std::stringstream ss;
ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > ";
log(ss.str());
});
}
void CobraChat::sendMessage(const std::string& text)
@ -218,7 +219,7 @@ namespace
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
TLogger() << "Subscriber: published message acked: " << msgId;
}
});

View File

@ -79,7 +79,7 @@ namespace
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
Logger() << "Subscriber connected:";
TLogger() << "Subscriber connected:";
for (auto&& it : headers)
{
log("Headers " + it.first + " " + it.second);
@ -87,47 +87,48 @@ namespace
}
if (eventType == ix::CobraConnection_EventType_Error)
{
Logger() << "Subscriber error:" << errMsg;
TLogger() << "Subscriber error:" << errMsg;
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated");
std::string filter;
conn.subscribe(CHANNEL, filter, [](const Json::Value& msg) {
log(msg.toStyledString());
conn.subscribe(
CHANNEL, filter, [](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString());
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
gMessageCount++;
});
gMessageCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
Logger() << "Subscriber: subscribed to channel " << subscriptionId;
TLogger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == CHANNEL)
{
gSubscriberConnectedAndSubscribed = true;
}
else
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
Logger() << "Subscriber: ununexpected from channel " << subscriptionId;
TLogger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != CHANNEL)
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
TLogger() << "Subscriber: published message acked: " << msgId;
}
});

View File

@ -0,0 +1,211 @@
/*
* cmd_satori_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <chrono>
#include <iostream>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
});
}
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
Json::Value msg;
msg["fps"] = 60;
cobraMetricsPublisher.setGenericAttributes("game", "ody");
// Wait a bit
ix::msleep(500);
// publish some messages
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
ix::msleep(500);
}
} // namespace
TEST_CASE("Cobra_to_sentry_bot", "[foo]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// Start a fake sentry http server
SocketTLSOptions tlsOptionsServer;
tlsOptionsServer.certFile = ".certs/trusted-server-crt.pem";
tlsOptionsServer.keyFile = ".certs/trusted-server-key.pem";
tlsOptionsServer.caFile = ".certs/trusted-ca-crt.pem";
int sentryPort = getFreePort();
ix::HttpServer sentryServer(sentryPort, "127.0.0.1");
sentryServer.setTLSOptions(tlsOptionsServer);
sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
if (request->method == "POST")
{
return std::make_shared<HttpResponse>(
200, "OK", HttpErrorCode::Ok, headers, std::string());
}
else
{
return std::make_shared<HttpResponse>(
405, "OK", HttpErrorCode::Invalid, headers, std::string("Invalid method"));
}
});
res = sentryServer.listen();
REQUIRE(res.first);
sentryServer.start();
setupTrafficTrackerCallback();
// Run the bot for a small amount of time
std::string channel = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
std::thread publisherThread(runPublisher, config, channel);
std::string filter;
bool verbose = true;
bool strict = true;
size_t maxQueueSize = 10;
bool enableHeartbeat = false;
// FIXME: try to get this working with https instead of http
// to regress the TLS 1.3 OpenSSL bug
// -> https://github.com/openssl/openssl/issues/7967
// https://xxxxx:yyyyyy@sentry.io/1234567
std::stringstream oss;
std::string scheme("http://");
oss << scheme << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
std::string dsn = oss.str();
SocketTLSOptions tlsOptionsClient;
tlsOptionsClient.certFile = ".certs/trusted-client-crt.pem";
tlsOptionsClient.keyFile = ".certs/trusted-client-key.pem";
tlsOptionsClient.caFile = ".certs/trusted-ca-crt.pem";
SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptionsClient);
// Only run the bot for 3 seconds
int runtime = 3;
int sentCount = cobra_to_sentry_bot(config,
channel,
filter,
sentryClient,
verbose,
strict,
maxQueueSize,
enableHeartbeat,
runtime);
//
// We want at least 2 messages to be sent
//
REQUIRE(sentCount >= 2);
// Give us 1s for all messages to be received
ix::msleep(1000);
spdlog::info("Incoming bytes {}", incomingBytes);
spdlog::info("Outgoing bytes {}", outgoingBytes);
spdlog::info("Stopping snake server...");
snakeServer.stop();
spdlog::info("Stopping redis server...");
redisServer.stop();
publisherThread.join();
sentryServer.stop();
}
}

View File

@ -29,17 +29,17 @@ namespace ix
makeCancellationRequestWithTimeout(timeoutSecs, requestInitCancellation);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
Logger() << "errMsg: " << errMsg;
TLogger() << "errMsg: " << errMsg;
REQUIRE(success);
Logger() << "Sending request: " << request << "to " << host << ":" << port;
TLogger() << "Sending request: " << request << "to " << host << ":" << port;
REQUIRE(socket->writeBytes(request, isCancellationRequested));
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
Logger() << "read error: " << strerror(Socket::getErrno());
TLogger() << "read error: " << strerror(Socket::getErrno());
REQUIRE(lineValid);

View File

@ -24,7 +24,7 @@ namespace ix
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
std::mutex Logger::_mutex;
std::mutex TLogger::_mutex;
std::stack<int> freePorts;
void setupWebSocketTrafficTrackerCallback()
@ -43,9 +43,9 @@ namespace ix
void reportWebSocketTraffic()
{
Logger() << incomingBytes;
Logger() << "Incoming bytes: " << incomingBytes;
Logger() << "Outgoing bytes: " << outgoingBytes;
TLogger() << incomingBytes;
TLogger() << "Incoming bytes: " << incomingBytes;
TLogger() << "Outgoing bytes: " << outgoingBytes;
}
void msleep(int ms)
@ -65,7 +65,7 @@ namespace ix
void log(const std::string& msg)
{
Logger() << msg;
TLogger() << msg;
}
void hexDump(const std::string& prefix, const std::string& s)
@ -90,17 +90,17 @@ namespace ix
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New connection";
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -118,7 +118,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
Logger() << res.second;
TLogger() << res.second;
return false;
}

View File

@ -28,11 +28,11 @@ namespace ix
void setupWebSocketTrafficTrackerCallback();
void reportWebSocketTraffic();
struct Logger
struct TLogger
{
public:
template<typename T>
Logger& operator<<(T const& obj)
TLogger& operator<<(T const& obj)
{
std::lock_guard<std::mutex> lock(_mutex);

View File

@ -199,13 +199,13 @@ namespace
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)

View File

@ -180,13 +180,13 @@ namespace
&mutexWrite](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New server connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)

View File

@ -23,23 +23,23 @@ namespace
[connectionState, &server](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
TLogger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto&& it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
Logger() << "Message received: " << msg->str;
TLogger() << "Message received: " << msg->str;
for (auto&& client : server.getClients())
{
@ -52,7 +52,7 @@ namespace
auto res = server.listen();
if (!res.first)
{
Logger() << res.second;
TLogger() << res.second;
return false;
}

View File

@ -140,13 +140,13 @@ namespace
const ix::WebSocketCloseInfo& closeInfo) {
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
TLogger() << "New server connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << openInfo.uri;
TLogger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)

View File

@ -169,13 +169,13 @@ namespace
const ix::WebSocketCloseInfo& closeInfo) {
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
TLogger() << "New server connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << openInfo.uri;
TLogger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)

View File

@ -40,21 +40,21 @@ namespace ix
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
TLogger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -72,7 +72,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
Logger() << res.second;
TLogger() << res.second;
return false;
}
@ -133,7 +133,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
Logger() << "writeBytes";
TLogger() << "writeBytes";
socket->writeBytes("GET /\r\n", isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);

View File

@ -23,13 +23,13 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
TLogger() << "New connection";
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
TLogger() << it.first << ": " << it.second;
}
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];

View File

@ -0,0 +1,11 @@
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});

View File

@ -0,0 +1,11 @@
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080, perMessageDeflate: true });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
# websocket send client
import argparse
import asyncio
import websockets
async def send(url):
async with websockets.connect(url) as ws:
while True:
message = input('> ')
print('Sending message...')
await ws.send(message)
print('Message sent.')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='websocket proxy.')
parser.add_argument('--url', help='Remote websocket url',
default='wss://echo.websocket.org')
args = parser.parse_args()
asyncio.get_event_loop().run_until_complete(send(args.url))

View File

@ -8,9 +8,10 @@ import websockets
async def echo(websocket, path):
msg = await websocket.recv()
print(f'Received {len(msg)} bytes')
await websocket.send(msg)
while True:
msg = await websocket.recv()
print(f'Received {len(msg)} bytes')
await websocket.send(msg)
host = os.getenv('BIND_HOST', 'localhost')
print(f'Serving on {host}:8766')

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# WS server example
import asyncio
import os
import websockets
async def echo(websocket, path):
while True:
msg = await websocket.recv()
print(f'Received {len(msg)} bytes')
await websocket.send(msg)
host = os.getenv('BIND_HOST', 'localhost')
print(f'Serving on {host}:8766')
start_server = websockets.serve(echo, host, 8766, max_size=2 ** 30)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@ -58,8 +58,6 @@ add_executable(ws
ws_cobra_subscribe.cpp
ws_cobra_metrics_publish.cpp
ws_cobra_publish.cpp
ws_cobra_to_statsd.cpp
ws_cobra_to_sentry.cpp
ws_cobra_metrics_to_redis.cpp
ws_httpd.cpp
ws_autobahn.cpp
@ -74,6 +72,7 @@ target_link_libraries(ws ixwebsocket)
target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore)
target_link_libraries(ws ixsentry)
target_link_libraries(ws ixbots)
target_link_libraries(ws spdlog)

111
ws/ws.cpp
View File

@ -11,7 +11,10 @@
#include <cli11/CLI11.hpp>
#include <fstream>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixsentry/IXSentryClient.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXUserAgent.h>
@ -64,10 +67,6 @@ int main(int argc, char** argv)
std::string filter;
std::string message;
std::string password;
std::string appkey;
std::string endpoint;
std::string rolename;
std::string rolesecret;
std::string prefix("ws.test.v0");
std::string fields;
std::string dsn;
@ -81,6 +80,7 @@ int main(int argc, char** argv)
std::string project;
std::string key;
ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig;
std::string ciphers;
std::string redirectUrl;
bool headersOnly = false;
@ -88,6 +88,7 @@ int main(int argc, char** argv)
bool verbose = false;
bool save = false;
bool quiet = false;
bool fluentd = false;
bool compress = false;
bool strict = false;
bool stress = false;
@ -108,7 +109,6 @@ int main(int argc, char** argv)
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
int jobs = 4;
uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
@ -127,6 +127,13 @@ int main(int argc, char** argv)
app->add_flag("--verify_none", verifyNone, "Disable peer cert verification");
};
auto addCobraConfig = [&cobraConfig](CLI::App* app) {
app->add_option("--appkey", cobraConfig.appkey, "Appkey")->required();
app->add_option("--endpoint", cobraConfig.endpoint, "Endpoint")->required();
app->add_option("--rolename", cobraConfig.rolename, "Role name")->required();
app->add_option("--rolesecret", cobraConfig.rolesecret, "Role secret")->required();
};
app.add_flag("--version", version, "Print ws version");
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
@ -219,34 +226,25 @@ int main(int argc, char** argv)
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber");
cobraSubscribeApp->add_option("--appkey", appkey, "Appkey")->required();
cobraSubscribeApp->add_option("--endpoint", endpoint, "Endpoint")->required();
cobraSubscribeApp->add_option("--rolename", rolename, "Role name")->required();
cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobraSubscribeApp->add_option("--channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
addTLSOptions(cobraSubscribeApp);
addCobraConfig(cobraSubscribeApp);
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->add_option("--appkey", appkey, "Appkey")->required();
cobraPublish->add_option("--endpoint", endpoint, "Endpoint")->required();
cobraPublish->add_option("--rolename", rolename, "Role name")->required();
cobraPublish->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobraPublish->add_option("--channel", channel, "Channel")->required();
cobraPublish->add_option("--pidfile", pidfile, "Pid file");
cobraPublish->add_option("path", path, "Path to the file to send")
->required()
->check(CLI::ExistingPath);
addTLSOptions(cobraPublish);
addCobraConfig(cobraPublish);
CLI::App* cobraMetricsPublish =
app.add_subcommand("cobra_metrics_publish", "Cobra metrics publisher");
cobraMetricsPublish->add_option("--appkey", appkey, "Appkey");
cobraMetricsPublish->add_option("--endpoint", endpoint, "Endpoint");
cobraMetricsPublish->add_option("--rolename", rolename, "Role name");
cobraMetricsPublish->add_option("--rolesecret", rolesecret, "Role secret");
cobraMetricsPublish->add_option("--channel", channel, "Channel")->required();
cobraMetricsPublish->add_option("--pidfile", pidfile, "Pid file");
cobraMetricsPublish->add_option("path", path, "Path to the file to send")
@ -254,12 +252,9 @@ int main(int argc, char** argv)
->check(CLI::ExistingPath);
cobraMetricsPublish->add_flag("--stress", stress, "Stress mode");
addTLSOptions(cobraMetricsPublish);
addCobraConfig(cobraMetricsPublish);
CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra metrics to statsd");
cobra2statsd->add_option("--appkey", appkey, "Appkey");
cobra2statsd->add_option("--endpoint", endpoint, "Endpoint");
cobra2statsd->add_option("--rolename", rolename, "Role name");
cobra2statsd->add_option("--rolesecret", rolesecret, "Role secret");
cobra2statsd->add_option("--host", hostname, "Statsd host");
cobra2statsd->add_option("--port", statsdPort, "Statsd port");
cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
@ -269,14 +264,10 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd);
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
cobra2sentry->add_option("--appkey", appkey, "Appkey")->required();
cobra2sentry->add_option("--endpoint", endpoint, "Endpoint")->required();
cobra2sentry->add_option("--rolename", rolename, "Role name")->required();
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
@ -286,13 +277,10 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry);
CLI::App* cobra2redisApp =
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
cobra2redisApp->add_option("--appkey", appkey, "Appkey")->required();
cobra2redisApp->add_option("--endpoint", endpoint, "Endpoint")->required();
cobra2redisApp->add_option("--rolename", rolename, "Role name")->required();
cobra2redisApp->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2redisApp->add_option("channel", channel, "Channel")->required();
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
@ -300,6 +288,7 @@ int main(int argc, char** argv)
cobra2redisApp->add_option("--port", redisPort, "Redis port");
cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
addTLSOptions(cobra2redisApp);
addCobraConfig(cobra2redisApp);
CLI::App* snakeApp = app.add_subcommand("snake", "Snake server");
snakeApp->add_option("--port", port, "Connection url");
@ -368,6 +357,10 @@ int main(int argc, char** argv)
tlsOptions.caFile = "NONE";
}
// Cobra config
cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
cobraConfig.socketTLSOptions = tlsOptions;
int ret = 1;
if (app.got_subcommand("transfer"))
{
@ -436,60 +429,40 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_subscribe"))
{
ret = ix::ws_cobra_subscribe_main(
appkey, endpoint, rolename, rolesecret, channel, filter, quiet, tlsOptions);
ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, quiet, fluentd);
}
else if (app.got_subcommand("cobra_publish"))
{
ret = ix::ws_cobra_publish_main(
appkey, endpoint, rolename, rolesecret, channel, path, tlsOptions);
ret = ix::ws_cobra_publish_main(cobraConfig, channel, path);
}
else if (app.got_subcommand("cobra_metrics_publish"))
{
ret = ix::ws_cobra_metrics_publish_main(
appkey, endpoint, rolename, rolesecret, channel, path, stress, tlsOptions);
ret = ix::ws_cobra_metrics_publish_main(cobraConfig, channel, path, stress);
}
else if (app.got_subcommand("cobra_to_statsd"))
{
ret = ix::ws_cobra_to_statsd_main(appkey,
endpoint,
rolename,
rolesecret,
channel,
filter,
hostname,
statsdPort,
prefix,
fields,
verbose,
tlsOptions);
ret = ix::cobra_to_statsd_bot(
cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose);
}
else if (app.got_subcommand("cobra_to_sentry"))
{
ret = ix::ws_cobra_to_sentry_main(appkey,
endpoint,
rolename,
rolesecret,
channel,
filter,
dsn,
verbose,
strict,
jobs,
maxQueueSize,
tlsOptions);
bool enableHeartbeat = true;
int runtime = -1;
ix::SentryClient sentryClient(dsn);
ret = ix::cobra_to_sentry_bot(cobraConfig,
channel,
filter,
sentryClient,
verbose,
strict,
maxQueueSize,
enableHeartbeat,
runtime);
}
else if (app.got_subcommand("cobra_metrics_to_redis"))
{
ret = ix::ws_cobra_metrics_to_redis(appkey,
endpoint,
rolename,
rolesecret,
channel,
filter,
hostname,
redisPort,
tlsOptions);
ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, hostname, redisPort);
}
else if (app.got_subcommand("snake"))
{

58
ws/ws.h
View File

@ -5,6 +5,7 @@
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <string>
@ -74,67 +75,26 @@ namespace ix
const std::string& channel,
bool verbose);
int ws_cobra_subscribe_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
bool quiet,
const ix::SocketTLSOptions& tlsOptions);
bool fluentd);
int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_publish_main(const ix::CobraConfig& appkey,
const std::string& channel,
const std::string& path,
const ix::SocketTLSOptions& tlsOptions);
const std::string& path);
int ws_cobra_metrics_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
bool stress,
const ix::SocketTLSOptions& tlsOptions);
bool stress);
int ws_cobra_to_statsd_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& fields,
bool verbose,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
const std::string& dsn,
bool verbose,
bool strict,
int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_metrics_to_redis(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const ix::SocketTLSOptions& tlsOptions);
int port);
int ws_snake_main(int port,
const std::string& hostname,

View File

@ -15,14 +15,10 @@
namespace ix
{
int ws_cobra_metrics_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
bool stress,
const ix::SocketTLSOptions& tlsOptions)
bool stress)
{
std::atomic<int> sentMessages(0);
std::atomic<int> ackedMessages(0);
@ -36,8 +32,13 @@ namespace ix
cobraMetricsPublisher.enable(true);
bool enablePerMessageDeflate = true;
cobraMetricsPublisher.configure(
appkey, endpoint, channel, rolename, rolesecret, enablePerMessageDeflate, tlsOptions);
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
enablePerMessageDeflate,
config.socketTLSOptions);
while (!cobraMetricsPublisher.isAuthenticated())
;

View File

@ -17,23 +17,14 @@
namespace ix
{
int ws_cobra_metrics_to_redis(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const ix::SocketTLSOptions& tlsOptions)
int port)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
// Display incoming messages
@ -135,7 +126,7 @@ namespace ix
channel,
filter,
[&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue](
const Json::Value& msg) {
const Json::Value& msg, const std::string& /*position*/) {
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg);

View File

@ -16,13 +16,9 @@
namespace ix
{
int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
int ws_cobra_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
const ix::SocketTLSOptions& tlsOptions)
const std::string& path)
{
std::ifstream f(path);
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
@ -36,12 +32,7 @@ namespace ix
}
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
// Display incoming messages
std::atomic<bool> authenticated(false);

View File

@ -6,6 +6,7 @@
#include <atomic>
#include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -13,22 +14,38 @@
namespace ix
{
int ws_cobra_subscribe_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
void writeToStdout(bool fluentd,
Json::FastWriter& jsonWriter,
const Json::Value& msg,
const std::string& position)
{
Json::Value enveloppe;
if (fluentd)
{
enveloppe["producer"] = "cobra";
enveloppe["consumer"] = "fluentd";
Json::Value msgWithPosition(msg);
msgWithPosition["position"] = position;
enveloppe["message"] = msgWithPosition;
std::cout << jsonWriter.write(enveloppe);
}
else
{
enveloppe = msg;
std::cout << position << " " << jsonWriter.write(enveloppe);
}
}
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
bool quiet,
const ix::SocketTLSOptions& tlsOptions)
bool fluentd)
{
ix::CobraConnection conn;
conn.configure(appkey,
endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.configure(config);
conn.connect();
Json::FastWriter jsonWriter;
@ -51,7 +68,7 @@ namespace ix
std::thread t(timer);
conn.setEventCallback(
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet](
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet, &fluentd](
ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
@ -69,18 +86,18 @@ namespace ix
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(
channel,
filter,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) {
if (!quiet)
{
spdlog::info(jsonWriter.write(msg));
}
conn.subscribe(channel,
filter,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd](
const Json::Value& msg, const std::string& position) {
if (!quiet)
{
writeToStdout(fluentd, jsonWriter, msg, position);
}
msgPerSeconds++;
msgCount++;
});
msgPerSeconds++;
msgCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{

View File

@ -30,6 +30,15 @@ namespace ix
void start();
void stop();
int getSentBytes()
{
return _sentBytes;
}
int getReceivedBytes()
{
return _receivedBytes;
}
void sendMessage(const std::string& text);
private:
@ -38,6 +47,8 @@ namespace ix
ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
bool _binaryMode;
std::atomic<int> _receivedBytes;
std::atomic<int> _sentBytes;
void log(const std::string& msg);
WebSocketHttpHeaders parseHeaders(const std::string& data);
@ -54,6 +65,8 @@ namespace ix
: _url(url)
, _disablePerMessageDeflate(disablePerMessageDeflate)
, _binaryMode(binaryMode)
, _receivedBytes(0)
, _sentBytes(0)
{
if (disableAutomaticReconnection)
{
@ -68,6 +81,17 @@ namespace ix
{
_webSocket.addSubProtocol(subprotocol);
}
WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) {
if (incoming)
{
_receivedBytes += size;
}
else
{
_sentBytes += size;
}
});
}
void WebSocketConnect::log(const std::string& msg)
@ -246,6 +270,9 @@ namespace ix
spdlog::info("");
webSocketChat.stop();
spdlog::info("Received {} bytes", webSocketChat.getReceivedBytes());
spdlog::info("Sent {} bytes", webSocketChat.getSentBytes());
return 0;
}
} // namespace ix