(cobra) CobraMetricsPublisher can be configure with an ix::CobraConfig + more unittest use SSL in server + client

This commit is contained in:
Benjamin Sergeant 2020-03-20 12:21:45 -07:00
parent 5691b55967
commit 829751b7af
12 changed files with 154 additions and 155 deletions

View File

@ -1,6 +1,10 @@
# Changelog
All changes to this project will be documented in this file.
## [8.3.1] - 2020-03-20
(cobra) CobraMetricsPublisher can be configure with an ix::CobraConfig + more unittest use SSL in server + client
## [8.3.0] - 2020-03-18
(websocket) Simplify ping/pong based heartbeat implementation

View File

@ -27,20 +27,12 @@ namespace ix
;
}
void CobraMetricsPublisher::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions)
void CobraMetricsPublisher::configure(const CobraConfig& config,
const std::string& channel)
{
// Configure the satori connection and start its publish background thread
_cobra_metrics_theaded_publisher.configure(config, channel);
_cobra_metrics_theaded_publisher.start();
_cobra_metrics_theaded_publisher.configure(appkey, endpoint, channel,
rolename, rolesecret,
enablePerMessageDeflate, socketTLSOptions);
}
Json::Value& CobraMetricsPublisher::getGenericAttributes()

View File

@ -40,13 +40,8 @@ namespace ix
/// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions);
void configure(const CobraConfig& config,
const std::string& channel);
/// Setter for the list of blacklisted metrics ids.
/// That list is sorted internally for fast lookups

View File

@ -92,22 +92,14 @@ namespace ix
_thread = std::thread(&CobraMetricsThreadedPublisher::run, this);
}
void CobraMetricsThreadedPublisher::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions)
void CobraMetricsThreadedPublisher::configure(const CobraConfig& config,
const std::string& channel)
{
ix::IXCoreLogger::Log(config.socketTLSOptions.getDescription().c_str());
_channel = channel;
_cobra_connection.configure(config);
ix::IXCoreLogger::Log(socketTLSOptions.getDescription().c_str());
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate);
_cobra_connection.configure(appkey, endpoint,
rolename, rolesecret,
webSocketPerMessageDeflateOptions, socketTLSOptions);
}
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind)

View File

@ -27,13 +27,8 @@ namespace ix
~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc...
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions);
void configure(const CobraConfig& config,
const std::string& channel);
/// Start the worker thread, used for background publishing
void start();

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "8.3.0"
#define IX_WEBSOCKET_VERSION "8.3.1"

View File

@ -37,7 +37,9 @@ namespace
class CobraChat
{
public:
CobraChat(const std::string& user, const std::string& session, const std::string& endpoint);
CobraChat(const std::string& user,
const std::string& session,
const ix::CobraConfig& config);
void subscribe(const std::string& channel);
void start();
@ -54,7 +56,7 @@ namespace
private:
std::string _user;
std::string _session;
std::string _endpoint;
ix::CobraConfig _cobraConfig;
std::queue<Json::Value> _publish_queue;
mutable std::mutex _queue_mutex;
@ -72,10 +74,10 @@ namespace
CobraChat::CobraChat(const std::string& user,
const std::string& session,
const std::string& endpoint)
const ix::CobraConfig& config)
: _user(user)
, _session(session)
, _endpoint(endpoint)
, _cobraConfig(config)
, _stop(false)
, _connectedAndSubscribed(false)
{
@ -169,19 +171,9 @@ namespace
//
void CobraChat::run()
{
// "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string channel = _session;
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
SocketTLSOptions socketTLSOptions;
_conn.configure(appkey,
_endpoint,
role,
secret,
ix::WebSocketPerMessageDeflateOptions(true),
socketTLSOptions);
_conn.configure(_cobraConfig);
_conn.connect();
_conn.setEventCallback([this, channel](ix::CobraConnectionEventType eventType,
@ -265,7 +257,7 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
@ -281,13 +273,20 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
setupTrafficTrackerCallback();
std::string session = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::string endpoint = makeCobraEndpoint(port, true);
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
CobraChat chatA("jean", session, endpoint);
CobraChat chatB("paul", session, endpoint);
CobraChat chatA("jean", session, config);
CobraChat chatB("paul", session, config);
chatA.start();
chatB.start();

View File

@ -33,17 +33,6 @@ namespace
});
}
//
// This project / appkey is configure on cobra to not do any batching.
// This way we can start a subscriber and receive all messages as they come in.
//
std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f");
std::string CHANNEL("unittest_channel");
std::string PUBLISHER_ROLE("_pub");
std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db");
std::string SUBSCRIBER_ROLE("_sub");
std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba");
std::atomic<bool> gStop;
std::atomic<bool> gSubscriberConnectedAndSubscribed;
std::atomic<size_t> gUniqueMessageIdsCount;
@ -55,24 +44,17 @@ namespace
//
// Background thread subscribe to the channel and validates what was sent
//
void startSubscriber(const std::string& endpoint)
void startSubscriber(const ix::CobraConfig& config, const std::string& channel)
{
gSubscriberConnectedAndSubscribed = false;
gUniqueMessageIdsCount = 0;
gMessageCount = 0;
ix::CobraConnection conn;
SocketTLSOptions socketTLSOptions;
conn.configure(APPKEY,
endpoint,
SUBSCRIBER_ROLE,
SUBSCRIBER_SECRET,
ix::WebSocketPerMessageDeflateOptions(true),
socketTLSOptions);
conn.configure(config);
conn.connect();
conn.setEventCallback([&conn](ix::CobraConnectionEventType eventType,
conn.setEventCallback([&conn, &channel](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
@ -96,7 +78,7 @@ namespace
std::string position("$");
conn.subscribe(
CHANNEL, filter, position, [](const Json::Value& msg, const std::string& /*position*/) {
channel, filter, position, [](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString());
std::string id = msg["id"].asString();
@ -111,7 +93,7 @@ namespace
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
TLogger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == CHANNEL)
if (subscriptionId == channel)
{
gSubscriberConnectedAndSubscribed = true;
}
@ -123,7 +105,7 @@ namespace
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
TLogger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != CHANNEL)
if (subscriptionId != channel)
{
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
@ -140,7 +122,7 @@ namespace
std::this_thread::sleep_for(duration);
}
conn.unsubscribe(CHANNEL);
conn.unsubscribe(channel);
conn.disconnect();
gUniqueMessageIdsCount = gIds.size();
@ -165,7 +147,8 @@ namespace
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
bool preferTLS = false;
snake::AppConfig appConfig = makeSnakeServerConfig(port, preferTLS);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
@ -179,15 +162,21 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
setupTrafficTrackerCallback();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
std::string channel = ix::generateSessionId();
std::string endpoint = makeCobraEndpoint(port, preferTLS);
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
// Make channel name unique
CHANNEL += uuid4();
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
gStop = false;
std::thread bgThread(&startSubscriber, endpoint);
std::thread subscriberThread(&startSubscriber, config, channel);
int timeout = 10 * 1000; // 10s
@ -207,18 +196,9 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
}
ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(APPKEY,
endpoint,
CHANNEL,
PUBLISHER_ROLE,
PUBLISHER_SECRET,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.configure(config, channel);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
cobraMetricsPublisher.enable(true);
Json::Value data;
data["foo"] = "bar";
@ -294,7 +274,7 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
// Now stop the thread
gStop = true;
bgThread.join();
subscriberThread.join();
//
// Validate that we received all message kinds, and the correct number of messages

View File

@ -42,18 +42,9 @@ namespace
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.configure(config, channel);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
cobraMetricsPublisher.enable(true);
Json::Value msg;
msg["fps"] = 60;
@ -83,7 +74,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
@ -96,16 +87,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
snakeServer.run();
// Start a fake sentry http server
SocketTLSOptions tlsOptionsServer;
tlsOptionsServer.certFile = ".certs/trusted-server-crt.pem";
tlsOptionsServer.keyFile = ".certs/trusted-server-key.pem";
tlsOptionsServer.caFile = ".certs/trusted-ca-crt.pem";
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
tlsOptionsServer.tls = true;
#else
tlsOptionsServer.tls = false;
#endif
SocketTLSOptions tlsOptionsServer = makeServerTLSOptions(true);
int sentryPort = getFreePort();
ix::HttpServer sentryServer(sentryPort, "127.0.0.1");
@ -145,16 +127,14 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
std::string endpoint = makeCobraEndpoint(port, true);
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
std::thread publisherThread(runPublisher, config, channel);
@ -169,19 +149,14 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
// to regress the TLS 1.3 OpenSSL bug
// -> https://github.com/openssl/openssl/issues/7967
// https://xxxxx:yyyyyy@sentry.io/1234567
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
std::string scheme("https://");
#else
std::string scheme("http://");
#endif
std::stringstream oss;
oss << scheme << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
oss << getHttpScheme()
<< "xxxxxxx:yyyyyyy@localhost:"
<< sentryPort
<< "/1234567";
std::string dsn = oss.str();
SocketTLSOptions tlsOptionsClient;
tlsOptionsClient.certFile = ".certs/trusted-client-crt.pem";
tlsOptionsClient.keyFile = ".certs/trusted-client-key.pem";
tlsOptionsClient.caFile = ".certs/trusted-ca-crt.pem";
SocketTLSOptions tlsOptionsClient = makeClientTLSOptions();
SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptionsClient);

View File

@ -149,7 +149,61 @@ namespace ix
return std::string(vec.begin(), vec.end());
}
snake::AppConfig makeSnakeServerConfig(int port)
SocketTLSOptions makeClientTLSOptions()
{
SocketTLSOptions tlsOptionsClient;
tlsOptionsClient.certFile = ".certs/trusted-client-crt.pem";
tlsOptionsClient.keyFile = ".certs/trusted-client-key.pem";
tlsOptionsClient.caFile = ".certs/trusted-ca-crt.pem";
return tlsOptionsClient;
}
SocketTLSOptions makeServerTLSOptions(bool preferTLS)
{
// Start a fake sentry http server
SocketTLSOptions tlsOptionsServer;
tlsOptionsServer.certFile = ".certs/trusted-server-crt.pem";
tlsOptionsServer.keyFile = ".certs/trusted-server-key.pem";
tlsOptionsServer.caFile = ".certs/trusted-ca-crt.pem";
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
tlsOptionsServer.tls = preferTLS;
#else
tlsOptionsServer.tls = false;
#endif
return tlsOptionsServer;
}
std::string getHttpScheme()
{
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
std::string scheme("https://");
#else
std::string scheme("http://");
#endif
return scheme;
}
std::string getWsScheme(bool preferTLS)
{
std::string scheme;
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
if (preferTLS)
{
scheme = "wss://";
}
else
{
scheme = "ws://";
}
#else
scheme = "ws://";
#endif
return scheme;
}
snake::AppConfig makeSnakeServerConfig(int port, bool preferTLS)
{
snake::AppConfig appConfig;
appConfig.port = port;
@ -158,6 +212,7 @@ namespace ix
appConfig.redisPort = getFreePort();
appConfig.redisPassword = "";
appConfig.redisHosts.push_back("localhost"); // only one host supported now
appConfig.socketTLSOptions = makeServerTLSOptions(preferTLS);
std::string appsConfigPath("appsConfig.json");
@ -178,4 +233,15 @@ namespace ix
return appConfig;
}
std::string makeCobraEndpoint(int port, bool preferTLS)
{
std::stringstream ss;
ss << getWsScheme(preferTLS)
<< "localhost:"
<< port;
std::string endpoint = ss.str();
return endpoint;
}
} // namespace ix

View File

@ -10,6 +10,7 @@
#include <iostream>
#include <ixsnake/IXAppConfig.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
@ -50,5 +51,12 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server);
snake::AppConfig makeSnakeServerConfig(int port);
snake::AppConfig makeSnakeServerConfig(int port, bool preferTLS);
SocketTLSOptions makeClientTLSOptions();
SocketTLSOptions makeServerTLSOptions(bool preferTLS);
std::string getHttpScheme();
std::string getWsScheme(bool preferTLS);
std::string makeCobraEndpoint(int port, bool preferTLS);
} // namespace ix

View File

@ -30,15 +30,8 @@ namespace ix
CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true);
bool enablePerMessageDeflate = true;
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
enablePerMessageDeflate,
config.socketTLSOptions);
cobraMetricsPublisher.configure(config,
channel);
while (!cobraMetricsPublisher.isAuthenticated())
;