Compare commits

...

15 Commits

Author SHA1 Message Date
c4e9abfe80 (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry 2019-12-25 22:15:57 -08:00
a805270d02 (ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed) 2019-12-25 09:03:57 -08:00
e13b57c73b (ws client) all commands use spdlog instead of std::cerr or std::cout for logging 2019-12-24 21:55:34 -08:00
5be84926ef (cobra client) send a websocket ping every 30s to keep the connection opened 2019-12-24 17:16:41 -08:00
33e7271b85 (tls / apple) minor refactoring, move functions out of the anonymous namespace to become static member functions 2019-12-23 16:30:38 -08:00
d72e5e70f6 socket tls options: display ciphers 2019-12-23 12:25:25 -08:00
e2c5f751bd (doc) fix typo 2019-12-22 20:33:14 -08:00
351b86e266 v7.6.4 2019-12-22 20:32:10 -08:00
d0cbff4f4e (client) error handling, quote url in error case when failing to parse on 2019-12-22 20:30:29 -08:00
cbfc9b9f94 (ws) ws_cobra_publish: register callbacks before connecting 2019-12-22 20:29:37 -08:00
ca816d801f (doc) mention mbedtls in supported ssl server backend 2019-12-22 20:28:44 -08:00
2f354d31eb update gitignore file 2019-12-20 15:21:36 -08:00
2c6c1edd37 (tls) add a simple description of the TLS configuration routine for debugging 2019-12-20 15:18:04 -08:00
9799e7e84b (mbedtls) correct support for using own certificate and private key 2019-12-20 15:13:26 -08:00
81be970679 (ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11 2019-12-20 09:51:21 -08:00
43 changed files with 400 additions and 296 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
build
*.pyc
venv
ixsnake/ixsnake/.certs/
site/
ws/.certs/
ws/.srl

View File

@ -1,4 +1,4 @@
FROM alpine:edge as build
FROM alpine:3.11 as build
RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev
RUN apk add --no-cache make
@ -16,7 +16,7 @@ WORKDIR /opt
USER app
RUN [ "make", "ws_install" ]
FROM alpine:edge as runtime
FROM alpine:3.11 as runtime
RUN apk add --no-cache libstdc++
RUN apk add --no-cache strace

View File

@ -1,5 +1,39 @@
# Changelog
All notable changes to this project will be documented in this file.
All changes to this project will be documented in this file.
## [7.8.2] - 2019-12-25
(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry
## [7.8.1] - 2019-12-25
(ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed)
## [7.8.0] - 2019-12-24
(ws client) all commands use spdlog instead of std::cerr or std::cout for logging
## [7.6.5] - 2019-12-24
(cobra client) send a websocket ping every 30s to keep the connection opened
## [7.6.4] - 2019-12-22
(client) error handling, quote url in error case when failing to parse one
(ws) ws_cobra_publish: register callbacks before connecting
(doc) mention mbedtls in supported ssl server backend
## [7.6.3] - 2019-12-20
(tls) add a simple description of the TLS configuration routine for debugging
## [7.6.2] - 2019-12-20
(mbedtls) correct support for using own certificate and private key
## [7.6.1] - 2019-12-20
(ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11
## [7.6.0] - 2019-12-19

View File

@ -247,7 +247,7 @@ Options:
[cobra](https://github.com/machinezone/cobra) is a real time messenging server. ws has several sub-command to interact with cobra. There is also a minimal cobra compatible server named snake available.
Below are examples on running a snake server and clients with TLS enabled (the server only works with the OpenSSL backend for now).
Below are examples on running a snake server and clients with TLS enabled (the server only works with the OpenSSL and the Mbed TLS backend for now).
First, generate certificates.
@ -366,4 +366,4 @@ $ ws cobra_publish --endpoint wss://127.0.0.1:8765 --appkey FC2F10139A2BAc53BB72
[2019-12-19 20:46:42.659] [info] Published message id 3 acked
```
To use OpenSSL on macOS, compile with `make ws_openssl`. First you will have to install OpenSSL libraries, which can be done with Homebrew.
To use OpenSSL on macOS, compile with `make ws_openssl`. First you will have to install OpenSSL libraries, which can be done with Homebrew. Use `make ws_mbedtls` accordingly to use MbedTLS.

View File

@ -24,6 +24,7 @@ namespace ix
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize;
constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId;
constexpr int CobraConnection::kPingIntervalSecs;
CobraConnection::CobraConnection() :
_webSocket(new WebSocket()),
@ -228,6 +229,10 @@ namespace ix
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string());
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
invokeEventCallback(ix::CobraConnection_EventType_Pong);
}
});
}
@ -260,6 +265,7 @@ namespace ix
_webSocket->setUrl(url);
_webSocket->setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
_webSocket->setTLSOptions(socketTLSOptions);
_webSocket->setPingInterval(kPingIntervalSecs);
}
//

View File

@ -30,7 +30,8 @@ namespace ix
CobraConnection_EventType_Closed = 3,
CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6
CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7
};
enum CobraConnectionPublishMode
@ -215,6 +216,9 @@ namespace ix
// Each pdu sent should have an incremental unique id
std::atomic<uint64_t> _id;
// Frequency at which we send a websocket ping to the backing cobra connection
static constexpr int kPingIntervalSecs = 30;
};
} // namespace ix

View File

@ -65,6 +65,10 @@ namespace ix
{
ss << "Published message " << msgId << " acked";
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
ss << "Received websocket pong";
}
ix::IXCoreLogger::Log(ss.str().c_str());
});
@ -98,6 +102,8 @@ namespace ix
{
_channel = channel;
ix::IXCoreLogger::Log(socketTLSOptions.getDescription().c_str());
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate);
_cobra_connection.configure(appkey, endpoint,
rolename, rolesecret,

View File

@ -10,7 +10,6 @@
#include "IXSocketConnect.h"
#include "IXUserAgent.h"
#include <fstream>
#include <iostream>
#include <sstream>
#include <vector>

View File

@ -24,9 +24,47 @@
#include <Security/SecureTransport.h>
namespace
namespace ix
{
OSStatus read_from_socket(SSLConnectionRef connection, void* data, size_t* len)
SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
std::string SocketAppleSSL::getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
OSStatus SocketAppleSSL::readFromSocket(SSLConnectionRef connection, void* data, size_t* len)
{
int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal;
@ -67,7 +105,7 @@ namespace
}
}
OSStatus write_to_socket(SSLConnectionRef connection, const void* data, size_t* len)
OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection, const void* data, size_t* len)
{
int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal;
@ -105,47 +143,6 @@ namespace
}
}
std::string getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
} // anonymous namespace
namespace ix
{
SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
bool SocketAppleSSL::accept(std::string& errMsg)
{
@ -168,7 +165,7 @@ namespace ix
_sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType);
SSLSetIOFuncs(_sslContext, read_from_socket, write_to_socket);
SSLSetIOFuncs(_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket);
SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd);
SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12);
SSLSetPeerDomainName(_sslContext, host.c_str(), host.size());

View File

@ -34,6 +34,10 @@ namespace ix
virtual ssize_t recv(void* buffer, size_t length) final;
private:
static std::string getSSLErrorDescription(OSStatus status);
static OSStatus writeToSocket(SSLConnectionRef connection, const void* data, size_t* len);
static OSStatus readFromSocket(SSLConnectionRef connection, void* data, size_t* len);
SSLContextRef _sslContext;
mutable std::mutex _mutex; // AppleSSL routines are not thread-safe

View File

@ -71,11 +71,16 @@ namespace ix
if (_tlsOptions.hasCertAndKey())
{
if (mbedtls_x509_crt_parse_file(&_cacert, _tlsOptions.certFile.c_str()) < 0)
if (mbedtls_x509_crt_parse_file(&_cert, _tlsOptions.certFile.c_str()) < 0)
{
errMsg = "Cannot parse cert file '" + _tlsOptions.certFile + "'";
return false;
}
if (mbedtls_pk_parse_keyfile(&_pkey, _tlsOptions.keyFile.c_str(), "") < 0)
{
errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'";
return false;
}
}
if (_tlsOptions.isPeerVerifyDisabled())
@ -84,7 +89,7 @@ namespace ix
}
else
{
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL);
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
// FIXME: should we call mbedtls_ssl_conf_verify ?
@ -97,7 +102,13 @@ namespace ix
errMsg = "Cannot parse CA file '" + _tlsOptions.caFile + "'";
return false;
}
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL);
if (_tlsOptions.hasCertAndKey())
{
mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey);
}
}
if (mbedtls_ssl_setup(&_ssl, &_conf) != 0)

View File

@ -45,6 +45,7 @@ namespace ix
mbedtls_ctr_drbg_context _ctr_drbg;
mbedtls_x509_crt _cacert;
mbedtls_x509_crt _cert;
mbedtls_pk_context _pkey;
std::mutex _mutex;
SocketTLSOptions _tlsOptions;

View File

@ -11,7 +11,7 @@
#include "IXSocketConnect.h"
#include "IXSocketFactory.h"
#include <assert.h>
#include <iostream>
#include <stdio.h>
#include <sstream>
#include <string.h>
@ -45,13 +45,13 @@ namespace ix
void SocketServer::logError(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl;
fprintf(stderr, "%s\n", str.c_str());
}
void SocketServer::logInfo(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl;
fprintf(stdout, "%s\n", str.c_str());
}
std::pair<bool, std::string> SocketServer::listen()

View File

@ -8,6 +8,7 @@
#include <assert.h>
#include <fstream>
#include <sstream>
namespace ix
{
@ -71,4 +72,16 @@ namespace ix
{
return _errMsg;
}
std::string SocketTLSOptions::getDescription() const
{
std::stringstream ss;
ss << "TLS Options:" << std::endl;
ss << " certFile = " << certFile << std::endl;
ss << " keyFile = " << keyFile << std::endl;
ss << " caFile = " << caFile << std::endl;
ss << " ciphers = " << ciphers << std::endl;
ss << " ciphers = " << ciphers << std::endl;
return ss.str();
}
} // namespace ix

View File

@ -43,6 +43,8 @@ namespace ix
const std::string& getErrorMsg() const;
std::string getDescription() const;
private:
mutable std::string _errMsg;
mutable bool _validated = false;

View File

@ -144,7 +144,9 @@ namespace ix
if (!UrlParser::parse(url, protocol, host, path, query, port))
{
return WebSocketInitResult(false, 0, std::string("Could not parse URL ") + url);
std::stringstream ss;
ss << "Could not parse url: '" << url << "'";
return WebSocketInitResult(false, 0, ss.str());
}
std::string errorMsg;

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "7.5.8"
#define IX_WEBSOCKET_VERSION "7.8.2"

View File

@ -23,6 +23,8 @@ include_directories(
../ws
)
add_definitions(-DSPDLOG_COMPILED_LIB=1)
find_package(JsonCpp)
if (NOT JSONCPP_FOUND)
include_directories(../third_party/jsoncpp)
@ -98,4 +100,6 @@ target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest spdlog)
install(TARGETS ixwebsocket_unittest DESTINATION bin)

View File

@ -25,6 +25,8 @@ include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise)
add_definitions(-DSPDLOG_COMPILED_LIB=1)
if (UNIX)
set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp)
endif()
@ -72,6 +74,8 @@ target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore)
target_link_libraries(ws ixsentry)
target_link_libraries(ws spdlog)
if(NOT APPLE AND NOT USE_MBED_TLS)
find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS})

View File

@ -11,7 +11,6 @@
#include <cli11/CLI11.hpp>
#include <fstream>
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h>
@ -39,12 +38,13 @@ int main(int argc, char** argv)
// Display command.
if (getenv("DEBUG"))
{
std::cout << "Command: ";
std::stringstream ss;
ss << "Command: ";
for (int i = 0; i < argc; ++i)
{
std::cout << argv[i] << " ";
ss << argv[i] << " ";
}
std::cout << std::endl;
spdlog::info(ss.str());
}
CLI::App app {"ws is a websocket tool"};
@ -105,6 +105,7 @@ int main(int argc, char** argv)
int count = 1;
int jobs = 4;
uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option(
@ -268,6 +269,7 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size", maxQueueSize, "Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
@ -455,6 +457,7 @@ int main(int argc, char** argv)
verbose,
strict,
jobs,
maxQueueSize,
tlsOptions);
}
else if (app.got_subcommand("cobra_metrics_to_redis"))
@ -496,11 +499,11 @@ int main(int argc, char** argv)
}
else if (version)
{
std::cout << "ws " << ix::userAgent() << std::endl;
spdlog::info("ws {}", ix::userAgent());
}
else
{
std::cerr << "A subcommand or --version is required" << std::endl;
spdlog::error("A subcommand or --version is required");
}
ix::uninitNetSystem();

View File

@ -119,6 +119,7 @@ namespace ix
bool verbose,
bool strict,
int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_metrics_to_redis(const std::string& appkey,

View File

@ -32,7 +32,6 @@
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h>
#include <mutex>
@ -91,7 +90,7 @@ namespace ix
{
if (!_quiet)
{
std::cerr << msg;
spdlog::info(msg);
}
}
@ -183,7 +182,7 @@ namespace ix
webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Report generated" << std::endl;
spdlog::info("Report generated");
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
@ -193,7 +192,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
spdlog::info(ss.str());
success = false;
}
@ -236,7 +235,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
spdlog::info(ss.str());
condition.notify_one();
}
@ -269,7 +268,7 @@ namespace ix
int ws_autobahn_main(const std::string& url, bool quiet)
{
int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl;
spdlog::info("Test cases count: {}", testCasesCount);
if (testCasesCount == -1)
{

View File

@ -4,9 +4,10 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <sstream>
#include <spdlog/spdlog.h>
namespace ix
{
@ -14,7 +15,7 @@ namespace ix
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions)
{
std::cout << "Listening on " << hostname << ":" << port << std::endl;
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
@ -25,20 +26,19 @@ namespace ix
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
spdlog::info("New connection");
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
spdlog::info("Closed connection: code {} reason {}",
msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -47,30 +47,29 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
spdlog::info(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
spdlog::info("Received message fragment");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
spdlog::info("Received {} bytes", msg->wireSize);
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str, msg->binary, [](int current, int total) -> bool {
std::cerr << "Step " << current << " out of " << total << std::endl;
spdlog::info("Step {} out of {}", current, total);
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent"
<< std::endl;
spdlog::info("{} bytes left to be sent", bufferedAmount);
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
@ -84,7 +83,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
spdlog::info(res.second);
return 1;
}

View File

@ -9,12 +9,13 @@
// Broadcast server can be ran with `ws broadcast_server`
//
#include "linenoise.hpp"
#include "nlohmann/json.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h>
#include <queue>
#include <sstream>
#include <spdlog/spdlog.h>
// for convenience
using json = nlohmann::json;
@ -55,7 +56,7 @@ namespace ix
void WebSocketChat::log(const std::string& msg)
{
std::cout << msg << std::endl;
spdlog::info(msg);
}
size_t WebSocketChat::getReceivedMessagesCount() const
@ -85,20 +86,21 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws chat: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
ss << "ws chat: user " << _user << " Connected !";
spdlog::info("ws chat: user {} connected !", _user);
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws chat: user " << _user << " disconnected !"
<< " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason;
ss << "ws chat user disconnected: " << _user;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
@ -162,25 +164,25 @@ namespace ix
int ws_chat_main(const std::string& url, const std::string& user)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketChat webSocketChat(url, user);
webSocketChat.start();
while (true)
{
std::string text;
std::cout << user << " > " << std::flush;
std::getline(std::cin, text);
// Read line
std::string line;
auto quit = linenoise::Readline("> ", line);
if (!std::cin)
if (quit)
{
break;
}
webSocketChat.sendMessage(text);
webSocketChat.sendMessage(line);
}
std::cout << std::endl;
spdlog::info("");
webSocketChat.stop();
return 0;

View File

@ -7,7 +7,6 @@
#include <atomic>
#include <chrono>
#include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h>

View File

@ -7,7 +7,6 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <ixsnake/IXRedisClient.h>
#include <mutex>
@ -44,8 +43,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] {
while (true)
{
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds << std::endl;
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);

View File

@ -7,7 +7,6 @@
#include <atomic>
#include <chrono>
#include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h>
#include <mutex>
@ -43,7 +42,6 @@ namespace ix
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.connect();
// Display incoming messages
std::atomic<bool> authenticated(false);
@ -92,8 +90,14 @@ namespace ix
spdlog::info("Published message id {} acked", msgId);
messageAcked = true;
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
conn.connect();
while (!authenticated)
;
while (!messageAcked)

View File

@ -6,7 +6,6 @@
#include <atomic>
#include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -41,8 +40,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] {
while (true)
{
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds << std::endl;
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
@ -77,7 +75,7 @@ namespace ix
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) {
if (!quiet)
{
std::cerr << jsonWriter.write(msg) << std::endl;
spdlog::info(jsonWriter.write(msg));
}
msgPerSeconds++;
@ -100,6 +98,10 @@ namespace ix
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
while (true)

View File

@ -7,7 +7,6 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h>
#include <mutex>
@ -29,6 +28,7 @@ namespace ix
bool verbose,
bool strict,
int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions)
{
ix::CobraConnection conn;
@ -161,7 +161,7 @@ namespace ix
};
// Create a thread pool
std::cerr << "Starting " << jobs << " sentry sender jobs" << std::endl;
spdlog::info("Starting {} sentry sender jobs", jobs);
std::vector<std::thread> pool;
for (int i = 0; i < jobs; i++)
{
@ -177,6 +177,7 @@ namespace ix
&receivedCount,
&condition,
&conditionVariableMutex,
&maxQueueSize,
&queue](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
@ -197,7 +198,7 @@ namespace ix
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
std::cerr << "Subscriber authenticated" << std::endl;
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&jsonWriter,
@ -206,6 +207,7 @@ namespace ix
&receivedCount,
&condition,
&conditionVariableMutex,
&maxQueueSize,
&queue](const Json::Value& msg) {
if (verbose)
{
@ -223,7 +225,12 @@ namespace ix
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (queue.size() < maxQueueSize)
{
queue.push(msg);
}
}
condition.notify_one();
@ -245,6 +252,10 @@ namespace ix
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
while (true)

View File

@ -6,7 +6,6 @@
#include <atomic>
#include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -160,6 +159,10 @@ namespace ix
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
while (true)

View File

@ -5,10 +5,10 @@
*/
#include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -93,7 +93,7 @@ namespace ix
auto key = token.substr(0, pos);
auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl;
spdlog::info("{}: {}", key, val);
headers[key] = val;
}
@ -129,11 +129,11 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws_connect: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
@ -145,7 +145,7 @@ namespace ix
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
spdlog::info("Received {} bytes", msg->wireSize);
ss << "ws_connect: received message: " << msg->str;
log(ss.str());
@ -160,15 +160,15 @@ namespace ix
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
spdlog::info("Received message fragment");
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
std::cerr << "Received ping" << std::endl;
spdlog::info("Received ping");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
std::cerr << "Received pong" << std::endl;
spdlog::info("Received pong");
}
else
{
@ -225,14 +225,14 @@ namespace ix
if (line == "/stop")
{
std::cout << "Stopping connection..." << std::endl;
spdlog::info("Stopping connection...");
webSocketChat.stop();
continue;
}
if (line == "/start")
{
std::cout << "Starting connection..." << std::endl;
spdlog::info("Starting connection...");
webSocketChat.start();
continue;
}
@ -243,7 +243,7 @@ namespace ix
linenoise::AddHistory(line.c_str());
}
std::cout << std::endl;
spdlog::info("");
webSocketChat.stop();
return 0;

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
@ -15,7 +15,7 @@ namespace ix
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions)
{
std::cout << "Listening on " << hostname << ":" << port << std::endl;
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
@ -27,13 +27,13 @@ namespace ix
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
spdlog::info("New connection");
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
@ -43,22 +43,21 @@ namespace ix
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
spdlog::info("Received {} bytes", msg->wireSize);
webSocket->send(msg->str, msg->binary);
}
});
@ -67,7 +66,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
spdlog::error(res.second);
return 1;
}

View File

@ -5,10 +5,10 @@
*/
#include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
@ -47,7 +47,7 @@ namespace ix
auto key = token.substr(0, pos);
auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl;
spdlog::info("{}: {}", key, val);
headers[key] = val;
}
@ -76,7 +76,7 @@ namespace ix
auto key = token.substr(0, pos);
auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl;
spdlog::info("{}: {}", key, val);
httpParameters[key] = val;
}
@ -108,10 +108,9 @@ namespace ix
args->maxRedirects = maxRedirects;
args->verbose = verbose;
args->compress = compress;
args->logger = [](const std::string& msg) { std::cout << msg; };
args->logger = [](const std::string& msg) { spdlog::info(msg); };
args->onProgressCallback = [](int current, int total) -> bool {
std::cerr << "\r"
<< "Downloaded " << current << " bytes out of " << total;
spdlog::info("Downloaded {} bytes out of {}", current, total);
return true;
};
@ -131,20 +130,20 @@ namespace ix
response = httpClient.post(url, httpParameters, args);
}
std::cerr << std::endl;
spdlog::info("");
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response->statusCode << std::endl;
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{
std::cerr << "error message: " << response->errorMsg << std::endl;
spdlog::info("error message: ", response->errorMsg);
}
if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
@ -158,7 +157,7 @@ namespace ix
filename = output;
}
std::cout << "Writing to disk: " << filename << std::endl;
spdlog::info("Writing to disk: {}", filename);
std::ofstream out(filename);
out.write((char*) &response->payload.front(), response->payload.size());
out.close();
@ -167,14 +166,13 @@ namespace ix
{
if (response->headers["Content-Type"] != "application/octet-stream")
{
std::cout << "payload: " << response->payload << std::endl;
spdlog::info("payload: {}", response->payload);
}
else
{
std::cerr << "Binary output can mess up your terminal." << std::endl;
std::cerr << "Use the -O flag to save the file to disk." << std::endl;
std::cerr << "You can also use the --output option to specify a filename."
<< std::endl;
spdlog::info("Binary output can mess up your terminal.");
spdlog::info("Use the -O flag to save the file to disk.");
spdlog::info("You can also use the --output option to specify a filename.");
}
}
}

View File

@ -5,7 +5,6 @@
*/
#include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -32,7 +31,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
spdlog::error(res.second);
return 1;
}

View File

@ -4,11 +4,12 @@
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <iostream>
namespace ix
{
@ -40,7 +41,7 @@ namespace ix
void WebSocketPingPong::log(const std::string& msg)
{
std::cout << msg << std::endl;
spdlog::info(msg);
}
void WebSocketPingPong::stop()
@ -56,18 +57,18 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
spdlog::info("Received {} bytes", msg->wireSize);
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ping_pong: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
@ -127,7 +128,7 @@ namespace ix
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketPingPong webSocketPingPong(url, tlsOptions);
webSocketPingPong.start();

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
@ -44,7 +44,7 @@ namespace ix
const std::string& remoteUrl,
bool verbose)
{
std::cout << "Listening on " << hostname << ":" << port << std::endl;
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
@ -61,41 +61,39 @@ namespace ix
// Server connection
state->webSocket().setOnMessageCallback([webSocket, state, verbose](
const WebSocketMessagePtr& msg) {
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "server id: " << state->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
spdlog::info("New connection to remote server");
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
webSocket->close(msg->closeInfo.code, msg->closeInfo.reason);
spdlog::info("Closed remote server connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->setTerminated();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes from server" << std::endl;
spdlog::info("Received {} bytes from server", msg->wireSize);
if (verbose)
{
std::cerr << "payload " << msg->str << std::endl;
spdlog::info("payload {}", msg->str);
}
webSocket->send(msg->str, msg->binary);
@ -107,53 +105,54 @@ namespace ix
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "client id: " << state->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
spdlog::info("New connection from client");
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{
std::cerr << "waiting for server connection establishment" << std::endl;
spdlog::info("waiting for server connection establishment");
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cerr << "server connection established" << std::endl;
spdlog::info("server connection established");
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
spdlog::info("Closed client connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes from client" << std::endl;
spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose)
{
std::cerr << "payload " << msg->str << std::endl;
spdlog::info("payload {}", msg->str);
}
state->webSocket().send(msg->str, msg->binary);
}
});
@ -162,7 +161,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
spdlog::info(res.second);
return 1;
}

View File

@ -7,7 +7,6 @@
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h>
@ -15,6 +14,7 @@
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex>
#include <sstream>
#include <vector>
@ -75,12 +75,12 @@ namespace ix
void WebSocketReceiver::log(const std::string& msg)
{
std::cout << msg << std::endl;
spdlog::info(msg);
}
void WebSocketReceiver::waitForConnection()
{
std::cout << "ws_receive: Connecting..." << std::endl;
spdlog::info("{}: Connecting...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
@ -88,7 +88,7 @@ namespace ix
void WebSocketReceiver::waitForMessage()
{
std::cout << "ws_receive: Waiting for message..." << std::endl;
spdlog::info("{}: Waiting for message...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
@ -124,7 +124,7 @@ namespace ix
void WebSocketReceiver::handleMessage(const std::string& str)
{
std::cerr << "ws_receive: Received message: " << str.size() << std::endl;
spdlog::info("ws_receive: Received message: {}", str.size());
std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg);
@ -134,17 +134,17 @@ namespace ix
return;
}
std::cout << "id: " << data["id"].string_value() << std::endl;
spdlog::info("id: {}", data["id"].string_value());
std::vector<uint8_t> content = data["content"].binary_items();
std::cout << "ws_receive: Content size: " << content.size() << std::endl;
spdlog::info("ws_receive: Content size: {}", content.size());
// Validate checksum
uint64_t cksum = ix::djb2Hash(content);
auto cksumRef = data["djb2_hash"].string_value();
std::cout << "ws_receive: Computed hash: " << cksum << std::endl;
std::cout << "ws_receive: Reference hash: " << cksumRef << std::endl;
spdlog::info("ws_receive: Computed hash: {}", cksum);
spdlog::info("ws_receive: Reference hash: {}", cksumRef);
if (std::to_string(cksum) != cksumRef)
{
@ -157,12 +157,12 @@ namespace ix
std::string filenameTmp = filename + ".tmp";
std::cout << "ws_receive: Writing to disk: " << filenameTmp << std::endl;
spdlog::info("ws_receive: Writing to disk: {}", filenameTmp);
std::ofstream out(filenameTmp);
out.write((char*) &content.front(), content.size());
out.close();
std::cout << "ws_receive: Renaming " << filenameTmp << " to " << filename << std::endl;
spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename);
rename(filenameTmp.c_str(), filename.c_str());
std::map<MsgPack, MsgPack> pdu;
@ -170,7 +170,7 @@ namespace ix
pdu["id"] = data["id"];
pdu["filename"] = data["filename"];
std::cout << "Sending ack to sender" << std::endl;
spdlog::info("Sending ack to sender");
MsgPack msg(pdu);
_webSocket.sendBinary(msg.dump());
}
@ -192,11 +192,11 @@ namespace ix
_condition.notify_one();
log("ws_receive: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
@ -259,7 +259,7 @@ namespace ix
std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration);
std::cout << "ws_receive: Done !" << std::endl;
spdlog::info("ws_receive: Done !");
webSocketReceiver.stop();
}

View File

@ -4,8 +4,8 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
@ -20,7 +20,7 @@ namespace ix
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
spdlog::info("Cannot connect to redis host");
return 1;
}
@ -30,10 +30,10 @@ namespace ix
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
spdlog::info("Cannot authenticated to redis");
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
spdlog::info("Auth response: {}", authResponse);
}
std::string errMsg;
@ -41,8 +41,7 @@ namespace ix
{
if (!redisClient.publish(channel, message, errMsg))
{
std::cerr << "Error publishing to channel " << channel << "error: " << errMsg
<< std::endl;
spdlog::error("Error publishing to channel {} error {}", channel, errMsg);
return 1;
}
}

View File

@ -4,7 +4,6 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixsnake/IXRedisServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -20,7 +19,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
spdlog::info(res.second);
return 1;
}

View File

@ -6,8 +6,8 @@
#include <atomic>
#include <chrono>
#include <iostream>
#include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
@ -22,7 +22,7 @@ namespace ix
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
spdlog::info("Cannot connect to redis host");
return 1;
}
@ -32,10 +32,10 @@ namespace ix
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
spdlog::info("Cannot authenticated to redis");
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
spdlog::info("Auth response: {}", authResponse);
}
std::atomic<int> msgPerSeconds(0);
@ -44,7 +44,7 @@ namespace ix
auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) {
if (verbose)
{
std::cout << "received: " << message << std::endl;
spdlog::info("recived: {}", message);
}
msgPerSeconds++;
@ -52,14 +52,13 @@ namespace ix
};
auto responseCallback = [](const std::string& redisResponse) {
std::cout << "Redis subscribe response: " << redisResponse << std::endl;
spdlog::info("Redis subscribe response: {}", redisResponse);
};
auto timer = [&msgPerSeconds, &msgCount] {
while (true)
{
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds << std::endl;
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
@ -69,10 +68,10 @@ namespace ix
std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl;
spdlog::info("Subscribing to {} ...", channel);
if (!redisClient.subscribe(channel, responseCallback, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
spdlog::info("Error subscribing to channel {}", channel);
return 1;
}

View File

@ -7,7 +7,6 @@
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h>
@ -15,6 +14,7 @@
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex>
#include <sstream>
#include <vector>
@ -68,12 +68,12 @@ namespace ix
void WebSocketSender::log(const std::string& msg)
{
std::cout << msg << std::endl;
spdlog::info(msg);
}
void WebSocketSender::waitForConnection()
{
std::cout << "ws_send: Connecting..." << std::endl;
spdlog::info("{}: Connecting...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
@ -81,7 +81,7 @@ namespace ix
void WebSocketSender::waitForAck()
{
std::cout << "ws_send: Waiting for ack..." << std::endl;
spdlog::info("{}: Waiting for ack...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
@ -122,11 +122,11 @@ namespace ix
_condition.notify_one();
log("ws_send: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
@ -147,14 +147,14 @@ namespace ix
MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty())
{
std::cerr << "Invalid MsgPack response" << std::endl;
spdlog::info("Invalid MsgPack response");
return;
}
std::string id = data["id"].string_value();
if (_id != id)
{
std::cerr << "Invalid id" << std::endl;
spdlog::info("Invalid id");
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
@ -201,7 +201,7 @@ namespace ix
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
_ms = milliseconds.count();
std::cout << _description << " completed in " << _ms << "ms" << std::endl;
spdlog::info("{} completed in {}", _description, _ms);
_reported = true;
}
@ -240,7 +240,7 @@ namespace ix
Bench bench("Sending file through websocket");
_webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl;
spdlog::info("ws_send: Step {} out of {}", current, total);
if (throttle)
{
@ -254,7 +254,7 @@ namespace ix
do
{
size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << "ws_send: " << bufferedAmount << " bytes left to be sent" << std::endl;
spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount);
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
@ -264,7 +264,7 @@ namespace ix
auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration;
transferRate /= (1024 * 1024);
std::cout << "ws_send: Send transfer rate: " << transferRate << "MB/s" << std::endl;
spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
}
void wsSend(const std::string& url,
@ -278,12 +278,12 @@ namespace ix
webSocketSender.waitForConnection();
std::cout << "ws_send: Sending..." << std::endl;
spdlog::info("ws_send: Sending...");
webSocketSender.sendMessage(path, throttle);
webSocketSender.waitForAck();
std::cout << "ws_send: Done !" << std::endl;
spdlog::info("ws_send: Done !");
webSocketSender.stop();
}

View File

@ -5,8 +5,8 @@
*/
#include <fstream>
#include <iostream>
#include <ixsnake/IXSnakeServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace
@ -58,11 +58,11 @@ namespace ix
auto str = readAsString(appsConfigPath);
if (str.empty())
{
std::cout << "Cannot read content of " << appsConfigPath << std::endl;
spdlog::error("Cannot read content of {}", appsConfigPath);
return 1;
}
std::cout << str << std::endl;
spdlog::error(str);
auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"];

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
@ -14,7 +14,7 @@ namespace ix
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions)
{
std::cout << "ws_transfer: Listening on " << hostname << ":" << port << std::endl;
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
@ -25,22 +25,23 @@ namespace ix
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "ws_transfer: New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
spdlog::info("ws_transfer: New connection");
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: Closed connection, code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
auto remaining = server.getClients().erase(webSocket);
std::cerr << "ws_transfer: " << remaining << " remaining clients " << std::endl;
spdlog::info("ws_transfer: {} remaining clients", remaining);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -49,40 +50,39 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
spdlog::info(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "ws_transfer: Received message fragment " << std::endl;
spdlog::info("ws_transfer: Received message fragment ");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "ws_transfer: Received " << msg->wireSize << " bytes" << std::endl;
spdlog::info("ws_transfer: Received {} bytes", msg->wireSize);
size_t receivers = 0;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
auto readyState = client->getReadyState();
auto id = connectionState->getId();
if (readyState == ReadyState::Open)
{
++receivers;
client->send(msg->str,
msg->binary,
[id = connectionState->getId()](int current,
int total) -> bool {
std::cerr << "ws_transfer: [client " << id
<< "]: Step " << current << " out of "
<< total << std::endl;
[&id](int current, int total) -> bool {
spdlog::info("{}: [client {}]: Step {} out of {}",
"ws_transfer", id, current, total);
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: " << bufferedAmount
<< " bytes left to be sent, " << std::endl;
spdlog::info("{}: [client {}]: {} bytes left to send",
"ws_transfer", id, bufferedAmount);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
@ -96,16 +96,15 @@ namespace ix
? "Connecting"
: readyState == ReadyState::Closing ? "Closing" : "Closed";
size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: has readystate '" << readyStateString << "' and "
<< bufferedAmount << " bytes left to be sent, "
<< std::endl;
spdlog::info("{}: [client {}]: has readystate {} bytes left to be sent",
"ws_transfer", id, readyStateString, bufferedAmount);
}
}
}
if (!receivers)
{
std::cerr << "ws_transfer: no remaining receivers" << std::endl;
spdlog::info("ws_transfer: no remaining receivers");
}
}
});
@ -114,7 +113,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
spdlog::info(res.second);
return 1;
}