Compare commits

...

23 Commits

Author SHA1 Message Date
c4e9abfe80 (ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry 2019-12-25 22:15:57 -08:00
a805270d02 (ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed) 2019-12-25 09:03:57 -08:00
e13b57c73b (ws client) all commands use spdlog instead of std::cerr or std::cout for logging 2019-12-24 21:55:34 -08:00
5be84926ef (cobra client) send a websocket ping every 30s to keep the connection opened 2019-12-24 17:16:41 -08:00
33e7271b85 (tls / apple) minor refactoring, move functions out of the anonymous namespace to become static member functions 2019-12-23 16:30:38 -08:00
d72e5e70f6 socket tls options: display ciphers 2019-12-23 12:25:25 -08:00
e2c5f751bd (doc) fix typo 2019-12-22 20:33:14 -08:00
351b86e266 v7.6.4 2019-12-22 20:32:10 -08:00
d0cbff4f4e (client) error handling, quote url in error case when failing to parse on 2019-12-22 20:30:29 -08:00
cbfc9b9f94 (ws) ws_cobra_publish: register callbacks before connecting 2019-12-22 20:29:37 -08:00
ca816d801f (doc) mention mbedtls in supported ssl server backend 2019-12-22 20:28:44 -08:00
2f354d31eb update gitignore file 2019-12-20 15:21:36 -08:00
2c6c1edd37 (tls) add a simple description of the TLS configuration routine for debugging 2019-12-20 15:18:04 -08:00
9799e7e84b (mbedtls) correct support for using own certificate and private key 2019-12-20 15:13:26 -08:00
81be970679 (ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11 2019-12-20 09:51:21 -08:00
52221906f6 (cobra) Add TLS options to all cobra commands and classes. Add example to the doc. 2019-12-19 20:49:28 -08:00
3e786fe23a formatting 2019-12-19 19:13:55 -08:00
de24aac7d5 (cobra-to-sentry) capture application version from device field 2019-12-18 15:41:59 -08:00
cd4b0ccf6f IXSentryClient: remove duplicated line 2019-12-18 15:29:53 -08:00
4e1888ac19 (tls) Experimental TLS server support with mbedtls (windows) + process cert tlsoption (client + server) 2019-12-18 11:51:02 -08:00
237ede56aa (tls servers) Make it clear that apple ssl and mbedtls backends do not support SSL in server mode 2019-12-18 10:43:05 -08:00
ba3b1c1a0f (tls options client) TLSOptions struct _validated member should be initialized to false 2019-12-17 14:10:28 -08:00
c60c606e0f (websocket client) improve the error message when connecting to a non websocket server 2019-12-16 17:57:43 -08:00
53 changed files with 807 additions and 380 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
build build
*.pyc *.pyc
venv venv
ixsnake/ixsnake/.certs/
site/
ws/.certs/
ws/.srl

View File

@ -1,4 +1,4 @@
FROM alpine:edge as build FROM alpine:3.11 as build
RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev
RUN apk add --no-cache make RUN apk add --no-cache make
@ -16,7 +16,7 @@ WORKDIR /opt
USER app USER app
RUN [ "make", "ws_install" ] RUN [ "make", "ws_install" ]
FROM alpine:edge as runtime FROM alpine:3.11 as runtime
RUN apk add --no-cache libstdc++ RUN apk add --no-cache libstdc++
RUN apk add --no-cache strace RUN apk add --no-cache strace

View File

@ -1,5 +1,75 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [7.8.2] - 2019-12-25
(ws cobra to sentry) bound the queue size used to hold up cobra messages before they are sent to sentry. Default queue size is a 100 messages. Without such limit the program runs out of memory when a subscriber receive a lot of messages that cannot make it to sentry
## [7.8.1] - 2019-12-25
(ws client) use correct compilation defines so that spdlog is not used as a header only library (reduce binary size and increase compilation speed)
## [7.8.0] - 2019-12-24
(ws client) all commands use spdlog instead of std::cerr or std::cout for logging
## [7.6.5] - 2019-12-24
(cobra client) send a websocket ping every 30s to keep the connection opened
## [7.6.4] - 2019-12-22
(client) error handling, quote url in error case when failing to parse one
(ws) ws_cobra_publish: register callbacks before connecting
(doc) mention mbedtls in supported ssl server backend
## [7.6.3] - 2019-12-20
(tls) add a simple description of the TLS configuration routine for debugging
## [7.6.2] - 2019-12-20
(mbedtls) correct support for using own certificate and private key
## [7.6.1] - 2019-12-20
(ws commands) in websocket proxy, disable automatic reconnections + in Dockerfile, use alpine 3.11
## [7.6.0] - 2019-12-19
(cobra) Add TLS options to all cobra commands and classes. Add example to the doc.
## [7.5.8] - 2019-12-18
(cobra-to-sentry) capture application version from device field
## [7.5.7] - 2019-12-18
(tls) Experimental TLS server support with mbedtls (windows) + process cert tlsoption (client + server)
## [7.5.6] - 2019-12-18
(tls servers) Make it clear that apple ssl and mbedtls backends do not support SSL in server mode
## [7.5.5] - 2019-12-17
(tls options client) TLSOptions struct _validated member should be initialized to false
## [7.5.4] - 2019-12-16
(websocket client) improve the error message when connecting to a non websocket server
Before:
```
Connection error: Got bad status connecting to example.com:443, status: 200, HTTP Status line: HTTP/1.1 200 OK
```
After:
```
Connection error: Expecting status 101 (Switching Protocol), got 200 status connecting to example.com:443, HTTP Status line: HTTP/1.1 200 OK
```
## [7.5.3] - 2019-12-12 ## [7.5.3] - 2019-12-12

View File

@ -243,6 +243,127 @@ Options:
--transfer-timeout INT Transfer timeout --transfer-timeout INT Transfer timeout
``` ```
## Cobra Client ## Cobra client and server
[cobra](https://github.com/machinezone/cobra) is a real time messenging server. ws has a sub-command to interact with cobra. [cobra](https://github.com/machinezone/cobra) is a real time messenging server. ws has several sub-command to interact with cobra. There is also a minimal cobra compatible server named snake available.
Below are examples on running a snake server and clients with TLS enabled (the server only works with the OpenSSL and the Mbed TLS backend for now).
First, generate certificates.
```
$ cd /path/to/IXWebSocket
$ cd ixsnake/ixsnake
$ bash ../../ws/generate_certs.sh
Generating RSA private key, 2048 bit long modulus
.....+++
.................+++
e is 65537 (0x10001)
generated ./.certs/trusted-ca-key.pem
generated ./.certs/trusted-ca-crt.pem
Generating RSA private key, 2048 bit long modulus
..+++
.......................................+++
e is 65537 (0x10001)
generated ./.certs/trusted-server-key.pem
Signature ok
subject=/O=machinezone/O=IXWebSocket/CN=trusted-server
Getting CA Private Key
generated ./.certs/trusted-server-crt.pem
Generating RSA private key, 2048 bit long modulus
...................................+++
..................................................+++
e is 65537 (0x10001)
generated ./.certs/trusted-client-key.pem
Signature ok
subject=/O=machinezone/O=IXWebSocket/CN=trusted-client
Getting CA Private Key
generated ./.certs/trusted-client-crt.pem
Generating RSA private key, 2048 bit long modulus
..............+++
.......................................+++
e is 65537 (0x10001)
generated ./.certs/untrusted-ca-key.pem
generated ./.certs/untrusted-ca-crt.pem
Generating RSA private key, 2048 bit long modulus
..........+++
................................................+++
e is 65537 (0x10001)
generated ./.certs/untrusted-client-key.pem
Signature ok
subject=/O=machinezone/O=IXWebSocket/CN=untrusted-client
Getting CA Private Key
generated ./.certs/untrusted-client-crt.pem
Generating RSA private key, 2048 bit long modulus
.....................................................................................+++
...........+++
e is 65537 (0x10001)
generated ./.certs/selfsigned-client-key.pem
Signature ok
subject=/O=machinezone/O=IXWebSocket/CN=selfsigned-client
Getting Private key
generated ./.certs/selfsigned-client-crt.pem
```
Now run the snake server.
```
$ export certs=.certs
$ ws snake --tls --port 8765 --cert-file ${certs}/trusted-server-crt.pem --key-file ${certs}/trusted-server-key.pem --ca-file ${certs}/trusted-ca-crt.pem
{
"apps": {
"FC2F10139A2BAc53BB72D9db967b024f": {
"roles": {
"_sub": {
"secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"
},
"_pub": {
"secret": "1c04DB8fFe76A4EeFE3E318C72d771db"
}
}
}
}
}
redis host: 127.0.0.1
redis password:
redis port: 6379
```
As a new connection comes in, such output should be printed
```
[2019-12-19 20:27:19.724] [info] New connection
id: 0
Uri: /v2?appkey=_health
Headers:
Connection: Upgrade
Host: 127.0.0.1:8765
Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
Sec-WebSocket-Key: d747B0fE61Db73f7Eh47c0==
Sec-WebSocket-Protocol: json
Sec-WebSocket-Version: 13
Upgrade: websocket
User-Agent: ixwebsocket/7.5.8 macos ssl/OpenSSL OpenSSL 1.0.2q 20 Nov 2018 zlib 1.2.11
```
To connect and publish a message, do:
```
$ export certs=.certs
$ cd /path/to/ws/folder
$ ls cobraMetricsSample.json
cobraMetricsSample.json
$ ws cobra_publish --endpoint wss://127.0.0.1:8765 --appkey FC2F10139A2BAc53BB72D9db967b024f --rolename _pub --rolesecret 1c04DB8fFe76A4EeFE3E318C72d771db --channel foo --cert-file ${certs}/trusted-client-crt.pem --key-file ${certs}/trusted-client-key.pem --ca-file ${certs}/trusted-ca-crt.pem cobraMetricsSample.json
[2019-12-19 20:46:42.656] [info] Publisher connected
[2019-12-19 20:46:42.657] [info] Connection: Upgrade
[2019-12-19 20:46:42.657] [info] Sec-WebSocket-Accept: rs99IFThoBrhSg+k8G4ixH9yaq4=
[2019-12-19 20:46:42.657] [info] Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
[2019-12-19 20:46:42.657] [info] Server: ixwebsocket/7.5.8 macos ssl/OpenSSL OpenSSL 1.0.2q 20 Nov 2018 zlib 1.2.11
[2019-12-19 20:46:42.657] [info] Upgrade: websocket
[2019-12-19 20:46:42.658] [info] Publisher authenticated
[2019-12-19 20:46:42.658] [info] Published msg 3
[2019-12-19 20:46:42.659] [info] Published message id 3 acked
```
To use OpenSSL on macOS, compile with `make ws_openssl`. First you will have to install OpenSSL libraries, which can be done with Homebrew. Use `make ws_mbedtls` accordingly to use MbedTLS.

View File

@ -7,6 +7,7 @@
#include "IXCobraConnection.h" #include "IXCobraConnection.h"
#include <ixcrypto/IXHMac.h> #include <ixcrypto/IXHMac.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <algorithm> #include <algorithm>
#include <stdexcept> #include <stdexcept>
@ -23,6 +24,7 @@ namespace ix
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr; PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize; constexpr size_t CobraConnection::kQueueMaxSize;
constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId; constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId;
constexpr int CobraConnection::kPingIntervalSecs;
CobraConnection::CobraConnection() : CobraConnection::CobraConnection() :
_webSocket(new WebSocket()), _webSocket(new WebSocket()),
@ -227,6 +229,10 @@ namespace ix
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string()); invokeErrorCallback(ss.str(), std::string());
} }
else if (msg->type == ix::WebSocketMessageType::Pong)
{
invokeEventCallback(ix::CobraConnection_EventType_Pong);
}
}); });
} }
@ -244,7 +250,8 @@ namespace ix
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions) const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions)
{ {
_roleName = rolename; _roleName = rolename;
_roleSecret = rolesecret; _roleSecret = rolesecret;
@ -257,6 +264,8 @@ namespace ix
std::string url = ss.str(); std::string url = ss.str();
_webSocket->setUrl(url); _webSocket->setUrl(url);
_webSocket->setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); _webSocket->setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
_webSocket->setTLSOptions(socketTLSOptions);
_webSocket->setPingInterval(kPingIntervalSecs);
} }
// //

View File

@ -20,6 +20,7 @@
namespace ix namespace ix
{ {
class WebSocket; class WebSocket;
struct SocketTLSOptions;
enum CobraConnectionEventType enum CobraConnectionEventType
{ {
@ -29,7 +30,8 @@ namespace ix
CobraConnection_EventType_Closed = 3, CobraConnection_EventType_Closed = 3,
CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5, CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6 CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7
}; };
enum CobraConnectionPublishMode enum CobraConnectionPublishMode
@ -62,7 +64,8 @@ namespace ix
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions); const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions);
/// Set the traffic tracker callback /// Set the traffic tracker callback
static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);
@ -213,6 +216,9 @@ namespace ix
// Each pdu sent should have an incremental unique id // Each pdu sent should have an incremental unique id
std::atomic<uint64_t> _id; std::atomic<uint64_t> _id;
// Frequency at which we send a websocket ping to the backing cobra connection
static constexpr int kPingIntervalSecs = 30;
}; };
} // namespace ix } // namespace ix

View File

@ -5,6 +5,7 @@
*/ */
#include "IXCobraMetricsPublisher.h" #include "IXCobraMetricsPublisher.h"
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <algorithm> #include <algorithm>
#include <stdexcept> #include <stdexcept>
@ -31,14 +32,15 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
bool enablePerMessageDeflate) bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions)
{ {
// Configure the satori connection and start its publish background thread // Configure the satori connection and start its publish background thread
_cobra_metrics_theaded_publisher.start(); _cobra_metrics_theaded_publisher.start();
_cobra_metrics_theaded_publisher.configure(appkey, endpoint, channel, _cobra_metrics_theaded_publisher.configure(appkey, endpoint, channel,
rolename, rolesecret, rolename, rolesecret,
enablePerMessageDeflate); enablePerMessageDeflate, socketTLSOptions);
} }
Json::Value& CobraMetricsPublisher::getGenericAttributes() Json::Value& CobraMetricsPublisher::getGenericAttributes()

View File

@ -15,6 +15,8 @@
namespace ix namespace ix
{ {
struct SocketTLSOptions;
class CobraMetricsPublisher class CobraMetricsPublisher
{ {
public: public:
@ -43,7 +45,8 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
bool enablePerMessageDeflate); bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions);
/// Setter for the list of blacklisted metrics ids. /// Setter for the list of blacklisted metrics ids.
/// That list is sorted internally for fast lookups /// That list is sorted internally for fast lookups

View File

@ -6,6 +6,7 @@
#include "IXCobraMetricsThreadedPublisher.h" #include "IXCobraMetricsThreadedPublisher.h"
#include <ixwebsocket/IXSetThreadName.h> #include <ixwebsocket/IXSetThreadName.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixcore/utils/IXCoreLogger.h> #include <ixcore/utils/IXCoreLogger.h>
#include <algorithm> #include <algorithm>
@ -64,6 +65,10 @@ namespace ix
{ {
ss << "Published message " << msgId << " acked"; ss << "Published message " << msgId << " acked";
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
ss << "Received websocket pong";
}
ix::IXCoreLogger::Log(ss.str().c_str()); ix::IXCoreLogger::Log(ss.str().c_str());
}); });
@ -92,14 +97,17 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
bool enablePerMessageDeflate) bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions)
{ {
_channel = channel; _channel = channel;
ix::IXCoreLogger::Log(socketTLSOptions.getDescription().c_str());
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate); ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate);
_cobra_connection.configure(appkey, endpoint, _cobra_connection.configure(appkey, endpoint,
rolename, rolesecret, rolename, rolesecret,
webSocketPerMessageDeflateOptions); webSocketPerMessageDeflateOptions, socketTLSOptions);
} }
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind) void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind)

View File

@ -18,6 +18,8 @@
namespace ix namespace ix
{ {
struct SocketTLSOptions;
class CobraMetricsThreadedPublisher class CobraMetricsThreadedPublisher
{ {
public: public:
@ -30,7 +32,8 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
bool enablePerMessageDeflate); bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions);
/// Start the worker thread, used for background publishing /// Start the worker thread, used for background publishing
void start(); void start();

View File

@ -182,7 +182,6 @@ namespace ix
Json::Value extra; Json::Value extra;
extra["cobra_event"] = msg; extra["cobra_event"] = msg;
extra["cobra_event"] = msg;
// Builtin tags // Builtin tags
Json::Value gameTag; Json::Value gameTag;
@ -200,6 +199,11 @@ namespace ix
environmentTag.append(msg["device"]["environment"]); environmentTag.append(msg["device"]["environment"]);
tags.append(environmentTag); tags.append(environmentTag);
Json::Value clientVersionTag;
clientVersionTag.append("client_version");
clientVersionTag.append(msg["device"]["app_version"]);
tags.append(clientVersionTag);
payload["tags"] = tags; payload["tags"] = tags;
return _jsonWriter.write(payload); return _jsonWriter.write(payload);

View File

@ -9,6 +9,7 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <string> #include <string>
#include <vector> #include <vector>
#include <ixwebsocket/IXSocketTLSOptions.h>
namespace snake namespace snake
{ {
@ -26,6 +27,9 @@ namespace snake
// AppKeys // AppKeys
nlohmann::json apps; nlohmann::json apps;
// TLS options
ix::SocketTLSOptions socketTLSOptions;
// Misc // Misc
bool verbose; bool verbose;
}; };

View File

@ -20,7 +20,7 @@ namespace snake
: _appConfig(appConfig) : _appConfig(appConfig)
, _server(appConfig.port, appConfig.hostname) , _server(appConfig.port, appConfig.hostname)
{ {
; _server.setTLSOptions(appConfig.socketTLSOptions);
} }
// //

View File

@ -10,7 +10,6 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXUserAgent.h" #include "IXUserAgent.h"
#include <fstream> #include <fstream>
#include <iostream>
#include <sstream> #include <sstream>
#include <vector> #include <vector>

View File

@ -24,9 +24,47 @@
#include <Security/SecureTransport.h> #include <Security/SecureTransport.h>
namespace namespace ix
{ {
OSStatus read_from_socket(SSLConnectionRef connection, void* data, size_t* len) SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
std::string SocketAppleSSL::getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
OSStatus SocketAppleSSL::readFromSocket(SSLConnectionRef connection, void* data, size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@ -67,7 +105,7 @@ namespace
} }
} }
OSStatus write_to_socket(SSLConnectionRef connection, const void* data, size_t* len) OSStatus SocketAppleSSL::writeToSocket(SSLConnectionRef connection, const void* data, size_t* len)
{ {
int fd = (int) (long) connection; int fd = (int) (long) connection;
if (fd < 0) return errSSLInternal; if (fd < 0) return errSSLInternal;
@ -105,47 +143,11 @@ namespace
} }
} }
std::string getSSLErrorDescription(OSStatus status)
bool SocketAppleSSL::accept(std::string& errMsg)
{ {
std::string errMsg("Unknown SSL error."); errMsg = "TLS not supported yet in server mode with apple ssl backend";
return false;
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success =
CFStringGetCString(message, localBuffer, 128, kCFStringEncodingUTF8);
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
} // anonymous namespace
namespace ix
{
SocketAppleSSL::SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd)
: Socket(fd)
, _sslContext(nullptr)
, _tlsOptions(tlsOptions)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
} }
// No wait support // No wait support
@ -163,7 +165,7 @@ namespace ix
_sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType); _sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType);
SSLSetIOFuncs(_sslContext, read_from_socket, write_to_socket); SSLSetIOFuncs(_sslContext, SocketAppleSSL::readFromSocket, SocketAppleSSL::writeToSocket);
SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd); SSLSetConnection(_sslContext, (SSLConnectionRef)(long) _sockfd);
SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12); SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12);
SSLSetPeerDomainName(_sslContext, host.c_str(), host.size()); SSLSetPeerDomainName(_sslContext, host.c_str(), host.size());

View File

@ -21,6 +21,8 @@ namespace ix
SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd = -1); SocketAppleSSL(const SocketTLSOptions& tlsOptions, int fd = -1);
~SocketAppleSSL(); ~SocketAppleSSL();
virtual bool accept(std::string& errMsg) final;
virtual bool connect(const std::string& host, virtual bool connect(const std::string& host,
int port, int port,
std::string& errMsg, std::string& errMsg,
@ -32,6 +34,10 @@ namespace ix
virtual ssize_t recv(void* buffer, size_t length) final; virtual ssize_t recv(void* buffer, size_t length) final;
private: private:
static std::string getSSLErrorDescription(OSStatus status);
static OSStatus writeToSocket(SSLConnectionRef connection, const void* data, size_t* len);
static OSStatus readFromSocket(SSLConnectionRef connection, void* data, size_t* len);
SSLContextRef _sslContext; SSLContextRef _sslContext;
mutable std::mutex _mutex; // AppleSSL routines are not thread-safe mutable std::mutex _mutex; // AppleSSL routines are not thread-safe

View File

@ -38,9 +38,10 @@ namespace ix
mbedtls_ctr_drbg_init(&_ctr_drbg); mbedtls_ctr_drbg_init(&_ctr_drbg);
mbedtls_entropy_init(&_entropy); mbedtls_entropy_init(&_entropy);
mbedtls_x509_crt_init(&_cacert); mbedtls_x509_crt_init(&_cacert);
mbedtls_x509_crt_init(&_cert);
} }
bool SocketMbedTLS::init(const std::string& host, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, bool isClient, std::string& errMsg)
{ {
initMBedTLS(); initMBedTLS();
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
@ -58,7 +59,7 @@ namespace ix
} }
if (mbedtls_ssl_config_defaults(&_conf, if (mbedtls_ssl_config_defaults(&_conf,
MBEDTLS_SSL_IS_CLIENT, (isClient) ? MBEDTLS_SSL_IS_CLIENT : MBEDTLS_SSL_IS_SERVER,
MBEDTLS_SSL_TRANSPORT_STREAM, MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT) != 0) MBEDTLS_SSL_PRESET_DEFAULT) != 0)
{ {
@ -68,13 +69,27 @@ namespace ix
mbedtls_ssl_conf_rng(&_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); mbedtls_ssl_conf_rng(&_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
if (_tlsOptions.hasCertAndKey())
{
if (mbedtls_x509_crt_parse_file(&_cert, _tlsOptions.certFile.c_str()) < 0)
{
errMsg = "Cannot parse cert file '" + _tlsOptions.certFile + "'";
return false;
}
if (mbedtls_pk_parse_keyfile(&_pkey, _tlsOptions.keyFile.c_str(), "") < 0)
{
errMsg = "Cannot parse key file '" + _tlsOptions.keyFile + "'";
return false;
}
}
if (_tlsOptions.isPeerVerifyDisabled()) if (_tlsOptions.isPeerVerifyDisabled())
{ {
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_NONE); mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_NONE);
} }
else else
{ {
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL); mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
// FIXME: should we call mbedtls_ssl_conf_verify ? // FIXME: should we call mbedtls_ssl_conf_verify ?
@ -87,7 +102,13 @@ namespace ix
errMsg = "Cannot parse CA file '" + _tlsOptions.caFile + "'"; errMsg = "Cannot parse CA file '" + _tlsOptions.caFile + "'";
return false; return false;
} }
mbedtls_ssl_conf_authmode(&_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
mbedtls_ssl_conf_ca_chain(&_conf, &_cacert, NULL);
if (_tlsOptions.hasCertAndKey())
{
mbedtls_ssl_conf_own_cert(&_conf, &_cert, &_pkey);
}
} }
if (mbedtls_ssl_setup(&_ssl, &_conf) != 0) if (mbedtls_ssl_setup(&_ssl, &_conf) != 0)
@ -96,7 +117,7 @@ namespace ix
return false; return false;
} }
if (mbedtls_ssl_set_hostname(&_ssl, host.c_str()) != 0) if (!host.empty() && mbedtls_ssl_set_hostname(&_ssl, host.c_str()) != 0)
{ {
errMsg = "SNI setup failed"; errMsg = "SNI setup failed";
return false; return false;
@ -105,6 +126,50 @@ namespace ix
return true; return true;
} }
bool SocketMbedTLS::accept(std::string& errMsg)
{
bool isClient = false;
bool initialized = init(std::string(), isClient, errMsg);
if (!initialized)
{
close();
return false;
}
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res;
do
{
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl);
} while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
if (res != 0)
{
char buf[256];
mbedtls_strerror(res, buf, sizeof(buf));
errMsg = "error in handshake : ";
errMsg += buf;
if (res == MBEDTLS_ERR_X509_CERT_VERIFY_FAILED)
{
char verifyBuf[512];
uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
mbedtls_x509_crt_verify_info(verifyBuf, sizeof(verifyBuf), " ! ", flags);
errMsg += " : ";
errMsg += verifyBuf;
}
close();
return false;
}
return true;
}
bool SocketMbedTLS::connect(const std::string& host, bool SocketMbedTLS::connect(const std::string& host,
int port, int port,
std::string& errMsg, std::string& errMsg,
@ -116,7 +181,8 @@ namespace ix
if (_sockfd == -1) return false; if (_sockfd == -1) return false;
} }
bool initialized = init(host, errMsg); bool isClient = true;
bool initialized = init(host, isClient, errMsg);
if (!initialized) if (!initialized)
{ {
close(); close();
@ -156,6 +222,7 @@ namespace ix
mbedtls_ctr_drbg_free(&_ctr_drbg); mbedtls_ctr_drbg_free(&_ctr_drbg);
mbedtls_entropy_free(&_entropy); mbedtls_entropy_free(&_entropy);
mbedtls_x509_crt_free(&_cacert); mbedtls_x509_crt_free(&_cacert);
mbedtls_x509_crt_free(&_cert);
Socket::close(); Socket::close();
} }

View File

@ -26,6 +26,8 @@ namespace ix
SocketMbedTLS(const SocketTLSOptions& tlsOptions, int fd = -1); SocketMbedTLS(const SocketTLSOptions& tlsOptions, int fd = -1);
~SocketMbedTLS(); ~SocketMbedTLS();
virtual bool accept(std::string& errMsg) final;
virtual bool connect(const std::string& host, virtual bool connect(const std::string& host,
int port, int port,
std::string& errMsg, std::string& errMsg,
@ -42,11 +44,13 @@ namespace ix
mbedtls_entropy_context _entropy; mbedtls_entropy_context _entropy;
mbedtls_ctr_drbg_context _ctr_drbg; mbedtls_ctr_drbg_context _ctr_drbg;
mbedtls_x509_crt _cacert; mbedtls_x509_crt _cacert;
mbedtls_x509_crt _cert;
mbedtls_pk_context _pkey;
std::mutex _mutex; std::mutex _mutex;
SocketTLSOptions _tlsOptions; SocketTLSOptions _tlsOptions;
bool init(const std::string& host, std::string& errMsg); bool init(const std::string& host, bool isClient, std::string& errMsg);
void initMBedTLS(); void initMBedTLS();
}; };

View File

@ -11,7 +11,7 @@
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#include <assert.h> #include <assert.h>
#include <iostream> #include <stdio.h>
#include <sstream> #include <sstream>
#include <string.h> #include <string.h>
@ -45,13 +45,13 @@ namespace ix
void SocketServer::logError(const std::string& str) void SocketServer::logError(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl; fprintf(stderr, "%s\n", str.c_str());
} }
void SocketServer::logInfo(const std::string& str) void SocketServer::logInfo(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl; fprintf(stdout, "%s\n", str.c_str());
} }
std::pair<bool, std::string> SocketServer::listen() std::pair<bool, std::string> SocketServer::listen()

View File

@ -8,6 +8,7 @@
#include <assert.h> #include <assert.h>
#include <fstream> #include <fstream>
#include <sstream>
namespace ix namespace ix
{ {
@ -71,4 +72,16 @@ namespace ix
{ {
return _errMsg; return _errMsg;
} }
std::string SocketTLSOptions::getDescription() const
{
std::stringstream ss;
ss << "TLS Options:" << std::endl;
ss << " certFile = " << certFile << std::endl;
ss << " keyFile = " << keyFile << std::endl;
ss << " caFile = " << caFile << std::endl;
ss << " ciphers = " << ciphers << std::endl;
ss << " ciphers = " << ciphers << std::endl;
return ss.str();
}
} // namespace ix } // namespace ix

View File

@ -43,8 +43,10 @@ namespace ix
const std::string& getErrorMsg() const; const std::string& getErrorMsg() const;
std::string getDescription() const;
private: private:
mutable std::string _errMsg; mutable std::string _errMsg;
mutable bool _validated; mutable bool _validated = false;
}; };
} // namespace ix } // namespace ix

View File

@ -178,8 +178,8 @@ namespace ix
if (status != 101) if (status != 101)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Got bad status connecting to " << host << ":" << port << ", status: " << status ss << "Expecting status 101 (Switching Protocol), got " << status
<< ", HTTP Status line: " << line; << " status connecting to " << host << ":" << port << ", HTTP Status line: " << line;
return WebSocketInitResult(false, status, ss.str()); return WebSocketInitResult(false, status, ss.str());
} }

View File

@ -144,7 +144,9 @@ namespace ix
if (!UrlParser::parse(url, protocol, host, path, query, port)) if (!UrlParser::parse(url, protocol, host, path, query, port))
{ {
return WebSocketInitResult(false, 0, std::string("Could not parse URL ") + url); std::stringstream ss;
ss << "Could not parse url: '" << url << "'";
return WebSocketInitResult(false, 0, ss.str());
} }
std::string errorMsg; std::string errorMsg;

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.5.3" #define IX_WEBSOCKET_VERSION "7.8.2"

View File

@ -23,6 +23,8 @@ include_directories(
../ws ../ws
) )
add_definitions(-DSPDLOG_COMPILED_LIB=1)
find_package(JsonCpp) find_package(JsonCpp)
if (NOT JSONCPP_FOUND) if (NOT JSONCPP_FOUND)
include_directories(../third_party/jsoncpp) include_directories(../third_party/jsoncpp)
@ -98,4 +100,6 @@ target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore) target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry) target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest spdlog)
install(TARGETS ixwebsocket_unittest DESTINATION bin) install(TARGETS ixwebsocket_unittest DESTINATION bin)

View File

@ -34,10 +34,10 @@ namespace
}); });
} }
class SatoriChat class CobraChat
{ {
public: public:
SatoriChat(const std::string& user, CobraChat(const std::string& user,
const std::string& session, const std::string& session,
const std::string& endpoint); const std::string& endpoint);
@ -72,9 +72,9 @@ namespace
std::mutex _logMutex; std::mutex _logMutex;
}; };
SatoriChat::SatoriChat(const std::string& user, CobraChat::CobraChat(const std::string& user,
const std::string& session, const std::string& session,
const std::string& endpoint) const std::string& endpoint)
: _user(user) : _user(user)
, _session(session) , _session(session)
, _endpoint(endpoint) , _endpoint(endpoint)
@ -83,34 +83,34 @@ namespace
{ {
} }
void SatoriChat::start() void CobraChat::start()
{ {
_thread = std::thread(&SatoriChat::run, this); _thread = std::thread(&CobraChat::run, this);
} }
void SatoriChat::stop() void CobraChat::stop()
{ {
_stop = true; _stop = true;
_thread.join(); _thread.join();
} }
bool SatoriChat::isReady() const bool CobraChat::isReady() const
{ {
return _connectedAndSubscribed; return _connectedAndSubscribed;
} }
size_t SatoriChat::getReceivedMessagesCount() const size_t CobraChat::getReceivedMessagesCount() const
{ {
return _receivedQueue.size(); return _receivedQueue.size();
} }
bool SatoriChat::hasPendingMessages() const bool CobraChat::hasPendingMessages() const
{ {
std::unique_lock<std::mutex> lock(_queue_mutex); std::unique_lock<std::mutex> lock(_queue_mutex);
return !_publish_queue.empty(); return !_publish_queue.empty();
} }
Json::Value SatoriChat::popMessage() Json::Value CobraChat::popMessage()
{ {
std::unique_lock<std::mutex> lock(_queue_mutex); std::unique_lock<std::mutex> lock(_queue_mutex);
auto msg = _publish_queue.front(); auto msg = _publish_queue.front();
@ -121,7 +121,7 @@ namespace
// //
// Callback to handle received messages, that are printed on the console // Callback to handle received messages, that are printed on the console
// //
void SatoriChat::subscribe(const std::string& channel) void CobraChat::subscribe(const std::string& channel)
{ {
std::string filter; std::string filter;
_conn.subscribe(channel, filter, [this](const Json::Value& msg) { _conn.subscribe(channel, filter, [this](const Json::Value& msg) {
@ -151,7 +151,7 @@ namespace
}); });
} }
void SatoriChat::sendMessage(const std::string& text) void CobraChat::sendMessage(const std::string& text)
{ {
Json::Value msg; Json::Value msg;
msg["user"] = _user; msg["user"] = _user;
@ -166,16 +166,21 @@ namespace
// Do satori communication on a background thread, where we can have // Do satori communication on a background thread, where we can have
// something like an event loop that publish, poll and receive data // something like an event loop that publish, poll and receive data
// //
void SatoriChat::run() void CobraChat::run()
{ {
// "chat" conf // "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f"); std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string channel = _session; std::string channel = _session;
std::string role = "_sub"; std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"; std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
SocketTLSOptions socketTLSOptions;
_conn.configure( _conn.configure(appkey,
appkey, _endpoint, role, secret, ix::WebSocketPerMessageDeflateOptions(true)); _endpoint,
role,
secret,
ix::WebSocketPerMessageDeflateOptions(true),
socketTLSOptions);
_conn.connect(); _conn.connect();
_conn.setEventCallback([this, channel](ix::CobraConnectionEventType eventType, _conn.setEventCallback([this, channel](ix::CobraConnectionEventType eventType,
@ -280,8 +285,8 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
ss << "ws://localhost:" << port; ss << "ws://localhost:" << port;
std::string endpoint = ss.str(); std::string endpoint = ss.str();
SatoriChat chatA("jean", session, endpoint); CobraChat chatA("jean", session, endpoint);
SatoriChat chatB("paul", session, endpoint); CobraChat chatB("paul", session, endpoint);
chatA.start(); chatA.start();
chatB.start(); chatB.start();

View File

@ -62,11 +62,14 @@ namespace
gMessageCount = 0; gMessageCount = 0;
ix::CobraConnection conn; ix::CobraConnection conn;
SocketTLSOptions socketTLSOptions;
conn.configure(APPKEY, conn.configure(APPKEY,
endpoint, endpoint,
SUBSCRIBER_ROLE, SUBSCRIBER_ROLE,
SUBSCRIBER_SECRET, SUBSCRIBER_SECRET,
ix::WebSocketPerMessageDeflateOptions(true)); ix::WebSocketPerMessageDeflateOptions(true),
socketTLSOptions);
conn.connect(); conn.connect();
conn.setEventCallback([&conn](ix::CobraConnectionEventType eventType, conn.setEventCallback([&conn](ix::CobraConnectionEventType eventType,
@ -202,9 +205,15 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
ix::CobraMetricsPublisher cobraMetricsPublisher; ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true; bool perMessageDeflate = true;
cobraMetricsPublisher.configure( cobraMetricsPublisher.configure(APPKEY,
APPKEY, endpoint, CHANNEL, PUBLISHER_ROLE, PUBLISHER_SECRET, perMessageDeflate); endpoint,
CHANNEL,
PUBLISHER_ROLE,
PUBLISHER_SECRET,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.setSession(uuid4()); cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active

View File

@ -25,6 +25,8 @@ include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws ../third_party/spdlog/include) include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise) include_directories(ws ../third_party/cpp-linenoise)
add_definitions(-DSPDLOG_COMPILED_LIB=1)
if (UNIX) if (UNIX)
set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp) set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp)
endif() endif()
@ -72,6 +74,8 @@ target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore) target_link_libraries(ws ixcore)
target_link_libraries(ws ixsentry) target_link_libraries(ws ixsentry)
target_link_libraries(ws spdlog)
if(NOT APPLE AND NOT USE_MBED_TLS) if(NOT APPLE AND NOT USE_MBED_TLS)
find_package(OpenSSL REQUIRED) find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS}) add_definitions(${OPENSSL_DEFINITIONS})

View File

@ -53,7 +53,7 @@ done
# Start a receiver # Start a receiver
mkdir -p /tmp/ws_test/receive mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive cd /tmp/ws_test/receive
ws receive "${protocol}127.0.0.1:8090" ${delay} --pidfile /tmp/ws_test/pidfile.receive ${server_tls} & ws receive "${protocol}127.0.0.1:8090" ${delay} --pidfile /tmp/ws_test/pidfile.receive ${client_tls} &
mkdir -p /tmp/ws_test/send mkdir -p /tmp/ws_test/send
cd /tmp/ws_test/send cd /tmp/ws_test/send

View File

@ -11,7 +11,6 @@
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h> #include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXNetSystem.h> #include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
@ -39,12 +38,13 @@ int main(int argc, char** argv)
// Display command. // Display command.
if (getenv("DEBUG")) if (getenv("DEBUG"))
{ {
std::cout << "Command: "; std::stringstream ss;
ss << "Command: ";
for (int i = 0; i < argc; ++i) for (int i = 0; i < argc; ++i)
{ {
std::cout << argv[i] << " "; ss << argv[i] << " ";
} }
std::cout << std::endl; spdlog::info(ss.str());
} }
CLI::App app {"ws is a websocket tool"}; CLI::App app {"ws is a websocket tool"};
@ -105,6 +105,7 @@ int main(int argc, char** argv)
int count = 1; int count = 1;
int jobs = 4; int jobs = 4;
uint32_t maxWaitBetweenReconnectionRetries; uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) { auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option( app->add_option(
@ -218,6 +219,7 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
addTLSOptions(cobraSubscribeApp);
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->add_option("--appkey", appkey, "Appkey")->required(); cobraPublish->add_option("--appkey", appkey, "Appkey")->required();
@ -229,6 +231,7 @@ int main(int argc, char** argv)
cobraPublish->add_option("path", path, "Path to the file to send") cobraPublish->add_option("path", path, "Path to the file to send")
->required() ->required()
->check(CLI::ExistingPath); ->check(CLI::ExistingPath);
addTLSOptions(cobraPublish);
CLI::App* cobraMetricsPublish = CLI::App* cobraMetricsPublish =
app.add_subcommand("cobra_metrics_publish", "Cobra metrics publisher"); app.add_subcommand("cobra_metrics_publish", "Cobra metrics publisher");
@ -242,6 +245,7 @@ int main(int argc, char** argv)
->required() ->required()
->check(CLI::ExistingPath); ->check(CLI::ExistingPath);
cobraMetricsPublish->add_flag("--stress", stress, "Stress mode"); cobraMetricsPublish->add_flag("--stress", stress, "Stress mode");
addTLSOptions(cobraMetricsPublish);
CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra metrics to statsd"); CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra metrics to statsd");
cobra2statsd->add_option("--appkey", appkey, "Appkey"); cobra2statsd->add_option("--appkey", appkey, "Appkey");
@ -256,6 +260,7 @@ int main(int argc, char** argv)
cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_flag("-v", verbose, "Verbose");
cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
addTLSOptions(cobra2statsd);
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry"); CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
cobra2sentry->add_option("--appkey", appkey, "Appkey")->required(); cobra2sentry->add_option("--appkey", appkey, "Appkey")->required();
@ -264,11 +269,13 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry"); cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("--queue_size", maxQueueSize, "Size of the queue to hold messages before they are sent to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_option("channel", channel, "Channel")->required();
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
addTLSOptions(cobra2sentry);
CLI::App* cobra2redisApp = CLI::App* cobra2redisApp =
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis"); app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
@ -282,17 +289,19 @@ int main(int argc, char** argv)
cobra2redisApp->add_option("--hostname", hostname, "Redis hostname"); cobra2redisApp->add_option("--hostname", hostname, "Redis hostname");
cobra2redisApp->add_option("--port", redisPort, "Redis port"); cobra2redisApp->add_option("--port", redisPort, "Redis port");
cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats"); cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
addTLSOptions(cobra2redisApp);
CLI::App* runApp = app.add_subcommand("snake", "Snake server"); CLI::App* snakeApp = app.add_subcommand("snake", "Snake server");
runApp->add_option("--port", port, "Connection url"); snakeApp->add_option("--port", port, "Connection url");
runApp->add_option("--host", hostname, "Hostname"); snakeApp->add_option("--host", hostname, "Hostname");
runApp->add_option("--pidfile", pidfile, "Pid file"); snakeApp->add_option("--pidfile", pidfile, "Pid file");
runApp->add_option("--redis_hosts", redisHosts, "Redis hosts"); snakeApp->add_option("--redis_hosts", redisHosts, "Redis hosts");
runApp->add_option("--redis_port", redisPort, "Redis hosts"); snakeApp->add_option("--redis_port", redisPort, "Redis hosts");
runApp->add_option("--redis_password", redisPassword, "Redis password"); snakeApp->add_option("--redis_password", redisPassword, "Redis password");
runApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data") snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
->check(CLI::ExistingPath); ->check(CLI::ExistingPath);
runApp->add_flag("-v", verbose, "Verbose"); snakeApp->add_flag("-v", verbose, "Verbose");
addTLSOptions(snakeApp);
CLI::App* httpServerApp = app.add_subcommand("httpd", "HTTP server"); CLI::App* httpServerApp = app.add_subcommand("httpd", "HTTP server");
httpServerApp->add_option("--port", port, "Port"); httpServerApp->add_option("--port", port, "Port");
@ -314,6 +323,7 @@ int main(int argc, char** argv)
proxyServerApp->add_option("--host", hostname, "Hostname"); proxyServerApp->add_option("--host", hostname, "Hostname");
proxyServerApp->add_option("--remote_host", remoteHost, "Remote Hostname"); proxyServerApp->add_option("--remote_host", remoteHost, "Remote Hostname");
proxyServerApp->add_flag("-v", verbose, "Verbose"); proxyServerApp->add_flag("-v", verbose, "Verbose");
addTLSOptions(proxyServerApp);
CLI::App* minidumpApp = app.add_subcommand("upload_minidump", "Upload a minidump to sentry"); CLI::App* minidumpApp = app.add_subcommand("upload_minidump", "Upload a minidump to sentry");
minidumpApp->add_option("--minidump", minidump, "Minidump path")->check(CLI::ExistingPath); minidumpApp->add_option("--minidump", minidump, "Minidump path")->check(CLI::ExistingPath);
@ -408,16 +418,17 @@ int main(int argc, char** argv)
else if (app.got_subcommand("cobra_subscribe")) else if (app.got_subcommand("cobra_subscribe"))
{ {
ret = ix::ws_cobra_subscribe_main( ret = ix::ws_cobra_subscribe_main(
appkey, endpoint, rolename, rolesecret, channel, filter, quiet); appkey, endpoint, rolename, rolesecret, channel, filter, quiet, tlsOptions);
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
{ {
ret = ix::ws_cobra_publish_main(appkey, endpoint, rolename, rolesecret, channel, path); ret = ix::ws_cobra_publish_main(
appkey, endpoint, rolename, rolesecret, channel, path, tlsOptions);
} }
else if (app.got_subcommand("cobra_metrics_publish")) else if (app.got_subcommand("cobra_metrics_publish"))
{ {
ret = ix::ws_cobra_metrics_publish_main( ret = ix::ws_cobra_metrics_publish_main(
appkey, endpoint, rolename, rolesecret, channel, path, stress); appkey, endpoint, rolename, rolesecret, channel, path, stress, tlsOptions);
} }
else if (app.got_subcommand("cobra_to_statsd")) else if (app.got_subcommand("cobra_to_statsd"))
{ {
@ -431,22 +442,40 @@ int main(int argc, char** argv)
statsdPort, statsdPort,
prefix, prefix,
fields, fields,
verbose); verbose,
tlsOptions);
} }
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
{ {
ret = ix::ws_cobra_to_sentry_main( ret = ix::ws_cobra_to_sentry_main(appkey,
appkey, endpoint, rolename, rolesecret, channel, filter, dsn, verbose, strict, jobs); endpoint,
rolename,
rolesecret,
channel,
filter,
dsn,
verbose,
strict,
jobs,
maxQueueSize,
tlsOptions);
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))
{ {
ret = ix::ws_cobra_metrics_to_redis( ret = ix::ws_cobra_metrics_to_redis(appkey,
appkey, endpoint, rolename, rolesecret, channel, filter, hostname, redisPort); endpoint,
rolename,
rolesecret,
channel,
filter,
hostname,
redisPort,
tlsOptions);
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {
ret = ix::ws_snake_main( ret = ix::ws_snake_main(
port, hostname, redisHosts, redisPort, redisPassword, verbose, appsConfigPath); port, hostname, redisHosts, redisPort, redisPassword, verbose, appsConfigPath, tlsOptions);
} }
else if (app.got_subcommand("httpd")) else if (app.got_subcommand("httpd"))
{ {
@ -470,11 +499,11 @@ int main(int argc, char** argv)
} }
else if (version) else if (version)
{ {
std::cout << "ws " << ix::userAgent() << std::endl; spdlog::info("ws {}", ix::userAgent());
} }
else else
{ {
std::cerr << "A subcommand or --version is required" << std::endl; spdlog::error("A subcommand or --version is required");
} }
ix::uninitNetSystem(); ix::uninitNetSystem();

22
ws/ws.h
View File

@ -76,14 +76,16 @@ namespace ix
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
bool quiet); bool quiet,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_publish_main(const std::string& appkey, int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& path); const std::string& path,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_metrics_publish_main(const std::string& appkey, int ws_cobra_metrics_publish_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
@ -91,7 +93,8 @@ namespace ix
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& path, const std::string& path,
bool stress); bool stress,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_to_statsd_main(const std::string& appkey, int ws_cobra_to_statsd_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
@ -103,7 +106,8 @@ namespace ix
int port, int port,
const std::string& prefix, const std::string& prefix,
const std::string& fields, const std::string& fields,
bool verbose); bool verbose,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_to_sentry_main(const std::string& appkey, int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
@ -114,7 +118,9 @@ namespace ix
const std::string& dsn, const std::string& dsn,
bool verbose, bool verbose,
bool strict, bool strict,
int jobs); int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions);
int ws_cobra_metrics_to_redis(const std::string& appkey, int ws_cobra_metrics_to_redis(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
@ -123,7 +129,8 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& host, const std::string& host,
int port); int port,
const ix::SocketTLSOptions& tlsOptions);
int ws_snake_main(int port, int ws_snake_main(int port,
const std::string& hostname, const std::string& hostname,
@ -131,7 +138,8 @@ namespace ix
int redisPort, int redisPort,
const std::string& redisPassword, const std::string& redisPassword,
bool verbose, bool verbose,
const std::string& appsConfigPath); const std::string& appsConfigPath,
const ix::SocketTLSOptions& tlsOptions);
int ws_httpd_main(int port, int ws_httpd_main(int port,
const std::string& hostname, const std::string& hostname,

View File

@ -32,7 +32,6 @@
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <mutex> #include <mutex>
@ -91,7 +90,7 @@ namespace ix
{ {
if (!_quiet) if (!_quiet)
{ {
std::cerr << msg; spdlog::info(msg);
} }
} }
@ -183,7 +182,7 @@ namespace ix
webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) { webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close) if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Report generated" << std::endl; spdlog::info("Report generated");
condition.notify_one(); condition.notify_one();
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -193,7 +192,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl; spdlog::info(ss.str());
success = false; success = false;
} }
@ -236,7 +235,7 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl; spdlog::info(ss.str());
condition.notify_one(); condition.notify_one();
} }
@ -269,7 +268,7 @@ namespace ix
int ws_autobahn_main(const std::string& url, bool quiet) int ws_autobahn_main(const std::string& url, bool quiet)
{ {
int testCasesCount = getTestCaseCount(url); int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl; spdlog::info("Test cases count: {}", testCasesCount);
if (testCasesCount == -1) if (testCasesCount == -1)
{ {

View File

@ -4,9 +4,10 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <sstream> #include <sstream>
#include <spdlog/spdlog.h>
namespace ix namespace ix
{ {
@ -14,7 +15,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -25,20 +26,19 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed connection: code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " msg->closeInfo.code, msg->closeInfo.reason);
<< msg->closeInfo.reason << std::endl;
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
@ -47,30 +47,29 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); spdlog::info(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; spdlog::info("Received message fragment");
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(msg->str, msg->binary, [](int current, int total) -> bool { client->send(msg->str, msg->binary, [](int current, int total) -> bool {
std::cerr << "Step " << current << " out of " << total << std::endl; spdlog::info("Step {} out of {}", current, total);
return true; return true;
}); });
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" spdlog::info("{} bytes left to be sent", bufferedAmount);
<< std::endl;
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
@ -84,7 +83,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -9,12 +9,13 @@
// Broadcast server can be ran with `ws broadcast_server` // Broadcast server can be ran with `ws broadcast_server`
// //
#include "linenoise.hpp"
#include "nlohmann/json.hpp" #include "nlohmann/json.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <queue> #include <queue>
#include <sstream> #include <sstream>
#include <spdlog/spdlog.h>
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;
@ -55,7 +56,7 @@ namespace ix
void WebSocketChat::log(const std::string& msg) void WebSocketChat::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
size_t WebSocketChat::getReceivedMessagesCount() const size_t WebSocketChat::getReceivedMessagesCount() const
@ -85,20 +86,21 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws chat: connected"); log("ws chat: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
ss << "ws chat: user " << _user << " Connected !"; spdlog::info("ws chat: user {} connected !", _user);
log(ss.str()); log(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws chat: user " << _user << " disconnected !" ss << "ws chat user disconnected: " << _user;
<< " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason; ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
@ -162,25 +164,25 @@ namespace ix
int ws_chat_main(const std::string& url, const std::string& user) int ws_chat_main(const std::string& url, const std::string& user)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketChat webSocketChat(url, user); WebSocketChat webSocketChat(url, user);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
{ {
std::string text; // Read line
std::cout << user << " > " << std::flush; std::string line;
std::getline(std::cin, text); auto quit = linenoise::Readline("> ", line);
if (!std::cin) if (quit)
{ {
break; break;
} }
webSocketChat.sendMessage(text); webSocketChat.sendMessage(line);
} }
std::cout << std::endl; spdlog::info("");
webSocketChat.stop(); webSocketChat.stop();
return 0; return 0;

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
@ -22,7 +21,8 @@ namespace ix
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& path, const std::string& path,
bool stress) bool stress,
const ix::SocketTLSOptions& tlsOptions)
{ {
std::atomic<int> sentMessages(0); std::atomic<int> sentMessages(0);
std::atomic<int> ackedMessages(0); std::atomic<int> ackedMessages(0);
@ -37,7 +37,7 @@ namespace ix
bool enablePerMessageDeflate = true; bool enablePerMessageDeflate = true;
cobraMetricsPublisher.configure( cobraMetricsPublisher.configure(
appkey, endpoint, channel, rolename, rolesecret, enablePerMessageDeflate); appkey, endpoint, channel, rolename, rolesecret, enablePerMessageDeflate, tlsOptions);
while (!cobraMetricsPublisher.isAuthenticated()) while (!cobraMetricsPublisher.isAuthenticated())
; ;

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <mutex> #include <mutex>
@ -25,11 +24,16 @@ namespace ix
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& host, const std::string& host,
int port) int port,
const ix::SocketTLSOptions& tlsOptions)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure( conn.configure(appkey,
appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true)); endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.connect(); conn.connect();
// Display incoming messages // Display incoming messages
@ -39,8 +43,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <mutex> #include <mutex>
@ -22,7 +21,8 @@ namespace ix
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& path) const std::string& path,
const ix::SocketTLSOptions& tlsOptions)
{ {
std::ifstream f(path); std::ifstream f(path);
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>()); std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
@ -36,9 +36,12 @@ namespace ix
} }
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure( conn.configure(appkey,
appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true)); endpoint,
conn.connect(); rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
// Display incoming messages // Display incoming messages
std::atomic<bool> authenticated(false); std::atomic<bool> authenticated(false);
@ -87,8 +90,14 @@ namespace ix
spdlog::info("Published message id {} acked", msgId); spdlog::info("Published message id {} acked", msgId);
messageAcked = true; messageAcked = true;
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
conn.connect();
while (!authenticated) while (!authenticated)
; ;
while (!messageAcked) while (!messageAcked)

View File

@ -6,7 +6,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -20,11 +19,16 @@ namespace ix
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& filter, const std::string& filter,
bool quiet) bool quiet,
const ix::SocketTLSOptions& tlsOptions)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure( conn.configure(appkey,
appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true)); endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.connect(); conn.connect();
Json::FastWriter jsonWriter; Json::FastWriter jsonWriter;
@ -36,8 +40,7 @@ namespace ix
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
@ -72,7 +75,7 @@ namespace ix
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) { [&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) {
if (!quiet) if (!quiet)
{ {
std::cerr << jsonWriter.write(msg) << std::endl; spdlog::info(jsonWriter.write(msg));
} }
msgPerSeconds++; msgPerSeconds++;
@ -95,6 +98,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -7,7 +7,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <mutex> #include <mutex>
@ -28,11 +27,17 @@ namespace ix
const std::string& dsn, const std::string& dsn,
bool verbose, bool verbose,
bool strict, bool strict,
int jobs) int jobs,
size_t maxQueueSize,
const ix::SocketTLSOptions& tlsOptions)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure( conn.configure(appkey,
appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true)); endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.connect(); conn.connect();
Json::FastWriter jsonWriter; Json::FastWriter jsonWriter;
@ -132,11 +137,13 @@ namespace ix
{ {
seconds = 30; seconds = 30;
spdlog::warn("Error parsing Retry-After header. " spdlog::warn("Error parsing Retry-After header. "
"Using {} for the sleep duration", seconds); "Using {} for the sleep duration",
seconds);
} }
spdlog::warn("Error 429 - Too Many Requests. ws will sleep " spdlog::warn("Error 429 - Too Many Requests. ws will sleep "
"and retry after {} seconds", retryAfter); "and retry after {} seconds",
retryAfter);
throttled = true; throttled = true;
auto duration = std::chrono::seconds(seconds); auto duration = std::chrono::seconds(seconds);
@ -154,7 +161,7 @@ namespace ix
}; };
// Create a thread pool // Create a thread pool
std::cerr << "Starting " << jobs << " sentry sender jobs" << std::endl; spdlog::info("Starting {} sentry sender jobs", jobs);
std::vector<std::thread> pool; std::vector<std::thread> pool;
for (int i = 0; i < jobs; i++) for (int i = 0; i < jobs; i++)
{ {
@ -170,6 +177,7 @@ namespace ix
&receivedCount, &receivedCount,
&condition, &condition,
&conditionVariableMutex, &conditionVariableMutex,
&maxQueueSize,
&queue](ix::CobraConnectionEventType eventType, &queue](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -190,7 +198,7 @@ namespace ix
} }
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
std::cerr << "Subscriber authenticated" << std::endl; spdlog::info("Subscriber authenticated");
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
[&jsonWriter, [&jsonWriter,
@ -199,6 +207,7 @@ namespace ix
&receivedCount, &receivedCount,
&condition, &condition,
&conditionVariableMutex, &conditionVariableMutex,
&maxQueueSize,
&queue](const Json::Value& msg) { &queue](const Json::Value& msg) {
if (verbose) if (verbose)
{ {
@ -216,7 +225,12 @@ namespace ix
{ {
std::unique_lock<std::mutex> lock(conditionVariableMutex); std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg); // if the sending is not fast enough there is no point
// in queuing too many events.
if (queue.size() < maxQueueSize)
{
queue.push(msg);
}
} }
condition.notify_one(); condition.notify_one();
@ -238,6 +252,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -6,7 +6,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -66,11 +65,16 @@ namespace ix
int port, int port,
const std::string& prefix, const std::string& prefix,
const std::string& fields, const std::string& fields,
bool verbose) bool verbose,
const ix::SocketTLSOptions& tlsOptions)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure( conn.configure(appkey,
appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true)); endpoint,
rolename,
rolesecret,
ix::WebSocketPerMessageDeflateOptions(true),
tlsOptions);
conn.connect(); conn.connect();
auto tokens = parseFields(fields); auto tokens = parseFields(fields);
@ -155,6 +159,10 @@ namespace ix
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
}); });
while (true) while (true)

View File

@ -5,10 +5,10 @@
*/ */
#include "linenoise.hpp" #include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -93,7 +93,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
headers[key] = val; headers[key] = val;
} }
@ -129,11 +129,11 @@ namespace ix
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); log("ws_connect: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -145,7 +145,7 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
ss << "ws_connect: received message: " << msg->str; ss << "ws_connect: received message: " << msg->str;
log(ss.str()); log(ss.str());
@ -160,15 +160,15 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; spdlog::info("Received message fragment");
} }
else if (msg->type == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
std::cerr << "Received ping" << std::endl; spdlog::info("Received ping");
} }
else if (msg->type == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cerr << "Received pong" << std::endl; spdlog::info("Received pong");
} }
else else
{ {
@ -225,14 +225,14 @@ namespace ix
if (line == "/stop") if (line == "/stop")
{ {
std::cout << "Stopping connection..." << std::endl; spdlog::info("Stopping connection...");
webSocketChat.stop(); webSocketChat.stop();
continue; continue;
} }
if (line == "/start") if (line == "/start")
{ {
std::cout << "Starting connection..." << std::endl; spdlog::info("Starting connection...");
webSocketChat.start(); webSocketChat.start();
continue; continue;
} }
@ -243,7 +243,7 @@ namespace ix
linenoise::AddHistory(line.c_str()); linenoise::AddHistory(line.c_str());
} }
std::cout << std::endl; spdlog::info("");
webSocketChat.stop(); webSocketChat.stop();
return 0; return 0;

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -15,7 +15,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -27,13 +27,13 @@ namespace ix
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) { [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
if (greetings) if (greetings)
@ -43,22 +43,21 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " connectionState->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
webSocket->send(msg->str, msg->binary); webSocket->send(msg->str, msg->binary);
} }
}); });
@ -67,7 +66,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::error(res.second);
return 1; return 1;
} }

View File

@ -5,10 +5,10 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h> #include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -47,7 +47,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
headers[key] = val; headers[key] = val;
} }
@ -76,7 +76,7 @@ namespace ix
auto key = token.substr(0, pos); auto key = token.substr(0, pos);
auto val = token.substr(pos + 1); auto val = token.substr(pos + 1);
std::cerr << key << ": " << val << std::endl; spdlog::info("{}: {}", key, val);
httpParameters[key] = val; httpParameters[key] = val;
} }
@ -108,10 +108,9 @@ namespace ix
args->maxRedirects = maxRedirects; args->maxRedirects = maxRedirects;
args->verbose = verbose; args->verbose = verbose;
args->compress = compress; args->compress = compress;
args->logger = [](const std::string& msg) { std::cout << msg; }; args->logger = [](const std::string& msg) { spdlog::info(msg); };
args->onProgressCallback = [](int current, int total) -> bool { args->onProgressCallback = [](int current, int total) -> bool {
std::cerr << "\r" spdlog::info("Downloaded {} bytes out of {}", current, total);
<< "Downloaded " << current << " bytes out of " << total;
return true; return true;
}; };
@ -131,20 +130,20 @@ namespace ix
response = httpClient.post(url, httpParameters, args); response = httpClient.post(url, httpParameters, args);
} }
std::cerr << std::endl; spdlog::info("");
for (auto it : response->headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
std::cerr << "Upload size: " << response->uploadSize << std::endl; spdlog::info("Upload size: {}", response->uploadSize);
std::cerr << "Download size: " << response->downloadSize << std::endl; spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response->statusCode << std::endl; spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok) if (response->errorCode != HttpErrorCode::Ok)
{ {
std::cerr << "error message: " << response->errorMsg << std::endl; spdlog::info("error message: ", response->errorMsg);
} }
if (!headersOnly && response->errorCode == HttpErrorCode::Ok) if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
@ -158,7 +157,7 @@ namespace ix
filename = output; filename = output;
} }
std::cout << "Writing to disk: " << filename << std::endl; spdlog::info("Writing to disk: {}", filename);
std::ofstream out(filename); std::ofstream out(filename);
out.write((char*) &response->payload.front(), response->payload.size()); out.write((char*) &response->payload.front(), response->payload.size());
out.close(); out.close();
@ -167,14 +166,13 @@ namespace ix
{ {
if (response->headers["Content-Type"] != "application/octet-stream") if (response->headers["Content-Type"] != "application/octet-stream")
{ {
std::cout << "payload: " << response->payload << std::endl; spdlog::info("payload: {}", response->payload);
} }
else else
{ {
std::cerr << "Binary output can mess up your terminal." << std::endl; spdlog::info("Binary output can mess up your terminal.");
std::cerr << "Use the -O flag to save the file to disk." << std::endl; spdlog::info("Use the -O flag to save the file to disk.");
std::cerr << "You can also use the --output option to specify a filename." spdlog::info("You can also use the --output option to specify a filename.");
<< std::endl;
} }
} }
} }

View File

@ -5,7 +5,6 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -32,7 +31,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::error(res.second);
return 1; return 1;
} }

View File

@ -4,11 +4,12 @@
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
namespace ix namespace ix
{ {
@ -40,7 +41,7 @@ namespace ix
void WebSocketPingPong::log(const std::string& msg) void WebSocketPingPong::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketPingPong::stop() void WebSocketPingPong::stop()
@ -56,18 +57,18 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("Received {} bytes", msg->wireSize);
std::stringstream ss; std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -127,7 +128,7 @@ namespace ix
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions) int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketPingPong webSocketPingPong(url, tlsOptions); WebSocketPingPong webSocketPingPong(url, tlsOptions);
webSocketPingPong.start(); webSocketPingPong.start();

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -44,7 +44,7 @@ namespace ix
const std::string& remoteUrl, const std::string& remoteUrl,
bool verbose) bool verbose)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -61,41 +61,39 @@ namespace ix
// Server connection // Server connection
state->webSocket().setOnMessageCallback([webSocket, state, verbose]( state->webSocket().setOnMessageCallback([webSocket, state, verbose](
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection to remote server");
std::cerr << "server id: " << state->getId() << std::endl; spdlog::info("id: {}", state->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed remote server connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " state->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
webSocket->close(msg->closeInfo.code, msg->closeInfo.reason); msg->closeInfo.reason);
state->setTerminated(); state->setTerminated();
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes from server" << std::endl; spdlog::info("Received {} bytes from server", msg->wireSize);
if (verbose) if (verbose)
{ {
std::cerr << "payload " << msg->str << std::endl; spdlog::info("payload {}", msg->str);
} }
webSocket->send(msg->str, msg->binary); webSocket->send(msg->str, msg->binary);
@ -107,53 +105,54 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; spdlog::info("New connection from client");
std::cerr << "client id: " << state->getId() << std::endl; spdlog::info("id: {}", state->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
// Connect to the 'real' server // Connect to the 'real' server
std::string url(remoteUrl); std::string url(remoteUrl);
url += msg->openInfo.uri; url += msg->openInfo.uri;
state->webSocket().setUrl(url); state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start(); state->webSocket().start();
// we should sleep here for a bit until we've established the // we should sleep here for a bit until we've established the
// connection with the remote server // connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open) while (state->webSocket().getReadyState() != ReadyState::Open)
{ {
std::cerr << "waiting for server connection establishment" << std::endl; spdlog::info("waiting for server connection establishment");
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
std::cerr << "server connection established" << std::endl; spdlog::info("server connection established");
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" spdlog::info("Closed client connection: client id {} code {} reason {}",
<< " code " << msg->closeInfo.code << " reason " state->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason); state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; spdlog::error("Connection error: {}", msg->errorInfo.reason);
ss << "Connection error: " << msg->errorInfo.reason << std::endl; spdlog::error("#retries: {}", msg->errorInfo.retries);
ss << "#retries: " << msg->errorInfo.retries << std::endl; spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << msg->wireSize << " bytes from client" << std::endl; spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose) if (verbose)
{ {
std::cerr << "payload " << msg->str << std::endl; spdlog::info("payload {}", msg->str);
} }
state->webSocket().send(msg->str, msg->binary); state->webSocket().send(msg->str, msg->binary);
} }
}); });
@ -162,7 +161,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -7,7 +7,6 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h> #include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h> #include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
@ -15,6 +14,7 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
@ -75,12 +75,12 @@ namespace ix
void WebSocketReceiver::log(const std::string& msg) void WebSocketReceiver::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketReceiver::waitForConnection() void WebSocketReceiver::waitForConnection()
{ {
std::cout << "ws_receive: Connecting..." << std::endl; spdlog::info("{}: Connecting...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -88,7 +88,7 @@ namespace ix
void WebSocketReceiver::waitForMessage() void WebSocketReceiver::waitForMessage()
{ {
std::cout << "ws_receive: Waiting for message..." << std::endl; spdlog::info("{}: Waiting for message...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -124,7 +124,7 @@ namespace ix
void WebSocketReceiver::handleMessage(const std::string& str) void WebSocketReceiver::handleMessage(const std::string& str)
{ {
std::cerr << "ws_receive: Received message: " << str.size() << std::endl; spdlog::info("ws_receive: Received message: {}", str.size());
std::string errMsg; std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg); MsgPack data = MsgPack::parse(str, errMsg);
@ -134,17 +134,17 @@ namespace ix
return; return;
} }
std::cout << "id: " << data["id"].string_value() << std::endl; spdlog::info("id: {}", data["id"].string_value());
std::vector<uint8_t> content = data["content"].binary_items(); std::vector<uint8_t> content = data["content"].binary_items();
std::cout << "ws_receive: Content size: " << content.size() << std::endl; spdlog::info("ws_receive: Content size: {}", content.size());
// Validate checksum // Validate checksum
uint64_t cksum = ix::djb2Hash(content); uint64_t cksum = ix::djb2Hash(content);
auto cksumRef = data["djb2_hash"].string_value(); auto cksumRef = data["djb2_hash"].string_value();
std::cout << "ws_receive: Computed hash: " << cksum << std::endl; spdlog::info("ws_receive: Computed hash: {}", cksum);
std::cout << "ws_receive: Reference hash: " << cksumRef << std::endl; spdlog::info("ws_receive: Reference hash: {}", cksumRef);
if (std::to_string(cksum) != cksumRef) if (std::to_string(cksum) != cksumRef)
{ {
@ -157,12 +157,12 @@ namespace ix
std::string filenameTmp = filename + ".tmp"; std::string filenameTmp = filename + ".tmp";
std::cout << "ws_receive: Writing to disk: " << filenameTmp << std::endl; spdlog::info("ws_receive: Writing to disk: {}", filenameTmp);
std::ofstream out(filenameTmp); std::ofstream out(filenameTmp);
out.write((char*) &content.front(), content.size()); out.write((char*) &content.front(), content.size());
out.close(); out.close();
std::cout << "ws_receive: Renaming " << filenameTmp << " to " << filename << std::endl; spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename);
rename(filenameTmp.c_str(), filename.c_str()); rename(filenameTmp.c_str(), filename.c_str());
std::map<MsgPack, MsgPack> pdu; std::map<MsgPack, MsgPack> pdu;
@ -170,7 +170,7 @@ namespace ix
pdu["id"] = data["id"]; pdu["id"] = data["id"];
pdu["filename"] = data["filename"]; pdu["filename"] = data["filename"];
std::cout << "Sending ack to sender" << std::endl; spdlog::info("Sending ack to sender");
MsgPack msg(pdu); MsgPack msg(pdu);
_webSocket.sendBinary(msg.dump()); _webSocket.sendBinary(msg.dump());
} }
@ -192,11 +192,11 @@ namespace ix
_condition.notify_one(); _condition.notify_one();
log("ws_receive: connected"); log("ws_receive: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -259,7 +259,7 @@ namespace ix
std::chrono::duration<double, std::milli> duration(1000); std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
std::cout << "ws_receive: Done !" << std::endl; spdlog::info("ws_receive: Done !");
webSocketReceiver.stop(); webSocketReceiver.stop();
} }

View File

@ -4,8 +4,8 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -20,7 +20,7 @@ namespace ix
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
{ {
std::cerr << "Cannot connect to redis host" << std::endl; spdlog::info("Cannot connect to redis host");
return 1; return 1;
} }
@ -30,10 +30,10 @@ namespace ix
if (!redisClient.auth(password, authResponse)) if (!redisClient.auth(password, authResponse))
{ {
std::stringstream ss; std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl; spdlog::info("Cannot authenticated to redis");
return 1; return 1;
} }
std::cout << "Auth response: " << authResponse << ":" << port << std::endl; spdlog::info("Auth response: {}", authResponse);
} }
std::string errMsg; std::string errMsg;
@ -41,8 +41,7 @@ namespace ix
{ {
if (!redisClient.publish(channel, message, errMsg)) if (!redisClient.publish(channel, message, errMsg))
{ {
std::cerr << "Error publishing to channel " << channel << "error: " << errMsg spdlog::error("Error publishing to channel {} error {}", channel, errMsg);
<< std::endl;
return 1; return 1;
} }
} }

View File

@ -4,7 +4,6 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixsnake/IXRedisServer.h> #include <ixsnake/IXRedisServer.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -20,7 +19,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }

View File

@ -6,8 +6,8 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <iostream>
#include <ixsnake/IXRedisClient.h> #include <ixsnake/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <thread> #include <thread>
@ -22,7 +22,7 @@ namespace ix
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
{ {
std::cerr << "Cannot connect to redis host" << std::endl; spdlog::info("Cannot connect to redis host");
return 1; return 1;
} }
@ -32,10 +32,10 @@ namespace ix
if (!redisClient.auth(password, authResponse)) if (!redisClient.auth(password, authResponse))
{ {
std::stringstream ss; std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl; spdlog::info("Cannot authenticated to redis");
return 1; return 1;
} }
std::cout << "Auth response: " << authResponse << ":" << port << std::endl; spdlog::info("Auth response: {}", authResponse);
} }
std::atomic<int> msgPerSeconds(0); std::atomic<int> msgPerSeconds(0);
@ -44,7 +44,7 @@ namespace ix
auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) { auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) {
if (verbose) if (verbose)
{ {
std::cout << "received: " << message << std::endl; spdlog::info("recived: {}", message);
} }
msgPerSeconds++; msgPerSeconds++;
@ -52,14 +52,13 @@ namespace ix
}; };
auto responseCallback = [](const std::string& redisResponse) { auto responseCallback = [](const std::string& redisResponse) {
std::cout << "Redis subscribe response: " << redisResponse << std::endl; spdlog::info("Redis subscribe response: {}", redisResponse);
}; };
auto timer = [&msgPerSeconds, &msgCount] { auto timer = [&msgPerSeconds, &msgCount] {
while (true) while (true)
{ {
std::cout << "#messages " << msgCount << " " spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
@ -69,10 +68,10 @@ namespace ix
std::thread t(timer); std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl; spdlog::info("Subscribing to {} ...", channel);
if (!redisClient.subscribe(channel, responseCallback, callback)) if (!redisClient.subscribe(channel, responseCallback, callback))
{ {
std::cerr << "Error subscribing to channel " << channel << std::endl; spdlog::info("Error subscribing to channel {}", channel);
return 1; return 1;
} }

View File

@ -7,7 +7,6 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <fstream> #include <fstream>
#include <iostream>
#include <ixcrypto/IXBase64.h> #include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h> #include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
@ -15,6 +14,7 @@
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp> #include <msgpack11/msgpack11.hpp>
#include <spdlog/spdlog.h>
#include <mutex> #include <mutex>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
@ -68,12 +68,12 @@ namespace ix
void WebSocketSender::log(const std::string& msg) void WebSocketSender::log(const std::string& msg)
{ {
std::cout << msg << std::endl; spdlog::info(msg);
} }
void WebSocketSender::waitForConnection() void WebSocketSender::waitForConnection()
{ {
std::cout << "ws_send: Connecting..." << std::endl; spdlog::info("{}: Connecting...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -81,7 +81,7 @@ namespace ix
void WebSocketSender::waitForAck() void WebSocketSender::waitForAck()
{ {
std::cout << "ws_send: Waiting for ack..." << std::endl; spdlog::info("{}: Waiting for ack...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex); std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock); _condition.wait(lock);
@ -122,11 +122,11 @@ namespace ix
_condition.notify_one(); _condition.notify_one();
log("ws_send: connected"); log("ws_send: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cout << "Handshake Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
@ -147,14 +147,14 @@ namespace ix
MsgPack data = MsgPack::parse(msg->str, errMsg); MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty()) if (!errMsg.empty())
{ {
std::cerr << "Invalid MsgPack response" << std::endl; spdlog::info("Invalid MsgPack response");
return; return;
} }
std::string id = data["id"].string_value(); std::string id = data["id"].string_value();
if (_id != id) if (_id != id)
{ {
std::cerr << "Invalid id" << std::endl; spdlog::info("Invalid id");
} }
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -201,7 +201,7 @@ namespace ix
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start); auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
_ms = milliseconds.count(); _ms = milliseconds.count();
std::cout << _description << " completed in " << _ms << "ms" << std::endl; spdlog::info("{} completed in {}", _description, _ms);
_reported = true; _reported = true;
} }
@ -240,7 +240,7 @@ namespace ix
Bench bench("Sending file through websocket"); Bench bench("Sending file through websocket");
_webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool { _webSocket.sendBinary(msg.dump(), [throttle](int current, int total) -> bool {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl; spdlog::info("ws_send: Step {} out of {}", current, total);
if (throttle) if (throttle)
{ {
@ -254,7 +254,7 @@ namespace ix
do do
{ {
size_t bufferedAmount = _webSocket.bufferedAmount(); size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << "ws_send: " << bufferedAmount << " bytes left to be sent" << std::endl; spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount);
std::chrono::duration<double, std::milli> duration(10); std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
@ -264,7 +264,7 @@ namespace ix
auto duration = bench.getDuration(); auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration; auto transferRate = 1000 * content.size() / duration;
transferRate /= (1024 * 1024); transferRate /= (1024 * 1024);
std::cout << "ws_send: Send transfer rate: " << transferRate << "MB/s" << std::endl; spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
} }
void wsSend(const std::string& url, void wsSend(const std::string& url,
@ -278,12 +278,12 @@ namespace ix
webSocketSender.waitForConnection(); webSocketSender.waitForConnection();
std::cout << "ws_send: Sending..." << std::endl; spdlog::info("ws_send: Sending...");
webSocketSender.sendMessage(path, throttle); webSocketSender.sendMessage(path, throttle);
webSocketSender.waitForAck(); webSocketSender.waitForAck();
std::cout << "ws_send: Done !" << std::endl; spdlog::info("ws_send: Done !");
webSocketSender.stop(); webSocketSender.stop();
} }

View File

@ -5,8 +5,8 @@
*/ */
#include <fstream> #include <fstream>
#include <iostream>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace namespace
@ -43,7 +43,8 @@ namespace ix
int redisPort, int redisPort,
const std::string& redisPassword, const std::string& redisPassword,
bool verbose, bool verbose,
const std::string& appsConfigPath) const std::string& appsConfigPath,
const SocketTLSOptions& socketTLSOptions)
{ {
snake::AppConfig appConfig; snake::AppConfig appConfig;
appConfig.port = port; appConfig.port = port;
@ -51,16 +52,17 @@ namespace ix
appConfig.verbose = verbose; appConfig.verbose = verbose;
appConfig.redisPort = redisPort; appConfig.redisPort = redisPort;
appConfig.redisPassword = redisPassword; appConfig.redisPassword = redisPassword;
appConfig.socketTLSOptions = socketTLSOptions;
// Parse config file // Parse config file
auto str = readAsString(appsConfigPath); auto str = readAsString(appsConfigPath);
if (str.empty()) if (str.empty())
{ {
std::cout << "Cannot read content of " << appsConfigPath << std::endl; spdlog::error("Cannot read content of {}", appsConfigPath);
return 1; return 1;
} }
std::cout << str << std::endl; spdlog::error(str);
auto apps = nlohmann::json::parse(str); auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"]; appConfig.apps = apps["apps"];

View File

@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
namespace ix namespace ix
@ -14,7 +14,7 @@ namespace ix
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions) const ix::SocketTLSOptions& tlsOptions)
{ {
std::cout << "ws_transfer: Listening on " << hostname << ":" << port << std::endl; spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
@ -25,22 +25,23 @@ namespace ix
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "ws_transfer: New connection" << std::endl; spdlog::info("ws_transfer: New connection");
std::cerr << "id: " << connectionState->getId() << std::endl; spdlog::info("id: {}", connectionState->getId());
std::cerr << "Uri: " << msg->openInfo.uri << std::endl; spdlog::info("Uri: {}", msg->openInfo.uri);
std::cerr << "Headers:" << std::endl; spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "ws_transfer: [client " << connectionState->getId() spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}",
<< "]: Closed connection, code " << msg->closeInfo.code << " reason " connectionState->getId(),
<< msg->closeInfo.reason << std::endl; msg->closeInfo.code,
msg->closeInfo.reason);
auto remaining = server.getClients().erase(webSocket); auto remaining = server.getClients().erase(webSocket);
std::cerr << "ws_transfer: " << remaining << " remaining clients " << std::endl; spdlog::info("ws_transfer: {} remaining clients", remaining);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
@ -49,40 +50,39 @@ namespace ix
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); spdlog::info(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "ws_transfer: Received message fragment " << std::endl; spdlog::info("ws_transfer: Received message fragment ");
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "ws_transfer: Received " << msg->wireSize << " bytes" << std::endl; spdlog::info("ws_transfer: Received {} bytes", msg->wireSize);
size_t receivers = 0; size_t receivers = 0;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
auto readyState = client->getReadyState(); auto readyState = client->getReadyState();
auto id = connectionState->getId();
if (readyState == ReadyState::Open) if (readyState == ReadyState::Open)
{ {
++receivers; ++receivers;
client->send(msg->str, client->send(msg->str,
msg->binary, msg->binary,
[id = connectionState->getId()](int current, [&id](int current, int total) -> bool {
int total) -> bool { spdlog::info("{}: [client {}]: Step {} out of {}",
std::cerr << "ws_transfer: [client " << id "ws_transfer", id, current, total);
<< "]: Step " << current << " out of "
<< total << std::endl;
return true; return true;
}); });
do do
{ {
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: " << bufferedAmount spdlog::info("{}: [client {}]: {} bytes left to send",
<< " bytes left to be sent, " << std::endl; "ws_transfer", id, bufferedAmount);
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_for(std::chrono::milliseconds(500));
@ -96,16 +96,15 @@ namespace ix
? "Connecting" ? "Connecting"
: readyState == ReadyState::Closing ? "Closing" : "Closed"; : readyState == ReadyState::Closing ? "Closing" : "Closed";
size_t bufferedAmount = client->bufferedAmount(); size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: [client " << connectionState->getId()
<< "]: has readystate '" << readyStateString << "' and " spdlog::info("{}: [client {}]: has readystate {} bytes left to be sent",
<< bufferedAmount << " bytes left to be sent, " "ws_transfer", id, readyStateString, bufferedAmount);
<< std::endl;
} }
} }
} }
if (!receivers) if (!receivers)
{ {
std::cerr << "ws_transfer: no remaining receivers" << std::endl; spdlog::info("ws_transfer: no remaining receivers");
} }
} }
}); });
@ -114,7 +113,7 @@ namespace ix
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; spdlog::info(res.second);
return 1; return 1;
} }