Compare commits

...

9 Commits

Author SHA1 Message Date
0520329350 tag version 2019-12-30 17:17:28 -08:00
ba88a05b74 Update IXSocketMbedTLS.cpp (#139)
fix bug with mbedtls server certificate loading.
2019-12-30 16:11:34 -08:00
72f8e76369 Update IXSocketMbedTLS.cpp (#138)
fix bug just introduced.

mbedstl_pk_setup() gets automatically called later.
2019-12-30 15:14:50 -08:00
0389b0b1a3 [2nd try] Update IXSocketMbedTLS.cpp (#137)
* Update IXSocketMbedTLS.cpp

fix initialization of mbedtls context.
without this, crashes under certain conditions.

* Update IXSocketMbedTLS.cpp

removed newline on 46
2019-12-30 14:38:25 -08:00
ac0c218455 clang-format 2019-12-30 08:46:18 -08:00
299dc0452e (ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd 2019-12-28 17:28:05 -08:00
f4af84dc06 (ws cobra to sentry) handle null events for empty queues 2019-12-28 10:16:18 -08:00
6522bc06ba (ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events 2019-12-27 19:10:15 -08:00
50bea7dffa (ws cobra to sentry) refactor queue related code into a class 2019-12-27 18:24:45 -08:00
16 changed files with 428 additions and 267 deletions

View File

@ -1,6 +1,26 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [7.8.7] - 2019-12-28
(mbedtls) fix related to private key file parsing and initialization
## [7.8.6] - 2019-12-28
(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
## [7.8.5] - 2019-12-28
(ws cobra to sentry) handle null events for empty queues
## [7.8.4] - 2019-12-27
(ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events
## [7.8.3] - 2019-12-27
(ws cobra to sentry) refactor queue related code into a class
## [7.8.2] - 2019-12-25 ## [7.8.2] - 2019-12-25
(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry

View File

@ -105,7 +105,9 @@ namespace ix
} }
} }
OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection, const void* data, size_t* len) OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection,
const void* data,
size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@ -165,7 +167,8 @@ namespace ix
_sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType); _sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType);
SSLSetIOFuncs(_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket); SSLSetIOFuncs(
_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket);
SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd); SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd);
SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12); SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12);
SSLSetPeerDomainName(_sslContext, host.c_str(), host.size()); SSLSetPeerDomainName(_sslContext, host.c_str(), host.size());

View File

@ -39,6 +39,7 @@ namespace ix
mbedtls_entropy_init(&_entropy); mbedtls_entropy_init(&_entropy);
mbedtls_x509_crt_init(&_cacert); mbedtls_x509_crt_init(&_cacert);
mbedtls_x509_crt_init(&_cert); mbedtls_x509_crt_init(&_cert);
mbedtls_pk_init(&_pkey);
} }
bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg)
@ -81,6 +82,11 @@ namespace ix
errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'"; errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'";
return false; return false;
} }
if (mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey) < 0)
{
errMsg = "Problem configuring cert '" + _tlsOptions.certFile + "'";
return false;
}
} }
if (_tlsOptions.isPeerVerifyDisabled()) if (_tlsOptions.isPeerVerifyDisabled())
@ -104,11 +110,6 @@ namespace ix
} }
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL); mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL);
if (_tlsOptions.hasCertAndKey())
{
mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey);
}
} }
if (mbedtls_ssl_setup(&_ssl, &_conf) != 0) if (mbedtls_ssl_setup(&_ssl, &_conf) != 0)

View File

@ -11,8 +11,8 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#include <assert.h> #include <assert.h>
#include <stdio.h>
#include <sstream> #include <sstream>
#include <stdio.h>
#include <string.h> #include <string.h>
namespace ix namespace ix

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.8.2" #define IX_WEBSOCKET_VERSION "7.8.7"

View File

@ -37,9 +37,7 @@ namespace
class CobraChat class CobraChat
{ {
public: public:
CobraChat(const std::string& user, CobraChat(const std::string& user, const std::string& session, const std::string& endpoint);
const std::string& session,
const std::string& endpoint);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();

View File

@ -269,7 +269,9 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry"); cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size", maxQueueSize, "Size of the queue to hold messages before they are sent to Sentry"); cobra2sentry->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
@ -474,8 +476,14 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {
ret = ix::ws_snake_main( ret = ix::ws_snake_main(port,
port, hostname, redisHosts, redisPort, redisPassword, verbose, appsConfigPath, tlsOptions); hostname,
redisHosts,
redisPort,
redisPassword,
verbose,
appsConfigPath,
tlsOptions);
} }
else if (app.got_subcommand("httpd")) else if (app.got_subcommand("httpd"))
{ {

View File

@ -5,8 +5,8 @@
*/ */
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <sstream>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream>
namespace ix namespace ix
@ -38,7 +38,8 @@ namespace ix
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
spdlog::info("Closed connection: code {} reason {}", spdlog::info("Closed connection: code {} reason {}",
msg->closeInfo.code, msg->closeInfo.reason); msg->closeInfo.code,
msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {

View File

@ -14,8 +14,8 @@
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <queue> #include <queue>
#include <sstream>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream>
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;

View File

@ -9,6 +9,7 @@
#include <condition_variable> #include <condition_variable>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <map>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
@ -18,6 +19,81 @@
namespace ix namespace ix
{ {
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return val;
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
spdlog::info("Sending event for game '{}'", game);
_condition.wait(lock, [this] { return !_stop; });
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
int ws_cobra_to_sentry_main(const std::string& appkey, int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
@ -47,9 +123,7 @@ namespace ix
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::condition_variable condition; QueueManager queueManager(maxQueueSize, stop);
std::mutex conditionVariableMutex;
std::queue<Json::Value> queue;
auto timer = [&sentCount, &receivedCount] { auto timer = [&sentCount, &receivedCount] {
while (true) while (true)
@ -63,102 +137,90 @@ namespace ix
std::thread t1(timer); std::thread t1(timer);
auto sentrySender = [&condition, auto sentrySender =
&conditionVariableMutex, [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
&queue, SentryClient sentryClient(dsn);
verbose,
&errorSending,
&sentCount,
&stop,
&throttled,
&dsn] {
SentryClient sentryClient(dsn);
while (true)
{
Json::Value msg;
while (true)
{ {
std::unique_lock<std::mutex> lock(conditionVariableMutex); Json::Value msg = queueManager.pop();
condition.wait(lock, [&queue, &stop] { return !queue.empty() && !stop; });
msg = queue.front(); if (msg.isNull()) continue;
queue.pop(); if (stop) return;
}
auto ret = sentryClient.send(msg, verbose); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first; HttpResponsePtr response = ret.first;
if (!response) if (!response)
{
spdlog::warn("Null HTTP Response");
continue;
}
if (verbose)
{
for (auto it : response->headers)
{ {
spdlog::info("{}: {}", it.first, it.second); spdlog::warn("Null HTTP Response");
continue;
} }
spdlog::info("Upload size: {}", response->uploadSize); if (verbose)
spdlog::info("Download size: {}", response->downloadSize);
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{ {
spdlog::info("error message: {}", response->errorMsg); for (auto it : response->headers)
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
}
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true;
// Error 429 Too Many Requests
if (response->statusCode == 429)
{
auto retryAfter = response->headers["Retry-After"];
std::stringstream ss;
ss << retryAfter;
int seconds;
ss >> seconds;
if (!ss.eof() || ss.fail())
{ {
seconds = 30; spdlog::info("{}: {}", it.first, it.second);
spdlog::warn("Error parsing Retry-After header. "
"Using {} for the sleep duration",
seconds);
} }
spdlog::warn("Error 429 - Too Many Requests. ws will sleep " spdlog::info("Upload size: {}", response->uploadSize);
"and retry after {} seconds", spdlog::info("Download size: {}", response->downloadSize);
retryAfter);
throttled = true; spdlog::info("Status: {}", response->statusCode);
auto duration = std::chrono::seconds(seconds); if (response->errorCode != HttpErrorCode::Ok)
std::this_thread::sleep_for(duration); {
throttled = false; spdlog::info("error message: {}", response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
} }
}
else
{
++sentCount;
}
if (stop) return; if (response->statusCode != 200)
} {
}; spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true;
// Error 429 Too Many Requests
if (response->statusCode == 429)
{
auto retryAfter = response->headers["Retry-After"];
std::stringstream ss;
ss << retryAfter;
int seconds;
ss >> seconds;
if (!ss.eof() || ss.fail())
{
seconds = 30;
spdlog::warn("Error parsing Retry-After header. "
"Using {} for the sleep duration",
seconds);
}
spdlog::warn("Error 429 - Too Many Requests. ws will sleep "
"and retry after {} seconds",
retryAfter);
throttled = true;
auto duration = std::chrono::seconds(seconds);
std::this_thread::sleep_for(duration);
throttled = false;
}
}
else
{
++sentCount;
}
if (stop) return;
}
};
// Create a thread pool // Create a thread pool
spdlog::info("Starting {} sentry sender jobs", jobs); spdlog::info("Starting {} sentry sender jobs", jobs);
@ -175,14 +237,11 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&condition, &queueManager](ix::CobraConnectionEventType eventType,
&conditionVariableMutex, const std::string& errMsg,
&maxQueueSize, const ix::WebSocketHttpHeaders& headers,
&queue](ix::CobraConnectionEventType eventType, const std::string& subscriptionId,
const std::string& errMsg, CobraConnection::MsgId msgId) {
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
spdlog::info("Subscriber connected"); spdlog::info("Subscriber connected");
@ -201,14 +260,8 @@ namespace ix
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
verbose, const Json::Value& msg) {
&throttled,
&receivedCount,
&condition,
&conditionVariableMutex,
&maxQueueSize,
&queue](const Json::Value& msg) {
if (verbose) if (verbose)
{ {
spdlog::info(jsonWriter.write(msg)); spdlog::info(jsonWriter.write(msg));
@ -217,23 +270,11 @@ namespace ix
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
if (throttled) if (throttled)
{ {
condition.notify_one();
return; return;
} }
++receivedCount; ++receivedCount;
queueManager.add(msg);
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (queue.size() < maxQueueSize)
{
queue.push(msg);
}
}
condition.notify_one();
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)

View File

@ -6,6 +6,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -16,6 +17,59 @@
#include <statsd_client.h> #include <statsd_client.h>
#endif #endif
namespace
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queue.empty())
{
Json::Value val;
return val;
}
_condition.wait(lock, [this] { return !_stop; });
auto msg = _queue.front();
_queue.pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
}
} // namespace
namespace ix namespace ix
{ {
// fields are command line argument that can be specified multiple times // fields are command line argument that can be specified multiple times
@ -79,91 +133,118 @@ namespace ix
auto tokens = parseFields(fields); auto tokens = parseFields(fields);
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
Json::FastWriter jsonWriter; Json::FastWriter jsonWriter;
uint64_t msgCount = 0; std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
conn.setEventCallback([&conn, size_t maxQueueSize = 1000;
&channel, QueueManager queueManager(maxQueueSize, stop);
&filter,
&jsonWriter, auto timer = [&sentCount, &receivedCount] {
&statsdClient, while (true)
verbose,
&tokens,
&prefix,
&msgCount](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{ {
spdlog::info("Subscriber connected"); spdlog::info("messages received {} sent {}", receivedCount, sentCount);
for (auto it : headers) auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] {
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
while (true)
{
Json::Value msg = queueManager.pop();
if (msg.isNull()) continue;
if (stop) return;
std::string id;
for (auto&& attr : tokens)
{ {
spdlog::info("{}: {}", it.first, it.second); id += ".";
id += extractAttr(attr, msg);
} }
}
if (eventType == ix::CobraConnection_EventType_Closed)
{
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount](
const Json::Value& msg) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
}
std::string id; sentCount += 1;
for (auto&& attr : tokens)
{
id += ".";
id += extractAttr(attr, msg);
}
spdlog::info("{} {}{}", msgCount++, prefix, id);
#ifndef _WIN32 #ifndef _WIN32
statsdClient.count(id, 1); statsdClient.count(id, 1);
#endif #endif
});
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) };
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); std::thread t2(statsdSender);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed) conn.setEventCallback(
{ [&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount](
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); ix::CobraConnectionEventType eventType,
} const std::string& errMsg,
else if (eventType == ix::CobraConnection_EventType_Error) const ix::WebSocketHttpHeaders& headers,
{ const std::string& subscriptionId,
spdlog::error("Subscriber: error {}", errMsg); CobraConnection::MsgId msgId) {
} if (eventType == ix::CobraConnection_EventType_Open)
else if (eventType == ix::CobraConnection_EventType_Published) {
{ spdlog::info("Subscriber connected");
spdlog::error("Published message hacked: {}", msgId);
} for (auto it : headers)
else if (eventType == ix::CobraConnection_EventType_Pong) {
{ spdlog::info("{}: {}", it.first, it.second);
spdlog::info("Received websocket pong"); }
} }
}); if (eventType == ix::CobraConnection_EventType_Closed)
{
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
}
receivedCount++;
++receivedCount;
queueManager.add(msg);
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
while (true) while (true)
{ {

View File

@ -4,12 +4,12 @@
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
namespace ix namespace ix
{ {

View File

@ -61,7 +61,7 @@ namespace ix
// Server connection // Server connection
state->webSocket().setOnMessageCallback([webSocket, state, verbose]( state->webSocket().setOnMessageCallback([webSocket, state, verbose](
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("New connection to remote server"); spdlog::info("New connection to remote server");
@ -101,61 +101,61 @@ namespace ix
}); });
// Client connection // Client connection
webSocket->setOnMessageCallback([state, remoteUrl, verbose]( webSocket->setOnMessageCallback(
const WebSocketMessagePtr& msg) { [state, remoteUrl, verbose](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection from client");
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{ {
spdlog::info("{}: {}", it.first, it.second); spdlog::info("New connection from client");
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{
spdlog::info("waiting for server connection establishment");
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
spdlog::info("server connection established");
} }
else if (msg->type == ix::WebSocketMessageType::Close)
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{ {
spdlog::info("waiting for server connection establishment"); spdlog::info("Closed client connection: client id {} code {} reason {}",
std::this_thread::sleep_for(std::chrono::milliseconds(10)); state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
} }
spdlog::info("server connection established"); else if (msg->type == ix::WebSocketMessageType::Error)
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed client connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose)
{ {
spdlog::info("payload {}", msg->str); spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
} }
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose)
{
spdlog::info("payload {}", msg->str);
}
state->webSocket().send(msg->str, msg->binary); state->webSocket().send(msg->str, msg->binary);
} }
}); });
}); });
auto res = server.listen(); auto res = server.listen();

View File

@ -14,8 +14,8 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <vector> #include <vector>

View File

@ -14,8 +14,8 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <vector> #include <vector>

View File

@ -70,19 +70,23 @@ namespace ix
if (readyState == ReadyState::Open) if (readyState == ReadyState::Open)
{ {
++receivers; ++receivers;
client->send(msg->str, client->send(
msg->binary, msg->str, msg->binary, [&id](int current, int total) -> bool {
[&id](int current, int total) -> bool { spdlog::info("{}: [client {}]: Step {} out of {}",
spdlog::info("{}: [client {}]: Step {} out of {}", "ws_transfer",
"ws_transfer", id, current, total); id,
return true; current,
}); total);
return true;
});
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
spdlog::info("{}: [client {}]: {} bytes left to send", spdlog::info("{}: [client {}]: {} bytes left to send",
"ws_transfer", id, bufferedAmount); "ws_transfer",
id,
bufferedAmount);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_for(std::chrono::milliseconds(500));
@ -97,8 +101,12 @@ namespace ix
: readyState == ReadyState::Closing ? "Closing" : "Closed"; : readyState == ReadyState::Closing ? "Closing" : "Closed";
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
spdlog::info("{}: [client {}]: has readystate {} bytes left to be sent", spdlog::info(
"ws_transfer", id, readyStateString, bufferedAmount); "{}: [client {}]: has readystate {} bytes left to be sent",
"ws_transfer",
id,
readyStateString,
bufferedAmount);
} }
} }
} }