Compare commits

..

22 Commits

Author SHA1 Message Date
128bc0afa9 (http server) read body request when the Content-Length is specified + set timeout to read the request to 30 seconds max by default, and make it configurable as a constructor parameter 2020-09-12 14:17:06 -07:00
b04e5c5529 http server: use socket->readBytes which reads in bulk instead of N calls to socket->readByte 2020-09-12 14:09:25 -07:00
1e8c421d66 formatting 2020-09-12 13:55:27 -07:00
72d6651ded Read body in parseRequest for HttpServer (#244)
Co-authored-by: Jay <jasoncarr@Jasons-MacBook-Pro.local>
2020-09-12 13:53:56 -07:00
a4e5d1b47a (ws) autoroute command exit on its own once all messages have been received 2020-09-09 18:01:38 -07:00
9f51a54a83 (docker) ws docker file installs strace 2020-09-04 13:47:12 -07:00
b74f7319c6 add a note to the readme about the fact that the MinGW compiler is not supported. close #242 2020-09-03 13:50:46 -07:00
0ad66a27f2 Fix ws/ws.cpp:2875:10: warning: unused variable noSend [-Wunused-variable] 2020-09-03 09:17:52 -07:00
a40003e85a (ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent. 2020-09-03 09:13:23 -07:00
5534a7fdf9 add a github action to publish a docker container for ws 2020-09-02 11:52:59 -07:00
efb245278d unittest / switch from using the REQUIRE macro, which halts (and usually crash) the test to the CHECK macro in IXWebSocketChatTest.cpp 2020-08-31 13:56:45 -07:00
5896d3740f (ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel 2020-08-31 13:45:00 -07:00
73b9c0b89b (socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis 2020-08-28 14:55:40 -07:00
629c155044 (ws) fix silly compile error (missing ix:: namespace) 2020-08-26 14:30:58 -07:00
08640d877f (ws) set the main thread name, to help with debugging in XCode, gdb, lldb etc... 2020-08-26 13:38:45 -07:00
ed5c63144e (ws) cobra to python bot / take a module python name as argument foo.bar.baz instead of a path foo/bar/baz.py 2020-08-19 10:00:00 -07:00
ee69aed2b0 (ws) on Linux with mbedtls, when the system ca certs are specified (the default) pick up sensible OS supplied paths (tested with CentOS and Alpine) 2020-08-19 09:31:57 -07:00
fcb92f862d (ws push_server) on the server side, stop sending and close the connection when the remote end has disconnected 2020-08-18 14:09:27 -07:00
e8e98e667d add ruby websocket bencharking code using
faye-websocket-ruby to receive messages as fast as possible
2020-08-18 13:45:53 -07:00
e1502017ce (ixwebsocket) replace std::unique_ptr<unsigned char[]> with std::array for some fixed arrays (which are in C++11) 2020-08-17 16:48:26 -07:00
72472f2899 IXWebSocketPerMessageDeflateCodec: use std::array instead of std::unique_ptr for a fixed size array 2020-08-17 16:36:24 -07:00
42f71364ca IXHttpClient.cpp: use std::array instead of std::unique_ptr for a fixed size array 2020-08-17 16:25:55 -07:00
45 changed files with 641 additions and 339 deletions

66
.github/workflows/docker.yml vendored Normal file
View File

@ -0,0 +1,66 @@
name: docker
# When its time to do a release do a build for amd64
# and push all of them to Docker Hub.
# Only trigger on semver shaped tags.
on:
push:
tags:
- "v*.*.*"
jobs:
login:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Prepare
id: prep
run: |
DOCKER_IMAGE=machinezone/ws
VERSION=edge
if [[ $GITHUB_REF == refs/tags/* ]]; then
VERSION=${GITHUB_REF#refs/tags/v}
fi
if [ "${{ github.event_name }}" = "schedule" ]; then
VERSION=nightly
fi
TAGS="${DOCKER_IMAGE}:${VERSION}"
if [[ $VERSION =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
TAGS="$TAGS,${DOCKER_IMAGE}:latest"
fi
echo ::set-output name=tags::${TAGS}
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@master
- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
- name: Login to GitHub Package Registry
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GHCR_TOKEN }}
- name: Build and push
id: docker_build
uses: docker/build-push-action@v2-build-push
with:
builder: ${{ steps.buildx.outputs.name }}
context: .
file: ./Dockerfile
target: prod
platforms: linux/amd64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.prep.outputs.tags }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache

View File

@ -62,7 +62,6 @@ set( IXWEBSOCKET_SOURCES
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXBench.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionInfo.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXExponentialBackoff.h

View File

@ -2,7 +2,7 @@
IXWebSocket is a C++ library for WebSocket client and server development. It has minimal dependencies (no boost), is very simple to use and support everything you'll likely need for websocket dev (SSL, deflate compression, compiles on most platforms, etc...). HTTP client and server code is also available, but it hasn't received as much testing.
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Two important design goals are simplicity and correctness.
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Note that the MinGW compiler is not supported at this point. Two important design goals are simplicity and correctness.
```cpp
/*

View File

@ -1,67 +1,11 @@
version: "3"
version: "3.3"
services:
# snake:
# image: bsergean/ws:build
# entrypoint: ws snake --port 8767 --host 0.0.0.0 --redis_hosts redis1
# ports:
# - "8767:8767"
# networks:
# - ws-net
# depends_on:
# - redis1
push:
entrypoint: ws push_server --host 0.0.0.0
image: ${DOCKER_REPO}/ws:build
# proxy:
# image: bsergean/ws:build
# entrypoint: strace ws proxy_server --remote_host 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 -v
# ports:
# - "8765:8765"
# networks:
# - ws-net
#pyproxy:
# image: bsergean/ws_proxy:build
# entrypoint: /usr/bin/ws_proxy.py --remote_url 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765
# ports:
# - "8765:8765"
# networks:
# - ws-net
# # ws:
# # security_opt:
# # - seccomp:unconfined
# # cap_add:
# # - SYS_PTRACE
# # stdin_open: true
# # tty: true
# # image: bsergean/ws:build
# # entrypoint: sh
# # networks:
# # - ws-net
# # depends_on:
# # - redis1
# #
# # redis1:
# # image: redis:alpine
# # networks:
# # - ws-net
# #
# # statsd:
# # image: jaconel/statsd
# # ports:
# # - "8125:8125"
# # environment:
# # - STATSD_DUMP_MSG=true
# # - GRAPHITE_HOST=127.0.0.1
# # networks:
# # - ws-net
compile:
image: alpine
entrypoint: sh
stdin_open: true
tty: true
volumes:
- /Users/bsergeant/src/foss:/home/bsergean/src/foss
networks:
ws-net:
autoroute:
entrypoint: ws autoroute ws://push:8008
image: ${DOCKER_REPO}/ws:build
depends_on:
- push

View File

@ -20,7 +20,7 @@ RUN make ws_mbedtls_install && \
FROM alpine:3.12 as runtime
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 && \
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 strace && \
addgroup -S app && \
adduser -S -G app app

View File

@ -2,6 +2,50 @@
All changes to this project will be documented in this file.
## [10.4.0] - 2020-09-12
(http server) read body request when the Content-Length is specified + set timeout to read the request to 30 seconds max by default, and make it configurable as a constructor parameter
## [10.3.5] - 2020-09-09
(ws) autoroute command exit on its own once all messages have been received
## [10.3.4] - 2020-09-04
(docker) ws docker file installs strace
## [10.3.3] - 2020-09-02
(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.
## [10.3.2] - 2020-08-31
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel
## [10.3.1] - 2020-08-28
(socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis
## [10.3.0] - 2020-08-26
(ws) set the main thread name, to help with debugging in XCode, gdb, lldb etc...
## [10.2.9] - 2020-08-19
(ws) cobra to python bot / take a module python name as argument foo.bar.baz instead of a path foo/bar/baz.py
## [10.2.8] - 2020-08-19
(ws) on Linux with mbedtls, when the system ca certs are specified (the default) pick up sensible OS supplied paths (tested with CentOS and Alpine)
## [10.2.7] - 2020-08-18
(ws push_server) on the server side, stop sending and close the connection when the remote end has disconnected
## [10.2.6] - 2020-08-17
(ixwebsocket) replace std::unique_ptr<unsigned char[]> with std::array for some fixed arrays (which are in C++11)
## [10.2.5] - 2020-08-15
(ws) merge all ws_*.cpp files into a single one to speedup compilation

View File

@ -280,10 +280,9 @@ ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::weak_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
std::cout << "Remote ip: " << connectionState->remoteIp << std::endl;
auto ws = webSocket.lock();
if (ws)
@ -359,13 +358,12 @@ The webSocket reference is guaranteed to be always valid ; by design the callbac
ix::WebSocketServer server(port);
server.setOnClientMessageCallback(std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg)
{
// The ConnectionInfo object contains information about the connection,
// The ConnectionState object contains information about the connection,
// at this point only the client ip address and the port.
std::cout << "Remote ip: " << connectionInfo.remoteIp << std::endl;
std::cout << "Remote ip: " << connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
@ -519,12 +517,11 @@ If you want to handle how requests are processed, implement the setOnConnectionC
```cpp
setOnConnectionCallback(
[this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr
{
// Build a string for the response
std::stringstream ss;
ss << connectionInfo->remoteIp
ss << connectionState->getRemoteIp();
<< " "
<< request->method
<< " "

View File

@ -5,6 +5,7 @@
set (IXBOTS_SOURCES
ixbots/IXCobraBot.cpp
ixbots/IXCobraToCobraBot.cpp
ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp
ixbots/IXCobraToStdoutBot.cpp
@ -16,6 +17,7 @@ set (IXBOTS_SOURCES
set (IXBOTS_HEADERS
ixbots/IXCobraBot.h
ixbots/IXCobraBotConfig.h
ixbots/IXCobraToCobraBot.h
ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h
ixbots/IXCobraToStdoutBot.h

View File

@ -0,0 +1,43 @@
/*
* IXCobraToCobraBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXCobraToCobraBot.h"
#include "IXCobraBot.h"
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <sstream>
namespace ix
{
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& cobraBotConfig,
const std::string& republishChannel,
const std::string& publisherRolename,
const std::string& publisherRolesecret)
{
CobraBot bot;
CobraMetricsPublisher cobraMetricsPublisher;
CobraConfig cobraPublisherConfig = cobraBotConfig.cobraConfig;
cobraPublisherConfig.rolename = publisherRolename;
cobraPublisherConfig.rolesecret = publisherRolesecret;
cobraMetricsPublisher.configure(cobraPublisherConfig, republishChannel);
bot.setOnBotMessageCallback(
[&republishChannel, &cobraMetricsPublisher](const Json::Value& msg,
const std::string& /*position*/,
std::atomic<bool>& /*throttled*/,
std::atomic<bool>& /*fatalCobraError*/,
std::atomic<uint64_t>& sentCount) -> void {
Json::Value msgWithNoId(msg);
msgWithNoId.removeMember("id");
cobraMetricsPublisher.push(republishChannel, msg);
sentCount++;
});
return bot.run(cobraBotConfig);
}
} // namespace ix

View File

@ -0,0 +1,20 @@
/*
* IXCobraToCobraBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <cstdint>
#include <ixbots/IXStatsdClient.h>
#include "IXCobraBotConfig.h"
#include <stddef.h>
#include <string>
namespace ix
{
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& config,
const std::string& republishChannel,
const std::string& publisherRolename,
const std::string& publisherRolesecret);
} // namespace ix

View File

@ -102,7 +102,7 @@ namespace ix
{
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
StatsdClient& statsdClient,
const std::string& scriptPath)
const std::string& moduleName)
{
#ifndef IXBOTS_USE_PYTHON
CoreLogger::error("Command is disabled. "
@ -113,10 +113,7 @@ namespace ix
Py_InitializeEx(0); // 0 arg so that we do not install signal handlers
// which prevent us from using Ctrl-C
size_t lastIndex = scriptPath.find_last_of(".");
std::string modulePath = scriptPath.substr(0, lastIndex);
PyObject* pyModuleName = PyUnicode_DecodeFSDefault(modulePath.c_str());
PyObject* pyModuleName = PyUnicode_DecodeFSDefault(moduleName.c_str());
if (pyModuleName == nullptr)
{

View File

@ -15,5 +15,5 @@ namespace ix
{
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
StatsdClient& statsdClient,
const std::string& scriptPath);
const std::string& moduleName);
} // namespace ix

View File

@ -24,6 +24,7 @@ namespace ix
{
_cobra_connection.setEventCallback([](const CobraEventPtr& event) {
std::stringstream ss;
ix::LogLevel logLevel = LogLevel::Info;
if (event->type == ix::CobraEventType::Open)
{
@ -41,6 +42,7 @@ namespace ix
else if (event->type == ix::CobraEventType::Error)
{
ss << "Error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
}
else if (event->type == ix::CobraEventType::Closed)
{
@ -57,6 +59,7 @@ namespace ix
else if (event->type == ix::CobraEventType::Published)
{
ss << "Published message " << event->msgId << " acked";
logLevel = ix::LogLevel::Debug;
}
else if (event->type == ix::CobraEventType::Pong)
{
@ -65,17 +68,20 @@ namespace ix
else if (event->type == ix::CobraEventType::HandshakeError)
{
ss << "Handshake error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
ss << "Authentication error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
}
else if (event->type == ix::CobraEventType::SubscriptionError)
{
ss << "Subscription error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
}
CoreLogger::log(ss.str().c_str());
CoreLogger::log(ss.str().c_str(), logLevel);
});
}

View File

@ -45,10 +45,9 @@ namespace ix
}
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
logInfo("New connection from remote ip " + connectionState->getRemoteIp());
_connectedClientsCount++;

View File

@ -44,8 +44,7 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
bool startsWith(const std::string& str, const std::string& start);

View File

@ -61,11 +61,10 @@ namespace snake
_server.setOnClientMessageCallback(
[this](std::shared_ptr<ix::ConnectionState> connectionState,
ix::ConnectionInfo& connectionInfo,
ix::WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
std::stringstream ss;
ss << "[" << state->getId() << "] ";

View File

@ -12,10 +12,8 @@ namespace ix
{
Bench::Bench(const std::string& description)
: _description(description)
, _start(std::chrono::high_resolution_clock::now())
, _reported(false)
{
;
reset();
}
Bench::~Bench()
@ -26,6 +24,12 @@ namespace ix
}
}
void Bench::reset()
{
_start = std::chrono::high_resolution_clock::now();
_reported = false;
}
void Bench::report()
{
auto now = std::chrono::high_resolution_clock::now();

View File

@ -17,6 +17,7 @@ namespace ix
Bench(const std::string& description);
~Bench();
void reset();
void report();
uint64_t getDuration() const;

View File

@ -1,25 +0,0 @@
/*
* IXConnectionInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
namespace ix
{
struct ConnectionInfo
{
std::string remoteIp;
int remotePort;
ConnectionInfo(const std::string& r = std::string(), int p = 0)
: remoteIp(r)
, remotePort(p)
{
;
}
};
} // namespace ix

View File

@ -50,4 +50,24 @@ namespace ix
_onSetTerminatedCallback();
}
}
const std::string& ConnectionState::getRemoteIp()
{
return _remoteIp;
}
int ConnectionState::getRemotePort()
{
return _remotePort;
}
void ConnectionState::setRemoteIp(const std::string& remoteIp)
{
_remoteIp = remoteIp;
}
void ConnectionState::setRemotePort(int remotePort)
{
_remotePort = remotePort;
}
} // namespace ix

View File

@ -28,11 +28,17 @@ namespace ix
void setTerminated();
bool isTerminated() const;
const std::string& getRemoteIp();
int getRemotePort();
static std::shared_ptr<ConnectionState> createConnectionState();
private:
void setOnSetTerminatedCallback(const OnSetTerminatedCallback& callback);
void setRemoteIp(const std::string& remoteIp);
void setRemotePort(int remotePort);
protected:
std::atomic<bool> _terminated;
std::string _id;
@ -40,6 +46,9 @@ namespace ix
static std::atomic<uint64_t> _globalId;
std::string _remoteIp;
int _remotePort;
friend class SocketServer;
};
} // namespace ix

View File

@ -93,14 +93,12 @@ namespace ix
}
std::tuple<bool, std::string, HttpRequestPtr> Http::parseRequest(
std::unique_ptr<Socket>& socket)
std::unique_ptr<Socket>& socket, int timeoutSecs)
{
HttpRequestPtr httpRequest;
std::atomic<bool> requestInitCancellation(false);
int timeoutSecs = 5; // FIXME
auto isCancellationRequested =
makeCancellationRequestWithTimeout(timeoutSecs, requestInitCancellation);
@ -130,7 +128,36 @@ namespace ix
return std::make_tuple(false, "Error parsing HTTP headers", httpRequest);
}
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, headers);
std::string body;
if (headers.find("Content-Length") != headers.end())
{
int contentLength = 0;
try
{
contentLength = std::stoi(headers["Content-Length"]);
}
catch (std::exception)
{
return std::make_tuple(
false, "Error parsing HTTP Header 'Content-Length'", httpRequest);
}
if (contentLength < 0)
{
return std::make_tuple(
false, "Error: 'Content-Length' should be a positive integer", httpRequest);
}
auto res = socket->readBytes(contentLength, nullptr, isCancellationRequested);
if (!res.first)
{
return std::make_tuple(
false, std::string("Error reading request: ") + res.second, httpRequest);
}
body = res.second;
}
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, body, headers);
return std::make_tuple(true, "", httpRequest);
}

View File

@ -95,15 +95,18 @@ namespace ix
std::string uri;
std::string method;
std::string version;
std::string body;
WebSocketHttpHeaders headers;
HttpRequest(const std::string& u,
const std::string& m,
const std::string& v,
const std::string& b,
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, method(m)
, version(v)
, body(b)
, headers(h)
{
}
@ -115,7 +118,7 @@ namespace ix
{
public:
static std::tuple<bool, std::string, HttpRequestPtr> parseRequest(
std::unique_ptr<Socket>& socket);
std::unique_ptr<Socket>& socket, int timeoutSecs);
static bool sendResponse(HttpResponsePtr response, std::unique_ptr<Socket>& socket);
static std::pair<std::string, int> parseStatusLine(const std::string& line);

View File

@ -10,6 +10,7 @@
#include "IXUrlParser.h"
#include "IXUserAgent.h"
#include "IXWebSocketHttpHeaders.h"
#include <array>
#include <assert.h>
#include <cstring>
#include <iomanip>
@ -700,14 +701,12 @@ namespace ix
inflateState.next_in = (unsigned char*) (const_cast<char*>(in.data()));
const int kBufferSize = 1 << 14;
std::unique_ptr<unsigned char[]> compressBuffer =
std::make_unique<unsigned char[]>(kBufferSize);
std::array<unsigned char, kBufferSize> compressBuffer;
do
{
inflateState.avail_out = (uInt) kBufferSize;
inflateState.next_out = compressBuffer.get();
inflateState.next_out = &compressBuffer.front();
int ret = inflate(&inflateState, Z_SYNC_FLUSH);
@ -717,7 +716,7 @@ namespace ix
return false;
}
out.append(reinterpret_cast<char*>(compressBuffer.get()),
out.append(reinterpret_cast<char*>(&compressBuffer.front()),
kBufferSize - inflateState.avail_out);
} while (inflateState.avail_out == 0);

View File

@ -92,10 +92,17 @@ namespace
namespace ix
{
HttpServer::HttpServer(
int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily)
const int HttpServer::kDefaultTimeoutSecs(30);
HttpServer::HttpServer(int port,
const std::string& host,
int backlog,
size_t maxConnections,
int addressFamily,
int timeoutSecs)
: SocketServer(port, host, backlog, maxConnections, addressFamily)
, _connectedClientsCount(0)
, _timeoutSecs(timeoutSecs)
{
setDefaultConnectionCallback();
}
@ -120,18 +127,16 @@ namespace ix
}
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
_connectedClientsCount++;
auto ret = Http::parseRequest(socket);
auto ret = Http::parseRequest(socket, _timeoutSecs);
// FIXME: handle errors in parseRequest
if (std::get<0>(ret))
{
auto response =
_onConnectionCallback(std::get<2>(ret), connectionState, std::move(connectionInfo));
auto response = _onConnectionCallback(std::get<2>(ret), connectionState);
if (!Http::sendResponse(response, socket))
{
logError("Cannot send response");
@ -151,8 +156,7 @@ namespace ix
{
setOnConnectionCallback(
[this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
std::string uri(request->uri);
if (uri.empty() || uri == "/")
{
@ -184,8 +188,8 @@ namespace ix
// Log request
std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
<< " " << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri << " " << content.size();
logInfo(ss.str());
@ -209,16 +213,16 @@ namespace ix
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections
//
setOnConnectionCallback(
[this, redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
[this,
redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
<< " " << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
logInfo(ss.str());

View File

@ -23,15 +23,14 @@ namespace ix
{
public:
using OnConnectionCallback =
std::function<HttpResponsePtr(HttpRequestPtr,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>;
HttpServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = SocketServer::kDefaultMaxConnections,
int addressFamily = SocketServer::kDefaultAddressFamily);
int addressFamily = SocketServer::kDefaultAddressFamily,
int timeoutSecs = HttpServer::kDefaultTimeoutSecs);
virtual ~HttpServer();
virtual void stop() final;
@ -44,10 +43,12 @@ namespace ix
OnConnectionCallback _onConnectionCallback;
std::atomic<int> _connectedClientsCount;
const static int kDefaultTimeoutSecs;
int _timeoutSecs;
// Methods
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
void setDefaultConnectionCallback();

View File

@ -332,12 +332,13 @@ namespace ix
}
// Retrieve connection info, the ip address of the remote peer/client)
std::unique_ptr<ConnectionInfo> connectionInfo;
std::string remoteIp;
int remotePort;
if (_addressFamily == AF_INET)
{
char remoteIp[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
char remoteIp4[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp4, INET_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
@ -350,12 +351,13 @@ namespace ix
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
remotePort = client.sin_port;
remoteIp = remoteIp4;
}
else // AF_INET6
{
char remoteIp[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
char remoteIp6[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp6, INET6_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
@ -368,7 +370,8 @@ namespace ix
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
remotePort = client.sin_port;
remoteIp = remoteIp6;
}
std::shared_ptr<ConnectionState> connectionState;
@ -377,6 +380,8 @@ namespace ix
connectionState = _connectionStateFactory();
}
connectionState->setOnSetTerminatedCallback([this] { onSetTerminatedCallback(); });
connectionState->setRemoteIp(remoteIp);
connectionState->setRemotePort(remotePort);
if (_stop) return;
@ -404,13 +409,10 @@ namespace ix
// Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
_connectionsThreads.push_back(
std::make_pair(connectionState,
std::thread(&SocketServer::handleConnection,
this,
std::move(socket),
connectionState,
std::move(connectionInfo))));
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::thread(
&SocketServer::handleConnection, this, std::move(socket), connectionState)));
}
}

View File

@ -6,7 +6,6 @@
#pragma once
#include "IXConnectionInfo.h"
#include "IXConnectionState.h"
#include "IXSelectInterrupt.h"
#include "IXSocketTLSOptions.h"
@ -105,8 +104,7 @@ namespace ix
ConnectionStateFactory _connectionStateFactory;
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;
// Returns true if all connection threads are joined

View File

@ -16,8 +16,6 @@ namespace
// is treated as a char* and the null termination (\x00) makes it
// look like an empty string.
const std::string kEmptyUncompressedBlock = std::string("\x00\x00\xff\xff", 4);
const int kBufferSize = 1 << 14;
} // namespace
namespace ix
@ -26,7 +24,6 @@ namespace ix
// Compressor
//
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
: _compressBufferSize(kBufferSize)
{
#ifdef IXWEBSOCKET_USE_ZLIB
memset(&_deflateState, 0, sizeof(_deflateState));
@ -57,8 +54,6 @@ namespace ix
if (ret != Z_OK) return false;
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
return true;
@ -145,14 +140,14 @@ namespace ix
do
{
// Output to local buffer
_deflateState.avail_out = (uInt) _compressBufferSize;
_deflateState.next_out = _compressBuffer.get();
_deflateState.avail_out = (uInt) _compressBuffer.size();
_deflateState.next_out = &_compressBuffer.front();
deflate(&_deflateState, _flush);
output = _compressBufferSize - _deflateState.avail_out;
output = _compressBuffer.size() - _deflateState.avail_out;
out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
out.insert(out.end(), _compressBuffer.begin(), _compressBuffer.begin() + output);
} while (_deflateState.avail_out == 0);
if (endsWithEmptyUnCompressedBlock(out))
@ -170,7 +165,6 @@ namespace ix
// Decompressor
//
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
: _compressBufferSize(kBufferSize)
{
#ifdef IXWEBSOCKET_USE_ZLIB
memset(&_inflateState, 0, sizeof(_inflateState));
@ -198,8 +192,6 @@ namespace ix
if (ret != Z_OK) return false;
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
return true;
@ -232,8 +224,8 @@ namespace ix
do
{
_inflateState.avail_out = (uInt) _compressBufferSize;
_inflateState.next_out = _compressBuffer.get();
_inflateState.avail_out = (uInt) _compressBuffer.size();
_inflateState.next_out = &_compressBuffer.front();
int ret = inflate(&_inflateState, Z_SYNC_FLUSH);
@ -242,8 +234,8 @@ namespace ix
return false; // zlib error
}
out.append(reinterpret_cast<char*>(_compressBuffer.get()),
_compressBufferSize - _inflateState.avail_out);
out.append(reinterpret_cast<char*>(&_compressBuffer.front()),
_compressBuffer.size() - _inflateState.avail_out);
} while (_inflateState.avail_out == 0);
return true;

View File

@ -9,7 +9,7 @@
#ifdef IXWEBSOCKET_USE_ZLIB
#include "zlib.h"
#endif
#include <memory>
#include <array>
#include <string>
#include <vector>
@ -34,8 +34,7 @@ namespace ix
bool endsWithEmptyUnCompressedBlock(const T& value);
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
std::array<unsigned char, 1 << 14> _compressBuffer;
#ifdef IXWEBSOCKET_USE_ZLIB
z_stream _deflateState;
@ -53,8 +52,7 @@ namespace ix
private:
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
std::array<unsigned char, 1 << 14> _compressBuffer;
#ifdef IXWEBSOCKET_USE_ZLIB
z_stream _inflateState;

View File

@ -56,10 +56,9 @@ namespace ix
server.setOnConnectionCallback(
[remoteUrl, remoteUrlsMapping](std::weak_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
std::shared_ptr<ConnectionState> connectionState) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
auto remoteIp = connectionState->getRemoteIp();
// Server connection
state->webSocket().setOnMessageCallback(

View File

@ -77,15 +77,14 @@ namespace ix
}
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>();
if (_onConnectionCallback)
{
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
_onConnectionCallback(webSocket, connectionState);
if (!webSocket->isOnMessageCallbackRegistered())
{
@ -99,9 +98,8 @@ namespace ix
else if (_onClientMessageCallback)
{
webSocket->setOnMessageCallback(
[this, &ws = *webSocket.get(), connectionState, &ci = *connectionInfo.get()](
const WebSocketMessagePtr& msg) {
_onClientMessageCallback(connectionState, ci, ws, msg);
[this, &ws = *webSocket.get(), connectionState](const WebSocketMessagePtr& msg) {
_onClientMessageCallback(connectionState, ws, msg);
});
}
else

View File

@ -23,14 +23,10 @@ namespace ix
{
public:
using OnConnectionCallback =
std::function<void(std::weak_ptr<WebSocket>,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
std::function<void(std::weak_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>;
using OnClientMessageCallback = std::function<void(std::shared_ptr<ConnectionState>,
ConnectionInfo&,
WebSocket&,
const WebSocketMessagePtr&)>;
using OnClientMessageCallback = std::function<void(
std::shared_ptr<ConnectionState>, WebSocket&, const WebSocketMessagePtr&)>;
WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
@ -69,8 +65,7 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo);
std::shared_ptr<ConnectionState> connectionState);
virtual size_t getConnectedClientsCount() final;
};
} // namespace ix

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "10.2.5"
#define IX_WEBSOCKET_VERSION "10.4.0"

View File

@ -95,15 +95,14 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
<< " " << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
if (request->method == "POST")

View File

@ -63,6 +63,54 @@ TEST_CASE("http server", "[httpd]")
server.stop();
}
SECTION("Posting plain text data to a local HTTP server")
{
int port = getFreePort();
ix::HttpServer server(port, "127.0.0.1");
server.setOnConnectionCallback(
[](HttpRequestPtr request, std::shared_ptr<ConnectionState>) -> HttpResponsePtr {
if (request->method == "POST")
{
return std::make_shared<HttpResponse>(
200, "OK", HttpErrorCode::Ok, WebSocketHttpHeaders(), request->body);
}
return std::make_shared<HttpResponse>(400, "BAD REQUEST");
});
auto res = server.listen();
REQUIRE(res.first);
server.start();
HttpClient httpClient;
WebSocketHttpHeaders headers;
headers["Content-Type"] = "text/plain";
std::string url("http://127.0.0.1:");
url += std::to_string(port);
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->verbose = true;
args->logger = [](const std::string& msg) { std::cout << msg; };
args->body = "Hello World!";
auto response = httpClient.post(url, args->body, args);
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
std::cerr << "Payload: " << response->payload << std::endl;
REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
REQUIRE(response->payload == args->body);
server.stop();
}
}
TEST_CASE("http server redirection", "[httpd_redirect]")

View File

@ -85,11 +85,10 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server)
{
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> /*connectionState*/,
ConnectionInfo& connectionInfo,
[&server](std::shared_ptr<ConnectionState> connectionState,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";

View File

@ -191,11 +191,9 @@ namespace
server.setOnClientMessageCallback(
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{

View File

@ -195,10 +195,9 @@ namespace
{
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
@ -286,27 +285,27 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
int attempts = 0;
while (chatA.getReceivedMessagesCount() != 3 || chatB.getReceivedMessagesCount() != 3)
{
REQUIRE(attempts++ < 10);
CHECK(attempts++ < 10);
ix::msleep(1000);
}
chatA.stop();
chatB.stop();
REQUIRE(chatA.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
CHECK(chatA.getReceivedMessagesCount() == 3);
CHECK(chatB.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessages()[0] == "from A1");
REQUIRE(chatB.getReceivedMessages()[1] == "from A2");
REQUIRE(chatB.getReceivedMessages()[2] == "from A3");
CHECK(chatB.getReceivedMessages()[0] == "from A1");
CHECK(chatB.getReceivedMessages()[1] == "from A2");
CHECK(chatB.getReceivedMessages()[2] == "from A3");
REQUIRE(chatA.getReceivedMessages()[0] == "from B1");
REQUIRE(chatA.getReceivedMessages()[1] == "from B2");
REQUIRE(chatA.getReceivedMessages()[2].size() == bigMessage.size());
CHECK(chatA.getReceivedMessages()[0] == "from B1");
CHECK(chatA.getReceivedMessages()[1] == "from B2");
CHECK(chatA.getReceivedMessages()[2].size() == bigMessage.size());
// Give us 1000ms for the server to notice that clients went away
ix::msleep(1000);
REQUIRE(server.getClients().size() == 0);
CHECK(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}

View File

@ -171,10 +171,9 @@ namespace
server.setOnClientMessageCallback(
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& /*webSocket*/,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New server connection";

View File

@ -35,11 +35,9 @@ namespace ix
server.setOnClientMessageCallback(
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{

View File

@ -18,10 +18,9 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
{
server.setOnClientMessageCallback(
[&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";

View File

@ -0,0 +1,6 @@
```
export GEM_HOME=$HOME/local/gems
bundle install faye-websocket
```
https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv

View File

@ -0,0 +1,59 @@
#
# $ ruby --version
# ruby 2.6.3p62 (2019-04-16 revision 67580) [universal.x86_64-darwin19]
#
# Install a gem locally by setting GEM_HOME
# https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv
# export GEM_HOME=$HOME/local/gems
# bundle install faye-websocket
#
# In a different terminal, start a push server:
# $ ws push_server -q
#
# $ ruby devnull_client.rb
# [:open]
# Connected to server
# messages received per second: 115926
# messages received per second: 119156
# messages received per second: 119156
# messages received per second: 119157
# messages received per second: 119156
# messages received per second: 119156
# messages received per second: 119157
# messages received per second: 119156
# messages received per second: 119156
# messages received per second: 119157
# messages received per second: 119156
# messages received per second: 119157
# messages received per second: 119156
# ^C[:close, 1006, ""]
#
require 'faye/websocket'
require 'eventmachine'
EM.run {
ws = Faye::WebSocket::Client.new('ws://127.0.0.1:8008')
counter = 0
EM.add_periodic_timer(1) do
print "messages received per second: #{counter}\n"
counter = 0 # reset counter
end
ws.on :open do |event|
p [:open]
print "Connected to server\n"
end
ws.on :message do |event|
# Uncomment the next line to validate that we receive something correct
# p [:message, event.data]
counter += 1
end
ws.on :close do |event|
p [:close, event.code, event.reason]
ws = nil
end
}

262
ws/ws.cpp
View File

@ -18,6 +18,7 @@
#include <fstream>
#include <iostream>
#include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixbots/IXCobraToCobraBot.h>
#include <ixbots/IXCobraToPythonBot.h>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h>
@ -120,6 +121,12 @@ namespace
return str.substr(0, n) + "...";
}
}
bool fileExists(const std::string& fileName)
{
std::ifstream infile(fileName);
return infile.good();
}
} // namespace
namespace ix
@ -406,10 +413,9 @@ namespace ix
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
@ -1097,21 +1103,26 @@ namespace ix
return 0;
}
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend)
int ws_autoroute(const std::string& url,
bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
int msgCount)
{
Bench bench("ws_autoroute full test");
// Our websocket object
ix::WebSocket webSocket;
webSocket.setUrl(url);
std::string fullUrl(url);
fullUrl += "/";
fullUrl += std::to_string(msgCount);
webSocket.setUrl(fullUrl);
webSocket.setTLSOptions(tlsOptions);
webSocket.setPingInterval(pingIntervalSecs);
webSocket.disableAutomaticReconnection();
if (disablePerMessageDeflate)
{
@ -1123,43 +1134,56 @@ namespace ix
webSocket.addSubProtocol(subprotocol);
}
std::atomic<uint64_t> receivedCount(0);
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
std::atomic<uint64_t> receivedCountPerSecs(0);
std::atomic<uint64_t> target(msgCount);
std::mutex conditionVariableMutex;
std::condition_variable condition;
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
// when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
if (!noSend)
std::atomic<bool> stop(false);
// Setup a callback to be fired
// when a message or an event (open, close, ping, pong, error) is received
webSocket.setOnMessageCallback(
[&webSocket, &receivedCountPerSecs, &target, &stop, &condition, &bench](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
webSocket.send(msg->str, msg->binary);
receivedCountPerSecs++;
target -= 1;
if (target == 0)
{
stop = true;
condition.notify_one();
bench.report();
}
}
receivedCount++;
}
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_echo_client: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("{}: {}", it.first, it.second);
bench.reset();
spdlog::info("ws_autoroute: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("ws_autoroute: connection closed");
}
});
webSocket.send(sendMsg, binaryMode);
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
});
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
auto timer = [&receivedCountPerSecs, &stop] {
setThreadName("Timer");
while (true)
while (!stop)
{
//
// We cannot write to sentCount and receivedCount
@ -1167,13 +1191,11 @@ namespace ix
// our own counters
//
std::stringstream ss;
ss << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total";
ss << "messages received per second: " << receivedCountPerSecs;
CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
receivedCountPerSecs = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
@ -1186,17 +1208,12 @@ namespace ix
std::cout << "Connecting to " << url << "..." << std::endl;
webSocket.start();
// Send a message to the server (default to TEXT mode)
webSocket.send("hello world");
// Wait for all the messages to be received
std::unique_lock<std::mutex> lock(conditionVariableMutex);
condition.wait(lock);
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
webSocket.send(text);
}
t1.join();
webSocket.stop();
return 0;
}
@ -1234,10 +1251,9 @@ namespace ix
server.setOnClientMessageCallback(
[greetings](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
@ -1635,7 +1651,6 @@ namespace ix
}
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
@ -1666,12 +1681,14 @@ namespace ix
server.disablePong();
}
// push one million messages
std::atomic<bool> stop(false);
server.setOnClientMessageCallback(
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
[&sendMsg, &stop](std::shared_ptr<ConnectionState> connectionState,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
@ -1684,15 +1701,31 @@ namespace ix
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket.sendText("Welcome !");
}
// Parse the msg count from the uri.
int msgCount = -1;
std::stringstream ss;
auto uriSize = msg->openInfo.uri.size();
ss << msg->openInfo.uri.substr(1, uriSize - 1);
ss >> msgCount;
bool binary = false;
while (true)
if (msgCount == -1)
{
webSocket.send(sendMsg, binary);
spdlog::info("Error parsing message count, closing connection");
webSocket.close();
}
else
{
bool binary = false;
for (int i = 0; i < msgCount; ++i)
{
auto sendInfo = webSocket.send(sendMsg, binary);
if (!sendInfo.success)
{
spdlog::info("Error sending message, closing connection");
webSocket.close();
break;
}
}
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
@ -1701,6 +1734,7 @@ namespace ix
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
stop = true;
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -1724,7 +1758,14 @@ namespace ix
}
server.start();
server.wait();
while (!stop)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
server.stop();
return 0;
}
@ -2601,10 +2642,9 @@ namespace ix
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_transfer: New connection");
@ -2716,6 +2756,7 @@ namespace ix
int main(int argc, char** argv)
{
ix::setThreadName("ws main thread");
ix::initNetSystem();
ix::CoreLogger::LogFunc logFunc = [](const char* msg, ix::LogLevel level) {
@ -2802,8 +2843,10 @@ int main(int argc, char** argv)
std::string project;
std::string key;
std::string logfile;
std::string scriptPath;
std::string moduleName;
std::string republishChannel;
std::string publisherRolename;
std::string publisherRolesecret;
std::string sendMsg("hello world");
ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig;
@ -2827,7 +2870,6 @@ int main(int argc, char** argv)
bool version = false;
bool verifyNone = false;
bool disablePong = false;
bool noSend = false;
int port = 8008;
int redisPort = 6379;
int statsdPort = 8125;
@ -2836,6 +2878,7 @@ int main(int argc, char** argv)
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
int msgCount = 1000 * 1000;
uint32_t maxWaitBetweenReconnectionRetries;
int pingIntervalSecs = 30;
@ -2929,17 +2972,14 @@ int main(int argc, char** argv)
addGenericOptions(connectApp);
addTLSOptions(connectApp);
CLI::App* echoClientApp =
app.add_subcommand("echo_client", "Echo messages sent by a remote server");
CLI::App* echoClientApp = app.add_subcommand("autoroute", "Test websocket client performance");
echoClientApp->fallthrough();
echoClientApp->add_option("url", url, "Connection url")->required();
echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
echoClientApp->add_option(
"--ping_interval", pingIntervalSecs, "Interval between sending pings");
echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
echoClientApp->add_option("--send_msg", sendMsg, "Send message");
echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
echoClientApp->add_option("--msg_count", msgCount, "Total message count to be sent");
addTLSOptions(echoClientApp);
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
@ -2964,7 +3004,6 @@ int main(int argc, char** argv)
pushServerApp->add_option("--port", port, "Port");
pushServerApp->add_option("--host", hostname, "Hostname");
pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
pushServerApp->add_flag("-g", greetings, "Greet");
pushServerApp->add_flag("-6", ipv6, "IpV6");
pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
@ -3068,13 +3107,23 @@ int main(int argc, char** argv)
addTLSOptions(cobra2statsd);
addCobraBotConfig(cobra2statsd);
CLI::App* cobra2cobra = app.add_subcommand("cobra_to_cobra", "Cobra to Cobra");
cobra2cobra->fallthrough();
cobra2cobra->add_option("--republish", republishChannel, "Republish channel");
cobra2cobra->add_option("--publisher_rolename", publisherRolename, "Publisher Role name")
->required();
cobra2cobra->add_option("--publisher_rolesecret", publisherRolesecret, "Publisher Role secret")
->required();
cobra2cobra->add_flag("-q", quiet, "Quiet");
addTLSOptions(cobra2cobra);
addCobraBotConfig(cobra2cobra);
CLI::App* cobra2python = app.add_subcommand("cobra_to_python", "Cobra to python");
cobra2python->fallthrough();
cobra2python->add_option("--host", hostname, "Statsd host");
cobra2python->add_option("--port", statsdPort, "Statsd port");
cobra2python->add_option("--prefix", prefix, "Statsd prefix");
cobra2python->add_option("--script", scriptPath, "Python script path")
->check(CLI::ExistingPath);
cobra2python->add_option("--module", moduleName, "Python module");
cobra2python->add_option("--pidfile", pidfile, "Pid file");
addTLSOptions(cobra2python);
addCobraBotConfig(cobra2python);
@ -3177,11 +3226,27 @@ int main(int argc, char** argv)
if (tlsOptions.isUsingSystemDefaults())
{
#ifdef __APPLE__
#if defined(__APPLE__)
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
// We could try to load some system certs as well, but this is easy enough
tlsOptions.caFile = "/etc/ssl/cert.pem";
#endif
#elif defined(__linux__)
#if defined(IXWEBSOCKET_USE_MBED_TLS)
std::vector<std::string> caFiles = {
"/etc/ssl/certs/ca-bundle.crt", // CentOS
"/etc/ssl/certs/ca-certificates.crt", // Alpine
};
for (auto&& caFile : caFiles)
{
if (fileExists(caFile))
{
tlsOptions.caFile = caFile;
break;
}
}
#endif
#endif
}
@ -3229,16 +3294,10 @@ int main(int argc, char** argv)
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("echo_client"))
else if (app.got_subcommand("autoroute"))
{
ret = ix::ws_echo_client(url,
disablePerMessageDeflate,
binaryMode,
tlsOptions,
subprotocol,
pingIntervalSecs,
sendMsg,
noSend);
ret = ix::ws_autoroute(
url, disablePerMessageDeflate, tlsOptions, subprotocol, pingIntervalSecs, msgCount);
}
else if (app.got_subcommand("echo_server"))
{
@ -3247,14 +3306,8 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("push_server"))
{
ret = ix::ws_push_server(port,
greetings,
hostname,
tlsOptions,
ipv6,
disablePerMessageDeflate,
disablePong,
sendMsg);
ret = ix::ws_push_server(
port, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong, sendMsg);
}
else if (app.got_subcommand("transfer"))
{
@ -3361,7 +3414,7 @@ int main(int argc, char** argv)
}
else
{
ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, scriptPath);
ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, moduleName);
}
}
else if (app.got_subcommand("cobra_to_sentry"))
@ -3384,6 +3437,11 @@ int main(int argc, char** argv)
ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose);
}
}
else if (app.got_subcommand("cobra_to_cobra"))
{
ret = (int) ix::cobra_to_cobra_bot(
cobraBotConfig, republishChannel, publisherRolename, publisherRolesecret);
}
else if (app.got_subcommand("snake"))
{
ret = ix::ws_snake_main(port,