Compare commits

...

15 Commits

Author SHA1 Message Date
c4e9abfe80 (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry 2019-12-25 22:15:57 -08:00
a805270d02 (ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed) 2019-12-25 09:03:57 -08:00
e13b57c73b (ws client) all commands use spdlog instead of std::cerr or std::cout for logging 2019-12-24 21:55:34 -08:00
5be84926ef (cobra client) send a websocket ping every 30s to keep the connection opened 2019-12-24 17:16:41 -08:00
33e7271b85 (tls / apple) minor refactoring, move functions out of the anonymous namespace to become static member functions 2019-12-23 16:30:38 -08:00
d72e5e70f6 socket tls options: display ciphers 2019-12-23 12:25:25 -08:00
e2c5f751bd (doc) fix typo 2019-12-22 20:33:14 -08:00
351b86e266 v7.6.4 2019-12-22 20:32:10 -08:00
d0cbff4f4e (client) error handling, quote url in error case when failing to parse on 2019-12-22 20:30:29 -08:00
cbfc9b9f94 (ws) ws_cobra_publish: register callbacks before connecting 2019-12-22 20:29:37 -08:00
ca816d801f (doc) mention mbedtls in supported ssl server backend 2019-12-22 20:28:44 -08:00
2f354d31eb update gitignore file 2019-12-20 15:21:36 -08:00
2c6c1edd37 (tls) add a simple description of the TLS configuration routine for debugging 2019-12-20 15:18:04 -08:00
9799e7e84b (mbedtls) correct support for using own certificate and private key 2019-12-20 15:13:26 -08:00
81be970679 (ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11 2019-12-20 09:51:21 -08:00
43 changed files with 400 additions and 296 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
build build
*.pyc *.pyc
venv venv
ixsnake/ixsnake/.certs/
site/
ws/.certs/
ws/.srl

View File

@ -1,4 +1,4 @@
FROM alpine:edge as build FROM alpine:3.11 as build
RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev
RUN apk add --no-cache make RUN apk add --no-cache make
@ -16,7 +16,7 @@ WORKDIR /opt
USER app USER app
RUN [ "make", "ws_install" ] RUN [ "make", "ws_install" ]
FROM alpine:edge as runtime FROM alpine:3.11 as runtime
RUN apk add --no-cache libstdc++ RUN apk add --no-cache libstdc++
RUN apk add --no-cache strace RUN apk add --no-cache strace

View File

@ -1,5 +1,39 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [7.8.2] - 2019-12-25
(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry
## [7.8.1] - 2019-12-25
(ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed)
## [7.8.0] - 2019-12-24
(ws client) all commands use spdlog instead of std::cerr or std::cout for logging
## [7.6.5] - 2019-12-24
(cobra client) send a websocket ping every 30s to keep the connection opened
## [7.6.4] - 2019-12-22
(client) error handling, quote url in error case when failing to parse one
(ws) ws_cobra_publish: register callbacks before connecting
(doc) mention mbedtls in supported ssl server backend
## [7.6.3] - 2019-12-20
(tls) add a simple description of the TLS configuration routine for debugging
## [7.6.2] - 2019-12-20
(mbedtls) correct support for using own certificate and private key
## [7.6.1] - 2019-12-20
(ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11
## [7.6.0] - 2019-12-19 ## [7.6.0] - 2019-12-19

View File

@ -247,7 +247,7 @@ Options:
[cobra](https://github.com/machinezone/cobra) is a real time messenging server. ws has several sub-command to interact with cobra. There is also a minimal cobra compatible server named snake available. [cobra](https://github.com/machinezone/cobra) is a real time messenging server. ws has several sub-command to interact with cobra. There is also a minimal cobra compatible server named snake available.
Below are examples on running a snake server and clients with TLS enabled (the server only works with the OpenSSL backend for now). Below are examples on running a snake server and clients with TLS enabled (the server only works with the OpenSSL and the Mbed TLS backend for now).
First, generate certificates. First, generate certificates.
@ -366,4 +366,4 @@ $ ws cobra_publish --endpoint wss://127.0.0.1:8765 --appkey FC2F10139A2BAc53BB72
[2019-12-19 20:46:42.659] [info] Published message id 3 acked [2019-12-19 20:46:42.659] [info] Published message id 3 acked
``` ```
To use OpenSSL on macOS, compile with `make ws_openssl`. First you will have to install OpenSSL libraries, which can be done with Homebrew. To use OpenSSL on macOS, compile with `make ws_openssl`. First you will have to install OpenSSL libraries, which can be done with Homebrew. Use `make ws_mbedtls` accordingly to use MbedTLS.

View File

@ -24,6 +24,7 @@ namespace ix
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr; PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize; constexpr size_t CobraConnection::kQueueMaxSize;
constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId; constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId;
constexpr int CobraConnection::kPingIntervalSecs;
CobraConnection::CobraConnection() : CobraConnection::CobraConnection() :
_webSocket(new WebSocket()), _webSocket(new WebSocket()),
@ -228,6 +229,10 @@ namespace ix
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string()); invokeErrorCallback(ss.str(), std::string());
} }
else if (msg->type == ix::WebSocketMessageType::Pong)
{
invokeEventCallback(ix::CobraConnection_EventType_Pong);
}
}); });
} }
@ -260,6 +265,7 @@ namespace ix
_webSocket->setUrl(url); _webSocket->setUrl(url);
_webSocket->setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); _webSocket->setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
_webSocket->setTLSOptions(socketTLSOptions); _webSocket->setTLSOptions(socketTLSOptions);
_webSocket->setPingInterval(kPingIntervalSecs);
} }
// //

View File

@ -30,7 +30,8 @@ namespace ix
CobraConnection_EventType_Closed = 3, CobraConnection_EventType_Closed = 3,
CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5, CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6 CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7
}; };
enum CobraConnectionPublishMode enum CobraConnectionPublishMode
@ -215,6 +216,9 @@ namespace ix
// Each pdu sent should have an incremental unique id // Each pdu sent should have an incremental unique id
std::atomic<uint64_t> _id; std::atomic<uint64_t> _id;
// Frequency at which we send a websocket ping to the backing cobra connection
static constexpr int kPingIntervalSecs = 30;
}; };
} // namespace ix } // namespace ix

View File

@ -65,6 +65,10 @@ namespace ix
{ {
ss << "Published message " << msgId << " acked"; ss << "Published message " << msgId << " acked";
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
ss << "Received websocket pong";
}
ix::IXCoreLogger::Log(ss.str().c_str()); ix::IXCoreLogger::Log(ss.str().c_str());
}); });
@ -98,6 +102,8 @@ namespace ix
{ {
_channel = channel; _channel = channel;
ix::IXCoreLogger::Log(socketTLSOptions.getDescription().c_str());
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate); ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate);
_cobra_connection.configure(appkey, endpoint, _cobra_connection.configure(appkey, endpoint,
rolename, rolesecret, rolename, rolesecret,

View File

@ -10,7 +10,6 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXUserAgent.h" #include "IXUserAgent.h"
#include <fstream> #include <fstream>
#include <iostream>
#include <sstream> #include <sstream>
#include <vector> #include <vector>

View File

@ -24,9 +24,47 @@
#include <Security/SecureTransport.h> #include <Security/SecureTransport.h>
namespace namespace ix
{ {
OSStatus read_from_socket(SSLConnectionRef connection, void* data, size_t* len) SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
std::string SocketAppleSSL::getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
OSStatus SocketAppleSSL::readFromSocket(SSLConnectionRef connection, void* data, size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@ -67,7 +105,7 @@ namespace
} }
} }
OSStatus write_to_socket(SSLConnectionRef connection, const void* data, size_t* len) OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection, const void* data, size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@ -105,47 +143,6 @@ namespace
} }
} }
std::string getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
} // anonymous namespace
namespace ix
{
SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
bool SocketAppleSSL::accept(std::string& errMsg) bool SocketAppleSSL::accept(std::string& errMsg)
{ {
@ -168,7 +165,7 @@ namespace ix
_sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType); _sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType);
SSLSetIOFuncs(_sslContext, read_from_socket, write_to_socket); SSLSetIOFuncs(_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket);
SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd); SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd);
SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12); SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12);
SSLSetPeerDomainName(_sslContext, host.c_str(), host.size()); SSLSetPeerDomainName(_sslContext, host.c_str(), host.size());

View File

@ -34,6 +34,10 @@ namespace ix
virtual ssize_t recv(void* buffer, size_t length) final; virtual ssize_t recv(void* buffer, size_t length) final;
private: private:
static std::string getSSLErrorDescription(OSStatus status);
static OSStatus writeToSocket(SSLConnectionRef connection, const void* data, size_t* len);
static OSStatus readFromSocket(SSLConnectionRef connection, void* data, size_t* len);
SSLContextRef _sslContext; SSLContextRef _sslContext;
mutable std::mutex _mutex; // AppleSSL routines are not thread-safe mutable std::mutex _mutex; // AppleSSL routines are not thread-safe

View File

@ -71,11 +71,16 @@ namespace ix
if (_tlsOptions.hasCertAndKey()) if (_tlsOptions.hasCertAndKey())
{ {
if (mbedtls_x509_crt_parse_file(&_cacert, _tlsOptions.certFile.c_str()) < 0) if (mbedtls_x509_crt_parse_file(&_cert, _tlsOptions.certFile.c_str()) < 0)
{ {
errMsg = "Cannot parse cert file '" + _tlsOptions.certFile + "'"; errMsg = "Cannot parse cert file '" + _tlsOptions.certFile + "'";
return false; return false;
} }
if (mbedtls_pk_parse_keyfile(&_pkey, _tlsOptions.keyFile.c_str(), "") < 0)
{
errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'";
return false;
}
} }
if (_tlsOptions.isPeerVerifyDisabled()) if (_tlsOptions.isPeerVerifyDisabled())
@ -84,7 +89,7 @@ namespace ix
} }
else else
{ {
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL); mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
// FIXME: should we call mbedtls_ssl_conf_verify ? // FIXME: should we call mbedtls_ssl_conf_verify ?
@ -97,7 +102,13 @@ namespace ix
errMsg = "Cannot parse CA file '" + _tlsOptions.caFile + "'"; errMsg = "Cannot parse CA file '" + _tlsOptions.caFile + "'";
return false; return false;
} }
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL);
if (_tlsOptions.hasCertAndKey())
{
mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey);
}
} }
if (mbedtls_ssl_setup(&_ssl, &_conf) != 0) if (mbedtls_ssl_setup(&_ssl, &_conf) != 0)

View File

@ -45,6 +45,7 @@ namespace ix
mbedtls_ctr_drbg_context _ctr_drbg; mbedtls_ctr_drbg_context _ctr_drbg;
mbedtls_x509_crt _cacert; mbedtls_x509_crt _cacert;
mbedtls_x509_crt _cert; mbedtls_x509_crt _cert;
mbedtls_pk_context _pkey;
std::mutex _mutex; std::mutex _mutex;
SocketTLSOptions _tlsOptions; SocketTLSOptions _tlsOptions;

View File

@ -11,7 +11,7 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#include <assert.h> #include <assert.h>
#include <iostream> #include <stdio.h>
#include <sstream> #include <sstream>
#include <string.h> #include <string.h>
@ -45,13 +45,13 @@ namespace ix
void SocketServer::logError(const std::string& str) void SocketServer::logError(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl; fprintf(stderr, "%s\n", str.c_str());
} }
void SocketServer::logInfo(const std::string& str) void SocketServer::logInfo(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl; fprintf(stdout, "%s\n", str.c_str());
} }
std::pair<bool, std::string> SocketServer::listen() std::pair<bool, std::string> SocketServer::listen()

View File

@ -8,6 +8,7 @@
#include <assert.h> #include <assert.h>
#include <fstream> #include <fstream>
#include <sstream>
namespace ix namespace ix
{ {
@ -71,4 +72,16 @@ namespace ix
{ {
return _errMsg; return _errMsg;
} }
std::string SocketTLSOptions::getDescription() const
{
std::stringstream ss;
ss << "TLS Options:" << std::endl;
ss << " certFile = " << certFile << std::endl;
ss << " keyFile = " << keyFile << std::endl;
ss << " caFile = " << caFile << std::endl;
ss << " ciphers = " << ciphers << std::endl;
ss << " ciphers = " << ciphers << std::endl;
return ss.str();
}
} // namespace ix } // namespace ix

View File

@ -43,6 +43,8 @@ namespace ix
const std::string& getErrorMsg() const; const std::string& getErrorMsg() const;
std::string getDescription() const;
private: private:
mutable std::string _errMsg; mutable std::string _errMsg;
mutable bool _validated = false; mutable bool _validated = false;

View File

@ -144,7 +144,9 @@ namespace ix
if (!UrlParser::parse(url, protocol, host, path, query, port)) if (!UrlParser::parse(url, protocol, host, path, query, port))
{ {
return WebSocketInitResult(false, 0, std::string("Could not parse URL ") + url); std::stringstream ss;
ss << "Could not parse url: '" << url << "'";
return WebSocketInitResult(false, 0, ss.str());
} }
std::string errorMsg; std::string errorMsg;

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.5.8" #define IX_WEBSOCKET_VERSION "7.8.2"

View File

@ -23,6 +23,8 @@ include_directories(
../ws ../ws
) )
add_definitions(-DSPDLOG_COMPILED_LIB=1)
find_package(JsonCpp) find_package(JsonCpp)
if (NOT JSONCPP_FOUND) if (NOT JSONCPP_FOUND)
include_directories(../third_party/jsoncpp) include_directories(../third_party/jsoncpp)
@ -98,4 +100,6 @@ target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore) target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry) target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest spdlog)
install(TARGETS ixwebsocket_unittest DESTINATION bin) install(TARGETS ixwebsocket_unittest DESTINATION bin)

View File

@ -25,6 +25,8 @@ include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws ../third_party/spdlog/include) include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise) include_directories(ws ../third_party/cpp-linenoise)
add_definitions(-DSPDLOG_COMPILED_LIB=1)
if (UNIX) if (UNIX)
set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp) set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp)
endif() endif()
@ -72,6 +74,8 @@ target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore) target_link_libraries(ws ixcore)
target_link_libraries(ws ixsentry) target_link_libraries(ws ixsentry)
target_link_libraries(ws spdlog)
if(NOT APPLE AND NOT USE_MBED_TLS) if(NOT APPLE AND NOT USE_MBED_TLS)
find_package(OpenSSL REQUIRED) find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS}) add_definitions(${OPENSSL_DEFINITIONS})

View File

@ -11,7 +11,6 @@
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h> #include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXNetSystem.h> #include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
@ -39,12 +38,13 @@ int main(int argc, char** argv)
// Display command. // Display command.
if (getenv("DEBUG")) if (getenv("DEBUG"))
{ {
std::cout << "Command: "; std::stringstream ss;
ss << "Command: ";
for (int i = 0; i < argc; ++i) for (int i = 0; i < argc; ++i)
{ {
std::cout << argv[i] << " "; ss << argv[i] << " ";
} }
std::cout << std::endl; spdlog::info(ss.str());
} }
CLI::App app {"ws is a websocket tool"}; CLI::App app {"ws is a websocket tool"};
@ -105,6 +105,7 @@ int main(int argc, char** argv)
int count = 1; int count = 1;
int jobs = 4; int jobs = 4;
uint32_t maxWaitBetweenReconnectionRetries; uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option( app->add_option(
@ -268,6 +269,7 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry"); cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size", maxQueueSize, "Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
@ -455,6 +457,7 @@ int main(int argc, char** argv)
verbose, verbose,
strict, strict,
jobs, jobs,
maxQueueSize,
tlsOptions); tlsOptions);
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))
@ -496,11 +499,11 @@ int main(int argc, char** argv)
} }
else if (version) else if (version)
{ {
std::cout << "ws " << ix::userAgent() << std::endl; spdlog::info("ws {}", ix::userAgent());
} }
else else
{ {
std::cerr << "A subcommand or --version is required" << std::endl; spdlog::error("A subcommand or --version is required");
} }
ix::uninitNetSystem(); ix::uninitNetSystem();

View File

@ -119,6 +119,7 @@ namespace ix
bool verbose, bool verbose,
bool strict, bool strict,
int jobs, int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions); const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_metrics_to_redis(const std::string& appkey, int ws_cobra_metrics_to_redis(const std::string& appkey,

View File

@ -32,7 +32,6 @@
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <mutex> #include <mutex>
@ -91,7 +90,7 @@ namespace ix
{ {
if (!_quiet) if (!_quiet)
{ {
std::cerr << msg; spdlog::info(msg);
} }
} }
@ -183,7 +182,7 @@ namespace ix
webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) { webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close) if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Report generated" << std::endl; spdlog::info("Report generated");
condition.notify_one(); condition.notify_one();
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -193,7 +192,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl; spdlog::info(ss.str());
success = false; success = false;
} }
@ -236,7 +235,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl; spdlog::info(ss.str());
condition.notify_one(); condition.notify_one();
} }
@ -269,7 +268,7 @@ namespace ix
int ws_autobahn_main(const std::string& url, bool quiet) int ws_autobahn_main(const std::string& url, bool quiet)
{ {
int testCasesCount = getTestCaseCount(url); int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl; spdlog::info("Test cases count: {}", testCasesCount);
if (testCasesCount == -1) if (testCasesCount == -1)
{ {

View File

@ -4,9 +4,10 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <sstream> #include <sstream>
#include <spdlog/spdlog.h>
namespace ix namespace ix
{ {
@ -14,7 +15,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -25,20 +26,19 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed connection: code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " msg->closeInfo.code, msg->closeInfo.reason);
<< msg->closeInfo.reason << std::endl;
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
@ -47,30 +47,29 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); spdlog::info(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; spdlog::info("Received message fragment");
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(msg->str, msg->binary, [](int current, int total) -> bool { client->send(msg->str, msg->binary, [](int current, int total) -> bool {
std::cerr << "Step " << current << " out of " << total << std::endl; spdlog::info("Step {} out of {}", current, total);
return true; return true;
}); });
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" spdlog::info("{} bytes left to be sent", bufferedAmount);
<< std::endl;
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
@ -84,7 +83,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -9,12 +9,13 @@
// Broadcast server can be ran with `ws broadcast_server` // Broadcast server can be ran with `ws broadcast_server`
// //
#include "linenoise.hpp"
#include "nlohmann/json.hpp" #include "nlohmann/json.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <queue> #include <queue>
#include <sstream> #include <sstream>
#include <spdlog/spdlog.h>
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;
@ -55,7 +56,7 @@ namespace ix
void WebSocketChat::log(const std::string& msg) void WebSocketChat::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
size_t WebSocketChat::getReceivedMessagesCount() const size_t WebSocketChat::getReceivedMessagesCount() const
@ -85,20 +86,21 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws chat: connected"); log("ws chat: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
ss << "ws chat: user " << _user << " Connected !"; spdlog::info("ws chat: user {} connected !", _user);
log(ss.str()); log(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws chat: user " << _user << " disconnected !" ss << "ws chat user disconnected: " << _user;
<< " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason; ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
@ -162,25 +164,25 @@ namespace ix
int ws_chat_main(const std::string& url, const std::string& user) int ws_chat_main(const std::string& url, const std::string& user)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketChat webSocketChat(url, user); WebSocketChat webSocketChat(url, user);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
{ {
std::string text; // Read line
std::cout << user << " > " << std::flush; std::string line;
std::getline(std::cin, text); auto quit = linenoise::Readline("> ", line);
if (!std::cin) if (quit)
{ {
break; break;
} }
webSocketChat.sendMessage(text); webSocketChat.sendMessage(line);
} }
std::cout << std::endl; spdlog::info("");
webSocketChat.stop(); webSocketChat.stop();
return 0; return 0;

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <mutex> #include <mutex>
@ -44,8 +43,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <mutex> #include <mutex>
@ -43,7 +42,6 @@ namespace ix
rolesecret, rolesecret,
ix::WebSocketPerMessageDeflateOptions(true), ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions); tlsOptions);
conn.connect();
// Display incoming messages // Display incoming messages
std::atomic<bool> authenticated(false); std::atomic<bool> authenticated(false);
@ -92,8 +90,14 @@ namespace ix
spdlog::info("Published message id {} acked", msgId); spdlog::info("Published message id {} acked", msgId);
messageAcked = true; messageAcked = true;
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
conn.connect();
while (!authenticated) while (!authenticated)
; ;
while (!messageAcked) while (!messageAcked)

View File

@ -6,7 +6,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -41,8 +40,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
@ -77,7 +75,7 @@ namespace ix
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) { [&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) {
if (!quiet) if (!quiet)
{ {
std::cerr << jsonWriter.write(msg) << std::endl; spdlog::info(jsonWriter.write(msg));
} }
msgPerSeconds++; msgPerSeconds++;
@ -100,6 +98,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <mutex> #include <mutex>
@ -29,6 +28,7 @@ namespace ix
bool verbose, bool verbose,
bool strict, bool strict,
int jobs, int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
@ -161,7 +161,7 @@ namespace ix
}; };
// Create a thread pool // Create a thread pool
std::cerr << "Starting " << jobs << " sentry sender jobs" << std::endl; spdlog::info("Starting {} sentry sender jobs", jobs);
std::vector<std::thread> pool; std::vector<std::thread> pool;
for (int i = 0; i < jobs; i++) for (int i = 0; i < jobs; i++)
{ {
@ -177,6 +177,7 @@ namespace ix
&receivedCount, &receivedCount,
&condition, &condition,
&conditionVariableMutex, &conditionVariableMutex,
&maxQueueSize,
&queue](ix::CobraConnectionEventType eventType, &queue](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -197,7 +198,7 @@ namespace ix
} }
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
std::cerr << "Subscriber authenticated" << std::endl; spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, [&jsonWriter,
@ -206,6 +207,7 @@ namespace ix
&receivedCount, &receivedCount,
&condition, &condition,
&conditionVariableMutex, &conditionVariableMutex,
&maxQueueSize,
&queue](const Json::Value& msg) { &queue](const Json::Value& msg) {
if (verbose) if (verbose)
{ {
@ -223,7 +225,12 @@ namespace ix
{ {
std::unique_lock<std::mutex> lock(conditionVariableMutex); std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg); // if the sending is not fast enough there is no point
// in queuing too many events.
if (queue.size() < maxQueueSize)
{
queue.push(msg);
}
} }
condition.notify_one(); condition.notify_one();
@ -245,6 +252,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -6,7 +6,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -160,6 +159,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -5,10 +5,10 @@
*/ */
#include "linenoise.hpp" #include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -93,7 +93,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
headers[key] = val; headers[key] = val;
} }
@ -129,11 +129,11 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); log("ws_connect: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -145,7 +145,7 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
ss << "ws_connect: received message: " << msg->str; ss << "ws_connect: received message: " << msg->str;
log(ss.str()); log(ss.str());
@ -160,15 +160,15 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; spdlog::info("Received message fragment");
} }
else if (msg->type == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
std::cerr << "Received ping" << std::endl; spdlog::info("Received ping");
} }
else if (msg->type == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cerr << "Received pong" << std::endl; spdlog::info("Received pong");
} }
else else
{ {
@ -225,14 +225,14 @@ namespace ix
if (line == "/stop") if (line == "/stop")
{ {
std::cout << "Stopping connection..." << std::endl; spdlog::info("Stopping connection...");
webSocketChat.stop(); webSocketChat.stop();
continue; continue;
} }
if (line == "/start") if (line == "/start")
{ {
std::cout << "Starting connection..." << std::endl; spdlog::info("Starting connection...");
webSocketChat.start(); webSocketChat.start();
continue; continue;
} }
@ -243,7 +243,7 @@ namespace ix
linenoise::AddHistory(line.c_str()); linenoise::AddHistory(line.c_str());
} }
std::cout << std::endl; spdlog::info("");
webSocketChat.stop(); webSocketChat.stop();
return 0; return 0;

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -15,7 +15,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -27,13 +27,13 @@ namespace ix
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) { [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
if (greetings) if (greetings)
@ -43,22 +43,21 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " connectionState->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
webSocket->send(msg->str, msg->binary); webSocket->send(msg->str, msg->binary);
} }
}); });
@ -67,7 +66,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::error(res.second);
return 1; return 1;
} }

View File

@ -5,10 +5,10 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h> #include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -47,7 +47,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
headers[key] = val; headers[key] = val;
} }
@ -76,7 +76,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
httpParameters[key] = val; httpParameters[key] = val;
} }
@ -108,10 +108,9 @@ namespace ix
args->maxRedirects = maxRedirects; args->maxRedirects = maxRedirects;
args->verbose = verbose; args->verbose = verbose;
args->compress = compress; args->compress = compress;
args->logger = [](const std::string& msg) { std::cout << msg; }; args->logger = [](const std::string& msg) { spdlog::info(msg); };
args->onProgressCallback = [](int current, int total) -> bool { args->onProgressCallback = [](int current, int total) -> bool {
std::cerr << "\r" spdlog::info("Downloaded {} bytes out of {}", current, total);
<< "Downloaded " << current << " bytes out of " << total;
return true; return true;
}; };
@ -131,20 +130,20 @@ namespace ix
response = httpClient.post(url, httpParameters, args); response = httpClient.post(url, httpParameters, args);
} }
std::cerr << std::endl; spdlog::info("");
for (auto it : response->headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
std::cerr << "Upload size: " << response->uploadSize << std::endl; spdlog::info("Upload size: {}", response->uploadSize);
std::cerr << "Download size: " << response->downloadSize << std::endl; spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response->statusCode << std::endl; spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok) if (response->errorCode != HttpErrorCode::Ok)
{ {
std::cerr << "error message: " << response->errorMsg << std::endl; spdlog::info("error message: ", response->errorMsg);
} }
if (!headersOnly && response->errorCode == HttpErrorCode::Ok) if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
@ -158,7 +157,7 @@ namespace ix
filename = output; filename = output;
} }
std::cout << "Writing to disk: " << filename << std::endl; spdlog::info("Writing to disk: {}", filename);
std::ofstream out(filename); std::ofstream out(filename);
out.write((char*) &response->payload.front(), response->payload.size()); out.write((char*) &response->payload.front(), response->payload.size());
out.close(); out.close();
@ -167,14 +166,13 @@ namespace ix
{ {
if (response->headers["Content-Type"] != "application/octet-stream") if (response->headers["Content-Type"] != "application/octet-stream")
{ {
std::cout << "payload: " << response->payload << std::endl; spdlog::info("payload: {}", response->payload);
} }
else else
{ {
std::cerr << "Binary output can mess up your terminal." << std::endl; spdlog::info("Binary output can mess up your terminal.");
std::cerr << "Use the -O flag to save the file to disk." << std::endl; spdlog::info("Use the -O flag to save the file to disk.");
std::cerr << "You can also use the --output option to specify a filename." spdlog::info("You can also use the --output option to specify a filename.");
<< std::endl;
} }
} }
} }

View File

@ -5,7 +5,6 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -32,7 +31,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::error(res.second);
return 1; return 1;
} }

View File

@ -4,11 +4,12 @@
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
namespace ix namespace ix
{ {
@ -40,7 +41,7 @@ namespace ix
void WebSocketPingPong::log(const std::string& msg) void WebSocketPingPong::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketPingPong::stop() void WebSocketPingPong::stop()
@ -56,18 +57,18 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
std::stringstream ss; std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -127,7 +128,7 @@ namespace ix
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions) int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketPingPong webSocketPingPong(url, tlsOptions); WebSocketPingPong webSocketPingPong(url, tlsOptions);
webSocketPingPong.start(); webSocketPingPong.start();

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -44,7 +44,7 @@ namespace ix
const std::string& remoteUrl, const std::string& remoteUrl,
bool verbose) bool verbose)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -61,41 +61,39 @@ namespace ix
// Server connection // Server connection
state->webSocket().setOnMessageCallback([webSocket, state, verbose]( state->webSocket().setOnMessageCallback([webSocket, state, verbose](
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection to remote server");
std::cerr << "server id: " << state->getId() << std::endl; spdlog::info("id: {}", state->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed remote server connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " state->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
webSocket->close(msg->closeInfo.code, msg->closeInfo.reason); msg->closeInfo.reason);
state->setTerminated(); state->setTerminated();
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes from server" << std::endl; spdlog::info("Received {} bytes from server", msg->wireSize);
if (verbose) if (verbose)
{ {
std::cerr << "payload " << msg->str << std::endl; spdlog::info("payload {}", msg->str);
} }
webSocket->send(msg->str, msg->binary); webSocket->send(msg->str, msg->binary);
@ -107,53 +105,54 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection from client");
std::cerr << "client id: " << state->getId() << std::endl; spdlog::info("id: {}", state->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
// Connect to the 'real' server // Connect to the 'real' server
std::string url(remoteUrl); std::string url(remoteUrl);
url += msg->openInfo.uri; url += msg->openInfo.uri;
state->webSocket().setUrl(url); state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start(); state->webSocket().start();
// we should sleep here for a bit until we've established the // we should sleep here for a bit until we've established the
// connection with the remote server // connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open) while (state->webSocket().getReadyState() != ReadyState::Open)
{ {
std::cerr << "waiting for server connection establishment" << std::endl; spdlog::info("waiting for server connection establishment");
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
std::cerr << "server connection established" << std::endl; spdlog::info("server connection established");
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed client connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " state->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason); state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes from client" << std::endl; spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose) if (verbose)
{ {
std::cerr << "payload " << msg->str << std::endl; spdlog::info("payload {}", msg->str);
} }
state->webSocket().send(msg->str, msg->binary); state->webSocket().send(msg->str, msg->binary);
} }
}); });
@ -162,7 +161,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -7,7 +7,6 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h> #include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h> #include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
@ -15,6 +14,7 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
@ -75,12 +75,12 @@ namespace ix
void WebSocketReceiver::log(const std::string& msg) void WebSocketReceiver::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketReceiver::waitForConnection() void WebSocketReceiver::waitForConnection()
{ {
std::cout << "ws_receive: Connecting..." << std::endl; spdlog::info("{}: Connecting...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -88,7 +88,7 @@ namespace ix
void WebSocketReceiver::waitForMessage() void WebSocketReceiver::waitForMessage()
{ {
std::cout << "ws_receive: Waiting for message..." << std::endl; spdlog::info("{}: Waiting for message...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -124,7 +124,7 @@ namespace ix
void WebSocketReceiver::handleMessage(const std::string& str) void WebSocketReceiver::handleMessage(const std::string& str)
{ {
std::cerr << "ws_receive: Received message: " << str.size() << std::endl; spdlog::info("ws_receive: Received message: {}", str.size());
std::string errMsg; std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg); MsgPack data = MsgPack::parse(str, errMsg);
@ -134,17 +134,17 @@ namespace ix
return; return;
} }
std::cout << "id: " << data["id"].string_value() << std::endl; spdlog::info("id: {}", data["id"].string_value());
std::vector<uint8_t> content = data["content"].binary_items(); std::vector<uint8_t> content = data["content"].binary_items();
std::cout << "ws_receive: Content size: " << content.size() << std::endl; spdlog::info("ws_receive: Content size: {}", content.size());
// Validate checksum // Validate checksum
uint64_t cksum = ix::djb2Hash(content); uint64_t cksum = ix::djb2Hash(content);
auto cksumRef = data["djb2_hash"].string_value(); auto cksumRef = data["djb2_hash"].string_value();
std::cout << "ws_receive: Computed hash: " << cksum << std::endl; spdlog::info("ws_receive: Computed hash: {}", cksum);
std::cout << "ws_receive: Reference hash: " << cksumRef << std::endl; spdlog::info("ws_receive: Reference hash: {}", cksumRef);
if (std::to_string(cksum) != cksumRef) if (std::to_string(cksum) != cksumRef)
{ {
@ -157,12 +157,12 @@ namespace ix
std::string filenameTmp = filename + ".tmp"; std::string filenameTmp = filename + ".tmp";
std::cout << "ws_receive: Writing to disk: " << filenameTmp << std::endl; spdlog::info("ws_receive: Writing to disk: {}", filenameTmp);
std::ofstream out(filenameTmp); std::ofstream out(filenameTmp);
out.write((char*) &content.front(), content.size()); out.write((char*) &content.front(), content.size());
out.close(); out.close();
std::cout << "ws_receive: Renaming " << filenameTmp << " to " << filename << std::endl; spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename);
rename(filenameTmp.c_str(), filename.c_str()); rename(filenameTmp.c_str(), filename.c_str());
std::map<MsgPack, MsgPack> pdu; std::map<MsgPack, MsgPack> pdu;
@ -170,7 +170,7 @@ namespace ix
pdu["id"] = data["id"]; pdu["id"] = data["id"];
pdu["filename"] = data["filename"]; pdu["filename"] = data["filename"];
std::cout << "Sending ack to sender" << std::endl; spdlog::info("Sending ack to sender");
MsgPack msg(pdu); MsgPack msg(pdu);
_webSocket.sendBinary(msg.dump()); _webSocket.sendBinary(msg.dump());
} }
@ -192,11 +192,11 @@ namespace ix
_condition.notify_one(); _condition.notify_one();
log("ws_receive: connected"); log("ws_receive: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -259,7 +259,7 @@ namespace ix
std::chrono::duration<double, std::milli> duration(1000); std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
std::cout << "ws_receive: Done !" << std::endl; spdlog::info("ws_receive: Done !");
webSocketReceiver.stop(); webSocketReceiver.stop();
} }

View File

@ -4,8 +4,8 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -20,7 +20,7 @@ namespace ix
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
{ {
std::cerr << "Cannot connect to redis host" << std::endl; spdlog::info("Cannot connect to redis host");
return 1; return 1;
} }
@ -30,10 +30,10 @@ namespace ix
if (!redisClient.auth(password, authResponse)) if (!redisClient.auth(password, authResponse))
{ {
std::stringstream ss; std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl; spdlog::info("Cannot authenticated to redis");
return 1; return 1;
} }
std::cout << "Auth response: " << authResponse << ":" << port << std::endl; spdlog::info("Auth response: {}", authResponse);
} }
std::string errMsg; std::string errMsg;
@ -41,8 +41,7 @@ namespace ix
{ {
if (!redisClient.publish(channel, message, errMsg)) if (!redisClient.publish(channel, message, errMsg))
{ {
std::cerr << "Error publishing to channel " << channel << "error: " << errMsg spdlog::error("Error publishing to channel {} error {}", channel, errMsg);
<< std::endl;
return 1; return 1;
} }
} }

View File

@ -4,7 +4,6 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixsnake/IXRedisServer.h> #include <ixsnake/IXRedisServer.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -20,7 +19,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -6,8 +6,8 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <thread> #include <thread>
@ -22,7 +22,7 @@ namespace ix
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
{ {
std::cerr << "Cannot connect to redis host" << std::endl; spdlog::info("Cannot connect to redis host");
return 1; return 1;
} }
@ -32,10 +32,10 @@ namespace ix
if (!redisClient.auth(password, authResponse)) if (!redisClient.auth(password, authResponse))
{ {
std::stringstream ss; std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl; spdlog::info("Cannot authenticated to redis");
return 1; return 1;
} }
std::cout << "Auth response: " << authResponse << ":" << port << std::endl; spdlog::info("Auth response: {}", authResponse);
} }
std::atomic<int> msgPerSeconds(0); std::atomic<int> msgPerSeconds(0);
@ -44,7 +44,7 @@ namespace ix
auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) { auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) {
if (verbose) if (verbose)
{ {
std::cout << "received: " << message << std::endl; spdlog::info("recived: {}", message);
} }
msgPerSeconds++; msgPerSeconds++;
@ -52,14 +52,13 @@ namespace ix
}; };
auto responseCallback = [](const std::string& redisResponse) { auto responseCallback = [](const std::string& redisResponse) {
std::cout << "Redis subscribe response: " << redisResponse << std::endl; spdlog::info("Redis subscribe response: {}", redisResponse);
}; };
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
@ -69,10 +68,10 @@ namespace ix
std::thread t(timer); std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl; spdlog::info("Subscribing to {} ...", channel);
if (!redisClient.subscribe(channel, responseCallback, callback)) if (!redisClient.subscribe(channel, responseCallback, callback))
{ {
std::cerr << "Error subscribing to channel " << channel << std::endl; spdlog::info("Error subscribing to channel {}", channel);
return 1; return 1;
} }

View File

@ -7,7 +7,6 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h> #include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h> #include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
@ -15,6 +14,7 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
@ -68,12 +68,12 @@ namespace ix
void WebSocketSender::log(const std::string& msg) void WebSocketSender::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketSender::waitForConnection() void WebSocketSender::waitForConnection()
{ {
std::cout << "ws_send: Connecting..." << std::endl; spdlog::info("{}: Connecting...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -81,7 +81,7 @@ namespace ix
void WebSocketSender::waitForAck() void WebSocketSender::waitForAck()
{ {
std::cout << "ws_send: Waiting for ack..." << std::endl; spdlog::info("{}: Waiting for ack...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -122,11 +122,11 @@ namespace ix
_condition.notify_one(); _condition.notify_one();
log("ws_send: connected"); log("ws_send: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -147,14 +147,14 @@ namespace ix
MsgPack data = MsgPack::parse(msg->str, errMsg); MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty()) if (!errMsg.empty())
{ {
std::cerr << "Invalid MsgPack response" << std::endl; spdlog::info("Invalid MsgPack response");
return; return;
} }
std::string id = data["id"].string_value(); std::string id = data["id"].string_value();
if (_id != id) if (_id != id)
{ {
std::cerr << "Invalid id" << std::endl; spdlog::info("Invalid id");
} }
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -201,7 +201,7 @@ namespace ix
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start); auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
_ms = milliseconds.count(); _ms = milliseconds.count();
std::cout << _description << " completed in " << _ms << "ms" << std::endl; spdlog::info("{} completed in {}", _description, _ms);
_reported = true; _reported = true;
} }
@ -240,7 +240,7 @@ namespace ix
Bench bench("Sending file through websocket"); Bench bench("Sending file through websocket");
_webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool { _webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl; spdlog::info("ws_send: Step {} out of {}", current, total);
if (throttle) if (throttle)
{ {
@ -254,7 +254,7 @@ namespace ix
do do
{ {
size_t bufferedAmount = _webSocket.bufferedAmount(); size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << "ws_send: " << bufferedAmount << " bytes left to be sent" << std::endl; spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount);
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
@ -264,7 +264,7 @@ namespace ix
auto duration = bench.getDuration(); auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration; auto transferRate = 1000 * content.size() / duration;
transferRate /= (1024 * 1024); transferRate /= (1024 * 1024);
std::cout << "ws_send: Send transfer rate: " << transferRate << "MB/s" << std::endl; spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
} }
void wsSend(const std::string& url, void wsSend(const std::string& url,
@ -278,12 +278,12 @@ namespace ix
webSocketSender.waitForConnection(); webSocketSender.waitForConnection();
std::cout << "ws_send: Sending..." << std::endl; spdlog::info("ws_send: Sending...");
webSocketSender.sendMessage(path, throttle); webSocketSender.sendMessage(path, throttle);
webSocketSender.waitForAck(); webSocketSender.waitForAck();
std::cout << "ws_send: Done !" << std::endl; spdlog::info("ws_send: Done !");
webSocketSender.stop(); webSocketSender.stop();
} }

View File

@ -5,8 +5,8 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace namespace
@ -58,11 +58,11 @@ namespace ix
auto str = readAsString(appsConfigPath); auto str = readAsString(appsConfigPath);
if (str.empty()) if (str.empty())
{ {
std::cout << "Cannot read content of " << appsConfigPath << std::endl; spdlog::error("Cannot read content of {}", appsConfigPath);
return 1; return 1;
} }
std::cout << str << std::endl; spdlog::error(str);
auto apps = nlohmann::json::parse(str); auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"]; appConfig.apps = apps["apps"];

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -14,7 +14,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "ws_transfer: Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -25,22 +25,23 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "ws_transfer: New connection" << std::endl; spdlog::info("ws_transfer: New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "ws_transfer: [client " << connectionState->getId() spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}",
<< "]: Closed connection, code " << msg->closeInfo.code << " reason " connectionState->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
auto remaining = server.getClients().erase(webSocket); auto remaining = server.getClients().erase(webSocket);
std::cerr << "ws_transfer: " << remaining << " remaining clients " << std::endl; spdlog::info("ws_transfer: {} remaining clients", remaining);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
@ -49,40 +50,39 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); spdlog::info(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "ws_transfer: Received message fragment " << std::endl; spdlog::info("ws_transfer: Received message fragment ");
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "ws_transfer: Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("ws_transfer: Received {} bytes", msg->wireSize);
size_t receivers = 0; size_t receivers = 0;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
auto readyState = client->getReadyState(); auto readyState = client->getReadyState();
auto id = connectionState->getId();
if (readyState == ReadyState::Open) if (readyState == ReadyState::Open)
{ {
++receivers; ++receivers;
client->send(msg->str, client->send(msg->str,
msg->binary, msg->binary,
[id = connectionState->getId()](int current, [&id](int current, int total) -> bool {
int total) -> bool { spdlog::info("{}: [client {}]: Step {} out of {}",
std::cerr << "ws_transfer: [client " << id "ws_transfer", id, current, total);
<< "]: Step " << current << " out of "
<< total << std::endl;
return true; return true;
}); });
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: " << bufferedAmount spdlog::info("{}: [client {}]: {} bytes left to send",
<< " bytes left to be sent, " << std::endl; "ws_transfer", id, bufferedAmount);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_for(std::chrono::milliseconds(500));
@ -96,16 +96,15 @@ namespace ix
? "Connecting" ? "Connecting"
: readyState == ReadyState::Closing ? "Closing" : "Closed"; : readyState == ReadyState::Closing ? "Closing" : "Closed";
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: has readystate '" << readyStateString << "' and " spdlog::info("{}: [client {}]: has readystate {} bytes left to be sent",
<< bufferedAmount << " bytes left to be sent, " "ws_transfer", id, readyStateString, bufferedAmount);
<< std::endl;
} }
} }
} }
if (!receivers) if (!receivers)
{ {
std::cerr << "ws_transfer: no remaining receivers" << std::endl; spdlog::info("ws_transfer: no remaining receivers");
} }
} }
}); });
@ -114,7 +113,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }