Compare commits

...

9 Commits

Author SHA1 Message Date
Benjamin Sergeant
0520329350 tag version 2019-12-30 17:17:28 -08:00
James Tyra
ba88a05b74 Update IXSocketMbedTLS.cpp (#139)
fix bug with mbedtls server certificate loading.
2019-12-30 16:11:34 -08:00
James Tyra
72f8e76369 Update IXSocketMbedTLS.cpp (#138)
fix bug just introduced.

mbedstl_pk_setup() gets automatically called later.
2019-12-30 15:14:50 -08:00
James Tyra
0389b0b1a3 [2nd try] Update IXSocketMbedTLS.cpp (#137)
* Update IXSocketMbedTLS.cpp

fix initialization of mbedtls context.
without this, crashes under certain conditions.

* Update IXSocketMbedTLS.cpp

removed newline on 46
2019-12-30 14:38:25 -08:00
Benjamin Sergeant
ac0c218455 clang-format 2019-12-30 08:46:18 -08:00
Benjamin Sergeant
299dc0452e (ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd 2019-12-28 17:28:05 -08:00
Benjamin Sergeant
f4af84dc06 (ws cobra to sentry) handle null events for empty queues 2019-12-28 10:16:18 -08:00
Benjamin Sergeant
6522bc06ba (ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events 2019-12-27 19:10:15 -08:00
Benjamin Sergeant
50bea7dffa (ws cobra to sentry) refactor queue related code into a class 2019-12-27 18:24:45 -08:00
16 changed files with 428 additions and 267 deletions

View File

@@ -1,6 +1,26 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [7.8.7] - 2019-12-28
(mbedtls) fix related to private key file parsing and initialization
## [7.8.6] - 2019-12-28
(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
## [7.8.5] - 2019-12-28
(ws cobra to sentry) handle null events for empty queues
## [7.8.4] - 2019-12-27
(ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events
## [7.8.3] - 2019-12-27
(ws cobra to sentry) refactor queue related code into a class
## [7.8.2] - 2019-12-25 ## [7.8.2] - 2019-12-25
(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry

View File

@@ -105,7 +105,9 @@ namespace ix
} }
} }
OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection, const void* data, size_t* len) OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection,
const void* data,
size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@@ -165,7 +167,8 @@ namespace ix
_sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType); _sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType);
SSLSetIOFuncs(_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket); SSLSetIOFuncs(
_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket);
SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd); SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd);
SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12); SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12);
SSLSetPeerDomainName(_sslContext, host.c_str(), host.size()); SSLSetPeerDomainName(_sslContext, host.c_str(), host.size());

View File

@@ -39,6 +39,7 @@ namespace ix
mbedtls_entropy_init(&_entropy); mbedtls_entropy_init(&_entropy);
mbedtls_x509_crt_init(&_cacert); mbedtls_x509_crt_init(&_cacert);
mbedtls_x509_crt_init(&_cert); mbedtls_x509_crt_init(&_cert);
mbedtls_pk_init(&_pkey);
} }
bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg)
@@ -81,6 +82,11 @@ namespace ix
errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'"; errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'";
return false; return false;
} }
if (mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey) < 0)
{
errMsg = "Problem configuring cert '" + _tlsOptions.certFile + "'";
return false;
}
} }
if (_tlsOptions.isPeerVerifyDisabled()) if (_tlsOptions.isPeerVerifyDisabled())
@@ -104,11 +110,6 @@ namespace ix
} }
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL); mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL);
if (_tlsOptions.hasCertAndKey())
{
mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey);
}
} }
if (mbedtls_ssl_setup(&_ssl, &_conf) != 0) if (mbedtls_ssl_setup(&_ssl, &_conf) != 0)

View File

@@ -11,8 +11,8 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#include <assert.h> #include <assert.h>
#include <stdio.h>
#include <sstream> #include <sstream>
#include <stdio.h>
#include <string.h> #include <string.h>
namespace ix namespace ix

View File

@@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.8.2" #define IX_WEBSOCKET_VERSION "7.8.7"

View File

@@ -37,9 +37,7 @@ namespace
class CobraChat class CobraChat
{ {
public: public:
CobraChat(const std::string& user, CobraChat(const std::string& user, const std::string& session, const std::string& endpoint);
const std::string& session,
const std::string& endpoint);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();

View File

@@ -269,7 +269,9 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry"); cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size", maxQueueSize, "Size of the queue to hold messages before they are sent to Sentry"); cobra2sentry->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
@@ -474,8 +476,14 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {
ret = ix::ws_snake_main( ret = ix::ws_snake_main(port,
port, hostname, redisHosts, redisPort, redisPassword, verbose, appsConfigPath, tlsOptions); hostname,
redisHosts,
redisPort,
redisPassword,
verbose,
appsConfigPath,
tlsOptions);
} }
else if (app.got_subcommand("httpd")) else if (app.got_subcommand("httpd"))
{ {

View File

@@ -5,8 +5,8 @@
*/ */
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <sstream>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream>
namespace ix namespace ix
@@ -38,7 +38,8 @@ namespace ix
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
spdlog::info("Closed connection: code {} reason {}", spdlog::info("Closed connection: code {} reason {}",
msg->closeInfo.code, msg->closeInfo.reason); msg->closeInfo.code,
msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {

View File

@@ -14,8 +14,8 @@
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <queue> #include <queue>
#include <sstream>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream>
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;

View File

@@ -9,6 +9,7 @@
#include <condition_variable> #include <condition_variable>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <map>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
@@ -18,6 +19,81 @@
namespace ix namespace ix
{ {
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return val;
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
spdlog::info("Sending event for game '{}'", game);
_condition.wait(lock, [this] { return !_stop; });
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
int ws_cobra_to_sentry_main(const std::string& appkey, int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
@@ -47,9 +123,7 @@ namespace ix
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::condition_variable condition; QueueManager queueManager(maxQueueSize, stop);
std::mutex conditionVariableMutex;
std::queue<Json::Value> queue;
auto timer = [&sentCount, &receivedCount] { auto timer = [&sentCount, &receivedCount] {
while (true) while (true)
@@ -63,28 +137,16 @@ namespace ix
std::thread t1(timer); std::thread t1(timer);
auto sentrySender = [&condition, auto sentrySender =
&conditionVariableMutex, [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
&queue,
verbose,
&errorSending,
&sentCount,
&stop,
&throttled,
&dsn] {
SentryClient sentryClient(dsn); SentryClient sentryClient(dsn);
while (true) while (true)
{ {
Json::Value msg; Json::Value msg = queueManager.pop();
{ if (msg.isNull()) continue;
std::unique_lock<std::mutex> lock(conditionVariableMutex); if (stop) return;
condition.wait(lock, [&queue, &stop] { return !queue.empty() && !stop; });
msg = queue.front();
queue.pop();
}
auto ret = sentryClient.send(msg, verbose); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first; HttpResponsePtr response = ret.first;
@@ -175,10 +237,7 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&condition, &queueManager](ix::CobraConnectionEventType eventType,
&conditionVariableMutex,
&maxQueueSize,
&queue](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId, const std::string& subscriptionId,
@@ -201,14 +260,8 @@ namespace ix
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
verbose, const Json::Value& msg) {
&throttled,
&receivedCount,
&condition,
&conditionVariableMutex,
&maxQueueSize,
&queue](const Json::Value& msg) {
if (verbose) if (verbose)
{ {
spdlog::info(jsonWriter.write(msg)); spdlog::info(jsonWriter.write(msg));
@@ -217,23 +270,11 @@ namespace ix
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
if (throttled) if (throttled)
{ {
condition.notify_one();
return; return;
} }
++receivedCount; ++receivedCount;
queueManager.add(msg);
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (queue.size() < maxQueueSize)
{
queue.push(msg);
}
}
condition.notify_one();
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)

View File

@@ -6,6 +6,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@@ -16,6 +17,59 @@
#include <statsd_client.h> #include <statsd_client.h>
#endif #endif
namespace
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queue.empty())
{
Json::Value val;
return val;
}
_condition.wait(lock, [this] { return !_stop; });
auto msg = _queue.front();
_queue.pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
}
} // namespace
namespace ix namespace ix
{ {
// fields are command line argument that can be specified multiple times // fields are command line argument that can be specified multiple times
@@ -79,6 +133,27 @@ namespace ix
auto tokens = parseFields(fields); auto tokens = parseFields(fields);
Json::FastWriter jsonWriter;
std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
size_t maxQueueSize = 1000;
QueueManager queueManager(maxQueueSize, stop);
auto timer = [&sentCount, &receivedCount] {
while (true)
{
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] {
// statsd client // statsd client
// test with netcat as a server: `nc -ul 8125` // test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true; bool statsdBatch = true;
@@ -87,19 +162,33 @@ namespace ix
#else #else
int statsdClient; int statsdClient;
#endif #endif
while (true)
{
Json::Value msg = queueManager.pop();
Json::FastWriter jsonWriter; if (msg.isNull()) continue;
uint64_t msgCount = 0; if (stop) return;
conn.setEventCallback([&conn, std::string id;
&channel, for (auto&& attr : tokens)
&filter, {
&jsonWriter, id += ".";
&statsdClient, id += extractAttr(attr, msg);
verbose, }
&tokens,
&prefix, sentCount += 1;
&msgCount](ix::CobraConnectionEventType eventType,
#ifndef _WIN32
statsdClient.count(id, 1);
#endif
}
};
std::thread t2(statsdSender);
conn.setEventCallback(
[&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount](
ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId, const std::string& subscriptionId,
@@ -122,25 +211,17 @@ namespace ix
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount]( [&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg) { const Json::Value& msg) {
if (verbose) if (verbose)
{ {
spdlog::info(jsonWriter.write(msg)); spdlog::info(jsonWriter.write(msg));
} }
std::string id; receivedCount++;
for (auto&& attr : tokens)
{
id += ".";
id += extractAttr(attr, msg);
}
spdlog::info("{} {}{}", msgCount++, prefix, id); ++receivedCount;
queueManager.add(msg);
#ifndef _WIN32
statsdClient.count(id, 1);
#endif
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)

View File

@@ -4,12 +4,12 @@
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
namespace ix namespace ix
{ {

View File

@@ -101,8 +101,8 @@ namespace ix
}); });
// Client connection // Client connection
webSocket->setOnMessageCallback([state, remoteUrl, verbose]( webSocket->setOnMessageCallback(
const WebSocketMessagePtr& msg) { [state, remoteUrl, verbose](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("New connection from client"); spdlog::info("New connection from client");

View File

@@ -14,8 +14,8 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <vector> #include <vector>

View File

@@ -14,8 +14,8 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <vector> #include <vector>

View File

@@ -70,11 +70,13 @@ namespace ix
if (readyState == ReadyState::Open) if (readyState == ReadyState::Open)
{ {
++receivers; ++receivers;
client->send(msg->str, client->send(
msg->binary, msg->str, msg->binary, [&id](int current, int total) -> bool {
[&id](int current, int total) -> bool {
spdlog::info("{}: [client {}]: Step {} out of {}", spdlog::info("{}: [client {}]: Step {} out of {}",
"ws_transfer", id, current, total); "ws_transfer",
id,
current,
total);
return true; return true;
}); });
do do
@@ -82,7 +84,9 @@ namespace ix
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
spdlog::info("{}: [client {}]: {} bytes left to send", spdlog::info("{}: [client {}]: {} bytes left to send",
"ws_transfer", id, bufferedAmount); "ws_transfer",
id,
bufferedAmount);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_for(std::chrono::milliseconds(500));
@@ -97,8 +101,12 @@ namespace ix
: readyState == ReadyState::Closing ? "Closing" : "Closed"; : readyState == ReadyState::Closing ? "Closing" : "Closed";
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
spdlog::info("{}: [client {}]: has readystate {} bytes left to be sent", spdlog::info(
"ws_transfer", id, readyStateString, bufferedAmount); "{}: [client {}]: has readystate {} bytes left to be sent",
"ws_transfer",
id,
readyStateString,
bufferedAmount);
} }
} }
} }