Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
128bc0afa9 | |||
b04e5c5529 | |||
1e8c421d66 | |||
72d6651ded | |||
a4e5d1b47a | |||
9f51a54a83 | |||
b74f7319c6 | |||
0ad66a27f2 | |||
a40003e85a | |||
5534a7fdf9 | |||
efb245278d | |||
5896d3740f | |||
73b9c0b89b | |||
629c155044 | |||
08640d877f | |||
ed5c63144e | |||
ee69aed2b0 | |||
fcb92f862d | |||
e8e98e667d | |||
e1502017ce | |||
72472f2899 | |||
42f71364ca |
66
.github/workflows/docker.yml
vendored
Normal file
66
.github/workflows/docker.yml
vendored
Normal file
@ -0,0 +1,66 @@
|
||||
name: docker
|
||||
|
||||
# When its time to do a release do a build for amd64
|
||||
# and push all of them to Docker Hub.
|
||||
# Only trigger on semver shaped tags.
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- "v*.*.*"
|
||||
|
||||
jobs:
|
||||
login:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Prepare
|
||||
id: prep
|
||||
run: |
|
||||
DOCKER_IMAGE=machinezone/ws
|
||||
VERSION=edge
|
||||
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||
VERSION=${GITHUB_REF#refs/tags/v}
|
||||
fi
|
||||
if [ "${{ github.event_name }}" = "schedule" ]; then
|
||||
VERSION=nightly
|
||||
fi
|
||||
TAGS="${DOCKER_IMAGE}:${VERSION}"
|
||||
if [[ $VERSION =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
|
||||
TAGS="$TAGS,${DOCKER_IMAGE}:latest"
|
||||
fi
|
||||
echo ::set-output name=tags::${TAGS}
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
id: buildx
|
||||
uses: docker/setup-buildx-action@master
|
||||
|
||||
- name: Cache Docker layers
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: /tmp/.buildx-cache
|
||||
key: ${{ runner.os }}-buildx-${{ github.sha }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
|
||||
- name: Login to GitHub Package Registry
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GHCR_TOKEN }}
|
||||
|
||||
- name: Build and push
|
||||
id: docker_build
|
||||
uses: docker/build-push-action@v2-build-push
|
||||
with:
|
||||
builder: ${{ steps.buildx.outputs.name }}
|
||||
context: .
|
||||
file: ./Dockerfile
|
||||
target: prod
|
||||
platforms: linux/amd64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.prep.outputs.tags }}
|
||||
cache-from: type=local,src=/tmp/.buildx-cache
|
||||
cache-to: type=local,dest=/tmp/.buildx-cache
|
@ -62,7 +62,6 @@ set( IXWEBSOCKET_SOURCES
|
||||
set( IXWEBSOCKET_HEADERS
|
||||
ixwebsocket/IXBench.h
|
||||
ixwebsocket/IXCancellationRequest.h
|
||||
ixwebsocket/IXConnectionInfo.h
|
||||
ixwebsocket/IXConnectionState.h
|
||||
ixwebsocket/IXDNSLookup.h
|
||||
ixwebsocket/IXExponentialBackoff.h
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
IXWebSocket is a C++ library for WebSocket client and server development. It has minimal dependencies (no boost), is very simple to use and support everything you'll likely need for websocket dev (SSL, deflate compression, compiles on most platforms, etc...). HTTP client and server code is also available, but it hasn't received as much testing.
|
||||
|
||||
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Two important design goals are simplicity and correctness.
|
||||
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Note that the MinGW compiler is not supported at this point. Two important design goals are simplicity and correctness.
|
||||
|
||||
```cpp
|
||||
/*
|
||||
|
@ -1,67 +1,11 @@
|
||||
version: "3"
|
||||
version: "3.3"
|
||||
services:
|
||||
# snake:
|
||||
# image: bsergean/ws:build
|
||||
# entrypoint: ws snake --port 8767 --host 0.0.0.0 --redis_hosts redis1
|
||||
# ports:
|
||||
# - "8767:8767"
|
||||
# networks:
|
||||
# - ws-net
|
||||
# depends_on:
|
||||
# - redis1
|
||||
push:
|
||||
entrypoint: ws push_server --host 0.0.0.0
|
||||
image: ${DOCKER_REPO}/ws:build
|
||||
|
||||
# proxy:
|
||||
# image: bsergean/ws:build
|
||||
# entrypoint: strace ws proxy_server --remote_host 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 -v
|
||||
# ports:
|
||||
# - "8765:8765"
|
||||
# networks:
|
||||
# - ws-net
|
||||
|
||||
#pyproxy:
|
||||
# image: bsergean/ws_proxy:build
|
||||
# entrypoint: /usr/bin/ws_proxy.py --remote_url 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765
|
||||
# ports:
|
||||
# - "8765:8765"
|
||||
# networks:
|
||||
# - ws-net
|
||||
|
||||
# # ws:
|
||||
# # security_opt:
|
||||
# # - seccomp:unconfined
|
||||
# # cap_add:
|
||||
# # - SYS_PTRACE
|
||||
# # stdin_open: true
|
||||
# # tty: true
|
||||
# # image: bsergean/ws:build
|
||||
# # entrypoint: sh
|
||||
# # networks:
|
||||
# # - ws-net
|
||||
# # depends_on:
|
||||
# # - redis1
|
||||
# #
|
||||
# # redis1:
|
||||
# # image: redis:alpine
|
||||
# # networks:
|
||||
# # - ws-net
|
||||
# #
|
||||
# # statsd:
|
||||
# # image: jaconel/statsd
|
||||
# # ports:
|
||||
# # - "8125:8125"
|
||||
# # environment:
|
||||
# # - STATSD_DUMP_MSG=true
|
||||
# # - GRAPHITE_HOST=127.0.0.1
|
||||
# # networks:
|
||||
# # - ws-net
|
||||
|
||||
compile:
|
||||
image: alpine
|
||||
entrypoint: sh
|
||||
stdin_open: true
|
||||
tty: true
|
||||
volumes:
|
||||
- /Users/bsergeant/src/foss:/home/bsergean/src/foss
|
||||
|
||||
networks:
|
||||
ws-net:
|
||||
autoroute:
|
||||
entrypoint: ws autoroute ws://push:8008
|
||||
image: ${DOCKER_REPO}/ws:build
|
||||
depends_on:
|
||||
- push
|
||||
|
@ -20,7 +20,7 @@ RUN make ws_mbedtls_install && \
|
||||
|
||||
FROM alpine:3.12 as runtime
|
||||
|
||||
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 && \
|
||||
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 strace && \
|
||||
addgroup -S app && \
|
||||
adduser -S -G app app
|
||||
|
||||
|
@ -2,6 +2,50 @@
|
||||
|
||||
All changes to this project will be documented in this file.
|
||||
|
||||
## [10.4.0] - 2020-09-12
|
||||
|
||||
(http server) read body request when the Content-Length is specified + set timeout to read the request to 30 seconds max by default, and make it configurable as a constructor parameter
|
||||
|
||||
## [10.3.5] - 2020-09-09
|
||||
|
||||
(ws) autoroute command exit on its own once all messages have been received
|
||||
|
||||
## [10.3.4] - 2020-09-04
|
||||
|
||||
(docker) ws docker file installs strace
|
||||
|
||||
## [10.3.3] - 2020-09-02
|
||||
|
||||
(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.
|
||||
|
||||
## [10.3.2] - 2020-08-31
|
||||
|
||||
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel
|
||||
|
||||
## [10.3.1] - 2020-08-28
|
||||
|
||||
(socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis
|
||||
|
||||
## [10.3.0] - 2020-08-26
|
||||
|
||||
(ws) set the main thread name, to help with debugging in XCode, gdb, lldb etc...
|
||||
|
||||
## [10.2.9] - 2020-08-19
|
||||
|
||||
(ws) cobra to python bot / take a module python name as argument foo.bar.baz instead of a path foo/bar/baz.py
|
||||
|
||||
## [10.2.8] - 2020-08-19
|
||||
|
||||
(ws) on Linux with mbedtls, when the system ca certs are specified (the default) pick up sensible OS supplied paths (tested with CentOS and Alpine)
|
||||
|
||||
## [10.2.7] - 2020-08-18
|
||||
|
||||
(ws push_server) on the server side, stop sending and close the connection when the remote end has disconnected
|
||||
|
||||
## [10.2.6] - 2020-08-17
|
||||
|
||||
(ixwebsocket) replace std::unique_ptr<unsigned char[]> with std::array for some fixed arrays (which are in C++11)
|
||||
|
||||
## [10.2.5] - 2020-08-15
|
||||
|
||||
(ws) merge all ws_*.cpp files into a single one to speedup compilation
|
||||
|
@ -280,10 +280,9 @@ ix::WebSocketServer server(port);
|
||||
|
||||
server.setOnConnectionCallback(
|
||||
[&server](std::weak_ptr<WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
|
||||
std::cout << "Remote ip: " << connectionState->remoteIp << std::endl;
|
||||
|
||||
auto ws = webSocket.lock();
|
||||
if (ws)
|
||||
@ -359,13 +358,12 @@ The webSocket reference is guaranteed to be always valid ; by design the callbac
|
||||
ix::WebSocketServer server(port);
|
||||
|
||||
server.setOnClientMessageCallback(std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg)
|
||||
{
|
||||
// The ConnectionInfo object contains information about the connection,
|
||||
// The ConnectionState object contains information about the connection,
|
||||
// at this point only the client ip address and the port.
|
||||
std::cout << "Remote ip: " << connectionInfo.remoteIp << std::endl;
|
||||
std::cout << "Remote ip: " << connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
@ -519,12 +517,11 @@ If you want to handle how requests are processed, implement the setOnConnectionC
|
||||
```cpp
|
||||
setOnConnectionCallback(
|
||||
[this](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr
|
||||
{
|
||||
// Build a string for the response
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp
|
||||
ss << connectionState->getRemoteIp();
|
||||
<< " "
|
||||
<< request->method
|
||||
<< " "
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
set (IXBOTS_SOURCES
|
||||
ixbots/IXCobraBot.cpp
|
||||
ixbots/IXCobraToCobraBot.cpp
|
||||
ixbots/IXCobraToSentryBot.cpp
|
||||
ixbots/IXCobraToStatsdBot.cpp
|
||||
ixbots/IXCobraToStdoutBot.cpp
|
||||
@ -16,6 +17,7 @@ set (IXBOTS_SOURCES
|
||||
set (IXBOTS_HEADERS
|
||||
ixbots/IXCobraBot.h
|
||||
ixbots/IXCobraBotConfig.h
|
||||
ixbots/IXCobraToCobraBot.h
|
||||
ixbots/IXCobraToSentryBot.h
|
||||
ixbots/IXCobraToStatsdBot.h
|
||||
ixbots/IXCobraToStdoutBot.h
|
||||
|
43
ixbots/ixbots/IXCobraToCobraBot.cpp
Normal file
43
ixbots/ixbots/IXCobraToCobraBot.cpp
Normal file
@ -0,0 +1,43 @@
|
||||
/*
|
||||
* IXCobraToCobraBot.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXCobraToCobraBot.h"
|
||||
|
||||
#include "IXCobraBot.h"
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& cobraBotConfig,
|
||||
const std::string& republishChannel,
|
||||
const std::string& publisherRolename,
|
||||
const std::string& publisherRolesecret)
|
||||
{
|
||||
CobraBot bot;
|
||||
|
||||
CobraMetricsPublisher cobraMetricsPublisher;
|
||||
CobraConfig cobraPublisherConfig = cobraBotConfig.cobraConfig;
|
||||
cobraPublisherConfig.rolename = publisherRolename;
|
||||
cobraPublisherConfig.rolesecret = publisherRolesecret;
|
||||
cobraMetricsPublisher.configure(cobraPublisherConfig, republishChannel);
|
||||
|
||||
bot.setOnBotMessageCallback(
|
||||
[&republishChannel, &cobraMetricsPublisher](const Json::Value& msg,
|
||||
const std::string& /*position*/,
|
||||
std::atomic<bool>& /*throttled*/,
|
||||
std::atomic<bool>& /*fatalCobraError*/,
|
||||
std::atomic<uint64_t>& sentCount) -> void {
|
||||
Json::Value msgWithNoId(msg);
|
||||
msgWithNoId.removeMember("id");
|
||||
|
||||
cobraMetricsPublisher.push(republishChannel, msg);
|
||||
sentCount++;
|
||||
});
|
||||
|
||||
return bot.run(cobraBotConfig);
|
||||
}
|
||||
} // namespace ix
|
20
ixbots/ixbots/IXCobraToCobraBot.h
Normal file
20
ixbots/ixbots/IXCobraToCobraBot.h
Normal file
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* IXCobraToCobraBot.h
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <ixbots/IXStatsdClient.h>
|
||||
#include "IXCobraBotConfig.h"
|
||||
#include <stddef.h>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& config,
|
||||
const std::string& republishChannel,
|
||||
const std::string& publisherRolename,
|
||||
const std::string& publisherRolesecret);
|
||||
} // namespace ix
|
@ -102,7 +102,7 @@ namespace ix
|
||||
{
|
||||
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
|
||||
StatsdClient& statsdClient,
|
||||
const std::string& scriptPath)
|
||||
const std::string& moduleName)
|
||||
{
|
||||
#ifndef IXBOTS_USE_PYTHON
|
||||
CoreLogger::error("Command is disabled. "
|
||||
@ -113,10 +113,7 @@ namespace ix
|
||||
Py_InitializeEx(0); // 0 arg so that we do not install signal handlers
|
||||
// which prevent us from using Ctrl-C
|
||||
|
||||
size_t lastIndex = scriptPath.find_last_of(".");
|
||||
std::string modulePath = scriptPath.substr(0, lastIndex);
|
||||
|
||||
PyObject* pyModuleName = PyUnicode_DecodeFSDefault(modulePath.c_str());
|
||||
PyObject* pyModuleName = PyUnicode_DecodeFSDefault(moduleName.c_str());
|
||||
|
||||
if (pyModuleName == nullptr)
|
||||
{
|
||||
|
@ -15,5 +15,5 @@ namespace ix
|
||||
{
|
||||
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
|
||||
StatsdClient& statsdClient,
|
||||
const std::string& scriptPath);
|
||||
const std::string& moduleName);
|
||||
} // namespace ix
|
||||
|
@ -24,6 +24,7 @@ namespace ix
|
||||
{
|
||||
_cobra_connection.setEventCallback([](const CobraEventPtr& event) {
|
||||
std::stringstream ss;
|
||||
ix::LogLevel logLevel = LogLevel::Info;
|
||||
|
||||
if (event->type == ix::CobraEventType::Open)
|
||||
{
|
||||
@ -41,6 +42,7 @@ namespace ix
|
||||
else if (event->type == ix::CobraEventType::Error)
|
||||
{
|
||||
ss << "Error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Closed)
|
||||
{
|
||||
@ -57,6 +59,7 @@ namespace ix
|
||||
else if (event->type == ix::CobraEventType::Published)
|
||||
{
|
||||
ss << "Published message " << event->msgId << " acked";
|
||||
logLevel = ix::LogLevel::Debug;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Pong)
|
||||
{
|
||||
@ -65,17 +68,20 @@ namespace ix
|
||||
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||
{
|
||||
ss << "Handshake error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||
{
|
||||
ss << "Authentication error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::SubscriptionError)
|
||||
{
|
||||
ss << "Subscription error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
|
||||
CoreLogger::log(ss.str().c_str());
|
||||
CoreLogger::log(ss.str().c_str(), logLevel);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -45,10 +45,9 @@ namespace ix
|
||||
}
|
||||
|
||||
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
|
||||
logInfo("New connection from remote ip " + connectionState->getRemoteIp());
|
||||
|
||||
_connectedClientsCount++;
|
||||
|
||||
|
@ -44,8 +44,7 @@ namespace ix
|
||||
|
||||
// Methods
|
||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) final;
|
||||
std::shared_ptr<ConnectionState> connectionState) final;
|
||||
virtual size_t getConnectedClientsCount() final;
|
||||
|
||||
bool startsWith(const std::string& str, const std::string& start);
|
||||
|
@ -61,11 +61,10 @@ namespace snake
|
||||
|
||||
_server.setOnClientMessageCallback(
|
||||
[this](std::shared_ptr<ix::ConnectionState> connectionState,
|
||||
ix::ConnectionInfo& connectionInfo,
|
||||
ix::WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "[" << state->getId() << "] ";
|
||||
|
@ -12,10 +12,8 @@ namespace ix
|
||||
{
|
||||
Bench::Bench(const std::string& description)
|
||||
: _description(description)
|
||||
, _start(std::chrono::high_resolution_clock::now())
|
||||
, _reported(false)
|
||||
{
|
||||
;
|
||||
reset();
|
||||
}
|
||||
|
||||
Bench::~Bench()
|
||||
@ -26,6 +24,12 @@ namespace ix
|
||||
}
|
||||
}
|
||||
|
||||
void Bench::reset()
|
||||
{
|
||||
_start = std::chrono::high_resolution_clock::now();
|
||||
_reported = false;
|
||||
}
|
||||
|
||||
void Bench::report()
|
||||
{
|
||||
auto now = std::chrono::high_resolution_clock::now();
|
||||
|
@ -17,6 +17,7 @@ namespace ix
|
||||
Bench(const std::string& description);
|
||||
~Bench();
|
||||
|
||||
void reset();
|
||||
void report();
|
||||
uint64_t getDuration() const;
|
||||
|
||||
|
@ -1,25 +0,0 @@
|
||||
/*
|
||||
* IXConnectionInfo.h
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
struct ConnectionInfo
|
||||
{
|
||||
std::string remoteIp;
|
||||
int remotePort;
|
||||
|
||||
ConnectionInfo(const std::string& r = std::string(), int p = 0)
|
||||
: remoteIp(r)
|
||||
, remotePort(p)
|
||||
{
|
||||
;
|
||||
}
|
||||
};
|
||||
} // namespace ix
|
@ -50,4 +50,24 @@ namespace ix
|
||||
_onSetTerminatedCallback();
|
||||
}
|
||||
}
|
||||
|
||||
const std::string& ConnectionState::getRemoteIp()
|
||||
{
|
||||
return _remoteIp;
|
||||
}
|
||||
|
||||
int ConnectionState::getRemotePort()
|
||||
{
|
||||
return _remotePort;
|
||||
}
|
||||
|
||||
void ConnectionState::setRemoteIp(const std::string& remoteIp)
|
||||
{
|
||||
_remoteIp = remoteIp;
|
||||
}
|
||||
|
||||
void ConnectionState::setRemotePort(int remotePort)
|
||||
{
|
||||
_remotePort = remotePort;
|
||||
}
|
||||
} // namespace ix
|
||||
|
@ -28,11 +28,17 @@ namespace ix
|
||||
void setTerminated();
|
||||
bool isTerminated() const;
|
||||
|
||||
const std::string& getRemoteIp();
|
||||
int getRemotePort();
|
||||
|
||||
static std::shared_ptr<ConnectionState> createConnectionState();
|
||||
|
||||
private:
|
||||
void setOnSetTerminatedCallback(const OnSetTerminatedCallback& callback);
|
||||
|
||||
void setRemoteIp(const std::string& remoteIp);
|
||||
void setRemotePort(int remotePort);
|
||||
|
||||
protected:
|
||||
std::atomic<bool> _terminated;
|
||||
std::string _id;
|
||||
@ -40,6 +46,9 @@ namespace ix
|
||||
|
||||
static std::atomic<uint64_t> _globalId;
|
||||
|
||||
std::string _remoteIp;
|
||||
int _remotePort;
|
||||
|
||||
friend class SocketServer;
|
||||
};
|
||||
} // namespace ix
|
||||
|
@ -93,14 +93,12 @@ namespace ix
|
||||
}
|
||||
|
||||
std::tuple<bool, std::string, HttpRequestPtr> Http::parseRequest(
|
||||
std::unique_ptr<Socket>& socket)
|
||||
std::unique_ptr<Socket>& socket, int timeoutSecs)
|
||||
{
|
||||
HttpRequestPtr httpRequest;
|
||||
|
||||
std::atomic<bool> requestInitCancellation(false);
|
||||
|
||||
int timeoutSecs = 5; // FIXME
|
||||
|
||||
auto isCancellationRequested =
|
||||
makeCancellationRequestWithTimeout(timeoutSecs, requestInitCancellation);
|
||||
|
||||
@ -130,7 +128,36 @@ namespace ix
|
||||
return std::make_tuple(false, "Error parsing HTTP headers", httpRequest);
|
||||
}
|
||||
|
||||
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, headers);
|
||||
std::string body;
|
||||
if (headers.find("Content-Length") != headers.end())
|
||||
{
|
||||
int contentLength = 0;
|
||||
try
|
||||
{
|
||||
contentLength = std::stoi(headers["Content-Length"]);
|
||||
}
|
||||
catch (std::exception)
|
||||
{
|
||||
return std::make_tuple(
|
||||
false, "Error parsing HTTP Header 'Content-Length'", httpRequest);
|
||||
}
|
||||
|
||||
if (contentLength < 0)
|
||||
{
|
||||
return std::make_tuple(
|
||||
false, "Error: 'Content-Length' should be a positive integer", httpRequest);
|
||||
}
|
||||
|
||||
auto res = socket->readBytes(contentLength, nullptr, isCancellationRequested);
|
||||
if (!res.first)
|
||||
{
|
||||
return std::make_tuple(
|
||||
false, std::string("Error reading request: ") + res.second, httpRequest);
|
||||
}
|
||||
body = res.second;
|
||||
}
|
||||
|
||||
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, body, headers);
|
||||
return std::make_tuple(true, "", httpRequest);
|
||||
}
|
||||
|
||||
|
@ -95,15 +95,18 @@ namespace ix
|
||||
std::string uri;
|
||||
std::string method;
|
||||
std::string version;
|
||||
std::string body;
|
||||
WebSocketHttpHeaders headers;
|
||||
|
||||
HttpRequest(const std::string& u,
|
||||
const std::string& m,
|
||||
const std::string& v,
|
||||
const std::string& b,
|
||||
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
|
||||
: uri(u)
|
||||
, method(m)
|
||||
, version(v)
|
||||
, body(b)
|
||||
, headers(h)
|
||||
{
|
||||
}
|
||||
@ -115,7 +118,7 @@ namespace ix
|
||||
{
|
||||
public:
|
||||
static std::tuple<bool, std::string, HttpRequestPtr> parseRequest(
|
||||
std::unique_ptr<Socket>& socket);
|
||||
std::unique_ptr<Socket>& socket, int timeoutSecs);
|
||||
static bool sendResponse(HttpResponsePtr response, std::unique_ptr<Socket>& socket);
|
||||
|
||||
static std::pair<std::string, int> parseStatusLine(const std::string& line);
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include "IXUrlParser.h"
|
||||
#include "IXUserAgent.h"
|
||||
#include "IXWebSocketHttpHeaders.h"
|
||||
#include <array>
|
||||
#include <assert.h>
|
||||
#include <cstring>
|
||||
#include <iomanip>
|
||||
@ -700,14 +701,12 @@ namespace ix
|
||||
inflateState.next_in = (unsigned char*) (const_cast<char*>(in.data()));
|
||||
|
||||
const int kBufferSize = 1 << 14;
|
||||
|
||||
std::unique_ptr<unsigned char[]> compressBuffer =
|
||||
std::make_unique<unsigned char[]>(kBufferSize);
|
||||
std::array<unsigned char, kBufferSize> compressBuffer;
|
||||
|
||||
do
|
||||
{
|
||||
inflateState.avail_out = (uInt) kBufferSize;
|
||||
inflateState.next_out = compressBuffer.get();
|
||||
inflateState.next_out = &compressBuffer.front();
|
||||
|
||||
int ret = inflate(&inflateState, Z_SYNC_FLUSH);
|
||||
|
||||
@ -717,7 +716,7 @@ namespace ix
|
||||
return false;
|
||||
}
|
||||
|
||||
out.append(reinterpret_cast<char*>(compressBuffer.get()),
|
||||
out.append(reinterpret_cast<char*>(&compressBuffer.front()),
|
||||
kBufferSize - inflateState.avail_out);
|
||||
} while (inflateState.avail_out == 0);
|
||||
|
||||
|
@ -92,10 +92,17 @@ namespace
|
||||
|
||||
namespace ix
|
||||
{
|
||||
HttpServer::HttpServer(
|
||||
int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily)
|
||||
const int HttpServer::kDefaultTimeoutSecs(30);
|
||||
|
||||
HttpServer::HttpServer(int port,
|
||||
const std::string& host,
|
||||
int backlog,
|
||||
size_t maxConnections,
|
||||
int addressFamily,
|
||||
int timeoutSecs)
|
||||
: SocketServer(port, host, backlog, maxConnections, addressFamily)
|
||||
, _connectedClientsCount(0)
|
||||
, _timeoutSecs(timeoutSecs)
|
||||
{
|
||||
setDefaultConnectionCallback();
|
||||
}
|
||||
@ -120,18 +127,16 @@ namespace ix
|
||||
}
|
||||
|
||||
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
_connectedClientsCount++;
|
||||
|
||||
auto ret = Http::parseRequest(socket);
|
||||
auto ret = Http::parseRequest(socket, _timeoutSecs);
|
||||
// FIXME: handle errors in parseRequest
|
||||
|
||||
if (std::get<0>(ret))
|
||||
{
|
||||
auto response =
|
||||
_onConnectionCallback(std::get<2>(ret), connectionState, std::move(connectionInfo));
|
||||
auto response = _onConnectionCallback(std::get<2>(ret), connectionState);
|
||||
if (!Http::sendResponse(response, socket))
|
||||
{
|
||||
logError("Cannot send response");
|
||||
@ -151,8 +156,7 @@ namespace ix
|
||||
{
|
||||
setOnConnectionCallback(
|
||||
[this](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
|
||||
std::string uri(request->uri);
|
||||
if (uri.empty() || uri == "/")
|
||||
{
|
||||
@ -184,8 +188,8 @@ namespace ix
|
||||
|
||||
// Log request
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
|
||||
<< " " << request->method << " " << request->headers["User-Agent"] << " "
|
||||
<< request->uri << " " << content.size();
|
||||
logInfo(ss.str());
|
||||
|
||||
@ -209,16 +213,16 @@ namespace ix
|
||||
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections
|
||||
//
|
||||
setOnConnectionCallback(
|
||||
[this, redirectUrl](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||
[this,
|
||||
redirectUrl](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Server"] = userAgent();
|
||||
|
||||
// Log request
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
|
||||
<< " " << request->method << " " << request->headers["User-Agent"] << " "
|
||||
<< request->uri;
|
||||
logInfo(ss.str());
|
||||
|
||||
|
@ -23,15 +23,14 @@ namespace ix
|
||||
{
|
||||
public:
|
||||
using OnConnectionCallback =
|
||||
std::function<HttpResponsePtr(HttpRequestPtr,
|
||||
std::shared_ptr<ConnectionState>,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)>;
|
||||
std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>;
|
||||
|
||||
HttpServer(int port = SocketServer::kDefaultPort,
|
||||
const std::string& host = SocketServer::kDefaultHost,
|
||||
int backlog = SocketServer::kDefaultTcpBacklog,
|
||||
size_t maxConnections = SocketServer::kDefaultMaxConnections,
|
||||
int addressFamily = SocketServer::kDefaultAddressFamily);
|
||||
int addressFamily = SocketServer::kDefaultAddressFamily,
|
||||
int timeoutSecs = HttpServer::kDefaultTimeoutSecs);
|
||||
virtual ~HttpServer();
|
||||
virtual void stop() final;
|
||||
|
||||
@ -44,10 +43,12 @@ namespace ix
|
||||
OnConnectionCallback _onConnectionCallback;
|
||||
std::atomic<int> _connectedClientsCount;
|
||||
|
||||
const static int kDefaultTimeoutSecs;
|
||||
int _timeoutSecs;
|
||||
|
||||
// Methods
|
||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) final;
|
||||
std::shared_ptr<ConnectionState> connectionState) final;
|
||||
virtual size_t getConnectedClientsCount() final;
|
||||
|
||||
void setDefaultConnectionCallback();
|
||||
|
@ -332,12 +332,13 @@ namespace ix
|
||||
}
|
||||
|
||||
// Retrieve connection info, the ip address of the remote peer/client)
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo;
|
||||
std::string remoteIp;
|
||||
int remotePort;
|
||||
|
||||
if (_addressFamily == AF_INET)
|
||||
{
|
||||
char remoteIp[INET_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
|
||||
char remoteIp4[INET_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp4, INET_ADDRSTRLEN) == nullptr)
|
||||
{
|
||||
int err = Socket::getErrno();
|
||||
std::stringstream ss;
|
||||
@ -350,12 +351,13 @@ namespace ix
|
||||
continue;
|
||||
}
|
||||
|
||||
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
|
||||
remotePort = client.sin_port;
|
||||
remoteIp = remoteIp4;
|
||||
}
|
||||
else // AF_INET6
|
||||
{
|
||||
char remoteIp[INET6_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
|
||||
char remoteIp6[INET6_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp6, INET6_ADDRSTRLEN) == nullptr)
|
||||
{
|
||||
int err = Socket::getErrno();
|
||||
std::stringstream ss;
|
||||
@ -368,7 +370,8 @@ namespace ix
|
||||
continue;
|
||||
}
|
||||
|
||||
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
|
||||
remotePort = client.sin_port;
|
||||
remoteIp = remoteIp6;
|
||||
}
|
||||
|
||||
std::shared_ptr<ConnectionState> connectionState;
|
||||
@ -377,6 +380,8 @@ namespace ix
|
||||
connectionState = _connectionStateFactory();
|
||||
}
|
||||
connectionState->setOnSetTerminatedCallback([this] { onSetTerminatedCallback(); });
|
||||
connectionState->setRemoteIp(remoteIp);
|
||||
connectionState->setRemotePort(remotePort);
|
||||
|
||||
if (_stop) return;
|
||||
|
||||
@ -404,13 +409,10 @@ namespace ix
|
||||
|
||||
// Launch the handleConnection work asynchronously in its own thread.
|
||||
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
|
||||
_connectionsThreads.push_back(
|
||||
std::make_pair(connectionState,
|
||||
std::thread(&SocketServer::handleConnection,
|
||||
this,
|
||||
std::move(socket),
|
||||
connectionState,
|
||||
std::move(connectionInfo))));
|
||||
_connectionsThreads.push_back(std::make_pair(
|
||||
connectionState,
|
||||
std::thread(
|
||||
&SocketServer::handleConnection, this, std::move(socket), connectionState)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "IXConnectionInfo.h"
|
||||
#include "IXConnectionState.h"
|
||||
#include "IXSelectInterrupt.h"
|
||||
#include "IXSocketTLSOptions.h"
|
||||
@ -105,8 +104,7 @@ namespace ix
|
||||
ConnectionStateFactory _connectionStateFactory;
|
||||
|
||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
|
||||
std::shared_ptr<ConnectionState> connectionState) = 0;
|
||||
virtual size_t getConnectedClientsCount() = 0;
|
||||
|
||||
// Returns true if all connection threads are joined
|
||||
|
@ -16,8 +16,6 @@ namespace
|
||||
// is treated as a char* and the null termination (\x00) makes it
|
||||
// look like an empty string.
|
||||
const std::string kEmptyUncompressedBlock = std::string("\x00\x00\xff\xff", 4);
|
||||
|
||||
const int kBufferSize = 1 << 14;
|
||||
} // namespace
|
||||
|
||||
namespace ix
|
||||
@ -26,7 +24,6 @@ namespace ix
|
||||
// Compressor
|
||||
//
|
||||
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
|
||||
: _compressBufferSize(kBufferSize)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
memset(&_deflateState, 0, sizeof(_deflateState));
|
||||
@ -57,8 +54,6 @@ namespace ix
|
||||
|
||||
if (ret != Z_OK) return false;
|
||||
|
||||
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
|
||||
|
||||
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
|
||||
|
||||
return true;
|
||||
@ -145,14 +140,14 @@ namespace ix
|
||||
do
|
||||
{
|
||||
// Output to local buffer
|
||||
_deflateState.avail_out = (uInt) _compressBufferSize;
|
||||
_deflateState.next_out = _compressBuffer.get();
|
||||
_deflateState.avail_out = (uInt) _compressBuffer.size();
|
||||
_deflateState.next_out = &_compressBuffer.front();
|
||||
|
||||
deflate(&_deflateState, _flush);
|
||||
|
||||
output = _compressBufferSize - _deflateState.avail_out;
|
||||
output = _compressBuffer.size() - _deflateState.avail_out;
|
||||
|
||||
out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
|
||||
out.insert(out.end(), _compressBuffer.begin(), _compressBuffer.begin() + output);
|
||||
} while (_deflateState.avail_out == 0);
|
||||
|
||||
if (endsWithEmptyUnCompressedBlock(out))
|
||||
@ -170,7 +165,6 @@ namespace ix
|
||||
// Decompressor
|
||||
//
|
||||
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
|
||||
: _compressBufferSize(kBufferSize)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
memset(&_inflateState, 0, sizeof(_inflateState));
|
||||
@ -198,8 +192,6 @@ namespace ix
|
||||
|
||||
if (ret != Z_OK) return false;
|
||||
|
||||
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
|
||||
|
||||
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
|
||||
|
||||
return true;
|
||||
@ -232,8 +224,8 @@ namespace ix
|
||||
|
||||
do
|
||||
{
|
||||
_inflateState.avail_out = (uInt) _compressBufferSize;
|
||||
_inflateState.next_out = _compressBuffer.get();
|
||||
_inflateState.avail_out = (uInt) _compressBuffer.size();
|
||||
_inflateState.next_out = &_compressBuffer.front();
|
||||
|
||||
int ret = inflate(&_inflateState, Z_SYNC_FLUSH);
|
||||
|
||||
@ -242,8 +234,8 @@ namespace ix
|
||||
return false; // zlib error
|
||||
}
|
||||
|
||||
out.append(reinterpret_cast<char*>(_compressBuffer.get()),
|
||||
_compressBufferSize - _inflateState.avail_out);
|
||||
out.append(reinterpret_cast<char*>(&_compressBuffer.front()),
|
||||
_compressBuffer.size() - _inflateState.avail_out);
|
||||
} while (_inflateState.avail_out == 0);
|
||||
|
||||
return true;
|
||||
|
@ -9,7 +9,7 @@
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
#include "zlib.h"
|
||||
#endif
|
||||
#include <memory>
|
||||
#include <array>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
@ -34,8 +34,7 @@ namespace ix
|
||||
bool endsWithEmptyUnCompressedBlock(const T& value);
|
||||
|
||||
int _flush;
|
||||
size_t _compressBufferSize;
|
||||
std::unique_ptr<unsigned char[]> _compressBuffer;
|
||||
std::array<unsigned char, 1 << 14> _compressBuffer;
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
z_stream _deflateState;
|
||||
@ -53,8 +52,7 @@ namespace ix
|
||||
|
||||
private:
|
||||
int _flush;
|
||||
size_t _compressBufferSize;
|
||||
std::unique_ptr<unsigned char[]> _compressBuffer;
|
||||
std::array<unsigned char, 1 << 14> _compressBuffer;
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
z_stream _inflateState;
|
||||
|
@ -56,10 +56,9 @@ namespace ix
|
||||
|
||||
server.setOnConnectionCallback(
|
||||
[remoteUrl, remoteUrlsMapping](std::weak_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
|
||||
auto remoteIp = connectionInfo->remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
// Server connection
|
||||
state->webSocket().setOnMessageCallback(
|
||||
|
@ -77,15 +77,14 @@ namespace ix
|
||||
}
|
||||
|
||||
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
setThreadName("WebSocketServer::" + connectionState->getId());
|
||||
|
||||
auto webSocket = std::make_shared<WebSocket>();
|
||||
if (_onConnectionCallback)
|
||||
{
|
||||
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
|
||||
_onConnectionCallback(webSocket, connectionState);
|
||||
|
||||
if (!webSocket->isOnMessageCallbackRegistered())
|
||||
{
|
||||
@ -99,9 +98,8 @@ namespace ix
|
||||
else if (_onClientMessageCallback)
|
||||
{
|
||||
webSocket->setOnMessageCallback(
|
||||
[this, &ws = *webSocket.get(), connectionState, &ci = *connectionInfo.get()](
|
||||
const WebSocketMessagePtr& msg) {
|
||||
_onClientMessageCallback(connectionState, ci, ws, msg);
|
||||
[this, &ws = *webSocket.get(), connectionState](const WebSocketMessagePtr& msg) {
|
||||
_onClientMessageCallback(connectionState, ws, msg);
|
||||
});
|
||||
}
|
||||
else
|
||||
|
@ -23,14 +23,10 @@ namespace ix
|
||||
{
|
||||
public:
|
||||
using OnConnectionCallback =
|
||||
std::function<void(std::weak_ptr<WebSocket>,
|
||||
std::shared_ptr<ConnectionState>,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)>;
|
||||
std::function<void(std::weak_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>;
|
||||
|
||||
using OnClientMessageCallback = std::function<void(std::shared_ptr<ConnectionState>,
|
||||
ConnectionInfo&,
|
||||
WebSocket&,
|
||||
const WebSocketMessagePtr&)>;
|
||||
using OnClientMessageCallback = std::function<void(
|
||||
std::shared_ptr<ConnectionState>, WebSocket&, const WebSocketMessagePtr&)>;
|
||||
|
||||
WebSocketServer(int port = SocketServer::kDefaultPort,
|
||||
const std::string& host = SocketServer::kDefaultHost,
|
||||
@ -69,8 +65,7 @@ namespace ix
|
||||
|
||||
// Methods
|
||||
virtual void handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo);
|
||||
std::shared_ptr<ConnectionState> connectionState);
|
||||
virtual size_t getConnectedClientsCount() final;
|
||||
};
|
||||
} // namespace ix
|
||||
|
@ -6,4 +6,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define IX_WEBSOCKET_VERSION "10.2.5"
|
||||
#define IX_WEBSOCKET_VERSION "10.4.0"
|
||||
|
@ -95,15 +95,14 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
|
||||
|
||||
sentryServer.setOnConnectionCallback(
|
||||
[](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Server"] = userAgent();
|
||||
|
||||
// Log request
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
|
||||
<< " " << request->method << " " << request->headers["User-Agent"] << " "
|
||||
<< request->uri;
|
||||
|
||||
if (request->method == "POST")
|
||||
|
@ -63,6 +63,54 @@ TEST_CASE("http server", "[httpd]")
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
SECTION("Posting plain text data to a local HTTP server")
|
||||
{
|
||||
int port = getFreePort();
|
||||
ix::HttpServer server(port, "127.0.0.1");
|
||||
|
||||
server.setOnConnectionCallback(
|
||||
[](HttpRequestPtr request, std::shared_ptr<ConnectionState>) -> HttpResponsePtr {
|
||||
if (request->method == "POST")
|
||||
{
|
||||
return std::make_shared<HttpResponse>(
|
||||
200, "OK", HttpErrorCode::Ok, WebSocketHttpHeaders(), request->body);
|
||||
}
|
||||
|
||||
return std::make_shared<HttpResponse>(400, "BAD REQUEST");
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
REQUIRE(res.first);
|
||||
server.start();
|
||||
|
||||
HttpClient httpClient;
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Content-Type"] = "text/plain";
|
||||
|
||||
std::string url("http://127.0.0.1:");
|
||||
url += std::to_string(port);
|
||||
auto args = httpClient.createRequest(url);
|
||||
|
||||
args->extraHeaders = headers;
|
||||
args->connectTimeout = 60;
|
||||
args->transferTimeout = 60;
|
||||
args->verbose = true;
|
||||
args->logger = [](const std::string& msg) { std::cout << msg; };
|
||||
args->body = "Hello World!";
|
||||
|
||||
auto response = httpClient.post(url, args->body, args);
|
||||
|
||||
std::cerr << "Status: " << response->statusCode << std::endl;
|
||||
std::cerr << "Error message: " << response->errorMsg << std::endl;
|
||||
std::cerr << "Payload: " << response->payload << std::endl;
|
||||
|
||||
REQUIRE(response->errorCode == HttpErrorCode::Ok);
|
||||
REQUIRE(response->statusCode == 200);
|
||||
REQUIRE(response->payload == args->body);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("http server redirection", "[httpd_redirect]")
|
||||
|
@ -85,11 +85,10 @@ namespace ix
|
||||
bool startWebSocketEchoServer(ix::WebSocketServer& server)
|
||||
{
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
ConnectionInfo& connectionInfo,
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
|
@ -191,11 +191,9 @@ namespace
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
|
@ -195,10 +195,9 @@ namespace
|
||||
{
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
@ -286,27 +285,27 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
|
||||
int attempts = 0;
|
||||
while (chatA.getReceivedMessagesCount() != 3 || chatB.getReceivedMessagesCount() != 3)
|
||||
{
|
||||
REQUIRE(attempts++ < 10);
|
||||
CHECK(attempts++ < 10);
|
||||
ix::msleep(1000);
|
||||
}
|
||||
|
||||
chatA.stop();
|
||||
chatB.stop();
|
||||
|
||||
REQUIRE(chatA.getReceivedMessagesCount() == 3);
|
||||
REQUIRE(chatB.getReceivedMessagesCount() == 3);
|
||||
CHECK(chatA.getReceivedMessagesCount() == 3);
|
||||
CHECK(chatB.getReceivedMessagesCount() == 3);
|
||||
|
||||
REQUIRE(chatB.getReceivedMessages()[0] == "from A1");
|
||||
REQUIRE(chatB.getReceivedMessages()[1] == "from A2");
|
||||
REQUIRE(chatB.getReceivedMessages()[2] == "from A3");
|
||||
CHECK(chatB.getReceivedMessages()[0] == "from A1");
|
||||
CHECK(chatB.getReceivedMessages()[1] == "from A2");
|
||||
CHECK(chatB.getReceivedMessages()[2] == "from A3");
|
||||
|
||||
REQUIRE(chatA.getReceivedMessages()[0] == "from B1");
|
||||
REQUIRE(chatA.getReceivedMessages()[1] == "from B2");
|
||||
REQUIRE(chatA.getReceivedMessages()[2].size() == bigMessage.size());
|
||||
CHECK(chatA.getReceivedMessages()[0] == "from B1");
|
||||
CHECK(chatA.getReceivedMessages()[1] == "from B2");
|
||||
CHECK(chatA.getReceivedMessages()[2].size() == bigMessage.size());
|
||||
|
||||
// Give us 1000ms for the server to notice that clients went away
|
||||
ix::msleep(1000);
|
||||
REQUIRE(server.getClients().size() == 0);
|
||||
CHECK(server.getClients().size() == 0);
|
||||
|
||||
ix::reportWebSocketTraffic();
|
||||
}
|
||||
|
@ -171,10 +171,9 @@ namespace
|
||||
server.setOnClientMessageCallback(
|
||||
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& /*webSocket*/,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New server connection";
|
||||
|
@ -35,11 +35,9 @@ namespace ix
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
|
@ -18,10 +18,9 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
|
||||
{
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
|
6
test/compatibility/ruby/README.md
Normal file
6
test/compatibility/ruby/README.md
Normal file
@ -0,0 +1,6 @@
|
||||
```
|
||||
export GEM_HOME=$HOME/local/gems
|
||||
bundle install faye-websocket
|
||||
```
|
||||
|
||||
https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv
|
59
test/compatibility/ruby/devnull_client.rb
Normal file
59
test/compatibility/ruby/devnull_client.rb
Normal file
@ -0,0 +1,59 @@
|
||||
#
|
||||
# $ ruby --version
|
||||
# ruby 2.6.3p62 (2019-04-16 revision 67580) [universal.x86_64-darwin19]
|
||||
#
|
||||
# Install a gem locally by setting GEM_HOME
|
||||
# https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv
|
||||
# export GEM_HOME=$HOME/local/gems
|
||||
# bundle install faye-websocket
|
||||
#
|
||||
# In a different terminal, start a push server:
|
||||
# $ ws push_server -q
|
||||
#
|
||||
# $ ruby devnull_client.rb
|
||||
# [:open]
|
||||
# Connected to server
|
||||
# messages received per second: 115926
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# ^C[:close, 1006, ""]
|
||||
#
|
||||
require 'faye/websocket'
|
||||
require 'eventmachine'
|
||||
|
||||
EM.run {
|
||||
ws = Faye::WebSocket::Client.new('ws://127.0.0.1:8008')
|
||||
|
||||
counter = 0
|
||||
|
||||
EM.add_periodic_timer(1) do
|
||||
print "messages received per second: #{counter}\n"
|
||||
counter = 0 # reset counter
|
||||
end
|
||||
|
||||
ws.on :open do |event|
|
||||
p [:open]
|
||||
print "Connected to server\n"
|
||||
end
|
||||
|
||||
ws.on :message do |event|
|
||||
# Uncomment the next line to validate that we receive something correct
|
||||
# p [:message, event.data]
|
||||
counter += 1
|
||||
end
|
||||
|
||||
ws.on :close do |event|
|
||||
p [:close, event.code, event.reason]
|
||||
ws = nil
|
||||
end
|
||||
}
|
262
ws/ws.cpp
262
ws/ws.cpp
@ -18,6 +18,7 @@
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <ixbots/IXCobraMetricsToRedisBot.h>
|
||||
#include <ixbots/IXCobraToCobraBot.h>
|
||||
#include <ixbots/IXCobraToPythonBot.h>
|
||||
#include <ixbots/IXCobraToSentryBot.h>
|
||||
#include <ixbots/IXCobraToStatsdBot.h>
|
||||
@ -120,6 +121,12 @@ namespace
|
||||
return str.substr(0, n) + "...";
|
||||
}
|
||||
}
|
||||
|
||||
bool fileExists(const std::string& fileName)
|
||||
{
|
||||
std::ifstream infile(fileName);
|
||||
return infile.good();
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace ix
|
||||
@ -406,10 +413,9 @@ namespace ix
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("New connection");
|
||||
@ -1097,21 +1103,26 @@ namespace ix
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ws_echo_client(const std::string& url,
|
||||
bool disablePerMessageDeflate,
|
||||
bool binaryMode,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& subprotocol,
|
||||
int pingIntervalSecs,
|
||||
const std::string& sendMsg,
|
||||
bool noSend)
|
||||
int ws_autoroute(const std::string& url,
|
||||
bool disablePerMessageDeflate,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& subprotocol,
|
||||
int pingIntervalSecs,
|
||||
int msgCount)
|
||||
{
|
||||
Bench bench("ws_autoroute full test");
|
||||
|
||||
// Our websocket object
|
||||
ix::WebSocket webSocket;
|
||||
|
||||
webSocket.setUrl(url);
|
||||
std::string fullUrl(url);
|
||||
fullUrl += "/";
|
||||
fullUrl += std::to_string(msgCount);
|
||||
|
||||
webSocket.setUrl(fullUrl);
|
||||
webSocket.setTLSOptions(tlsOptions);
|
||||
webSocket.setPingInterval(pingIntervalSecs);
|
||||
webSocket.disableAutomaticReconnection();
|
||||
|
||||
if (disablePerMessageDeflate)
|
||||
{
|
||||
@ -1123,43 +1134,56 @@ namespace ix
|
||||
webSocket.addSubProtocol(subprotocol);
|
||||
}
|
||||
|
||||
std::atomic<uint64_t> receivedCount(0);
|
||||
uint64_t receivedCountTotal(0);
|
||||
uint64_t receivedCountPerSecs(0);
|
||||
std::atomic<uint64_t> receivedCountPerSecs(0);
|
||||
std::atomic<uint64_t> target(msgCount);
|
||||
std::mutex conditionVariableMutex;
|
||||
std::condition_variable condition;
|
||||
|
||||
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
|
||||
// when a message or an event (open, close, error) is received
|
||||
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
if (!noSend)
|
||||
std::atomic<bool> stop(false);
|
||||
|
||||
// Setup a callback to be fired
|
||||
// when a message or an event (open, close, ping, pong, error) is received
|
||||
webSocket.setOnMessageCallback(
|
||||
[&webSocket, &receivedCountPerSecs, &target, &stop, &condition, &bench](
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
webSocket.send(msg->str, msg->binary);
|
||||
receivedCountPerSecs++;
|
||||
|
||||
target -= 1;
|
||||
if (target == 0)
|
||||
{
|
||||
stop = true;
|
||||
condition.notify_one();
|
||||
|
||||
bench.report();
|
||||
}
|
||||
}
|
||||
receivedCount++;
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("ws_echo_client: connected");
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
else if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
bench.reset();
|
||||
|
||||
spdlog::info("ws_autoroute: connected");
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
spdlog::info("Received pong {}", msg->str);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
spdlog::info("ws_autoroute: connection closed");
|
||||
}
|
||||
});
|
||||
|
||||
webSocket.send(sendMsg, binaryMode);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
spdlog::info("Received pong {}", msg->str);
|
||||
}
|
||||
});
|
||||
|
||||
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
|
||||
auto timer = [&receivedCountPerSecs, &stop] {
|
||||
setThreadName("Timer");
|
||||
while (true)
|
||||
while (!stop)
|
||||
{
|
||||
//
|
||||
// We cannot write to sentCount and receivedCount
|
||||
@ -1167,13 +1191,11 @@ namespace ix
|
||||
// our own counters
|
||||
//
|
||||
std::stringstream ss;
|
||||
ss << "messages received: " << receivedCountPerSecs << " per second "
|
||||
<< receivedCountTotal << " total";
|
||||
ss << "messages received per second: " << receivedCountPerSecs;
|
||||
|
||||
CoreLogger::info(ss.str());
|
||||
|
||||
receivedCountPerSecs = receivedCount - receivedCountTotal;
|
||||
receivedCountTotal += receivedCountPerSecs;
|
||||
receivedCountPerSecs = 0;
|
||||
|
||||
auto duration = std::chrono::seconds(1);
|
||||
std::this_thread::sleep_for(duration);
|
||||
@ -1186,17 +1208,12 @@ namespace ix
|
||||
std::cout << "Connecting to " << url << "..." << std::endl;
|
||||
webSocket.start();
|
||||
|
||||
// Send a message to the server (default to TEXT mode)
|
||||
webSocket.send("hello world");
|
||||
// Wait for all the messages to be received
|
||||
std::unique_lock<std::mutex> lock(conditionVariableMutex);
|
||||
condition.wait(lock);
|
||||
|
||||
while (true)
|
||||
{
|
||||
std::string text;
|
||||
std::cout << "> " << std::flush;
|
||||
std::getline(std::cin, text);
|
||||
|
||||
webSocket.send(text);
|
||||
}
|
||||
t1.join();
|
||||
webSocket.stop();
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -1234,10 +1251,9 @@ namespace ix
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[greetings](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("New connection");
|
||||
@ -1635,7 +1651,6 @@ namespace ix
|
||||
}
|
||||
|
||||
int ws_push_server(int port,
|
||||
bool greetings,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
bool ipv6,
|
||||
@ -1666,12 +1681,14 @@ namespace ix
|
||||
server.disablePong();
|
||||
}
|
||||
|
||||
// push one million messages
|
||||
std::atomic<bool> stop(false);
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
[&sendMsg, &stop](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("New connection");
|
||||
@ -1684,15 +1701,31 @@ namespace ix
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
|
||||
if (greetings)
|
||||
{
|
||||
webSocket.sendText("Welcome !");
|
||||
}
|
||||
// Parse the msg count from the uri.
|
||||
int msgCount = -1;
|
||||
std::stringstream ss;
|
||||
auto uriSize = msg->openInfo.uri.size();
|
||||
ss << msg->openInfo.uri.substr(1, uriSize - 1);
|
||||
ss >> msgCount;
|
||||
|
||||
bool binary = false;
|
||||
while (true)
|
||||
if (msgCount == -1)
|
||||
{
|
||||
webSocket.send(sendMsg, binary);
|
||||
spdlog::info("Error parsing message count, closing connection");
|
||||
webSocket.close();
|
||||
}
|
||||
else
|
||||
{
|
||||
bool binary = false;
|
||||
for (int i = 0; i < msgCount; ++i)
|
||||
{
|
||||
auto sendInfo = webSocket.send(sendMsg, binary);
|
||||
if (!sendInfo.success)
|
||||
{
|
||||
spdlog::info("Error sending message, closing connection");
|
||||
webSocket.close();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
@ -1701,6 +1734,7 @@ namespace ix
|
||||
connectionState->getId(),
|
||||
msg->closeInfo.code,
|
||||
msg->closeInfo.reason);
|
||||
stop = true;
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
@ -1724,7 +1758,14 @@ namespace ix
|
||||
}
|
||||
|
||||
server.start();
|
||||
server.wait();
|
||||
|
||||
while (!stop)
|
||||
{
|
||||
auto duration = std::chrono::seconds(1);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
|
||||
server.stop();
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -2601,10 +2642,9 @@ namespace ix
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("ws_transfer: New connection");
|
||||
@ -2716,6 +2756,7 @@ namespace ix
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
ix::setThreadName("ws main thread");
|
||||
ix::initNetSystem();
|
||||
|
||||
ix::CoreLogger::LogFunc logFunc = [](const char* msg, ix::LogLevel level) {
|
||||
@ -2802,8 +2843,10 @@ int main(int argc, char** argv)
|
||||
std::string project;
|
||||
std::string key;
|
||||
std::string logfile;
|
||||
std::string scriptPath;
|
||||
std::string moduleName;
|
||||
std::string republishChannel;
|
||||
std::string publisherRolename;
|
||||
std::string publisherRolesecret;
|
||||
std::string sendMsg("hello world");
|
||||
ix::SocketTLSOptions tlsOptions;
|
||||
ix::CobraConfig cobraConfig;
|
||||
@ -2827,7 +2870,6 @@ int main(int argc, char** argv)
|
||||
bool version = false;
|
||||
bool verifyNone = false;
|
||||
bool disablePong = false;
|
||||
bool noSend = false;
|
||||
int port = 8008;
|
||||
int redisPort = 6379;
|
||||
int statsdPort = 8125;
|
||||
@ -2836,6 +2878,7 @@ int main(int argc, char** argv)
|
||||
int maxRedirects = 5;
|
||||
int delayMs = -1;
|
||||
int count = 1;
|
||||
int msgCount = 1000 * 1000;
|
||||
uint32_t maxWaitBetweenReconnectionRetries;
|
||||
int pingIntervalSecs = 30;
|
||||
|
||||
@ -2929,17 +2972,14 @@ int main(int argc, char** argv)
|
||||
addGenericOptions(connectApp);
|
||||
addTLSOptions(connectApp);
|
||||
|
||||
CLI::App* echoClientApp =
|
||||
app.add_subcommand("echo_client", "Echo messages sent by a remote server");
|
||||
CLI::App* echoClientApp = app.add_subcommand("autoroute", "Test websocket client performance");
|
||||
echoClientApp->fallthrough();
|
||||
echoClientApp->add_option("url", url, "Connection url")->required();
|
||||
echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
|
||||
echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
|
||||
echoClientApp->add_option(
|
||||
"--ping_interval", pingIntervalSecs, "Interval between sending pings");
|
||||
echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
|
||||
echoClientApp->add_option("--send_msg", sendMsg, "Send message");
|
||||
echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
|
||||
echoClientApp->add_option("--msg_count", msgCount, "Total message count to be sent");
|
||||
addTLSOptions(echoClientApp);
|
||||
|
||||
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
|
||||
@ -2964,7 +3004,6 @@ int main(int argc, char** argv)
|
||||
pushServerApp->add_option("--port", port, "Port");
|
||||
pushServerApp->add_option("--host", hostname, "Hostname");
|
||||
pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
|
||||
pushServerApp->add_flag("-g", greetings, "Greet");
|
||||
pushServerApp->add_flag("-6", ipv6, "IpV6");
|
||||
pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
|
||||
pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
|
||||
@ -3068,13 +3107,23 @@ int main(int argc, char** argv)
|
||||
addTLSOptions(cobra2statsd);
|
||||
addCobraBotConfig(cobra2statsd);
|
||||
|
||||
CLI::App* cobra2cobra = app.add_subcommand("cobra_to_cobra", "Cobra to Cobra");
|
||||
cobra2cobra->fallthrough();
|
||||
cobra2cobra->add_option("--republish", republishChannel, "Republish channel");
|
||||
cobra2cobra->add_option("--publisher_rolename", publisherRolename, "Publisher Role name")
|
||||
->required();
|
||||
cobra2cobra->add_option("--publisher_rolesecret", publisherRolesecret, "Publisher Role secret")
|
||||
->required();
|
||||
cobra2cobra->add_flag("-q", quiet, "Quiet");
|
||||
addTLSOptions(cobra2cobra);
|
||||
addCobraBotConfig(cobra2cobra);
|
||||
|
||||
CLI::App* cobra2python = app.add_subcommand("cobra_to_python", "Cobra to python");
|
||||
cobra2python->fallthrough();
|
||||
cobra2python->add_option("--host", hostname, "Statsd host");
|
||||
cobra2python->add_option("--port", statsdPort, "Statsd port");
|
||||
cobra2python->add_option("--prefix", prefix, "Statsd prefix");
|
||||
cobra2python->add_option("--script", scriptPath, "Python script path")
|
||||
->check(CLI::ExistingPath);
|
||||
cobra2python->add_option("--module", moduleName, "Python module");
|
||||
cobra2python->add_option("--pidfile", pidfile, "Pid file");
|
||||
addTLSOptions(cobra2python);
|
||||
addCobraBotConfig(cobra2python);
|
||||
@ -3177,11 +3226,27 @@ int main(int argc, char** argv)
|
||||
|
||||
if (tlsOptions.isUsingSystemDefaults())
|
||||
{
|
||||
#ifdef __APPLE__
|
||||
#if defined(__APPLE__)
|
||||
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
|
||||
// We could try to load some system certs as well, but this is easy enough
|
||||
tlsOptions.caFile = "/etc/ssl/cert.pem";
|
||||
#endif
|
||||
#elif defined(__linux__)
|
||||
#if defined(IXWEBSOCKET_USE_MBED_TLS)
|
||||
std::vector<std::string> caFiles = {
|
||||
"/etc/ssl/certs/ca-bundle.crt", // CentOS
|
||||
"/etc/ssl/certs/ca-certificates.crt", // Alpine
|
||||
};
|
||||
|
||||
for (auto&& caFile : caFiles)
|
||||
{
|
||||
if (fileExists(caFile))
|
||||
{
|
||||
tlsOptions.caFile = caFile;
|
||||
break;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -3229,16 +3294,10 @@ int main(int argc, char** argv)
|
||||
subprotocol,
|
||||
pingIntervalSecs);
|
||||
}
|
||||
else if (app.got_subcommand("echo_client"))
|
||||
else if (app.got_subcommand("autoroute"))
|
||||
{
|
||||
ret = ix::ws_echo_client(url,
|
||||
disablePerMessageDeflate,
|
||||
binaryMode,
|
||||
tlsOptions,
|
||||
subprotocol,
|
||||
pingIntervalSecs,
|
||||
sendMsg,
|
||||
noSend);
|
||||
ret = ix::ws_autoroute(
|
||||
url, disablePerMessageDeflate, tlsOptions, subprotocol, pingIntervalSecs, msgCount);
|
||||
}
|
||||
else if (app.got_subcommand("echo_server"))
|
||||
{
|
||||
@ -3247,14 +3306,8 @@ int main(int argc, char** argv)
|
||||
}
|
||||
else if (app.got_subcommand("push_server"))
|
||||
{
|
||||
ret = ix::ws_push_server(port,
|
||||
greetings,
|
||||
hostname,
|
||||
tlsOptions,
|
||||
ipv6,
|
||||
disablePerMessageDeflate,
|
||||
disablePong,
|
||||
sendMsg);
|
||||
ret = ix::ws_push_server(
|
||||
port, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong, sendMsg);
|
||||
}
|
||||
else if (app.got_subcommand("transfer"))
|
||||
{
|
||||
@ -3361,7 +3414,7 @@ int main(int argc, char** argv)
|
||||
}
|
||||
else
|
||||
{
|
||||
ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, scriptPath);
|
||||
ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, moduleName);
|
||||
}
|
||||
}
|
||||
else if (app.got_subcommand("cobra_to_sentry"))
|
||||
@ -3384,6 +3437,11 @@ int main(int argc, char** argv)
|
||||
ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose);
|
||||
}
|
||||
}
|
||||
else if (app.got_subcommand("cobra_to_cobra"))
|
||||
{
|
||||
ret = (int) ix::cobra_to_cobra_bot(
|
||||
cobraBotConfig, republishChannel, publisherRolename, publisherRolesecret);
|
||||
}
|
||||
else if (app.got_subcommand("snake"))
|
||||
{
|
||||
ret = ix::ws_snake_main(port,
|
||||
|
Reference in New Issue
Block a user