Compare commits

...

23 Commits

Author SHA1 Message Date
40bf7b00ec tag version 2019-12-30 15:19:13 -08:00
72f8e76369 Update IXSocketMbedTLS.cpp (#138)
fix bug just introduced.

mbedstl_pk_setup() gets automatically called later.
2019-12-30 15:14:50 -08:00
0389b0b1a3 [2nd try] Update IXSocketMbedTLS.cpp (#137)
* Update IXSocketMbedTLS.cpp

fix initialization of mbedtls context.
without this, crashes under certain conditions.

* Update IXSocketMbedTLS.cpp

removed newline on 46
2019-12-30 14:38:25 -08:00
ac0c218455 clang-format 2019-12-30 08:46:18 -08:00
299dc0452e (ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd 2019-12-28 17:28:05 -08:00
f4af84dc06 (ws cobra to sentry) handle null events for empty queues 2019-12-28 10:16:18 -08:00
6522bc06ba (ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events 2019-12-27 19:10:15 -08:00
50bea7dffa (ws cobra to sentry) refactor queue related code into a class 2019-12-27 18:24:45 -08:00
c4e9abfe80 (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry 2019-12-25 22:15:57 -08:00
a805270d02 (ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed) 2019-12-25 09:03:57 -08:00
e13b57c73b (ws client) all commands use spdlog instead of std::cerr or std::cout for logging 2019-12-24 21:55:34 -08:00
5be84926ef (cobra client) send a websocket ping every 30s to keep the connection opened 2019-12-24 17:16:41 -08:00
33e7271b85 (tls / apple) minor refactoring, move functions out of the anonymous namespace to become static member functions 2019-12-23 16:30:38 -08:00
d72e5e70f6 socket tls options: display ciphers 2019-12-23 12:25:25 -08:00
e2c5f751bd (doc) fix typo 2019-12-22 20:33:14 -08:00
351b86e266 v7.6.4 2019-12-22 20:32:10 -08:00
d0cbff4f4e (client) error handling, quote url in error case when failing to parse on 2019-12-22 20:30:29 -08:00
cbfc9b9f94 (ws) ws_cobra_publish: register callbacks before connecting 2019-12-22 20:29:37 -08:00
ca816d801f (doc) mention mbedtls in supported ssl server backend 2019-12-22 20:28:44 -08:00
2f354d31eb update gitignore file 2019-12-20 15:21:36 -08:00
2c6c1edd37 (tls) add a simple description of the TLS configuration routine for debugging 2019-12-20 15:18:04 -08:00
9799e7e84b (mbedtls) correct support for using own certificate and private key 2019-12-20 15:13:26 -08:00
81be970679 (ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11 2019-12-20 09:51:21 -08:00
44 changed files with 773 additions and 508 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
build build
*.pyc *.pyc
venv venv
ixsnake/ixsnake/.certs/
site/
ws/.certs/
ws/.srl

View File

@ -1,4 +1,4 @@
FROM alpine:edge as build FROM alpine:3.11 as build
RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev
RUN apk add --no-cache make RUN apk add --no-cache make
@ -16,7 +16,7 @@ WORKDIR /opt
USER app USER app
RUN [ "make", "ws_install" ] RUN [ "make", "ws_install" ]
FROM alpine:edge as runtime FROM alpine:3.11 as runtime
RUN apk add --no-cache libstdc++ RUN apk add --no-cache libstdc++
RUN apk add --no-cache strace RUN apk add --no-cache strace

View File

@ -1,5 +1,59 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [7.8.7] - 2019-12-28
(mbedtls) fix related to private key file parsing and initialization
## [7.8.6] - 2019-12-28
(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
## [7.8.5] - 2019-12-28
(ws cobra to sentry) handle null events for empty queues
## [7.8.4] - 2019-12-27
(ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events
## [7.8.3] - 2019-12-27
(ws cobra to sentry) refactor queue related code into a class
## [7.8.2] - 2019-12-25
(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry
## [7.8.1] - 2019-12-25
(ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed)
## [7.8.0] - 2019-12-24
(ws client) all commands use spdlog instead of std::cerr or std::cout for logging
## [7.6.5] - 2019-12-24
(cobra client) send a websocket ping every 30s to keep the connection opened
## [7.6.4] - 2019-12-22
(client) error handling, quote url in error case when failing to parse one
(ws) ws_cobra_publish: register callbacks before connecting
(doc) mention mbedtls in supported ssl server backend
## [7.6.3] - 2019-12-20
(tls) add a simple description of the TLS configuration routine for debugging
## [7.6.2] - 2019-12-20
(mbedtls) correct support for using own certificate and private key
## [7.6.1] - 2019-12-20
(ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11
## [7.6.0] - 2019-12-19 ## [7.6.0] - 2019-12-19

View File

@ -247,7 +247,7 @@ Options:
[cobra](https://github.com/machinezone/cobra) is a real time messenging server. ws has several sub-command to interact with cobra. There is also a minimal cobra compatible server named snake available. [cobra](https://github.com/machinezone/cobra) is a real time messenging server. ws has several sub-command to interact with cobra. There is also a minimal cobra compatible server named snake available.
Below are examples on running a snake server and clients with TLS enabled (the server only works with the OpenSSL backend for now). Below are examples on running a snake server and clients with TLS enabled (the server only works with the OpenSSL and the Mbed TLS backend for now).
First, generate certificates. First, generate certificates.
@ -366,4 +366,4 @@ $ ws cobra_publish --endpoint wss://127.0.0.1:8765 --appkey FC2F10139A2BAc53BB72
[2019-12-19 20:46:42.659] [info] Published message id 3 acked [2019-12-19 20:46:42.659] [info] Published message id 3 acked
``` ```
To use OpenSSL on macOS, compile with `make ws_openssl`. First you will have to install OpenSSL libraries, which can be done with Homebrew. To use OpenSSL on macOS, compile with `make ws_openssl`. First you will have to install OpenSSL libraries, which can be done with Homebrew. Use `make ws_mbedtls` accordingly to use MbedTLS.

View File

@ -24,6 +24,7 @@ namespace ix
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr; PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize; constexpr size_t CobraConnection::kQueueMaxSize;
constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId; constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId;
constexpr int CobraConnection::kPingIntervalSecs;
CobraConnection::CobraConnection() : CobraConnection::CobraConnection() :
_webSocket(new WebSocket()), _webSocket(new WebSocket()),
@ -228,6 +229,10 @@ namespace ix
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string()); invokeErrorCallback(ss.str(), std::string());
} }
else if (msg->type == ix::WebSocketMessageType::Pong)
{
invokeEventCallback(ix::CobraConnection_EventType_Pong);
}
}); });
} }
@ -260,6 +265,7 @@ namespace ix
_webSocket->setUrl(url); _webSocket->setUrl(url);
_webSocket->setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); _webSocket->setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
_webSocket->setTLSOptions(socketTLSOptions); _webSocket->setTLSOptions(socketTLSOptions);
_webSocket->setPingInterval(kPingIntervalSecs);
} }
// //

View File

@ -30,7 +30,8 @@ namespace ix
CobraConnection_EventType_Closed = 3, CobraConnection_EventType_Closed = 3,
CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5, CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6 CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7
}; };
enum CobraConnectionPublishMode enum CobraConnectionPublishMode
@ -215,6 +216,9 @@ namespace ix
// Each pdu sent should have an incremental unique id // Each pdu sent should have an incremental unique id
std::atomic<uint64_t> _id; std::atomic<uint64_t> _id;
// Frequency at which we send a websocket ping to the backing cobra connection
static constexpr int kPingIntervalSecs = 30;
}; };
} // namespace ix } // namespace ix

View File

@ -65,6 +65,10 @@ namespace ix
{ {
ss << "Published message " << msgId << " acked"; ss << "Published message " << msgId << " acked";
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
ss << "Received websocket pong";
}
ix::IXCoreLogger::Log(ss.str().c_str()); ix::IXCoreLogger::Log(ss.str().c_str());
}); });
@ -98,6 +102,8 @@ namespace ix
{ {
_channel = channel; _channel = channel;
ix::IXCoreLogger::Log(socketTLSOptions.getDescription().c_str());
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate); ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate);
_cobra_connection.configure(appkey, endpoint, _cobra_connection.configure(appkey, endpoint,
rolename, rolesecret, rolename, rolesecret,

View File

@ -10,7 +10,6 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXUserAgent.h" #include "IXUserAgent.h"
#include <fstream> #include <fstream>
#include <iostream>
#include <sstream> #include <sstream>
#include <vector> #include <vector>

View File

@ -24,9 +24,47 @@
#include <Security/SecureTransport.h> #include <Security/SecureTransport.h>
namespace namespace ix
{ {
OSStatus read_from_socket(SSLConnectionRef connection, void* data, size_t* len) SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
std::string SocketAppleSSL::getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
OSStatus SocketAppleSSL::readFromSocket(SSLConnectionRef connection, void* data, size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@ -67,7 +105,9 @@ namespace
} }
} }
OSStatus write_to_socket(SSLConnectionRef connection, const void* data, size_t* len) OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection,
const void* data,
size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@ -105,47 +145,6 @@ namespace
} }
} }
std::string getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
} // anonymous namespace
namespace ix
{
SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
bool SocketAppleSSL::accept(std::string& errMsg) bool SocketAppleSSL::accept(std::string& errMsg)
{ {
@ -168,7 +167,8 @@ namespace ix
_sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType); _sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType);
SSLSetIOFuncs(_sslContext, read_from_socket, write_to_socket); SSLSetIOFuncs(
_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket);
SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd); SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd);
SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12); SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12);
SSLSetPeerDomainName(_sslContext, host.c_str(), host.size()); SSLSetPeerDomainName(_sslContext, host.c_str(), host.size());

View File

@ -34,6 +34,10 @@ namespace ix
virtual ssize_t recv(void* buffer, size_t length) final; virtual ssize_t recv(void* buffer, size_t length) final;
private: private:
static std::string getSSLErrorDescription(OSStatus status);
static OSStatus writeToSocket(SSLConnectionRef connection, const void* data, size_t* len);
static OSStatus readFromSocket(SSLConnectionRef connection, void* data, size_t* len);
SSLContextRef _sslContext; SSLContextRef _sslContext;
mutable std::mutex _mutex; // AppleSSL routines are not thread-safe mutable std::mutex _mutex; // AppleSSL routines are not thread-safe

View File

@ -39,6 +39,7 @@ namespace ix
mbedtls_entropy_init(&_entropy); mbedtls_entropy_init(&_entropy);
mbedtls_x509_crt_init(&_cacert); mbedtls_x509_crt_init(&_cacert);
mbedtls_x509_crt_init(&_cert); mbedtls_x509_crt_init(&_cert);
mbedtls_pk_init(&_pkey);
} }
bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg)
@ -71,11 +72,16 @@ namespace ix
if (_tlsOptions.hasCertAndKey()) if (_tlsOptions.hasCertAndKey())
{ {
if (mbedtls_x509_crt_parse_file(&_cacert, _tlsOptions.certFile.c_str()) < 0) if (mbedtls_x509_crt_parse_file(&_cert, _tlsOptions.certFile.c_str()) < 0)
{ {
errMsg = "Cannot parse cert file '" + _tlsOptions.certFile + "'"; errMsg = "Cannot parse cert file '" + _tlsOptions.certFile + "'";
return false; return false;
} }
if (mbedtls_pk_parse_keyfile(&_pkey, _tlsOptions.keyFile.c_str(), "") < 0)
{
errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'";
return false;
}
} }
if (_tlsOptions.isPeerVerifyDisabled()) if (_tlsOptions.isPeerVerifyDisabled())
@ -84,7 +90,7 @@ namespace ix
} }
else else
{ {
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL); mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
// FIXME: should we call mbedtls_ssl_conf_verify ? // FIXME: should we call mbedtls_ssl_conf_verify ?
@ -97,7 +103,13 @@ namespace ix
errMsg = "Cannot parse CA file '" + _tlsOptions.caFile + "'"; errMsg = "Cannot parse CA file '" + _tlsOptions.caFile + "'";
return false; return false;
} }
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL);
if (_tlsOptions.hasCertAndKey())
{
mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey);
}
} }
if (mbedtls_ssl_setup(&_ssl, &_conf) != 0) if (mbedtls_ssl_setup(&_ssl, &_conf) != 0)

View File

@ -45,6 +45,7 @@ namespace ix
mbedtls_ctr_drbg_context _ctr_drbg; mbedtls_ctr_drbg_context _ctr_drbg;
mbedtls_x509_crt _cacert; mbedtls_x509_crt _cacert;
mbedtls_x509_crt _cert; mbedtls_x509_crt _cert;
mbedtls_pk_context _pkey;
std::mutex _mutex; std::mutex _mutex;
SocketTLSOptions _tlsOptions; SocketTLSOptions _tlsOptions;

View File

@ -11,8 +11,8 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#include <assert.h> #include <assert.h>
#include <iostream>
#include <sstream> #include <sstream>
#include <stdio.h>
#include <string.h> #include <string.h>
namespace ix namespace ix
@ -45,13 +45,13 @@ namespace ix
void SocketServer::logError(const std::string& str) void SocketServer::logError(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl; fprintf(stderr, "%s\n", str.c_str());
} }
void SocketServer::logInfo(const std::string& str) void SocketServer::logInfo(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl; fprintf(stdout, "%s\n", str.c_str());
} }
std::pair<bool, std::string> SocketServer::listen() std::pair<bool, std::string> SocketServer::listen()

View File

@ -8,6 +8,7 @@
#include <assert.h> #include <assert.h>
#include <fstream> #include <fstream>
#include <sstream>
namespace ix namespace ix
{ {
@ -71,4 +72,16 @@ namespace ix
{ {
return _errMsg; return _errMsg;
} }
std::string SocketTLSOptions::getDescription() const
{
std::stringstream ss;
ss << "TLS Options:" << std::endl;
ss << " certFile = " << certFile << std::endl;
ss << " keyFile = " << keyFile << std::endl;
ss << " caFile = " << caFile << std::endl;
ss << " ciphers = " << ciphers << std::endl;
ss << " ciphers = " << ciphers << std::endl;
return ss.str();
}
} // namespace ix } // namespace ix

View File

@ -43,6 +43,8 @@ namespace ix
const std::string& getErrorMsg() const; const std::string& getErrorMsg() const;
std::string getDescription() const;
private: private:
mutable std::string _errMsg; mutable std::string _errMsg;
mutable bool _validated = false; mutable bool _validated = false;

View File

@ -144,7 +144,9 @@ namespace ix
if (!UrlParser::parse(url, protocol, host, path, query, port)) if (!UrlParser::parse(url, protocol, host, path, query, port))
{ {
return WebSocketInitResult(false, 0, std::string("Could not parse URL ") + url); std::stringstream ss;
ss << "Could not parse url: '" << url << "'";
return WebSocketInitResult(false, 0, ss.str());
} }
std::string errorMsg; std::string errorMsg;

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.5.8" #define IX_WEBSOCKET_VERSION "7.8.7"

View File

@ -23,6 +23,8 @@ include_directories(
../ws ../ws
) )
add_definitions(-DSPDLOG_COMPILED_LIB=1)
find_package(JsonCpp) find_package(JsonCpp)
if (NOT JSONCPP_FOUND) if (NOT JSONCPP_FOUND)
include_directories(../third_party/jsoncpp) include_directories(../third_party/jsoncpp)
@ -98,4 +100,6 @@ target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore) target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry) target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest spdlog)
install(TARGETS ixwebsocket_unittest DESTINATION bin) install(TARGETS ixwebsocket_unittest DESTINATION bin)

View File

@ -37,9 +37,7 @@ namespace
class CobraChat class CobraChat
{ {
public: public:
CobraChat(const std::string& user, CobraChat(const std::string& user, const std::string& session, const std::string& endpoint);
const std::string& session,
const std::string& endpoint);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();

View File

@ -25,6 +25,8 @@ include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws ../third_party/spdlog/include) include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise) include_directories(ws ../third_party/cpp-linenoise)
add_definitions(-DSPDLOG_COMPILED_LIB=1)
if (UNIX) if (UNIX)
set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp) set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp)
endif() endif()
@ -72,6 +74,8 @@ target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore) target_link_libraries(ws ixcore)
target_link_libraries(ws ixsentry) target_link_libraries(ws ixsentry)
target_link_libraries(ws spdlog)
if(NOT APPLE AND NOT USE_MBED_TLS) if(NOT APPLE AND NOT USE_MBED_TLS)
find_package(OpenSSL REQUIRED) find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS}) add_definitions(${OPENSSL_DEFINITIONS})

View File

@ -11,7 +11,6 @@
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h> #include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXNetSystem.h> #include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
@ -39,12 +38,13 @@ int main(int argc, char** argv)
// Display command. // Display command.
if (getenv("DEBUG")) if (getenv("DEBUG"))
{ {
std::cout << "Command: "; std::stringstream ss;
ss << "Command: ";
for (int i = 0; i < argc; ++i) for (int i = 0; i < argc; ++i)
{ {
std::cout << argv[i] << " "; ss << argv[i] << " ";
} }
std::cout << std::endl; spdlog::info(ss.str());
} }
CLI::App app {"ws is a websocket tool"}; CLI::App app {"ws is a websocket tool"};
@ -105,6 +105,7 @@ int main(int argc, char** argv)
int count = 1; int count = 1;
int jobs = 4; int jobs = 4;
uint32_t maxWaitBetweenReconnectionRetries; uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option( app->add_option(
@ -268,6 +269,9 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry"); cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
@ -455,6 +459,7 @@ int main(int argc, char** argv)
verbose, verbose,
strict, strict,
jobs, jobs,
maxQueueSize,
tlsOptions); tlsOptions);
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))
@ -471,8 +476,14 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {
ret = ix::ws_snake_main( ret = ix::ws_snake_main(port,
port, hostname, redisHosts, redisPort, redisPassword, verbose, appsConfigPath, tlsOptions); hostname,
redisHosts,
redisPort,
redisPassword,
verbose,
appsConfigPath,
tlsOptions);
} }
else if (app.got_subcommand("httpd")) else if (app.got_subcommand("httpd"))
{ {
@ -496,11 +507,11 @@ int main(int argc, char** argv)
} }
else if (version) else if (version)
{ {
std::cout << "ws " << ix::userAgent() << std::endl; spdlog::info("ws {}", ix::userAgent());
} }
else else
{ {
std::cerr << "A subcommand or --version is required" << std::endl; spdlog::error("A subcommand or --version is required");
} }
ix::uninitNetSystem(); ix::uninitNetSystem();

View File

@ -119,6 +119,7 @@ namespace ix
bool verbose, bool verbose,
bool strict, bool strict,
int jobs, int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions); const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_metrics_to_redis(const std::string& appkey, int ws_cobra_metrics_to_redis(const std::string& appkey,

View File

@ -32,7 +32,6 @@
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <mutex> #include <mutex>
@ -91,7 +90,7 @@ namespace ix
{ {
if (!_quiet) if (!_quiet)
{ {
std::cerr << msg; spdlog::info(msg);
} }
} }
@ -183,7 +182,7 @@ namespace ix
webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) { webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close) if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Report generated" << std::endl; spdlog::info("Report generated");
condition.notify_one(); condition.notify_one();
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -193,7 +192,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl; spdlog::info(ss.str());
success = false; success = false;
} }
@ -236,7 +235,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl; spdlog::info(ss.str());
condition.notify_one(); condition.notify_one();
} }
@ -269,7 +268,7 @@ namespace ix
int ws_autobahn_main(const std::string& url, bool quiet) int ws_autobahn_main(const std::string& url, bool quiet)
{ {
int testCasesCount = getTestCaseCount(url); int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl; spdlog::info("Test cases count: {}", testCasesCount);
if (testCasesCount == -1) if (testCasesCount == -1)
{ {

View File

@ -4,17 +4,18 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
{ {
int ws_broadcast_server_main(int port, int ws_broadcast_server_main(int port,
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -25,20 +26,20 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed connection: code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " msg->closeInfo.code,
<< msg->closeInfo.reason << std::endl; msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
@ -47,30 +48,29 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); spdlog::info(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; spdlog::info("Received message fragment");
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(msg->str, msg->binary, [](int current, int total) -> bool { client->send(msg->str, msg->binary, [](int current, int total) -> bool {
std::cerr << "Step " << current << " out of " << total << std::endl; spdlog::info("Step {} out of {}", current, total);
return true; return true;
}); });
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" spdlog::info("{} bytes left to be sent", bufferedAmount);
<< std::endl;
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
@ -84,7 +84,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -9,11 +9,12 @@
// Broadcast server can be ran with `ws broadcast_server` // Broadcast server can be ran with `ws broadcast_server`
// //
#include "linenoise.hpp"
#include "nlohmann/json.hpp" #include "nlohmann/json.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <queue> #include <queue>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
// for convenience // for convenience
@ -55,7 +56,7 @@ namespace ix
void WebSocketChat::log(const std::string& msg) void WebSocketChat::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
size_t WebSocketChat::getReceivedMessagesCount() const size_t WebSocketChat::getReceivedMessagesCount() const
@ -85,20 +86,21 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws chat: connected"); log("ws chat: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
ss << "ws chat: user " << _user << " Connected !"; spdlog::info("ws chat: user {} connected !", _user);
log(ss.str()); log(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws chat: user " << _user << " disconnected !" ss << "ws chat user disconnected: " << _user;
<< " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason; ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
@ -162,25 +164,25 @@ namespace ix
int ws_chat_main(const std::string& url, const std::string& user) int ws_chat_main(const std::string& url, const std::string& user)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketChat webSocketChat(url, user); WebSocketChat webSocketChat(url, user);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
{ {
std::string text; // Read line
std::cout << user << " > " << std::flush; std::string line;
std::getline(std::cin, text); auto quit = linenoise::Readline("> ", line);
if (!std::cin) if (quit)
{ {
break; break;
} }
webSocketChat.sendMessage(text); webSocketChat.sendMessage(line);
} }
std::cout << std::endl; spdlog::info("");
webSocketChat.stop(); webSocketChat.stop();
return 0; return 0;

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <mutex> #include <mutex>
@ -44,8 +43,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <mutex> #include <mutex>
@ -43,7 +42,6 @@ namespace ix
rolesecret, rolesecret,
ix::WebSocketPerMessageDeflateOptions(true), ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions); tlsOptions);
conn.connect();
// Display incoming messages // Display incoming messages
std::atomic<bool> authenticated(false); std::atomic<bool> authenticated(false);
@ -92,8 +90,14 @@ namespace ix
spdlog::info("Published message id {} acked", msgId); spdlog::info("Published message id {} acked", msgId);
messageAcked = true; messageAcked = true;
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
conn.connect();
while (!authenticated) while (!authenticated)
; ;
while (!messageAcked) while (!messageAcked)

View File

@ -6,7 +6,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -41,8 +40,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
@ -77,7 +75,7 @@ namespace ix
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) { [&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) {
if (!quiet) if (!quiet)
{ {
std::cerr << jsonWriter.write(msg) << std::endl; spdlog::info(jsonWriter.write(msg));
} }
msgPerSeconds++; msgPerSeconds++;
@ -100,6 +98,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -7,9 +7,9 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <map>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
@ -19,6 +19,81 @@
namespace ix namespace ix
{ {
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queues.empty())
{
Json::Value val;
return val;
}
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
spdlog::info("Sending event for game '{}'", game);
_condition.wait(lock, [this] { return !_stop; });
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
int ws_cobra_to_sentry_main(const std::string& appkey, int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
@ -29,6 +104,7 @@ namespace ix
bool verbose, bool verbose,
bool strict, bool strict,
int jobs, int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
@ -47,9 +123,7 @@ namespace ix
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::condition_variable condition; QueueManager queueManager(maxQueueSize, stop);
std::mutex conditionVariableMutex;
std::queue<Json::Value> queue;
auto timer = [&sentCount, &receivedCount] { auto timer = [&sentCount, &receivedCount] {
while (true) while (true)
@ -63,105 +137,93 @@ namespace ix
std::thread t1(timer); std::thread t1(timer);
auto sentrySender = [&condition, auto sentrySender =
&conditionVariableMutex, [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
&queue, SentryClient sentryClient(dsn);
verbose,
&errorSending,
&sentCount,
&stop,
&throttled,
&dsn] {
SentryClient sentryClient(dsn);
while (true)
{
Json::Value msg;
while (true)
{ {
std::unique_lock<std::mutex> lock(conditionVariableMutex); Json::Value msg = queueManager.pop();
condition.wait(lock, [&queue, &stop] { return !queue.empty() && !stop; });
msg = queue.front(); if (msg.isNull()) continue;
queue.pop(); if (stop) return;
}
auto ret = sentryClient.send(msg, verbose); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first; HttpResponsePtr response = ret.first;
if (!response) if (!response)
{
spdlog::warn("Null HTTP Response");
continue;
}
if (verbose)
{
for (auto it : response->headers)
{ {
spdlog::info("{}: {}", it.first, it.second); spdlog::warn("Null HTTP Response");
continue;
} }
spdlog::info("Upload size: {}", response->uploadSize); if (verbose)
spdlog::info("Download size: {}", response->downloadSize);
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{ {
spdlog::info("error message: {}", response->errorMsg); for (auto it : response->headers)
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
}
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true;
// Error 429 Too Many Requests
if (response->statusCode == 429)
{
auto retryAfter = response->headers["Retry-After"];
std::stringstream ss;
ss << retryAfter;
int seconds;
ss >> seconds;
if (!ss.eof() || ss.fail())
{ {
seconds = 30; spdlog::info("{}: {}", it.first, it.second);
spdlog::warn("Error parsing Retry-After header. "
"Using {} for the sleep duration",
seconds);
} }
spdlog::warn("Error 429 - Too Many Requests. ws will sleep " spdlog::info("Upload size: {}", response->uploadSize);
"and retry after {} seconds", spdlog::info("Download size: {}", response->downloadSize);
retryAfter);
throttled = true; spdlog::info("Status: {}", response->statusCode);
auto duration = std::chrono::seconds(seconds); if (response->errorCode != HttpErrorCode::Ok)
std::this_thread::sleep_for(duration); {
throttled = false; spdlog::info("error message: {}", response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
} }
}
else
{
++sentCount;
}
if (stop) return; if (response->statusCode != 200)
} {
}; spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true;
// Error 429 Too Many Requests
if (response->statusCode == 429)
{
auto retryAfter = response->headers["Retry-After"];
std::stringstream ss;
ss << retryAfter;
int seconds;
ss >> seconds;
if (!ss.eof() || ss.fail())
{
seconds = 30;
spdlog::warn("Error parsing Retry-After header. "
"Using {} for the sleep duration",
seconds);
}
spdlog::warn("Error 429 - Too Many Requests. ws will sleep "
"and retry after {} seconds",
retryAfter);
throttled = true;
auto duration = std::chrono::seconds(seconds);
std::this_thread::sleep_for(duration);
throttled = false;
}
}
else
{
++sentCount;
}
if (stop) return;
}
};
// Create a thread pool // Create a thread pool
std::cerr << "Starting " << jobs << " sentry sender jobs" << std::endl; spdlog::info("Starting {} sentry sender jobs", jobs);
std::vector<std::thread> pool; std::vector<std::thread> pool;
for (int i = 0; i < jobs; i++) for (int i = 0; i < jobs; i++)
{ {
@ -175,13 +237,11 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&condition, &queueManager](ix::CobraConnectionEventType eventType,
&conditionVariableMutex, const std::string& errMsg,
&queue](ix::CobraConnectionEventType eventType, const ix::WebSocketHttpHeaders& headers,
const std::string& errMsg, const std::string& subscriptionId,
const ix::WebSocketHttpHeaders& headers, CobraConnection::MsgId msgId) {
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
spdlog::info("Subscriber connected"); spdlog::info("Subscriber connected");
@ -197,16 +257,11 @@ namespace ix
} }
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
std::cerr << "Subscriber authenticated" << std::endl; spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
verbose, const Json::Value& msg) {
&throttled,
&receivedCount,
&condition,
&conditionVariableMutex,
&queue](const Json::Value& msg) {
if (verbose) if (verbose)
{ {
spdlog::info(jsonWriter.write(msg)); spdlog::info(jsonWriter.write(msg));
@ -215,18 +270,11 @@ namespace ix
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
if (throttled) if (throttled)
{ {
condition.notify_one();
return; return;
} }
++receivedCount; ++receivedCount;
queueManager.add(msg);
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg);
}
condition.notify_one();
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)
@ -245,6 +293,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -6,7 +6,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream> #include <condition_variable>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -17,6 +17,59 @@
#include <statsd_client.h> #include <statsd_client.h>
#endif #endif
namespace
{
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
Json::Value pop();
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
Json::Value QueueManager::pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (_queue.empty())
{
Json::Value val;
return val;
}
_condition.wait(lock, [this] { return !_stop; });
auto msg = _queue.front();
_queue.pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
}
} // namespace
namespace ix namespace ix
{ {
// fields are command line argument that can be specified multiple times // fields are command line argument that can be specified multiple times
@ -80,87 +133,118 @@ namespace ix
auto tokens = parseFields(fields); auto tokens = parseFields(fields);
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
Json::FastWriter jsonWriter; Json::FastWriter jsonWriter;
uint64_t msgCount = 0; std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
conn.setEventCallback([&conn, size_t maxQueueSize = 1000;
&channel, QueueManager queueManager(maxQueueSize, stop);
&filter,
&jsonWriter, auto timer = [&sentCount, &receivedCount] {
&statsdClient, while (true)
verbose,
&tokens,
&prefix,
&msgCount](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{ {
spdlog::info("Subscriber connected"); spdlog::info("messages received {} sent {}", receivedCount, sentCount);
for (auto it : headers) auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] {
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
while (true)
{
Json::Value msg = queueManager.pop();
if (msg.isNull()) continue;
if (stop) return;
std::string id;
for (auto&& attr : tokens)
{ {
spdlog::info("{}: {}", it.first, it.second); id += ".";
id += extractAttr(attr, msg);
} }
}
if (eventType == ix::CobraConnection_EventType_Closed)
{
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount](
const Json::Value& msg) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
}
std::string id; sentCount += 1;
for (auto&& attr : tokens)
{
id += ".";
id += extractAttr(attr, msg);
}
spdlog::info("{} {}{}", msgCount++, prefix, id);
#ifndef _WIN32 #ifndef _WIN32
statsdClient.count(id, 1); statsdClient.count(id, 1);
#endif #endif
});
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) };
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); std::thread t2(statsdSender);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed) conn.setEventCallback(
{ [&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount](
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); ix::CobraConnectionEventType eventType,
} const std::string& errMsg,
else if (eventType == ix::CobraConnection_EventType_Error) const ix::WebSocketHttpHeaders& headers,
{ const std::string& subscriptionId,
spdlog::error("Subscriber: error {}", errMsg); CobraConnection::MsgId msgId) {
} if (eventType == ix::CobraConnection_EventType_Open)
else if (eventType == ix::CobraConnection_EventType_Published) {
{ spdlog::info("Subscriber connected");
spdlog::error("Published message hacked: {}", msgId);
} for (auto it : headers)
}); {
spdlog::info("{}: {}", it.first, it.second);
}
}
if (eventType == ix::CobraConnection_EventType_Closed)
{
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg) {
if (verbose)
{
spdlog::info(jsonWriter.write(msg));
}
receivedCount++;
++receivedCount;
queueManager.add(msg);
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
while (true) while (true)
{ {

View File

@ -5,10 +5,10 @@
*/ */
#include "linenoise.hpp" #include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -93,7 +93,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
headers[key] = val; headers[key] = val;
} }
@ -129,11 +129,11 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); log("ws_connect: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -145,7 +145,7 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
ss << "ws_connect: received message: " << msg->str; ss << "ws_connect: received message: " << msg->str;
log(ss.str()); log(ss.str());
@ -160,15 +160,15 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; spdlog::info("Received message fragment");
} }
else if (msg->type == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
std::cerr << "Received ping" << std::endl; spdlog::info("Received ping");
} }
else if (msg->type == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cerr << "Received pong" << std::endl; spdlog::info("Received pong");
} }
else else
{ {
@ -225,14 +225,14 @@ namespace ix
if (line == "/stop") if (line == "/stop")
{ {
std::cout << "Stopping connection..." << std::endl; spdlog::info("Stopping connection...");
webSocketChat.stop(); webSocketChat.stop();
continue; continue;
} }
if (line == "/start") if (line == "/start")
{ {
std::cout << "Starting connection..." << std::endl; spdlog::info("Starting connection...");
webSocketChat.start(); webSocketChat.start();
continue; continue;
} }
@ -243,7 +243,7 @@ namespace ix
linenoise::AddHistory(line.c_str()); linenoise::AddHistory(line.c_str());
} }
std::cout << std::endl; spdlog::info("");
webSocketChat.stop(); webSocketChat.stop();
return 0; return 0;

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -15,7 +15,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -27,13 +27,13 @@ namespace ix
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) { [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
if (greetings) if (greetings)
@ -43,22 +43,21 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " connectionState->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
webSocket->send(msg->str, msg->binary); webSocket->send(msg->str, msg->binary);
} }
}); });
@ -67,7 +66,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::error(res.second);
return 1; return 1;
} }

View File

@ -5,10 +5,10 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h> #include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -47,7 +47,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
headers[key] = val; headers[key] = val;
} }
@ -76,7 +76,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
httpParameters[key] = val; httpParameters[key] = val;
} }
@ -108,10 +108,9 @@ namespace ix
args->maxRedirects = maxRedirects; args->maxRedirects = maxRedirects;
args->verbose = verbose; args->verbose = verbose;
args->compress = compress; args->compress = compress;
args->logger = [](const std::string& msg) { std::cout << msg; }; args->logger = [](const std::string& msg) { spdlog::info(msg); };
args->onProgressCallback = [](int current, int total) -> bool { args->onProgressCallback = [](int current, int total) -> bool {
std::cerr << "\r" spdlog::info("Downloaded {} bytes out of {}", current, total);
<< "Downloaded " << current << " bytes out of " << total;
return true; return true;
}; };
@ -131,20 +130,20 @@ namespace ix
response = httpClient.post(url, httpParameters, args); response = httpClient.post(url, httpParameters, args);
} }
std::cerr << std::endl; spdlog::info("");
for (auto it : response->headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
std::cerr << "Upload size: " << response->uploadSize << std::endl; spdlog::info("Upload size: {}", response->uploadSize);
std::cerr << "Download size: " << response->downloadSize << std::endl; spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response->statusCode << std::endl; spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok) if (response->errorCode != HttpErrorCode::Ok)
{ {
std::cerr << "error message: " << response->errorMsg << std::endl; spdlog::info("error message: ", response->errorMsg);
} }
if (!headersOnly && response->errorCode == HttpErrorCode::Ok) if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
@ -158,7 +157,7 @@ namespace ix
filename = output; filename = output;
} }
std::cout << "Writing to disk: " << filename << std::endl; spdlog::info("Writing to disk: {}", filename);
std::ofstream out(filename); std::ofstream out(filename);
out.write((char*) &response->payload.front(), response->payload.size()); out.write((char*) &response->payload.front(), response->payload.size());
out.close(); out.close();
@ -167,14 +166,13 @@ namespace ix
{ {
if (response->headers["Content-Type"] != "application/octet-stream") if (response->headers["Content-Type"] != "application/octet-stream")
{ {
std::cout << "payload: " << response->payload << std::endl; spdlog::info("payload: {}", response->payload);
} }
else else
{ {
std::cerr << "Binary output can mess up your terminal." << std::endl; spdlog::info("Binary output can mess up your terminal.");
std::cerr << "Use the -O flag to save the file to disk." << std::endl; spdlog::info("Use the -O flag to save the file to disk.");
std::cerr << "You can also use the --output option to specify a filename." spdlog::info("You can also use the --output option to specify a filename.");
<< std::endl;
} }
} }
} }

View File

@ -5,7 +5,6 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -32,7 +31,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::error(res.second);
return 1; return 1;
} }

View File

@ -8,6 +8,7 @@
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -40,7 +41,7 @@ namespace ix
void WebSocketPingPong::log(const std::string& msg) void WebSocketPingPong::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketPingPong::stop() void WebSocketPingPong::stop()
@ -56,18 +57,18 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
std::stringstream ss; std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -127,7 +128,7 @@ namespace ix
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions) int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketPingPong webSocketPingPong(url, tlsOptions); WebSocketPingPong webSocketPingPong(url, tlsOptions);
webSocketPingPong.start(); webSocketPingPong.start();

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -44,7 +44,7 @@ namespace ix
const std::string& remoteUrl, const std::string& remoteUrl,
bool verbose) bool verbose)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -64,38 +64,36 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection to remote server");
std::cerr << "server id: " << state->getId() << std::endl; spdlog::info("id: {}", state->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed remote server connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " state->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
webSocket->close(msg->closeInfo.code, msg->closeInfo.reason); msg->closeInfo.reason);
state->setTerminated(); state->setTerminated();
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes from server" << std::endl; spdlog::info("Received {} bytes from server", msg->wireSize);
if (verbose) if (verbose)
{ {
std::cerr << "payload " << msg->str << std::endl; spdlog::info("payload {}", msg->str);
} }
webSocket->send(msg->str, msg->binary); webSocket->send(msg->str, msg->binary);
@ -103,66 +101,67 @@ namespace ix
}); });
// Client connection // Client connection
webSocket->setOnMessageCallback([state, remoteUrl, verbose]( webSocket->setOnMessageCallback(
const WebSocketMessagePtr& msg) { [state, remoteUrl, verbose](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "client id: " << state->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("New connection from client");
} spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
// Connect to the 'real' server // Connect to the 'real' server
std::string url(remoteUrl); std::string url(remoteUrl);
url += msg->openInfo.uri; url += msg->openInfo.uri;
state->webSocket().setUrl(url); state->webSocket().setUrl(url);
state->webSocket().start(); state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the // we should sleep here for a bit until we've established the
// connection with the remote server // connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open) while (state->webSocket().getReadyState() != ReadyState::Open)
{ {
std::cerr << "waiting for server connection establishment" << std::endl; spdlog::info("waiting for server connection establishment");
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
spdlog::info("server connection established");
} }
std::cerr << "server connection established" << std::endl; else if (msg->type == ix::WebSocketMessageType::Close)
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes from client" << std::endl;
if (verbose)
{ {
std::cerr << "payload " << msg->str << std::endl; spdlog::info("Closed client connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
} }
state->webSocket().send(msg->str, msg->binary); else if (msg->type == ix::WebSocketMessageType::Error)
} {
}); spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose)
{
spdlog::info("payload {}", msg->str);
}
state->webSocket().send(msg->str, msg->binary);
}
});
}); });
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -7,7 +7,6 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h> #include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h> #include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
@ -16,6 +15,7 @@
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
@ -75,12 +75,12 @@ namespace ix
void WebSocketReceiver::log(const std::string& msg) void WebSocketReceiver::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketReceiver::waitForConnection() void WebSocketReceiver::waitForConnection()
{ {
std::cout << "ws_receive: Connecting..." << std::endl; spdlog::info("{}: Connecting...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -88,7 +88,7 @@ namespace ix
void WebSocketReceiver::waitForMessage() void WebSocketReceiver::waitForMessage()
{ {
std::cout << "ws_receive: Waiting for message..." << std::endl; spdlog::info("{}: Waiting for message...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -124,7 +124,7 @@ namespace ix
void WebSocketReceiver::handleMessage(const std::string& str) void WebSocketReceiver::handleMessage(const std::string& str)
{ {
std::cerr << "ws_receive: Received message: " << str.size() << std::endl; spdlog::info("ws_receive: Received message: {}", str.size());
std::string errMsg; std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg); MsgPack data = MsgPack::parse(str, errMsg);
@ -134,17 +134,17 @@ namespace ix
return; return;
} }
std::cout << "id: " << data["id"].string_value() << std::endl; spdlog::info("id: {}", data["id"].string_value());
std::vector<uint8_t> content = data["content"].binary_items(); std::vector<uint8_t> content = data["content"].binary_items();
std::cout << "ws_receive: Content size: " << content.size() << std::endl; spdlog::info("ws_receive: Content size: {}", content.size());
// Validate checksum // Validate checksum
uint64_t cksum = ix::djb2Hash(content); uint64_t cksum = ix::djb2Hash(content);
auto cksumRef = data["djb2_hash"].string_value(); auto cksumRef = data["djb2_hash"].string_value();
std::cout << "ws_receive: Computed hash: " << cksum << std::endl; spdlog::info("ws_receive: Computed hash: {}", cksum);
std::cout << "ws_receive: Reference hash: " << cksumRef << std::endl; spdlog::info("ws_receive: Reference hash: {}", cksumRef);
if (std::to_string(cksum) != cksumRef) if (std::to_string(cksum) != cksumRef)
{ {
@ -157,12 +157,12 @@ namespace ix
std::string filenameTmp = filename + ".tmp"; std::string filenameTmp = filename + ".tmp";
std::cout << "ws_receive: Writing to disk: " << filenameTmp << std::endl; spdlog::info("ws_receive: Writing to disk: {}", filenameTmp);
std::ofstream out(filenameTmp); std::ofstream out(filenameTmp);
out.write((char*) &content.front(), content.size()); out.write((char*) &content.front(), content.size());
out.close(); out.close();
std::cout << "ws_receive: Renaming " << filenameTmp << " to " << filename << std::endl; spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename);
rename(filenameTmp.c_str(), filename.c_str()); rename(filenameTmp.c_str(), filename.c_str());
std::map<MsgPack, MsgPack> pdu; std::map<MsgPack, MsgPack> pdu;
@ -170,7 +170,7 @@ namespace ix
pdu["id"] = data["id"]; pdu["id"] = data["id"];
pdu["filename"] = data["filename"]; pdu["filename"] = data["filename"];
std::cout << "Sending ack to sender" << std::endl; spdlog::info("Sending ack to sender");
MsgPack msg(pdu); MsgPack msg(pdu);
_webSocket.sendBinary(msg.dump()); _webSocket.sendBinary(msg.dump());
} }
@ -192,11 +192,11 @@ namespace ix
_condition.notify_one(); _condition.notify_one();
log("ws_receive: connected"); log("ws_receive: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -259,7 +259,7 @@ namespace ix
std::chrono::duration<double, std::milli> duration(1000); std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
std::cout << "ws_receive: Done !" << std::endl; spdlog::info("ws_receive: Done !");
webSocketReceiver.stop(); webSocketReceiver.stop();
} }

View File

@ -4,8 +4,8 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -20,7 +20,7 @@ namespace ix
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
{ {
std::cerr << "Cannot connect to redis host" << std::endl; spdlog::info("Cannot connect to redis host");
return 1; return 1;
} }
@ -30,10 +30,10 @@ namespace ix
if (!redisClient.auth(password, authResponse)) if (!redisClient.auth(password, authResponse))
{ {
std::stringstream ss; std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl; spdlog::info("Cannot authenticated to redis");
return 1; return 1;
} }
std::cout << "Auth response: " << authResponse << ":" << port << std::endl; spdlog::info("Auth response: {}", authResponse);
} }
std::string errMsg; std::string errMsg;
@ -41,8 +41,7 @@ namespace ix
{ {
if (!redisClient.publish(channel, message, errMsg)) if (!redisClient.publish(channel, message, errMsg))
{ {
std::cerr << "Error publishing to channel " << channel << "error: " << errMsg spdlog::error("Error publishing to channel {} error {}", channel, errMsg);
<< std::endl;
return 1; return 1;
} }
} }

View File

@ -4,7 +4,6 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixsnake/IXRedisServer.h> #include <ixsnake/IXRedisServer.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -20,7 +19,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -6,8 +6,8 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <thread> #include <thread>
@ -22,7 +22,7 @@ namespace ix
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
{ {
std::cerr << "Cannot connect to redis host" << std::endl; spdlog::info("Cannot connect to redis host");
return 1; return 1;
} }
@ -32,10 +32,10 @@ namespace ix
if (!redisClient.auth(password, authResponse)) if (!redisClient.auth(password, authResponse))
{ {
std::stringstream ss; std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl; spdlog::info("Cannot authenticated to redis");
return 1; return 1;
} }
std::cout << "Auth response: " << authResponse << ":" << port << std::endl; spdlog::info("Auth response: {}", authResponse);
} }
std::atomic<int> msgPerSeconds(0); std::atomic<int> msgPerSeconds(0);
@ -44,7 +44,7 @@ namespace ix
auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) { auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) {
if (verbose) if (verbose)
{ {
std::cout << "received: " << message << std::endl; spdlog::info("recived: {}", message);
} }
msgPerSeconds++; msgPerSeconds++;
@ -52,14 +52,13 @@ namespace ix
}; };
auto responseCallback = [](const std::string& redisResponse) { auto responseCallback = [](const std::string& redisResponse) {
std::cout << "Redis subscribe response: " << redisResponse << std::endl; spdlog::info("Redis subscribe response: {}", redisResponse);
}; };
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
@ -69,10 +68,10 @@ namespace ix
std::thread t(timer); std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl; spdlog::info("Subscribing to {} ...", channel);
if (!redisClient.subscribe(channel, responseCallback, callback)) if (!redisClient.subscribe(channel, responseCallback, callback))
{ {
std::cerr << "Error subscribing to channel " << channel << std::endl; spdlog::info("Error subscribing to channel {}", channel);
return 1; return 1;
} }

View File

@ -7,7 +7,6 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h> #include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h> #include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
@ -16,6 +15,7 @@
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
@ -68,12 +68,12 @@ namespace ix
void WebSocketSender::log(const std::string& msg) void WebSocketSender::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketSender::waitForConnection() void WebSocketSender::waitForConnection()
{ {
std::cout << "ws_send: Connecting..." << std::endl; spdlog::info("{}: Connecting...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -81,7 +81,7 @@ namespace ix
void WebSocketSender::waitForAck() void WebSocketSender::waitForAck()
{ {
std::cout << "ws_send: Waiting for ack..." << std::endl; spdlog::info("{}: Waiting for ack...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -122,11 +122,11 @@ namespace ix
_condition.notify_one(); _condition.notify_one();
log("ws_send: connected"); log("ws_send: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -147,14 +147,14 @@ namespace ix
MsgPack data = MsgPack::parse(msg->str, errMsg); MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty()) if (!errMsg.empty())
{ {
std::cerr << "Invalid MsgPack response" << std::endl; spdlog::info("Invalid MsgPack response");
return; return;
} }
std::string id = data["id"].string_value(); std::string id = data["id"].string_value();
if (_id != id) if (_id != id)
{ {
std::cerr << "Invalid id" << std::endl; spdlog::info("Invalid id");
} }
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -201,7 +201,7 @@ namespace ix
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start); auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
_ms = milliseconds.count(); _ms = milliseconds.count();
std::cout << _description << " completed in " << _ms << "ms" << std::endl; spdlog::info("{} completed in {}", _description, _ms);
_reported = true; _reported = true;
} }
@ -240,7 +240,7 @@ namespace ix
Bench bench("Sending file through websocket"); Bench bench("Sending file through websocket");
_webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool { _webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl; spdlog::info("ws_send: Step {} out of {}", current, total);
if (throttle) if (throttle)
{ {
@ -254,7 +254,7 @@ namespace ix
do do
{ {
size_t bufferedAmount = _webSocket.bufferedAmount(); size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << "ws_send: " << bufferedAmount << " bytes left to be sent" << std::endl; spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount);
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
@ -264,7 +264,7 @@ namespace ix
auto duration = bench.getDuration(); auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration; auto transferRate = 1000 * content.size() / duration;
transferRate /= (1024 * 1024); transferRate /= (1024 * 1024);
std::cout << "ws_send: Send transfer rate: " << transferRate << "MB/s" << std::endl; spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
} }
void wsSend(const std::string& url, void wsSend(const std::string& url,
@ -278,12 +278,12 @@ namespace ix
webSocketSender.waitForConnection(); webSocketSender.waitForConnection();
std::cout << "ws_send: Sending..." << std::endl; spdlog::info("ws_send: Sending...");
webSocketSender.sendMessage(path, throttle); webSocketSender.sendMessage(path, throttle);
webSocketSender.waitForAck(); webSocketSender.waitForAck();
std::cout << "ws_send: Done !" << std::endl; spdlog::info("ws_send: Done !");
webSocketSender.stop(); webSocketSender.stop();
} }

View File

@ -5,8 +5,8 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace namespace
@ -58,11 +58,11 @@ namespace ix
auto str = readAsString(appsConfigPath); auto str = readAsString(appsConfigPath);
if (str.empty()) if (str.empty())
{ {
std::cout << "Cannot read content of " << appsConfigPath << std::endl; spdlog::error("Cannot read content of {}", appsConfigPath);
return 1; return 1;
} }
std::cout << str << std::endl; spdlog::error(str);
auto apps = nlohmann::json::parse(str); auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"]; appConfig.apps = apps["apps"];

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -14,7 +14,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "ws_transfer: Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -25,22 +25,23 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "ws_transfer: New connection" << std::endl; spdlog::info("ws_transfer: New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "ws_transfer: [client " << connectionState->getId() spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}",
<< "]: Closed connection, code " << msg->closeInfo.code << " reason " connectionState->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
auto remaining = server.getClients().erase(webSocket); auto remaining = server.getClients().erase(webSocket);
std::cerr << "ws_transfer: " << remaining << " remaining clients " << std::endl; spdlog::info("ws_transfer: {} remaining clients", remaining);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
@ -49,40 +50,43 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); spdlog::info(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "ws_transfer: Received message fragment " << std::endl; spdlog::info("ws_transfer: Received message fragment ");
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "ws_transfer: Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("ws_transfer: Received {} bytes", msg->wireSize);
size_t receivers = 0; size_t receivers = 0;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
auto readyState = client->getReadyState(); auto readyState = client->getReadyState();
auto id = connectionState->getId();
if (readyState == ReadyState::Open) if (readyState == ReadyState::Open)
{ {
++receivers; ++receivers;
client->send(msg->str, client->send(
msg->binary, msg->str, msg->binary, [&id](int current, int total) -> bool {
[id = connectionState->getId()](int current, spdlog::info("{}: [client {}]: Step {} out of {}",
int total) -> bool { "ws_transfer",
std::cerr << "ws_transfer: [client " << id id,
<< "]: Step " << current << " out of " current,
<< total << std::endl; total);
return true; return true;
}); });
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: " << bufferedAmount spdlog::info("{}: [client {}]: {} bytes left to send",
<< " bytes left to be sent, " << std::endl; "ws_transfer",
id,
bufferedAmount);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_for(std::chrono::milliseconds(500));
@ -96,16 +100,19 @@ namespace ix
? "Connecting" ? "Connecting"
: readyState == ReadyState::Closing ? "Closing" : "Closed"; : readyState == ReadyState::Closing ? "Closing" : "Closed";
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: has readystate '" << readyStateString << "' and " spdlog::info(
<< bufferedAmount << " bytes left to be sent, " "{}: [client {}]: has readystate {} bytes left to be sent",
<< std::endl; "ws_transfer",
id,
readyStateString,
bufferedAmount);
} }
} }
} }
if (!receivers) if (!receivers)
{ {
std::cerr << "ws_transfer: no remaining receivers" << std::endl; spdlog::info("ws_transfer: no remaining receivers");
} }
} }
}); });
@ -114,7 +121,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }