Compare commits

...

17 Commits

Author SHA1 Message Date
3dabd3a556 (ws) merge all ws_*.cpp files into a single one to speedup compilation 2020-08-15 19:30:17 -07:00
0498e2fa98 IXBench.h is missing a pragma once 2020-08-15 18:58:46 -07:00
2aaf59651e (socket server) in the loop accepting connections, call select without a timeout on unix to avoid busy looping, and only wake up when a new connection happens 2020-08-15 18:32:59 -07:00
cd4e51eacf (socket server) instead of busy looping with a sleep, only wake up the GC thread when a new thread will have to be joined, (we know that thanks to the ConnectionState OnSetTerminated callback 2020-08-15 16:24:35 -07:00
785842de03 (socket server) add a callback to the ConnectionState to be invoked when the connection is terminated. This will be used by the SocketServer in the future to know on time that the associated connection thread can be terminated. 2020-08-15 16:03:40 -07:00
261095fa12 (socket server) do not create a select interrupt object everytime when polling for notifications while waiting for new connections, instead use a persistent one which is a member variable 2020-08-15 15:28:15 -07:00
ed2ed0f7ae (ixwebsocket client) handle HTTP redirects 2020-08-14 18:13:34 -07:00
7ad5ead0f6 document the --config_path option in usage.md 2020-08-14 15:15:27 -07:00
a8284e64e3 add a simple shell script to test websocket proxy 2020-08-14 15:09:34 -07:00
5423a31d5a (ws) have more subcommand handle --pidfile, to write pid to a file 2020-08-14 15:09:12 -07:00
53575f8d90 change makefile openssl target to use ninja and install ws 2020-08-14 15:08:37 -07:00
d3bcbdac26 (ws) upgrade to latest version of nlohmann json (3.9.1 from 3.2.0) 2020-08-13 22:10:38 -07:00
8c5b28adce (websocket proxy server) add ability to map different hosts to different websocket servers, using a json config file 2020-08-13 21:20:42 -07:00
dcbafae35a (ws) on macOS, with OpenSSL or MbedTLS, use /etc/ssl/cert.pem as the system certs 2020-08-12 18:55:13 -07:00
eb197edcec ws --version does not get printed with a log prefix 2020-08-12 18:44:47 -07:00
b8265bf7f2 (ws) -q option imply info log level, not warning log level 2020-08-11 15:44:06 -07:00
e7c4f0b171 add documentation for the websocket send callback and the send return type (fix #239) 2020-08-11 11:24:00 -07:00
46 changed files with 15553 additions and 9174 deletions

View File

@ -2,6 +2,46 @@
All changes to this project will be documented in this file.
## [10.2.5] - 2020-08-15
(ws) merge all ws_*.cpp files into a single one to speedup compilation
## [10.2.4] - 2020-08-15
(socket server) in the loop accepting connections, call select without a timeout on unix to avoid busy looping, and only wake up when a new connection happens
## [10.2.3] - 2020-08-15
(socket server) instead of busy looping with a sleep, only wake up the GC thread when a new thread will have to be joined, (we know that thanks to the ConnectionState OnSetTerminated callback
## [10.2.2] - 2020-08-15
(socket server) add a callback to the ConnectionState to be invoked when the connection is terminated. This will be used by the SocketServer in the future to know on time that the associated connection thread can be terminated.
## [10.2.1] - 2020-08-15
(socket server) do not create a select interrupt object everytime when polling for notifications while waiting for new connections, instead use a persistent one which is a member variable
## [10.2.0] - 2020-08-14
(ixwebsocket client) handle HTTP redirects
## [10.2.0] - 2020-08-13
(ws) upgrade to latest version of nlohmann json (3.9.1 from 3.2.0)
## [10.1.9] - 2020-08-13
(websocket proxy server) add ability to map different hosts to different websocket servers, using a json config file
## [10.1.8] - 2020-08-12
(ws) on macOS, with OpenSSL or MbedTLS, use /etc/ssl/cert.pem as the system certs
## [10.1.7] - 2020-08-11
(ws) -q option imply info log level, not warning log level
## [10.1.6] - 2020-08-06
(websocket server) Handle programmer error when the server callback is not registered properly (fix #227)

View File

@ -67,9 +67,28 @@ webSocket.stop()
### Sending messages
`websocket.send("foo")` will send a message.
`WebSocketSendInfo result = websocket.send("foo")` will send a message.
If the connection was closed and sending failed, the return value will be set to false.
If the connection was closed, sending will fail, and the success field of the result object will be set to false. There could also be a compression error in which case the compressError field will be set to true. The payloadSize field and wireSize fields will tell you respectively how much bytes the message weight, and how many bytes were sent on the wire (potentially compressed + counting the message header (a few bytes).
There is an optional progress callback that can be passed in as the second argument. If a message is large it will be fragmented into chunks which will be sent independantly. Everytime the we can write a fragment into the OS network cache, the callback will be invoked. If a user wants to cancel a slow send, false should be returned from within the callback.
Here is an example code snippet copied from the ws send sub-command. Each fragment weights 32K, so the total integer is the wireSize divided by 32K. As an example if you are sending 32M of data, uncompressed, total will be 1000. current will be set to 0 for the first fragment, then 1, 2 etc...
```
auto result =
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
if (throttle)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
return _connected;
});
```
### ReadyState

View File

@ -204,6 +204,24 @@ Listening on 127.0.0.1:8008
If you connect to ws://127.0.0.1:8008, the proxy will connect to ws://127.0.0.1:9000 and pass all traffic to this server.
You can also use a more complex setup if you want to redirect to different websocket servers based on the hostname your client is trying to connect to. If you have multiple CNAME aliases that point to the same server.
A JSON config file is used to express that mapping ; here connecting to echo.jeanserge.com will proxy the client to ws://localhost:8008 on the local machine (which actually runs ws echo_server), while connecting to bavarde.jeanserge.com will proxy the client to ws://localhost:5678 where a cobra python server is running. As a side note you will need a wildcard SSL certificate if you want to have SSL enabled on that machine.
```json
{
"remote_urls": {
"echo.jeanserge.com": "ws://localhost:8008",
"bavarde.jeanserge.com": "ws://localhost:5678"
}
}
```
The --config_path option is required to instruct ws proxy_server to read that file.
```
ws proxy_server --config_path proxyConfig.json --port 8765
```
## File transfer
```

View File

@ -3,6 +3,7 @@
* Author: Benjamin Sergeant
* Copyright (c) 2017-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <chrono>
#include <stdint.h>

View File

@ -31,6 +31,11 @@ namespace ix
return std::make_shared<ConnectionState>();
}
void ConnectionState::setOnSetTerminatedCallback(const OnSetTerminatedCallback& callback)
{
_onSetTerminatedCallback = callback;
}
bool ConnectionState::isTerminated() const
{
return _terminated;
@ -39,5 +44,10 @@ namespace ix
void ConnectionState::setTerminated()
{
_terminated = true;
if (_onSetTerminatedCallback)
{
_onSetTerminatedCallback();
}
}
} // namespace ix

View File

@ -7,12 +7,15 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <stdint.h>
#include <string>
namespace ix
{
using OnSetTerminatedCallback = std::function<void()>;
class ConnectionState
{
public:
@ -27,10 +30,16 @@ namespace ix
static std::shared_ptr<ConnectionState> createConnectionState();
private:
void setOnSetTerminatedCallback(const OnSetTerminatedCallback& callback);
protected:
std::atomic<bool> _terminated;
std::string _id;
OnSetTerminatedCallback _onSetTerminatedCallback;
static std::atomic<uint64_t> _globalId;
friend class SocketServer;
};
} // namespace ix

View File

@ -8,6 +8,9 @@
namespace ix
{
const uint64_t SelectInterrupt::kSendRequest = 1;
const uint64_t SelectInterrupt::kCloseRequest = 2;
SelectInterrupt::SelectInterrupt()
{
;

View File

@ -6,6 +6,7 @@
#pragma once
#include <memory>
#include <stdint.h>
#include <string>
@ -23,5 +24,11 @@ namespace ix
virtual bool clear();
virtual uint64_t read();
virtual int getFd() const;
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
};
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
} // namespace ix

View File

@ -27,8 +27,6 @@ namespace ix
{
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd)
@ -96,11 +94,11 @@ namespace ix
{
uint64_t value = selectInterrupt->read();
if (value == kSendRequest)
if (value == SelectInterrupt::kSendRequest)
{
pollResult = PollResultType::SendRequest;
}
else if (value == kCloseRequest)
else if (value == SelectInterrupt::kCloseRequest)
{
pollResult = PollResultType::CloseRequest;
}

View File

@ -34,12 +34,10 @@ typedef SSIZE_T ssize_t;
#include "IXCancellationRequest.h"
#include "IXProgressCallback.h"
#include "IXSelectInterrupt.h"
namespace ix
{
class SelectInterrupt;
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
enum class PollResultType
{
ReadyForRead = 0,
@ -96,11 +94,6 @@ namespace ix
int sockfd,
const SelectInterruptPtr& selectInterrupt);
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected:
std::atomic<int> _sockfd;
std::mutex _socketMutex;

View File

@ -8,6 +8,7 @@
#include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
#include "IXSetThreadName.h"
#include "IXSocket.h"
#include "IXSocketConnect.h"
@ -36,6 +37,7 @@ namespace ix
, _stop(false)
, _stopGc(false)
, _connectionStateFactory(&ConnectionState::createConnectionState)
, _acceptSelectInterrupt(createSelectInterrupt())
{
}
@ -58,6 +60,16 @@ namespace ix
std::pair<bool, std::string> SocketServer::listen()
{
std::string acceptSelectInterruptInitErrorMsg;
if (!_acceptSelectInterrupt->init(acceptSelectInterruptInitErrorMsg))
{
std::stringstream ss;
ss << "SocketServer::listen() error in SelectInterrupt::init: "
<< acceptSelectInterruptInitErrorMsg;
return std::make_pair(false, ss.str());
}
if (_addressFamily != AF_INET && _addressFamily != AF_INET6)
{
std::string errMsg("SocketServer::listen() AF_INET and AF_INET6 are currently "
@ -193,6 +205,12 @@ namespace ix
if (_thread.joinable())
{
_stop = true;
// Wake up select
if (!_acceptSelectInterrupt->notify(SelectInterrupt::kCloseRequest))
{
logError("SocketServer::stop: Cannot wake up from select");
}
_thread.join();
_stop = false;
}
@ -201,6 +219,7 @@ namespace ix
if (_gcThread.joinable())
{
_stopGc = true;
_conditionVariableGC.notify_one();
_gcThread.join();
_stopGc = false;
}
@ -249,18 +268,22 @@ namespace ix
// Set the socket to non blocking mode, so that accept calls are not blocking
SocketConnect::configure(_serverFd);
setThreadName("SocketServer::listen");
setThreadName("SocketServer::accept");
for (;;)
{
if (_stop) return;
// Use poll to check whether a new connection is in progress
int timeoutMs = 10;
int timeoutMs = -1;
#ifdef _WIN32
// select cannot be interrupted on Windows so we need to pass a small timeout
timeoutMs = 10;
#endif
bool readyToRead = true;
auto selectInterrupt = std::make_unique<SelectInterrupt>();
PollResultType pollResult =
Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt);
Socket::poll(readyToRead, timeoutMs, _serverFd, _acceptSelectInterrupt);
if (pollResult == PollResultType::Error)
{
@ -308,6 +331,7 @@ namespace ix
continue;
}
// Retrieve connection info, the ip address of the remote peer/client)
std::unique_ptr<ConnectionInfo> connectionInfo;
if (_addressFamily == AF_INET)
@ -352,6 +376,7 @@ namespace ix
{
connectionState = _connectionStateFactory();
}
connectionState->setOnSetTerminatedCallback([this] { onSetTerminatedCallback(); });
if (_stop) return;
@ -411,8 +436,14 @@ namespace ix
break;
}
// Sleep a little bit then keep cleaning up
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// Unless we are stopping the server, wait for a connection
// to be terminated to run the threads GC, instead of busy waiting
// with a sleep
if (!_stopGc)
{
std::unique_lock<std::mutex> lock(_conditionVariableMutexGC);
_conditionVariableGC.wait(lock);
}
}
}
@ -420,4 +451,11 @@ namespace ix
{
_socketTLSOptions = socketTLSOptions;
}
void SocketServer::onSetTerminatedCallback()
{
// a connection got terminated, we can run the connection thread GC,
// so wake up the thread responsible for that
_conditionVariableGC.notify_one();
}
} // namespace ix

View File

@ -8,6 +8,7 @@
#include "IXConnectionInfo.h"
#include "IXConnectionState.h"
#include "IXSelectInterrupt.h"
#include "IXSocketTLSOptions.h"
#include <atomic>
#include <condition_variable>
@ -84,6 +85,7 @@ namespace ix
// background thread to wait for incoming connections
std::thread _thread;
void run();
void onSetTerminatedCallback();
// background thread to cleanup (join) terminated threads
std::atomic<bool> _stopGc;
@ -112,5 +114,13 @@ namespace ix
size_t getConnectionsThreadsCount();
SocketTLSOptions _socketTLSOptions;
// to wake up from select
SelectInterruptPtr _acceptSelectInterrupt;
// used by the gc thread, to know that a thread needs to be garbage collected
// as a connection
std::condition_variable _conditionVariableGC;
std::mutex _conditionVariableMutexGC;
};
} // namespace ix

View File

@ -170,20 +170,11 @@ namespace ix
{
std::stringstream ss;
ss << "Expecting HTTP/1.1, got " << httpVersion << ". "
<< "Rejecting connection to " << host << ":" << port << ", status: " << status
<< "Rejecting connection to " << url << ", status: " << status
<< ", HTTP Status line: " << line;
return WebSocketInitResult(false, status, ss.str());
}
// We want an 101 HTTP status
if (status != 101)
{
std::stringstream ss;
ss << "Expecting status 101 (Switching Protocol), got " << status
<< " status connecting to " << host << ":" << port << ", HTTP Status line: " << line;
return WebSocketInitResult(false, status, ss.str());
}
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
auto headers = result.second;
@ -193,6 +184,17 @@ namespace ix
return WebSocketInitResult(false, status, "Error parsing HTTP headers");
}
// We want an 101 HTTP status for websocket, otherwise it could be
// a redirection (like 301)
if (status != 101)
{
std::stringstream ss;
ss << "Expecting status 101 (Switching Protocol), got " << status
<< " status connecting to " << url << ", HTTP Status line: " << line;
return WebSocketInitResult(false, status, ss.str(), headers, path);
}
// Check the presence of the connection field
if (headers.find("connection") == headers.end())
{

View File

@ -43,6 +43,7 @@ namespace ix
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
const RemoteUrlsMapping& remoteUrlsMapping,
bool /*verbose*/)
{
ix::WebSocketServer server(port, hostname);
@ -53,61 +54,75 @@ namespace ix
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback([remoteUrl](std::weak_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
server.setOnConnectionCallback(
[remoteUrl, remoteUrlsMapping](std::weak_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
// Server connection
state->webSocket().setOnMessageCallback(
[webSocket, state, remoteIp](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close)
{
state->setTerminated();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto ws = webSocket.lock();
if (ws)
// Server connection
state->webSocket().setOnMessageCallback(
[webSocket, state, remoteIp](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close)
{
ws->send(msg->str, msg->binary);
state->setTerminated();
}
}
});
// Client connection
auto ws = webSocket.lock();
if (ws)
{
ws->setOnMessageCallback([state, remoteUrl](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto ws = webSocket.lock();
if (ws)
{
ws->send(msg->str, msg->binary);
}
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
state->webSocket().send(msg->str, msg->binary);
}
});
}
});
});
// Client connection
auto ws = webSocket.lock();
if (ws)
{
ws->setOnMessageCallback([state, remoteUrl, remoteUrlsMapping](
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
// Connect to the 'real' server
std::string url(remoteUrl);
// maybe we want a different url based on the mapping
std::string host = msg->openInfo.headers["Host"];
auto it = remoteUrlsMapping.find(host);
if (it != remoteUrlsMapping.end())
{
url = it->second;
}
// append the uri to form the full url
// (say ws://localhost:1234/foo/?bar=baz)
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
state->webSocket().send(msg->str, msg->binary);
}
});
}
});
auto res = server.listen();
if (!res.first)

View File

@ -7,14 +7,18 @@
#include "IXSocketTLSOptions.h"
#include <cstdint>
#include <map>
#include <stddef.h>
#include <string>
namespace ix
{
using RemoteUrlsMapping = std::map<std::string, std::string>;
int websocket_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
const RemoteUrlsMapping& remoteUrlsMapping,
bool verbose);
} // namespace ix

View File

@ -107,36 +107,62 @@ namespace ix
std::string protocol, host, path, query;
int port;
std::string remoteUrl(url);
if (!UrlParser::parse(url, protocol, host, path, query, port))
WebSocketInitResult result;
const int maxRedirections = 10;
for (int i = 0; i < maxRedirections; ++i)
{
std::stringstream ss;
ss << "Could not parse url: '" << url << "'";
return WebSocketInitResult(false, 0, ss.str());
if (!UrlParser::parse(remoteUrl, protocol, host, path, query, port))
{
std::stringstream ss;
ss << "Could not parse url: '" << url << "'";
return WebSocketInitResult(false, 0, ss.str());
}
std::string errorMsg;
bool tls = protocol == "wss";
_socket = createSocket(tls, -1, errorMsg, _socketTLSOptions);
_perMessageDeflate = std::make_unique<WebSocketPerMessageDeflate>();
if (!_socket)
{
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket,
_perMessageDeflate,
_perMessageDeflateOptions,
_enablePerMessageDeflate);
result = webSocketHandshake.clientHandshake(
remoteUrl, headers, host, path, port, timeoutSecs);
if (result.http_status >= 300 && result.http_status < 400)
{
auto it = result.headers.find("Location");
if (it == result.headers.end())
{
std::stringstream ss;
ss << "Missing Location Header for HTTP Redirect response. "
<< "Rejecting connection to " << url << ", status: " << result.http_status;
result.errorStr = ss.str();
break;
}
remoteUrl = it->second;
continue;
}
if (result.success)
{
setReadyState(ReadyState::OPEN);
}
return result;
}
std::string errorMsg;
bool tls = protocol == "wss";
_socket = createSocket(tls, -1, errorMsg, _socketTLSOptions);
_perMessageDeflate = std::make_unique<WebSocketPerMessageDeflate>();
if (!_socket)
{
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket,
_perMessageDeflate,
_perMessageDeflateOptions,
_enablePerMessageDeflate);
auto result =
webSocketHandshake.clientHandshake(url, headers, host, path, port, timeoutSecs);
if (result.success)
{
setReadyState(ReadyState::OPEN);
}
return result;
}
@ -633,7 +659,7 @@ namespace ix
// send back the CLOSE frame
sendCloseFrame(code, reason);
wakeUpFromPoll(Socket::kCloseRequest);
wakeUpFromPoll(SelectInterrupt::kCloseRequest);
bool remote = true;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
@ -853,7 +879,7 @@ namespace ix
// Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty())
{
wakeUpFromPoll(Socket::kSendRequest);
wakeUpFromPoll(SelectInterrupt::kSendRequest);
// FIXME: we should have a timeout when sending large messages: see #131
if (_blockingSend && !flushSendBuffer())
@ -1122,7 +1148,7 @@ namespace ix
sendCloseFrame(code, reason);
// wake up the poll, but do not close yet
wakeUpFromPoll(Socket::kSendRequest);
wakeUpFromPoll(SelectInterrupt::kSendRequest);
}
size_t WebSocketTransport::bufferedAmount() const

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "10.1.5"
#define IX_WEBSOCKET_VERSION "10.2.5"

View File

@ -39,8 +39,8 @@ ws_install:
ws_install_release:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
ws_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
ws_openssl_install:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; ninja install)
ws_mbedtls:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4)
@ -48,6 +48,9 @@ ws_mbedtls:
ws_no_ssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_WS=1 .. ; make -j 4)
ws_no_python:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 .. ; ninja install)
uninstall:
xargs rm -fv < build/install_manifest.txt

File diff suppressed because it is too large Load Diff

View File

@ -46,29 +46,6 @@ add_executable(ws
../third_party/msgpack11/msgpack11.cpp
../third_party/cpp-linenoise/linenoise.cpp
${JSONCPP_SOURCES}
ws_http_client.cpp
ws_ping_pong.cpp
ws_broadcast_server.cpp
ws_push_server.cpp
ws_echo_server.cpp
ws_echo_client.cpp
ws_chat.cpp
ws_connect.cpp
ws_transfer.cpp
ws_send.cpp
ws_receive.cpp
ws_redis_cli.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws_redis_server.cpp
ws_snake.cpp
ws_cobra_metrics_publish.cpp
ws_cobra_publish.cpp
ws_httpd.cpp
ws_autobahn.cpp
ws_sentry_minidump_upload.cpp
ws_dns_lookup.cpp
ws.cpp)
# library with the most dependencies come first

6
ws/proxyConfig.json Normal file
View File

@ -0,0 +1,6 @@
{
"remote_urls": {
"echo.localhost:8008": "ws://localhost:8009",
"cobra.localhost:8008": "ws://localhost:5678"
}
}

39
ws/test_ws_proxy.sh Normal file
View File

@ -0,0 +1,39 @@
#!/bin/sh
# This test requires cobra to be available
which cobra > /dev/null || {
echo cobra is not installed on this machine.
exit 0
}
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
exit_code=${1:-1}
echo "Killing all servers (ws and cobra)"
echo
kill `cat /tmp/pidfile.proxy` &>/dev/null
kill `cat /tmp/pidfile.echo_server` &>/dev/null
kill `cat /tmp/pidfile.cobra` &>/dev/null
kill `cat /tmp/pidfile.connect.echo` &>/dev/null
kill `cat /tmp/pidfile.connect.cobra` &>/dev/null
exit ${exit_code}
}
ws proxy_server --pidfile /tmp/pidfile.proxy --config_path proxyConfig.json &
ws echo_server --pidfile /tmp/pidfile.echo_server --port 8009 &
cobra -v run --pidfile /tmp/pidfile.cobra --port 5678 &
# Wait for the servers to be up
sleep 1
# unbuffer comes with expect (written in tcl)
echo 'hello' | unbuffer ws connect --pidfile /tmp/pidfile.connect.echo ws://echo.localhost:8008 &
echo 'hello' | unbuffer ws connect --pidfile /tmp/pidfile.connect.cobra ws://cobra.localhost:8008 &
# Wait
sleep 2
cleanup

2731
ws/ws.cpp

File diff suppressed because it is too large Load Diff

144
ws/ws.h
View File

@ -1,144 +0,0 @@
/*
* ws.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <string>
namespace ix
{
int ws_http_client_main(const std::string& url,
const std::string& headers,
const std::string& data,
bool headersOnly,
int connectTimeout,
int transferTimeout,
bool followRedirects,
int maxRedirects,
bool verbose,
bool save,
const std::string& output,
bool compress,
const ix::SocketTLSOptions& tlsOptions);
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions);
int ws_echo_server_main(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong);
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong,
const std::string& sendMsg);
int ws_broadcast_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions);
int ws_transfer_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions);
int ws_chat_main(const std::string& url, const std::string& user);
int ws_connect_main(const std::string& url,
const std::string& headers,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs);
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend);
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate,
int delayMs,
const ix::SocketTLSOptions& tlsOptions);
int ws_send_main(const std::string& url,
const std::string& path,
bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions);
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password);
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message,
int count);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose);
int ws_cobra_publish_main(const ix::CobraConfig& appkey,
const std::string& channel,
const std::string& path);
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
bool stress);
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
const std::string& host,
int port);
int ws_snake_main(int port,
const std::string& hostname,
const std::string& redisHosts,
int redisPort,
const std::string& redisPassword,
bool verbose,
const std::string& appsConfigPath,
const ix::SocketTLSOptions& tlsOptions,
bool disablePong,
const std::string& republishChannel);
int ws_httpd_main(int port,
const std::string& hostname,
bool redirect,
const std::string& redirectUrl,
const ix::SocketTLSOptions& tlsOptions);
int ws_autobahn_main(const std::string& url, bool quiet);
int ws_redis_server_main(int port, const std::string& hostname);
int ws_sentry_minidump_upload(const std::string& metadataPath,
const std::string& minidump,
const std::string& project,
const std::string& key,
bool verbose);
int ws_dns_lookup(const std::string& hostname);
} // namespace ix

View File

@ -1,298 +0,0 @@
/*
* ws_autobahn.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
//
// 1. First you need to generate a config file in a config folder,
// which can use a white list of test to execute (with globbing),
// or a black list of tests to ignore
//
// config/fuzzingserver.json
// {
// "url": "ws://127.0.0.1:9001",
// "outdir": "./reports/clients",
// "cases": ["2.*"],
// "exclude-cases": [
// ],
// "exclude-agent-cases": {}
// }
//
//
// 2 Run the test server (using docker)
// docker run -it --rm -v "${PWD}/config:/config" -v "${PWD}/reports:/reports" -p 9001:9001 --name
// fuzzingserver crossbario/autobahn-testsuite
//
// 3. Run this command
// ws autobahn -q --url ws://localhost:9001
//
// 4. A HTML report will be generated, you can inspect it to see if you are compliant or not
//
#include <atomic>
#include <condition_variable>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h>
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
namespace
{
std::string truncate(const std::string& str, size_t n)
{
if (str.size() < n)
{
return str;
}
else
{
return str.substr(0, n) + "...";
}
}
} // namespace
namespace ix
{
class AutobahnTestCase
{
public:
AutobahnTestCase(const std::string& _url, bool quiet);
void run();
private:
void log(const std::string& msg);
std::string _url;
ix::WebSocket _webSocket;
bool _quiet;
std::mutex _mutex;
std::condition_variable _condition;
};
AutobahnTestCase::AutobahnTestCase(const std::string& url, bool quiet)
: _url(url)
, _quiet(quiet)
{
_webSocket.disableAutomaticReconnection();
// FIXME: this should be on by default
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
void AutobahnTestCase::log(const std::string& msg)
{
if (!_quiet)
{
spdlog::info(msg);
}
}
void AutobahnTestCase::run()
{
_webSocket.setUrl(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("autobahn: connected");
ss << "Uri: " << msg->openInfo.uri << std::endl;
ss << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
ss << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "autobahn: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
_condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "autobahn: received message: " << truncate(msg->str, 40) << std::endl;
_webSocket.send(msg->str, msg->binary);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
// And error can happen, in which case the test-case is marked done
_condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
ss << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
ss << "Received ping" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
ss << "Received pong" << std::endl;
}
else
{
ss << "Invalid ix::WebSocketMessageType" << std::endl;
}
log(ss.str());
});
_webSocket.start();
log("Waiting for test completion ...");
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock);
_webSocket.stop();
}
bool generateReport(const std::string& url)
{
ix::WebSocket webSocket;
std::string reportUrl(url);
reportUrl += "/updateReports?agent=ixwebsocket";
webSocket.setUrl(reportUrl);
webSocket.disableAutomaticReconnection();
std::atomic<bool> success(true);
std::condition_variable condition;
webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Report generated");
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
spdlog::info(ss.str());
success = false;
}
});
webSocket.start();
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (!success)
{
spdlog::error("Cannot generate report at url {}", reportUrl);
}
return success;
}
int getTestCaseCount(const std::string& url)
{
ix::WebSocket webSocket;
std::string caseCountUrl(url);
caseCountUrl += "/getCaseCount";
webSocket.setUrl(caseCountUrl);
webSocket.disableAutomaticReconnection();
int count = -1;
std::condition_variable condition;
webSocket.setOnMessageCallback([&condition, &count](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close)
{
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
spdlog::info(ss.str());
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
// response is a string
std::stringstream ss;
ss << msg->str;
ss >> count;
}
});
webSocket.start();
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (count == -1)
{
spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl);
}
return count;
}
//
// make && bench ws autobahn --url ws://localhost:9001
//
int ws_autobahn_main(const std::string& url, bool quiet)
{
int testCasesCount = getTestCaseCount(url);
spdlog::info("Test cases count: {}", testCasesCount);
if (testCasesCount == -1)
{
spdlog::error("Cannot retrieve test case count at url {}", url);
return 1;
}
testCasesCount++;
for (int i = 1; i < testCasesCount; ++i)
{
spdlog::info("Execute test case {}", i);
int caseNumber = i;
std::stringstream ss;
ss << url << "/runCase?case=" << caseNumber << "&agent=ixwebsocket";
std::string url(ss.str());
AutobahnTestCase testCase(url, quiet);
testCase.run();
}
return generateReport(url) ? 0 : 1;
}
} // namespace ix

View File

@ -1,98 +0,0 @@
/*
* ws_broadcast_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_broadcast_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed connection: code {} reason {}",
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
spdlog::info(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
spdlog::info("Received message fragment");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
for (auto&& client : server.getClients())
{
if (client.get() != &webSocket)
{
client->send(msg->str, msg->binary, [](int current, int total) -> bool {
spdlog::info("Step {} out of {}", current, total);
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
spdlog::info("{} bytes left to be sent", bufferedAmount);
std::chrono::duration<double, std::milli> duration(500);
std::this_thread::sleep_for(duration);
} while (client->bufferedAmount() != 0);
}
}
}
});
auto res = server.listen();
if (!res.first)
{
spdlog::info(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -1,191 +0,0 @@
/*
* ws_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
//
// Simple chat program that talks to a broadcast server
// Broadcast server can be ran with `ws broadcast_server`
//
#include "nlohmann/json.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h>
#include <queue>
#include <spdlog/spdlog.h>
#include <sstream>
// for convenience
using json = nlohmann::json;
namespace ix
{
class WebSocketChat
{
public:
WebSocketChat(const std::string& url, const std::string& user);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
std::string encodeMessage(const std::string& text);
std::pair<std::string, std::string> decodeMessage(const std::string& str);
private:
std::string _url;
std::string _user;
ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue;
void log(const std::string& msg);
};
WebSocketChat::WebSocketChat(const std::string& url, const std::string& user)
: _url(url)
, _user(user)
{
;
}
void WebSocketChat::log(const std::string& msg)
{
spdlog::info(msg);
}
size_t WebSocketChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
}
bool WebSocketChat::isReady() const
{
return _webSocket.getReadyState() == ix::ReadyState::Open;
}
void WebSocketChat::stop()
{
_webSocket.stop();
}
void WebSocketChat::start()
{
_webSocket.setUrl(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws chat: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
spdlog::info("ws chat: user {} connected !", _user);
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws chat user disconnected: " << _user;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out.
// store text
_receivedQueue.push(result.second);
ss << std::endl
<< result.first << "(" << msg->wireSize << " bytes)"
<< " > " << result.second << std::endl
<< _user << " > ";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
std::pair<std::string, std::string> WebSocketChat::decodeMessage(const std::string& str)
{
auto j = json::parse(str);
std::string msg_user = j["user"];
std::string msg_text = j["text"];
return std::pair<std::string, std::string>(msg_user, msg_text);
}
std::string WebSocketChat::encodeMessage(const std::string& text)
{
json j;
j["user"] = _user;
j["text"] = text;
std::string output = j.dump();
return output;
}
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.sendText(encodeMessage(text));
}
int ws_chat_main(const std::string& url, const std::string& user)
{
spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketChat webSocketChat(url, user);
webSocketChat.start();
while (true)
{
// Read line
std::string line;
std::cout << user << " > " << std::flush;
std::getline(std::cin, line);
if (!std::cin)
{
break;
}
webSocketChat.sendMessage(line);
}
spdlog::info("");
webSocketChat.stop();
return 0;
}
} // namespace ix

View File

@ -1,77 +0,0 @@
/*
* ws_cobra_metrics_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include <chrono>
#include <fstream>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path,
bool stress)
{
std::atomic<int> sentMessages(0);
std::atomic<int> ackedMessages(0);
CobraConnection::setPublishTrackerCallback(
[&sentMessages, &ackedMessages](bool sent, bool acked) {
if (sent) sentMessages++;
if (acked) ackedMessages++;
});
CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true);
cobraMetricsPublisher.configure(config, channel);
while (!cobraMetricsPublisher.isAuthenticated())
;
std::ifstream f(path);
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data)) return 1;
if (!stress)
{
auto msgId = cobraMetricsPublisher.push(channel, data);
spdlog::info("Sent message: {}", msgId);
}
else
{
// Stress mode to try to trigger server and client bugs
while (true)
{
for (int i = 0; i < 1000; ++i)
{
cobraMetricsPublisher.push(channel, data);
}
cobraMetricsPublisher.suspend();
cobraMetricsPublisher.resume();
// FIXME: investigate why without this check we trigger a lock
while (!cobraMetricsPublisher.isAuthenticated())
;
}
}
// Wait a bit for the message to get a chance to be sent
// there isn't any ack on publish right now so it's the best we can do
// FIXME: this comment is a lie now
std::this_thread::sleep_for(std::chrono::milliseconds(100));
spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages);
return 0;
}
} // namespace ix

View File

@ -1,106 +0,0 @@
/*
* ws_cobra_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include <chrono>
#include <fstream>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
int ws_cobra_publish_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& path)
{
std::ifstream f(path);
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data))
{
spdlog::info("Input file is not a JSON file");
return 1;
}
ix::CobraConnection conn;
conn.configure(config);
// Display incoming messages
std::atomic<bool> authenticated(false);
std::atomic<bool> messageAcked(false);
conn.setEventCallback(
[&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open)
{
spdlog::info("Publisher connected");
for (auto&& it : event->headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (event->type == ix::CobraEventType::Closed)
{
spdlog::info("Subscriber closed: {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::Authenticated)
{
spdlog::info("Publisher authenticated");
authenticated = true;
Json::Value channels;
channels[0] = channel;
auto msgId = conn.publish(channels, data);
spdlog::info("Published msg {}", msgId);
}
else if (event->type == ix::CobraEventType::Subscribed)
{
spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::UnSubscribed)
{
spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::Error)
{
spdlog::error("Publisher: error {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::Published)
{
spdlog::info("Published message id {} acked", event->msgId);
messageAcked = true;
}
else if (event->type == ix::CobraEventType::Pong)
{
spdlog::info("Received websocket pong");
}
else if (event->type == ix::CobraEventType::HandshakeError)
{
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
}
});
conn.connect();
while (!authenticated)
;
while (!messageAcked)
;
return 0;
}
} // namespace ix

View File

@ -1,288 +0,0 @@
/*
* ws_connect.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXBench.h"
#include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
class WebSocketConnect
{
public:
WebSocketConnect(const std::string& _url,
const std::string& headers,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs);
void subscribe(const std::string& channel);
void start();
void stop();
int getSentBytes()
{
return _sentBytes;
}
int getReceivedBytes()
{
return _receivedBytes;
}
void sendMessage(const std::string& text);
private:
std::string _url;
WebSocketHttpHeaders _headers;
ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
bool _binaryMode;
std::atomic<int> _receivedBytes;
std::atomic<int> _sentBytes;
void log(const std::string& msg);
WebSocketHttpHeaders parseHeaders(const std::string& data);
};
WebSocketConnect::WebSocketConnect(const std::string& url,
const std::string& headers,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs)
: _url(url)
, _disablePerMessageDeflate(disablePerMessageDeflate)
, _binaryMode(binaryMode)
, _receivedBytes(0)
, _sentBytes(0)
{
if (disableAutomaticReconnection)
{
_webSocket.disableAutomaticReconnection();
}
_webSocket.setMaxWaitBetweenReconnectionRetries(maxWaitBetweenReconnectionRetries);
_webSocket.setTLSOptions(tlsOptions);
_webSocket.setPingInterval(pingIntervalSecs);
_headers = parseHeaders(headers);
if (!subprotocol.empty())
{
_webSocket.addSubProtocol(subprotocol);
}
WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) {
if (incoming)
{
_receivedBytes += size;
}
else
{
_sentBytes += size;
}
});
}
void WebSocketConnect::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
WebSocketHttpHeaders WebSocketConnect::parseHeaders(const std::string& data)
{
WebSocketHttpHeaders headers;
// Split by \n
std::string token;
std::stringstream tokenStream(data);
while (std::getline(tokenStream, token))
{
std::size_t pos = token.rfind(':');
// Bail out if last '.' is found
if (pos == std::string::npos) continue;
auto key = token.substr(0, pos);
auto val = token.substr(pos + 1);
spdlog::info("{}: {}", key, val);
headers[key] = val;
}
return headers;
}
void WebSocketConnect::stop()
{
{
Bench bench("ws_connect: stop connection");
_webSocket.stop();
}
}
void WebSocketConnect::start()
{
_webSocket.setUrl(_url);
_webSocket.setExtraHeaders(_headers);
if (_disablePerMessageDeflate)
{
_webSocket.disablePerMessageDeflate();
}
else
{
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_connect: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_connect: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
ss << "ws_connect: received message: " << msg->str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
spdlog::info("Received message fragment");
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
spdlog::info("Received ping");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void WebSocketConnect::sendMessage(const std::string& text)
{
if (_binaryMode)
{
_webSocket.sendBinary(text);
}
else
{
_webSocket.sendText(text);
}
}
int ws_connect_main(const std::string& url,
const std::string& headers,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url,
headers,
disableAutomaticReconnection,
disablePerMessageDeflate,
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions,
subprotocol,
pingIntervalSecs);
webSocketChat.start();
while (true)
{
// Read line
std::string line;
auto quit = linenoise::Readline("> ", line);
if (quit)
{
break;
}
if (line == "/stop")
{
spdlog::info("Stopping connection...");
webSocketChat.stop();
continue;
}
if (line == "/start")
{
spdlog::info("Starting connection...");
webSocketChat.start();
continue;
}
webSocketChat.sendMessage(line);
// Add text to history
linenoise::AddHistory(line.c_str());
}
spdlog::info("");
webSocketChat.stop();
spdlog::info("Received {} bytes", webSocketChat.getReceivedBytes());
spdlog::info("Sent {} bytes", webSocketChat.getSentBytes());
return 0;
}
} // namespace ix

View File

@ -1,34 +0,0 @@
/*
* ws_dns_lookup.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include <ixwebsocket/IXDNSLookup.h>
#include <ixwebsocket/IXNetSystem.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_dns_lookup(const std::string& hostname)
{
auto dnsLookup = std::make_shared<DNSLookup>(hostname, 80);
std::string errMsg;
struct addrinfo* res;
res = dnsLookup->resolve(errMsg, [] { return false; });
auto addr = res->ai_addr;
char str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &addr, str, INET_ADDRSTRLEN);
spdlog::info("host: {} ip: {}", hostname, str);
return 0;
}
} // namespace ix

View File

@ -1,121 +0,0 @@
/*
* ws_echo_client.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSetThreadName.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend)
{
// Our websocket object
ix::WebSocket webSocket;
webSocket.setUrl(url);
webSocket.setTLSOptions(tlsOptions);
webSocket.setPingInterval(pingIntervalSecs);
if (disablePerMessageDeflate)
{
webSocket.disablePerMessageDeflate();
}
if (!subprotocol.empty())
{
webSocket.addSubProtocol(subprotocol);
}
std::atomic<uint64_t> receivedCount(0);
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
// when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
if (!noSend)
{
webSocket.send(msg->str, msg->binary);
}
receivedCount++;
}
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_echo_client: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
webSocket.send(sendMsg, binaryMode);
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
});
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
setThreadName("Timer");
while (true)
{
//
// We cannot write to sentCount and receivedCount
// as those are used externally, so we need to introduce
// our own counters
//
std::stringstream ss;
ss << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total";
CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
// Now that our callback is setup, we can start our background thread and receive messages
std::cout << "Connecting to " << url << "..." << std::endl;
webSocket.start();
// Send a message to the server (default to TEXT mode)
webSocket.send("hello world");
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
webSocket.send(text);
}
return 0;
}
} // namespace ix

View File

@ -1,101 +0,0 @@
/*
* ws_echo_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_echo_server_main(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port,
hostname,
SocketServer::kDefaultTcpBacklog,
SocketServer::kDefaultMaxConnections,
WebSocketServer::kDefaultHandShakeTimeoutSecs,
(ipv6) ? AF_INET6 : AF_INET);
server.setTLSOptions(tlsOptions);
if (disablePerMessageDeflate)
{
spdlog::info("Disable per message deflate");
server.disablePerMessageDeflate();
}
if (disablePong)
{
spdlog::info("Disable responding to PING messages with PONG");
server.disablePong();
}
server.setOnClientMessageCallback(
[greetings](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket.sendText("Welcome !");
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
webSocket.send(msg->str, msg->binary);
}
});
auto res = server.listen();
if (!res.first)
{
spdlog::error(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -1,185 +0,0 @@
/*
* http_client.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <fstream>
#include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
std::string extractFilename(const std::string& path)
{
std::string::size_type idx;
idx = path.rfind('/');
if (idx != std::string::npos)
{
std::string filename = path.substr(idx + 1);
return filename;
}
else
{
return path;
}
}
WebSocketHttpHeaders parseHeaders(const std::string& data)
{
WebSocketHttpHeaders headers;
// Split by \n
std::string token;
std::stringstream tokenStream(data);
while (std::getline(tokenStream, token))
{
std::size_t pos = token.rfind(':');
// Bail out if last '.' is found
if (pos == std::string::npos) continue;
auto key = token.substr(0, pos);
auto val = token.substr(pos + 1);
spdlog::info("{}: {}", key, val);
headers[key] = val;
}
return headers;
}
//
// Useful endpoint to test HTTP post
// https://postman-echo.com/post
//
HttpParameters parsePostParameters(const std::string& data)
{
HttpParameters httpParameters;
// Split by \n
std::string token;
std::stringstream tokenStream(data);
while (std::getline(tokenStream, token))
{
std::size_t pos = token.rfind('=');
// Bail out if last '.' is found
if (pos == std::string::npos) continue;
auto key = token.substr(0, pos);
auto val = token.substr(pos + 1);
spdlog::info("{}: {}", key, val);
httpParameters[key] = val;
}
return httpParameters;
}
int ws_http_client_main(const std::string& url,
const std::string& headersData,
const std::string& data,
bool headersOnly,
int connectTimeout,
int transferTimeout,
bool followRedirects,
int maxRedirects,
bool verbose,
bool save,
const std::string& output,
bool compress,
const ix::SocketTLSOptions& tlsOptions)
{
HttpClient httpClient;
httpClient.setTLSOptions(tlsOptions);
auto args = httpClient.createRequest();
args->extraHeaders = parseHeaders(headersData);
args->connectTimeout = connectTimeout;
args->transferTimeout = transferTimeout;
args->followRedirects = followRedirects;
args->maxRedirects = maxRedirects;
args->verbose = verbose;
args->compress = compress;
args->logger = [](const std::string& msg) { spdlog::info(msg); };
args->onProgressCallback = [verbose](int current, int total) -> bool {
if (verbose)
{
spdlog::info("Downloaded {} bytes out of {}", current, total);
}
return true;
};
HttpParameters httpParameters = parsePostParameters(data);
HttpResponsePtr response;
if (headersOnly)
{
response = httpClient.head(url, args);
}
else if (data.empty())
{
response = httpClient.get(url, args);
}
else
{
response = httpClient.post(url, httpParameters, args);
}
spdlog::info("");
for (auto it : response->headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{
spdlog::info("error message: ", response->errorMsg);
}
if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
{
if (save || !output.empty())
{
// FIMXE we should decode the url first
std::string filename = extractFilename(url);
if (!output.empty())
{
filename = output;
}
spdlog::info("Writing to disk: {}", filename);
std::ofstream out(filename);
out.write((char*) &response->payload.front(), response->payload.size());
out.close();
}
else
{
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
else
{
spdlog::info("Binary output can mess up your terminal.");
spdlog::info("Use the -O flag to save the file to disk.");
spdlog::info("You can also use the --output option to specify a filename.");
}
}
}
return 0;
}
} // namespace ix

View File

@ -1,43 +0,0 @@
/*
* ws_httpd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <fstream>
#include <ixwebsocket/IXHttpServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <vector>
namespace ix
{
int ws_httpd_main(int port,
const std::string& hostname,
bool redirect,
const std::string& redirectUrl,
const ix::SocketTLSOptions& tlsOptions)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::HttpServer server(port, hostname);
server.setTLSOptions(tlsOptions);
if (redirect)
{
server.makeRedirectServer(redirectUrl);
}
auto res = server.listen();
if (!res.first)
{
spdlog::error(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -1,161 +0,0 @@
/*
* ws_ping_pong.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
class WebSocketPingPong
{
public:
WebSocketPingPong(const std::string& _url, const ix::SocketTLSOptions& tlsOptions);
void subscribe(const std::string& channel);
void start();
void stop();
void ping(const std::string& text);
void send(const std::string& text);
private:
std::string _url;
ix::WebSocket _webSocket;
void log(const std::string& msg);
};
WebSocketPingPong::WebSocketPingPong(const std::string& url,
const ix::SocketTLSOptions& tlsOptions)
: _url(url)
{
_webSocket.setTLSOptions(tlsOptions);
}
void WebSocketPingPong::log(const std::string& msg)
{
spdlog::info(msg);
}
void WebSocketPingPong::stop()
{
_webSocket.stop();
}
void WebSocketPingPong::start()
{
_webSocket.setUrl(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
spdlog::info("Received {} bytes", msg->wireSize);
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ping_pong: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ping_pong: disconnected:"
<< " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason
<< msg->str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "ping_pong: received message: " << msg->str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
ss << "ping_pong: received ping message: " << msg->str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
ss << "ping_pong: received pong message: " << msg->str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void WebSocketPingPong::ping(const std::string& text)
{
if (!_webSocket.ping(text).success)
{
std::cerr << "Failed to send ping message. Message too long (> 125 bytes) or endpoint "
"is disconnected"
<< std::endl;
}
}
void WebSocketPingPong::send(const std::string& text)
{
_webSocket.send(text);
}
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions)
{
spdlog::info("Type Ctrl-D to exit prompt...");
WebSocketPingPong webSocketPingPong(url, tlsOptions);
webSocketPingPong.start();
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
if (!std::cin)
{
break;
}
if (text == "/close")
{
webSocketPingPong.send(text);
}
else
{
webSocketPingPong.ping(text);
}
}
std::cout << std::endl;
webSocketPingPong.stop();
return 0;
}
} // namespace ix

View File

@ -1,108 +0,0 @@
/*
* ws_push_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong,
const std::string& sendMsg)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port,
hostname,
SocketServer::kDefaultTcpBacklog,
SocketServer::kDefaultMaxConnections,
WebSocketServer::kDefaultHandShakeTimeoutSecs,
(ipv6) ? AF_INET6 : AF_INET);
server.setTLSOptions(tlsOptions);
if (disablePerMessageDeflate)
{
spdlog::info("Disable per message deflate");
server.disablePerMessageDeflate();
}
if (disablePong)
{
spdlog::info("Disable responding to PING messages with PONG");
server.disablePong();
}
server.setOnClientMessageCallback(
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket.sendText("Welcome !");
}
bool binary = false;
while (true)
{
webSocket.send(sendMsg, binary);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
webSocket.send(msg->str, msg->binary);
}
});
auto res = server.listen();
if (!res.first)
{
spdlog::error(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -1,282 +0,0 @@
/*
* ws_receiver.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp>
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
#include <vector>
using msgpack11::MsgPack;
namespace ix
{
class WebSocketReceiver
{
public:
WebSocketReceiver(const std::string& _url,
bool enablePerMessageDeflate,
int delayMs,
const ix::SocketTLSOptions& tlsOptions);
void subscribe(const std::string& channel);
void start();
void stop();
void waitForConnection();
void waitForMessage();
void handleMessage(const std::string& str);
private:
std::string _url;
std::string _id;
ix::WebSocket _webSocket;
bool _enablePerMessageDeflate;
int _delayMs;
int _receivedFragmentCounter;
std::mutex _conditionVariableMutex;
std::condition_variable _condition;
std::string extractFilename(const std::string& path);
void handleError(const std::string& errMsg, const std::string& id);
void log(const std::string& msg);
};
WebSocketReceiver::WebSocketReceiver(const std::string& url,
bool enablePerMessageDeflate,
int delayMs,
const ix::SocketTLSOptions& tlsOptions)
: _url(url)
, _enablePerMessageDeflate(enablePerMessageDeflate)
, _delayMs(delayMs)
, _receivedFragmentCounter(0)
{
_webSocket.disableAutomaticReconnection();
_webSocket.setTLSOptions(tlsOptions);
}
void WebSocketReceiver::stop()
{
_webSocket.stop();
}
void WebSocketReceiver::log(const std::string& msg)
{
spdlog::info(msg);
}
void WebSocketReceiver::waitForConnection()
{
spdlog::info("{}: Connecting...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
void WebSocketReceiver::waitForMessage()
{
spdlog::info("{}: Waiting for message...", "ws_receive");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
// We should cleanup the file name and full path further to remove .. as well
std::string WebSocketReceiver::extractFilename(const std::string& path)
{
std::string::size_type idx;
idx = path.rfind('/');
if (idx != std::string::npos)
{
std::string filename = path.substr(idx + 1);
return filename;
}
else
{
return path;
}
}
void WebSocketReceiver::handleError(const std::string& errMsg, const std::string& id)
{
std::map<MsgPack, MsgPack> pdu;
pdu["kind"] = "error";
pdu["id"] = id;
pdu["message"] = errMsg;
MsgPack msg(pdu);
_webSocket.sendBinary(msg.dump());
}
void WebSocketReceiver::handleMessage(const std::string& str)
{
spdlog::info("ws_receive: Received message: {}", str.size());
std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg);
if (!errMsg.empty())
{
handleError("ws_receive: Invalid MsgPack", std::string());
return;
}
spdlog::info("id: {}", data["id"].string_value());
std::vector<uint8_t> content = data["content"].binary_items();
spdlog::info("ws_receive: Content size: {}", content.size());
// Validate checksum
uint64_t cksum = ix::djb2Hash(content);
auto cksumRef = data["djb2_hash"].string_value();
spdlog::info("ws_receive: Computed hash: {}", cksum);
spdlog::info("ws_receive: Reference hash: {}", cksumRef);
if (std::to_string(cksum) != cksumRef)
{
handleError("Hash mismatch.", std::string());
return;
}
std::string filename = data["filename"].string_value();
filename = extractFilename(filename);
std::string filenameTmp = filename + ".tmp";
spdlog::info("ws_receive: Writing to disk: {}", filenameTmp);
std::ofstream out(filenameTmp);
out.write((char*) &content.front(), content.size());
out.close();
spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename);
rename(filenameTmp.c_str(), filename.c_str());
std::map<MsgPack, MsgPack> pdu;
pdu["ack"] = true;
pdu["id"] = data["id"];
pdu["filename"] = data["filename"];
spdlog::info("Sending ack to sender");
MsgPack msg(pdu);
_webSocket.sendBinary(msg.dump());
}
void WebSocketReceiver::start()
{
_webSocket.setUrl(_url);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
_enablePerMessageDeflate, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
std::stringstream ss;
log(std::string("ws_receive: Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
_condition.notify_one();
log("ws_receive: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_receive: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "ws_receive: transfered " << msg->wireSize << " bytes";
log(ss.str());
handleMessage(msg->str);
_condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str());
if (_delayMs > 0)
{
// Introduce an arbitrary delay, to simulate a slow connection
std::chrono::duration<double, std::milli> duration(_delayMs);
std::this_thread::sleep_for(duration);
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "ws_receive ";
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("ws_receive: received ping");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("ws_receive: received pong");
}
else
{
ss << "ws_receive: Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void wsReceive(const std::string& url,
bool enablePerMessageDeflate,
int delayMs,
const ix::SocketTLSOptions& tlsOptions)
{
WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate, delayMs, tlsOptions);
webSocketReceiver.start();
webSocketReceiver.waitForConnection();
webSocketReceiver.waitForMessage();
std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration);
spdlog::info("ws_receive: Done !");
webSocketReceiver.stop();
}
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate,
int delayMs,
const ix::SocketTLSOptions& tlsOptions)
{
wsReceive(url, enablePerMessageDeflate, delayMs, tlsOptions);
return 0;
}
} // namespace ix

View File

@ -1,84 +0,0 @@
/*
* ws_redis_cli.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "linenoise.hpp"
#include <iostream>
#include <ixredis/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
spdlog::info("Cannot connect to redis host");
return 1;
}
if (!password.empty())
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
spdlog::info("Cannot authenticated to redis");
return 1;
}
spdlog::info("Auth response: {}", authResponse);
}
while (true)
{
// Read line
std::string line;
std::string prompt;
prompt += hostname;
prompt += ":";
prompt += std::to_string(port);
prompt += "> ";
auto quit = linenoise::Readline(prompt.c_str(), line);
if (quit)
{
break;
}
std::stringstream ss(line);
std::vector<std::string> args;
std::string arg;
while (ss.good())
{
ss >> arg;
args.push_back(arg);
}
std::string errMsg;
auto response = redisClient.send(args, errMsg);
if (!errMsg.empty())
{
spdlog::error("(error) {}", errMsg);
}
else
{
if (response.first != RespType::String)
{
std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")"
<< " ";
}
std::cout << response.second << std::endl;
}
linenoise::AddHistory(line.c_str());
}
return 0;
}
} // namespace ix

View File

@ -1,51 +0,0 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <ixredis/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message,
int count)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
spdlog::info("Cannot connect to redis host");
return 1;
}
if (!password.empty())
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
spdlog::info("Cannot authenticated to redis");
return 1;
}
spdlog::info("Auth response: {}", authResponse);
}
std::string errMsg;
for (int i = 0; i < count; i++)
{
if (!redisClient.publish(channel, message, errMsg))
{
spdlog::error("Error publishing to channel {} error {}", channel, errMsg);
return 1;
}
}
return 0;
}
} // namespace ix

View File

@ -1,31 +0,0 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <ixredis/IXRedisServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_redis_server_main(int port, const std::string& hostname)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::RedisServer server(port, hostname);
auto res = server.listen();
if (!res.first)
{
spdlog::info(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -1,80 +0,0 @@
/*
* ws_redis_subscribe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include <chrono>
#include <ixredis/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
spdlog::info("Cannot connect to redis host");
return 1;
}
if (!password.empty())
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
spdlog::info("Cannot authenticated to redis");
return 1;
}
spdlog::info("Auth response: {}", authResponse);
}
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) {
if (verbose)
{
spdlog::info("recived: {}", message);
}
msgPerSeconds++;
msgCount++;
};
auto responseCallback = [](const std::string& redisResponse) {
spdlog::info("Redis subscribe response: {}", redisResponse);
};
auto timer = [&msgPerSeconds, &msgCount] {
while (true)
{
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t(timer);
spdlog::info("Subscribing to {} ...", channel);
if (!redisClient.subscribe(channel, responseCallback, callback))
{
spdlog::info("Error subscribing to channel {}", channel);
return 1;
}
return 0;
}
} // namespace ix

View File

@ -1,311 +0,0 @@
/*
* ws_send.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXBench.h"
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
#include <ixcrypto/IXUuid.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <msgpack11/msgpack11.hpp>
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
#include <vector>
using msgpack11::MsgPack;
namespace ix
{
class WebSocketSender
{
public:
WebSocketSender(const std::string& _url,
bool enablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions);
void subscribe(const std::string& channel);
void start();
void stop();
void waitForConnection();
void waitForAck();
bool sendMessage(const std::string& filename, bool throttle);
private:
std::string _url;
std::string _id;
ix::WebSocket _webSocket;
bool _enablePerMessageDeflate;
std::atomic<bool> _connected;
std::mutex _conditionVariableMutex;
std::condition_variable _condition;
void log(const std::string& msg);
};
WebSocketSender::WebSocketSender(const std::string& url,
bool enablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions)
: _url(url)
, _enablePerMessageDeflate(enablePerMessageDeflate)
, _connected(false)
{
_webSocket.disableAutomaticReconnection();
_webSocket.setTLSOptions(tlsOptions);
}
void WebSocketSender::stop()
{
_webSocket.stop();
}
void WebSocketSender::log(const std::string& msg)
{
spdlog::info(msg);
}
void WebSocketSender::waitForConnection()
{
spdlog::info("{}: Connecting...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
void WebSocketSender::waitForAck()
{
spdlog::info("{}: Waiting for ack...", "ws_send");
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
std::vector<uint8_t> load(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize((size_t) size);
file.read((char*) &memblock.front(), static_cast<std::streamsize>(size));
return memblock;
}
void WebSocketSender::start()
{
_webSocket.setUrl(_url);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
_enablePerMessageDeflate, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
std::stringstream ss;
log(std::string("ws_send: Connecting to url: ") + _url);
_webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
_connected = true;
_condition.notify_one();
log("ws_send: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
_connected = false;
ss << "ws_send: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
_condition.notify_one();
ss << "ws_send: received message (" << msg->wireSize << " bytes)";
log(ss.str());
std::string errMsg;
MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty())
{
spdlog::info("Invalid MsgPack response");
return;
}
std::string id = data["id"].string_value();
if (_id != id)
{
spdlog::info("Invalid id");
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "ws_send ";
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
spdlog::info("ws_send: received ping");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("ws_send: received pong");
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
spdlog::info("ws_send: received fragment");
}
else
{
ss << "ws_send: Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
bool WebSocketSender::sendMessage(const std::string& filename, bool throttle)
{
std::vector<uint8_t> content;
{
Bench bench("ws_send: load file from disk");
content = load(filename);
}
_id = uuid4();
std::map<MsgPack, MsgPack> pdu;
pdu["kind"] = "send";
pdu["id"] = _id;
pdu["content"] = content;
auto hash = djb2Hash(content);
pdu["djb2_hash"] = std::to_string(hash);
pdu["filename"] = filename;
MsgPack msg(pdu);
auto serializedMsg = msg.dump();
spdlog::info("ws_send: sending {} bytes", serializedMsg.size());
Bench bench("ws_send: Sending file through websocket");
auto result =
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
if (throttle)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
return _connected;
});
if (!result.success)
{
spdlog::error("ws_send: Error sending file.");
return false;
}
if (!_connected)
{
spdlog::error("ws_send: Got disconnected from the server");
return false;
}
spdlog::info("ws_send: sent {} bytes", serializedMsg.size());
do
{
size_t bufferedAmount = _webSocket.bufferedAmount();
spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount);
std::chrono::duration<double, std::milli> duration(500);
std::this_thread::sleep_for(duration);
} while (_webSocket.bufferedAmount() != 0 && _connected);
if (_connected)
{
bench.report();
auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration;
transferRate /= (1024 * 1024);
spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
}
else
{
spdlog::error("ws_send: Got disconnected from the server");
}
return _connected;
}
void wsSend(const std::string& url,
const std::string& path,
bool enablePerMessageDeflate,
bool throttle,
const ix::SocketTLSOptions& tlsOptions)
{
WebSocketSender webSocketSender(url, enablePerMessageDeflate, tlsOptions);
webSocketSender.start();
webSocketSender.waitForConnection();
spdlog::info("ws_send: Sending...");
if (webSocketSender.sendMessage(path, throttle))
{
webSocketSender.waitForAck();
spdlog::info("ws_send: Done !");
}
else
{
spdlog::error("ws_send: Error sending file.");
}
webSocketSender.stop();
}
int ws_send_main(const std::string& url,
const std::string& path,
bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions)
{
bool throttle = false;
bool enablePerMessageDeflate = !disablePerMessageDeflate;
wsSend(url, path, enablePerMessageDeflate, throttle, tlsOptions);
return 0;
}
} // namespace ix

View File

@ -1,112 +0,0 @@
/*
* ws_sentry_minidump_upload.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <fstream>
#include <ixsentry/IXSentryClient.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace
{
// Assume the file exists
std::string readBytes(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize(size);
file.read((char*) &memblock.front(), static_cast<std::streamsize>(size));
std::string bytes(memblock.begin(), memblock.end());
return bytes;
}
} // namespace
namespace ix
{
int ws_sentry_minidump_upload(const std::string& metadataPath,
const std::string& minidump,
const std::string& project,
const std::string& key,
bool verbose)
{
SentryClient sentryClient((std::string()));
// Read minidump file from disk
std::string minidumpBytes = readBytes(minidump);
// Read json data
std::string sentryMetadata = readBytes(metadataPath);
std::atomic<bool> done(false);
sentryClient.uploadMinidump(
sentryMetadata,
minidumpBytes,
project,
key,
verbose,
[verbose, &done](const HttpResponsePtr& response) {
if (verbose)
{
for (auto it : response->headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{
spdlog::info("error message: {}", response->errorMsg);
}
if (response->headers["Content-Type"] != "application/octet-stream")
{
spdlog::info("payload: {}", response->payload);
}
}
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Status: {}", response->statusCode);
spdlog::error("Response: {}", response->payload);
}
else
{
spdlog::info("Event sent to sentry");
}
done = true;
});
int i = 0;
while (!done)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
if (i++ > 5000) break; // wait 5 seconds max
}
if (!done)
{
spdlog::error("Error: timing out trying to sent a crash to sentry");
}
return 0;
}
} // namespace ix

View File

@ -1,88 +0,0 @@
/*
* snake_run.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <fstream>
#include <ixsnake/IXSnakeServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace
{
std::vector<uint8_t> load(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize((size_t) size);
file.read((char*) &memblock.front(), static_cast<std::streamsize>(size));
return memblock;
}
std::string readAsString(const std::string& path)
{
auto vec = load(path);
return std::string(vec.begin(), vec.end());
}
} // namespace
namespace ix
{
int ws_snake_main(int port,
const std::string& hostname,
const std::string& redisHosts,
int redisPort,
const std::string& redisPassword,
bool verbose,
const std::string& appsConfigPath,
const SocketTLSOptions& socketTLSOptions,
bool disablePong,
const std::string& republishChannel)
{
snake::AppConfig appConfig;
appConfig.port = port;
appConfig.hostname = hostname;
appConfig.verbose = verbose;
appConfig.redisPort = redisPort;
appConfig.redisPassword = redisPassword;
appConfig.socketTLSOptions = socketTLSOptions;
appConfig.disablePong = disablePong;
appConfig.republishChannel = republishChannel;
// Parse config file
auto str = readAsString(appsConfigPath);
if (str.empty())
{
spdlog::error("Cannot read content of {}", appsConfigPath);
return 1;
}
spdlog::error(str);
auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"];
std::string token;
std::stringstream tokenStream(redisHosts);
while (std::getline(tokenStream, token, ';'))
{
appConfig.redisHosts.push_back(token);
}
// Display config on the terminal for debugging
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
snakeServer.runForever();
return 0; // should never reach this
}
} // namespace ix

View File

@ -1,135 +0,0 @@
/*
* ws_transfer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_transfer_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_transfer: New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
auto remaining = server.getClients().size() - 1;
spdlog::info("ws_transfer: {} remaining clients", remaining);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "ws_transfer: Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
spdlog::info(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
spdlog::info("ws_transfer: Received message fragment ");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("ws_transfer: Received {} bytes", msg->wireSize);
size_t receivers = 0;
for (auto&& client : server.getClients())
{
if (client.get() != &webSocket)
{
auto readyState = client->getReadyState();
auto id = connectionState->getId();
if (readyState == ReadyState::Open)
{
++receivers;
client->send(
msg->str, msg->binary, [&id](int current, int total) -> bool {
spdlog::info("{}: [client {}]: Step {} out of {}",
"ws_transfer",
id,
current,
total);
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
spdlog::info("{}: [client {}]: {} bytes left to send",
"ws_transfer",
id,
bufferedAmount);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
} while (client->bufferedAmount() != 0 &&
client->getReadyState() == ReadyState::Open);
}
else
{
std::string readyStateString =
readyState == ReadyState::Connecting
? "Connecting"
: readyState == ReadyState::Closing ? "Closing" : "Closed";
size_t bufferedAmount = client->bufferedAmount();
spdlog::info(
"{}: [client {}]: has readystate {} bytes left to be sent {}",
"ws_transfer",
id,
readyStateString,
bufferedAmount);
}
}
}
if (!receivers)
{
spdlog::info("ws_transfer: no remaining receivers");
}
}
});
auto res = server.listen();
if (!res.first)
{
spdlog::info(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix