Compare commits
48 Commits
feature/zl
...
v10.4.0
Author | SHA1 | Date | |
---|---|---|---|
128bc0afa9 | |||
b04e5c5529 | |||
1e8c421d66 | |||
72d6651ded | |||
a4e5d1b47a | |||
9f51a54a83 | |||
b74f7319c6 | |||
0ad66a27f2 | |||
a40003e85a | |||
5534a7fdf9 | |||
efb245278d | |||
5896d3740f | |||
73b9c0b89b | |||
629c155044 | |||
08640d877f | |||
ed5c63144e | |||
ee69aed2b0 | |||
fcb92f862d | |||
e8e98e667d | |||
e1502017ce | |||
72472f2899 | |||
42f71364ca | |||
3dabd3a556 | |||
0498e2fa98 | |||
2aaf59651e | |||
cd4e51eacf | |||
785842de03 | |||
261095fa12 | |||
ed2ed0f7ae | |||
7ad5ead0f6 | |||
a8284e64e3 | |||
5423a31d5a | |||
53575f8d90 | |||
d3bcbdac26 | |||
8c5b28adce | |||
dcbafae35a | |||
eb197edcec | |||
b8265bf7f2 | |||
e7c4f0b171 | |||
12f36b61ff | |||
b15c4189f5 | |||
74d3278258 | |||
831152b906 | |||
7c81a98632 | |||
6e47c62c06 | |||
bcae7f326d | |||
d719c41e31 | |||
6f0307fb35 |
66
.github/workflows/docker.yml
vendored
Normal file
66
.github/workflows/docker.yml
vendored
Normal file
@ -0,0 +1,66 @@
|
||||
name: docker
|
||||
|
||||
# When its time to do a release do a build for amd64
|
||||
# and push all of them to Docker Hub.
|
||||
# Only trigger on semver shaped tags.
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- "v*.*.*"
|
||||
|
||||
jobs:
|
||||
login:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Prepare
|
||||
id: prep
|
||||
run: |
|
||||
DOCKER_IMAGE=machinezone/ws
|
||||
VERSION=edge
|
||||
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||
VERSION=${GITHUB_REF#refs/tags/v}
|
||||
fi
|
||||
if [ "${{ github.event_name }}" = "schedule" ]; then
|
||||
VERSION=nightly
|
||||
fi
|
||||
TAGS="${DOCKER_IMAGE}:${VERSION}"
|
||||
if [[ $VERSION =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
|
||||
TAGS="$TAGS,${DOCKER_IMAGE}:latest"
|
||||
fi
|
||||
echo ::set-output name=tags::${TAGS}
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
id: buildx
|
||||
uses: docker/setup-buildx-action@master
|
||||
|
||||
- name: Cache Docker layers
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: /tmp/.buildx-cache
|
||||
key: ${{ runner.os }}-buildx-${{ github.sha }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
|
||||
- name: Login to GitHub Package Registry
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GHCR_TOKEN }}
|
||||
|
||||
- name: Build and push
|
||||
id: docker_build
|
||||
uses: docker/build-push-action@v2-build-push
|
||||
with:
|
||||
builder: ${{ steps.buildx.outputs.name }}
|
||||
context: .
|
||||
file: ./Dockerfile
|
||||
target: prod
|
||||
platforms: linux/amd64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.prep.outputs.tags }}
|
||||
cache-from: type=local,src=/tmp/.buildx-cache
|
||||
cache-to: type=local,dest=/tmp/.buildx-cache
|
4
.github/workflows/unittest_uwp.yml
vendored
4
.github/workflows/unittest_uwp.yml
vendored
@ -10,12 +10,10 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- uses: seanmiddleditch/gha-setup-vsdevenv@master
|
||||
- run: |
|
||||
vcpkg install zlib:x64-uwp
|
||||
- run: |
|
||||
mkdir build
|
||||
cd build
|
||||
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 ..
|
||||
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 -DUSE_ZLIB=0 ..
|
||||
- run: cmake --build build
|
||||
|
||||
#
|
||||
|
7
.github/workflows/unittest_windows.yml
vendored
7
.github/workflows/unittest_windows.yml
vendored
@ -10,10 +10,11 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- uses: seanmiddleditch/gha-setup-vsdevenv@master
|
||||
- run: |
|
||||
vcpkg install zlib:x64-windows
|
||||
- run: |
|
||||
mkdir build
|
||||
cd build
|
||||
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
|
||||
cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 -DUSE_ZLIB=0 ..
|
||||
- run: cmake --build build
|
||||
|
||||
#- run: ../build/test/ixwebsocket_unittest.exe
|
||||
# working-directory: test
|
||||
|
@ -62,7 +62,6 @@ set( IXWEBSOCKET_SOURCES
|
||||
set( IXWEBSOCKET_HEADERS
|
||||
ixwebsocket/IXBench.h
|
||||
ixwebsocket/IXCancellationRequest.h
|
||||
ixwebsocket/IXConnectionInfo.h
|
||||
ixwebsocket/IXConnectionState.h
|
||||
ixwebsocket/IXDNSLookup.h
|
||||
ixwebsocket/IXExponentialBackoff.h
|
||||
@ -190,10 +189,16 @@ if (USE_TLS)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
# Use ZLIB_ROOT CMake variable if you need to use your own zlib
|
||||
find_package(ZLIB REQUIRED)
|
||||
include_directories(${ZLIB_INCLUDE_DIRS})
|
||||
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
|
||||
option(USE_ZLIB "Enable zlib support" TRUE)
|
||||
|
||||
if (USE_ZLIB)
|
||||
# Use ZLIB_ROOT CMake variable if you need to use your own zlib
|
||||
find_package(ZLIB REQUIRED)
|
||||
include_directories(${ZLIB_INCLUDE_DIRS})
|
||||
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
|
||||
|
||||
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_ZLIB)
|
||||
endif()
|
||||
|
||||
if (WIN32)
|
||||
target_link_libraries(ixwebsocket wsock32 ws2_32 shlwapi)
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
IXWebSocket is a C++ library for WebSocket client and server development. It has minimal dependencies (no boost), is very simple to use and support everything you'll likely need for websocket dev (SSL, deflate compression, compiles on most platforms, etc...). HTTP client and server code is also available, but it hasn't received as much testing.
|
||||
|
||||
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Two important design goals are simplicity and correctness.
|
||||
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Note that the MinGW compiler is not supported at this point. Two important design goals are simplicity and correctness.
|
||||
|
||||
```cpp
|
||||
/*
|
||||
|
@ -1,67 +1,11 @@
|
||||
version: "3"
|
||||
version: "3.3"
|
||||
services:
|
||||
# snake:
|
||||
# image: bsergean/ws:build
|
||||
# entrypoint: ws snake --port 8767 --host 0.0.0.0 --redis_hosts redis1
|
||||
# ports:
|
||||
# - "8767:8767"
|
||||
# networks:
|
||||
# - ws-net
|
||||
# depends_on:
|
||||
# - redis1
|
||||
push:
|
||||
entrypoint: ws push_server --host 0.0.0.0
|
||||
image: ${DOCKER_REPO}/ws:build
|
||||
|
||||
# proxy:
|
||||
# image: bsergean/ws:build
|
||||
# entrypoint: strace ws proxy_server --remote_host 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 -v
|
||||
# ports:
|
||||
# - "8765:8765"
|
||||
# networks:
|
||||
# - ws-net
|
||||
|
||||
#pyproxy:
|
||||
# image: bsergean/ws_proxy:build
|
||||
# entrypoint: /usr/bin/ws_proxy.py --remote_url 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765
|
||||
# ports:
|
||||
# - "8765:8765"
|
||||
# networks:
|
||||
# - ws-net
|
||||
|
||||
# # ws:
|
||||
# # security_opt:
|
||||
# # - seccomp:unconfined
|
||||
# # cap_add:
|
||||
# # - SYS_PTRACE
|
||||
# # stdin_open: true
|
||||
# # tty: true
|
||||
# # image: bsergean/ws:build
|
||||
# # entrypoint: sh
|
||||
# # networks:
|
||||
# # - ws-net
|
||||
# # depends_on:
|
||||
# # - redis1
|
||||
# #
|
||||
# # redis1:
|
||||
# # image: redis:alpine
|
||||
# # networks:
|
||||
# # - ws-net
|
||||
# #
|
||||
# # statsd:
|
||||
# # image: jaconel/statsd
|
||||
# # ports:
|
||||
# # - "8125:8125"
|
||||
# # environment:
|
||||
# # - STATSD_DUMP_MSG=true
|
||||
# # - GRAPHITE_HOST=127.0.0.1
|
||||
# # networks:
|
||||
# # - ws-net
|
||||
|
||||
compile:
|
||||
image: alpine
|
||||
entrypoint: sh
|
||||
stdin_open: true
|
||||
tty: true
|
||||
volumes:
|
||||
- /Users/bsergeant/src/foss:/home/bsergean/src/foss
|
||||
|
||||
networks:
|
||||
ws-net:
|
||||
autoroute:
|
||||
entrypoint: ws autoroute ws://push:8008
|
||||
image: ${DOCKER_REPO}/ws:build
|
||||
depends_on:
|
||||
- push
|
||||
|
@ -20,7 +20,7 @@ RUN make ws_mbedtls_install && \
|
||||
|
||||
FROM alpine:3.12 as runtime
|
||||
|
||||
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 && \
|
||||
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 strace && \
|
||||
addgroup -S app && \
|
||||
adduser -S -G app app
|
||||
|
||||
|
@ -1,6 +1,111 @@
|
||||
# Changelog
|
||||
|
||||
All changes to this project will be documented in this file.
|
||||
|
||||
## [10.4.0] - 2020-09-12
|
||||
|
||||
(http server) read body request when the Content-Length is specified + set timeout to read the request to 30 seconds max by default, and make it configurable as a constructor parameter
|
||||
|
||||
## [10.3.5] - 2020-09-09
|
||||
|
||||
(ws) autoroute command exit on its own once all messages have been received
|
||||
|
||||
## [10.3.4] - 2020-09-04
|
||||
|
||||
(docker) ws docker file installs strace
|
||||
|
||||
## [10.3.3] - 2020-09-02
|
||||
|
||||
(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.
|
||||
|
||||
## [10.3.2] - 2020-08-31
|
||||
|
||||
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel
|
||||
|
||||
## [10.3.1] - 2020-08-28
|
||||
|
||||
(socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis
|
||||
|
||||
## [10.3.0] - 2020-08-26
|
||||
|
||||
(ws) set the main thread name, to help with debugging in XCode, gdb, lldb etc...
|
||||
|
||||
## [10.2.9] - 2020-08-19
|
||||
|
||||
(ws) cobra to python bot / take a module python name as argument foo.bar.baz instead of a path foo/bar/baz.py
|
||||
|
||||
## [10.2.8] - 2020-08-19
|
||||
|
||||
(ws) on Linux with mbedtls, when the system ca certs are specified (the default) pick up sensible OS supplied paths (tested with CentOS and Alpine)
|
||||
|
||||
## [10.2.7] - 2020-08-18
|
||||
|
||||
(ws push_server) on the server side, stop sending and close the connection when the remote end has disconnected
|
||||
|
||||
## [10.2.6] - 2020-08-17
|
||||
|
||||
(ixwebsocket) replace std::unique_ptr<unsigned char[]> with std::array for some fixed arrays (which are in C++11)
|
||||
|
||||
## [10.2.5] - 2020-08-15
|
||||
|
||||
(ws) merge all ws_*.cpp files into a single one to speedup compilation
|
||||
|
||||
## [10.2.4] - 2020-08-15
|
||||
|
||||
(socket server) in the loop accepting connections, call select without a timeout on unix to avoid busy looping, and only wake up when a new connection happens
|
||||
|
||||
## [10.2.3] - 2020-08-15
|
||||
|
||||
(socket server) instead of busy looping with a sleep, only wake up the GC thread when a new thread will have to be joined, (we know that thanks to the ConnectionState OnSetTerminated callback
|
||||
|
||||
## [10.2.2] - 2020-08-15
|
||||
|
||||
(socket server) add a callback to the ConnectionState to be invoked when the connection is terminated. This will be used by the SocketServer in the future to know on time that the associated connection thread can be terminated.
|
||||
|
||||
## [10.2.1] - 2020-08-15
|
||||
|
||||
(socket server) do not create a select interrupt object everytime when polling for notifications while waiting for new connections, instead use a persistent one which is a member variable
|
||||
|
||||
## [10.2.0] - 2020-08-14
|
||||
|
||||
(ixwebsocket client) handle HTTP redirects
|
||||
|
||||
## [10.2.0] - 2020-08-13
|
||||
|
||||
(ws) upgrade to latest version of nlohmann json (3.9.1 from 3.2.0)
|
||||
|
||||
## [10.1.9] - 2020-08-13
|
||||
|
||||
(websocket proxy server) add ability to map different hosts to different websocket servers, using a json config file
|
||||
|
||||
## [10.1.8] - 2020-08-12
|
||||
|
||||
(ws) on macOS, with OpenSSL or MbedTLS, use /etc/ssl/cert.pem as the system certs
|
||||
|
||||
## [10.1.7] - 2020-08-11
|
||||
|
||||
(ws) -q option imply info log level, not warning log level
|
||||
|
||||
## [10.1.6] - 2020-08-06
|
||||
|
||||
(websocket server) Handle programmer error when the server callback is not registered properly (fix #227)
|
||||
|
||||
## [10.1.5] - 2020-08-02
|
||||
|
||||
(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235).
|
||||
|
||||
## [10.1.4] - 2020-08-02
|
||||
|
||||
(ws) Add a new ws sub-command, echo_client. This command sends a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235)
|
||||
|
||||
## [10.1.3] - 2020-08-02
|
||||
|
||||
(ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235)
|
||||
|
||||
## [10.1.2] - 2020-07-31
|
||||
|
||||
(build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent
|
||||
|
||||
## [10.1.1] - 2020-07-29
|
||||
|
||||
(websocket client) onProgressCallback not called for short messages on a websocket (fix #233)
|
||||
|
@ -17,6 +17,7 @@ There is a unittest which can be executed by typing `make test`.
|
||||
|
||||
Options for building:
|
||||
|
||||
* `-DUSE_ZLIB=1` will enable zlib support, required for http client + server + websocket per message deflate extension
|
||||
* `-DUSE_TLS=1` will enable TLS support
|
||||
* `-DUSE_OPEN_SSL=1` will use [openssl](https://www.openssl.org/) for the TLS support (default on Linux and Windows)
|
||||
* `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support
|
||||
|
37
docs/performance.md
Normal file
37
docs/performance.md
Normal file
@ -0,0 +1,37 @@
|
||||
|
||||
## WebSocket Client performance
|
||||
|
||||
We will run a client and a server on the same machine, connecting to localhost. This bench is run on a MacBook Pro from 2015. We can receive over 200,000 (small) messages per second, another way to put it is that it takes 5 micro-second to receive and process one message. This is an indication about the minimal latency to receive messages.
|
||||
|
||||
### Receiving messages
|
||||
|
||||
By using the push_server ws sub-command, the server will send the same message in a loop to any connected client.
|
||||
|
||||
```
|
||||
ws push_server -q --send_msg 'yo'
|
||||
```
|
||||
|
||||
By using the echo_client ws sub-command, with the -m (mute or no_send), we will display statistics on how many messages we can receive per second.
|
||||
|
||||
```
|
||||
$ ws echo_client -m ws://localhost:8008
|
||||
[2020-08-02 12:31:17.284] [info] ws_echo_client: connected
|
||||
[2020-08-02 12:31:17.284] [info] Uri: /
|
||||
[2020-08-02 12:31:17.284] [info] Headers:
|
||||
[2020-08-02 12:31:17.284] [info] Connection: Upgrade
|
||||
[2020-08-02 12:31:17.284] [info] Sec-WebSocket-Accept: byy/pMK2d0PtRwExaaiOnXJTQHo=
|
||||
[2020-08-02 12:31:17.284] [info] Server: ixwebsocket/10.1.4 macos ssl/SecureTransport zlib 1.2.11
|
||||
[2020-08-02 12:31:17.284] [info] Upgrade: websocket
|
||||
[2020-08-02 12:31:17.663] [info] messages received: 0 per second 2595307 total
|
||||
[2020-08-02 12:31:18.668] [info] messages received: 79679 per second 2674986 total
|
||||
[2020-08-02 12:31:19.668] [info] messages received: 207438 per second 2882424 total
|
||||
[2020-08-02 12:31:20.673] [info] messages received: 209207 per second 3091631 total
|
||||
[2020-08-02 12:31:21.676] [info] messages received: 216056 per second 3307687 total
|
||||
[2020-08-02 12:31:22.680] [info] messages received: 214927 per second 3522614 total
|
||||
[2020-08-02 12:31:23.684] [info] messages received: 216960 per second 3739574 total
|
||||
[2020-08-02 12:31:24.688] [info] messages received: 215232 per second 3954806 total
|
||||
[2020-08-02 12:31:25.691] [info] messages received: 212300 per second 4167106 total
|
||||
[2020-08-02 12:31:26.694] [info] messages received: 212501 per second 4379607 total
|
||||
[2020-08-02 12:31:27.699] [info] messages received: 212330 per second 4591937 total
|
||||
[2020-08-02 12:31:28.702] [info] messages received: 216511 per second 4808448 total
|
||||
```
|
@ -67,9 +67,28 @@ webSocket.stop()
|
||||
|
||||
### Sending messages
|
||||
|
||||
`websocket.send("foo")` will send a message.
|
||||
`WebSocketSendInfo result = websocket.send("foo")` will send a message.
|
||||
|
||||
If the connection was closed and sending failed, the return value will be set to false.
|
||||
If the connection was closed, sending will fail, and the success field of the result object will be set to false. There could also be a compression error in which case the compressError field will be set to true. The payloadSize field and wireSize fields will tell you respectively how much bytes the message weight, and how many bytes were sent on the wire (potentially compressed + counting the message header (a few bytes).
|
||||
|
||||
There is an optional progress callback that can be passed in as the second argument. If a message is large it will be fragmented into chunks which will be sent independantly. Everytime the we can write a fragment into the OS network cache, the callback will be invoked. If a user wants to cancel a slow send, false should be returned from within the callback.
|
||||
|
||||
Here is an example code snippet copied from the ws send sub-command. Each fragment weights 32K, so the total integer is the wireSize divided by 32K. As an example if you are sending 32M of data, uncompressed, total will be 1000. current will be set to 0 for the first fragment, then 1, 2 etc...
|
||||
|
||||
```
|
||||
auto result =
|
||||
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
|
||||
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
|
||||
|
||||
if (throttle)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
|
||||
return _connected;
|
||||
});
|
||||
```
|
||||
|
||||
### ReadyState
|
||||
|
||||
@ -261,10 +280,9 @@ ix::WebSocketServer server(port);
|
||||
|
||||
server.setOnConnectionCallback(
|
||||
[&server](std::weak_ptr<WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
|
||||
std::cout << "Remote ip: " << connectionState->remoteIp << std::endl;
|
||||
|
||||
auto ws = webSocket.lock();
|
||||
if (ws)
|
||||
@ -340,13 +358,12 @@ The webSocket reference is guaranteed to be always valid ; by design the callbac
|
||||
ix::WebSocketServer server(port);
|
||||
|
||||
server.setOnClientMessageCallback(std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg)
|
||||
{
|
||||
// The ConnectionInfo object contains information about the connection,
|
||||
// The ConnectionState object contains information about the connection,
|
||||
// at this point only the client ip address and the port.
|
||||
std::cout << "Remote ip: " << connectionInfo.remoteIp << std::endl;
|
||||
std::cout << "Remote ip: " << connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
@ -500,12 +517,11 @@ If you want to handle how requests are processed, implement the setOnConnectionC
|
||||
```cpp
|
||||
setOnConnectionCallback(
|
||||
[this](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr
|
||||
{
|
||||
// Build a string for the response
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp
|
||||
ss << connectionState->getRemoteIp();
|
||||
<< " "
|
||||
<< request->method
|
||||
<< " "
|
||||
|
18
docs/ws.md
18
docs/ws.md
@ -204,6 +204,24 @@ Listening on 127.0.0.1:8008
|
||||
|
||||
If you connect to ws://127.0.0.1:8008, the proxy will connect to ws://127.0.0.1:9000 and pass all traffic to this server.
|
||||
|
||||
You can also use a more complex setup if you want to redirect to different websocket servers based on the hostname your client is trying to connect to. If you have multiple CNAME aliases that point to the same server.
|
||||
|
||||
A JSON config file is used to express that mapping ; here connecting to echo.jeanserge.com will proxy the client to ws://localhost:8008 on the local machine (which actually runs ws echo_server), while connecting to bavarde.jeanserge.com will proxy the client to ws://localhost:5678 where a cobra python server is running. As a side note you will need a wildcard SSL certificate if you want to have SSL enabled on that machine.
|
||||
|
||||
```json
|
||||
{
|
||||
"remote_urls": {
|
||||
"echo.jeanserge.com": "ws://localhost:8008",
|
||||
"bavarde.jeanserge.com": "ws://localhost:5678"
|
||||
}
|
||||
}
|
||||
```
|
||||
The --config_path option is required to instruct ws proxy_server to read that file.
|
||||
|
||||
```
|
||||
ws proxy_server --config_path proxyConfig.json --port 8765
|
||||
```
|
||||
|
||||
## File transfer
|
||||
|
||||
```
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
set (IXBOTS_SOURCES
|
||||
ixbots/IXCobraBot.cpp
|
||||
ixbots/IXCobraToCobraBot.cpp
|
||||
ixbots/IXCobraToSentryBot.cpp
|
||||
ixbots/IXCobraToStatsdBot.cpp
|
||||
ixbots/IXCobraToStdoutBot.cpp
|
||||
@ -16,6 +17,7 @@ set (IXBOTS_SOURCES
|
||||
set (IXBOTS_HEADERS
|
||||
ixbots/IXCobraBot.h
|
||||
ixbots/IXCobraBotConfig.h
|
||||
ixbots/IXCobraToCobraBot.h
|
||||
ixbots/IXCobraToSentryBot.h
|
||||
ixbots/IXCobraToStatsdBot.h
|
||||
ixbots/IXCobraToStdoutBot.h
|
||||
|
43
ixbots/ixbots/IXCobraToCobraBot.cpp
Normal file
43
ixbots/ixbots/IXCobraToCobraBot.cpp
Normal file
@ -0,0 +1,43 @@
|
||||
/*
|
||||
* IXCobraToCobraBot.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXCobraToCobraBot.h"
|
||||
|
||||
#include "IXCobraBot.h"
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& cobraBotConfig,
|
||||
const std::string& republishChannel,
|
||||
const std::string& publisherRolename,
|
||||
const std::string& publisherRolesecret)
|
||||
{
|
||||
CobraBot bot;
|
||||
|
||||
CobraMetricsPublisher cobraMetricsPublisher;
|
||||
CobraConfig cobraPublisherConfig = cobraBotConfig.cobraConfig;
|
||||
cobraPublisherConfig.rolename = publisherRolename;
|
||||
cobraPublisherConfig.rolesecret = publisherRolesecret;
|
||||
cobraMetricsPublisher.configure(cobraPublisherConfig, republishChannel);
|
||||
|
||||
bot.setOnBotMessageCallback(
|
||||
[&republishChannel, &cobraMetricsPublisher](const Json::Value& msg,
|
||||
const std::string& /*position*/,
|
||||
std::atomic<bool>& /*throttled*/,
|
||||
std::atomic<bool>& /*fatalCobraError*/,
|
||||
std::atomic<uint64_t>& sentCount) -> void {
|
||||
Json::Value msgWithNoId(msg);
|
||||
msgWithNoId.removeMember("id");
|
||||
|
||||
cobraMetricsPublisher.push(republishChannel, msg);
|
||||
sentCount++;
|
||||
});
|
||||
|
||||
return bot.run(cobraBotConfig);
|
||||
}
|
||||
} // namespace ix
|
20
ixbots/ixbots/IXCobraToCobraBot.h
Normal file
20
ixbots/ixbots/IXCobraToCobraBot.h
Normal file
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* IXCobraToCobraBot.h
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <ixbots/IXStatsdClient.h>
|
||||
#include "IXCobraBotConfig.h"
|
||||
#include <stddef.h>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& config,
|
||||
const std::string& republishChannel,
|
||||
const std::string& publisherRolename,
|
||||
const std::string& publisherRolesecret);
|
||||
} // namespace ix
|
@ -102,7 +102,7 @@ namespace ix
|
||||
{
|
||||
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
|
||||
StatsdClient& statsdClient,
|
||||
const std::string& scriptPath)
|
||||
const std::string& moduleName)
|
||||
{
|
||||
#ifndef IXBOTS_USE_PYTHON
|
||||
CoreLogger::error("Command is disabled. "
|
||||
@ -113,10 +113,7 @@ namespace ix
|
||||
Py_InitializeEx(0); // 0 arg so that we do not install signal handlers
|
||||
// which prevent us from using Ctrl-C
|
||||
|
||||
size_t lastIndex = scriptPath.find_last_of(".");
|
||||
std::string modulePath = scriptPath.substr(0, lastIndex);
|
||||
|
||||
PyObject* pyModuleName = PyUnicode_DecodeFSDefault(modulePath.c_str());
|
||||
PyObject* pyModuleName = PyUnicode_DecodeFSDefault(moduleName.c_str());
|
||||
|
||||
if (pyModuleName == nullptr)
|
||||
{
|
||||
|
@ -15,5 +15,5 @@ namespace ix
|
||||
{
|
||||
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
|
||||
StatsdClient& statsdClient,
|
||||
const std::string& scriptPath);
|
||||
const std::string& moduleName);
|
||||
} // namespace ix
|
||||
|
@ -24,6 +24,7 @@ namespace ix
|
||||
{
|
||||
_cobra_connection.setEventCallback([](const CobraEventPtr& event) {
|
||||
std::stringstream ss;
|
||||
ix::LogLevel logLevel = LogLevel::Info;
|
||||
|
||||
if (event->type == ix::CobraEventType::Open)
|
||||
{
|
||||
@ -41,6 +42,7 @@ namespace ix
|
||||
else if (event->type == ix::CobraEventType::Error)
|
||||
{
|
||||
ss << "Error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Closed)
|
||||
{
|
||||
@ -57,6 +59,7 @@ namespace ix
|
||||
else if (event->type == ix::CobraEventType::Published)
|
||||
{
|
||||
ss << "Published message " << event->msgId << " acked";
|
||||
logLevel = ix::LogLevel::Debug;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Pong)
|
||||
{
|
||||
@ -65,17 +68,20 @@ namespace ix
|
||||
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||
{
|
||||
ss << "Handshake error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||
{
|
||||
ss << "Authentication error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::SubscriptionError)
|
||||
{
|
||||
ss << "Subscription error: " << event->errMsg;
|
||||
logLevel = ix::LogLevel::Error;
|
||||
}
|
||||
|
||||
CoreLogger::log(ss.str().c_str());
|
||||
CoreLogger::log(ss.str().c_str(), logLevel);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -45,10 +45,9 @@ namespace ix
|
||||
}
|
||||
|
||||
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
|
||||
logInfo("New connection from remote ip " + connectionState->getRemoteIp());
|
||||
|
||||
_connectedClientsCount++;
|
||||
|
||||
|
@ -44,8 +44,7 @@ namespace ix
|
||||
|
||||
// Methods
|
||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) final;
|
||||
std::shared_ptr<ConnectionState> connectionState) final;
|
||||
virtual size_t getConnectedClientsCount() final;
|
||||
|
||||
bool startsWith(const std::string& str, const std::string& start);
|
||||
|
@ -61,11 +61,10 @@ namespace snake
|
||||
|
||||
_server.setOnClientMessageCallback(
|
||||
[this](std::shared_ptr<ix::ConnectionState> connectionState,
|
||||
ix::ConnectionInfo& connectionInfo,
|
||||
ix::WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "[" << state->getId() << "] ";
|
||||
|
@ -12,10 +12,8 @@ namespace ix
|
||||
{
|
||||
Bench::Bench(const std::string& description)
|
||||
: _description(description)
|
||||
, _start(std::chrono::high_resolution_clock::now())
|
||||
, _reported(false)
|
||||
{
|
||||
;
|
||||
reset();
|
||||
}
|
||||
|
||||
Bench::~Bench()
|
||||
@ -26,6 +24,12 @@ namespace ix
|
||||
}
|
||||
}
|
||||
|
||||
void Bench::reset()
|
||||
{
|
||||
_start = std::chrono::high_resolution_clock::now();
|
||||
_reported = false;
|
||||
}
|
||||
|
||||
void Bench::report()
|
||||
{
|
||||
auto now = std::chrono::high_resolution_clock::now();
|
||||
|
@ -3,6 +3,7 @@
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2017-2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <stdint.h>
|
||||
@ -16,6 +17,7 @@ namespace ix
|
||||
Bench(const std::string& description);
|
||||
~Bench();
|
||||
|
||||
void reset();
|
||||
void report();
|
||||
uint64_t getDuration() const;
|
||||
|
||||
|
@ -1,25 +0,0 @@
|
||||
/*
|
||||
* IXConnectionInfo.h
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
struct ConnectionInfo
|
||||
{
|
||||
std::string remoteIp;
|
||||
int remotePort;
|
||||
|
||||
ConnectionInfo(const std::string& r = std::string(), int p = 0)
|
||||
: remoteIp(r)
|
||||
, remotePort(p)
|
||||
{
|
||||
;
|
||||
}
|
||||
};
|
||||
} // namespace ix
|
@ -31,6 +31,11 @@ namespace ix
|
||||
return std::make_shared<ConnectionState>();
|
||||
}
|
||||
|
||||
void ConnectionState::setOnSetTerminatedCallback(const OnSetTerminatedCallback& callback)
|
||||
{
|
||||
_onSetTerminatedCallback = callback;
|
||||
}
|
||||
|
||||
bool ConnectionState::isTerminated() const
|
||||
{
|
||||
return _terminated;
|
||||
@ -39,5 +44,30 @@ namespace ix
|
||||
void ConnectionState::setTerminated()
|
||||
{
|
||||
_terminated = true;
|
||||
|
||||
if (_onSetTerminatedCallback)
|
||||
{
|
||||
_onSetTerminatedCallback();
|
||||
}
|
||||
}
|
||||
|
||||
const std::string& ConnectionState::getRemoteIp()
|
||||
{
|
||||
return _remoteIp;
|
||||
}
|
||||
|
||||
int ConnectionState::getRemotePort()
|
||||
{
|
||||
return _remotePort;
|
||||
}
|
||||
|
||||
void ConnectionState::setRemoteIp(const std::string& remoteIp)
|
||||
{
|
||||
_remoteIp = remoteIp;
|
||||
}
|
||||
|
||||
void ConnectionState::setRemotePort(int remotePort)
|
||||
{
|
||||
_remotePort = remotePort;
|
||||
}
|
||||
} // namespace ix
|
||||
|
@ -7,12 +7,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <stdint.h>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
using OnSetTerminatedCallback = std::function<void()>;
|
||||
|
||||
class ConnectionState
|
||||
{
|
||||
public:
|
||||
@ -25,12 +28,27 @@ namespace ix
|
||||
void setTerminated();
|
||||
bool isTerminated() const;
|
||||
|
||||
const std::string& getRemoteIp();
|
||||
int getRemotePort();
|
||||
|
||||
static std::shared_ptr<ConnectionState> createConnectionState();
|
||||
|
||||
private:
|
||||
void setOnSetTerminatedCallback(const OnSetTerminatedCallback& callback);
|
||||
|
||||
void setRemoteIp(const std::string& remoteIp);
|
||||
void setRemotePort(int remotePort);
|
||||
|
||||
protected:
|
||||
std::atomic<bool> _terminated;
|
||||
std::string _id;
|
||||
OnSetTerminatedCallback _onSetTerminatedCallback;
|
||||
|
||||
static std::atomic<uint64_t> _globalId;
|
||||
|
||||
std::string _remoteIp;
|
||||
int _remotePort;
|
||||
|
||||
friend class SocketServer;
|
||||
};
|
||||
} // namespace ix
|
||||
|
@ -93,14 +93,12 @@ namespace ix
|
||||
}
|
||||
|
||||
std::tuple<bool, std::string, HttpRequestPtr> Http::parseRequest(
|
||||
std::unique_ptr<Socket>& socket)
|
||||
std::unique_ptr<Socket>& socket, int timeoutSecs)
|
||||
{
|
||||
HttpRequestPtr httpRequest;
|
||||
|
||||
std::atomic<bool> requestInitCancellation(false);
|
||||
|
||||
int timeoutSecs = 5; // FIXME
|
||||
|
||||
auto isCancellationRequested =
|
||||
makeCancellationRequestWithTimeout(timeoutSecs, requestInitCancellation);
|
||||
|
||||
@ -130,7 +128,36 @@ namespace ix
|
||||
return std::make_tuple(false, "Error parsing HTTP headers", httpRequest);
|
||||
}
|
||||
|
||||
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, headers);
|
||||
std::string body;
|
||||
if (headers.find("Content-Length") != headers.end())
|
||||
{
|
||||
int contentLength = 0;
|
||||
try
|
||||
{
|
||||
contentLength = std::stoi(headers["Content-Length"]);
|
||||
}
|
||||
catch (std::exception)
|
||||
{
|
||||
return std::make_tuple(
|
||||
false, "Error parsing HTTP Header 'Content-Length'", httpRequest);
|
||||
}
|
||||
|
||||
if (contentLength < 0)
|
||||
{
|
||||
return std::make_tuple(
|
||||
false, "Error: 'Content-Length' should be a positive integer", httpRequest);
|
||||
}
|
||||
|
||||
auto res = socket->readBytes(contentLength, nullptr, isCancellationRequested);
|
||||
if (!res.first)
|
||||
{
|
||||
return std::make_tuple(
|
||||
false, std::string("Error reading request: ") + res.second, httpRequest);
|
||||
}
|
||||
body = res.second;
|
||||
}
|
||||
|
||||
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, body, headers);
|
||||
return std::make_tuple(true, "", httpRequest);
|
||||
}
|
||||
|
||||
|
@ -95,15 +95,18 @@ namespace ix
|
||||
std::string uri;
|
||||
std::string method;
|
||||
std::string version;
|
||||
std::string body;
|
||||
WebSocketHttpHeaders headers;
|
||||
|
||||
HttpRequest(const std::string& u,
|
||||
const std::string& m,
|
||||
const std::string& v,
|
||||
const std::string& b,
|
||||
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
|
||||
: uri(u)
|
||||
, method(m)
|
||||
, version(v)
|
||||
, body(b)
|
||||
, headers(h)
|
||||
{
|
||||
}
|
||||
@ -115,7 +118,7 @@ namespace ix
|
||||
{
|
||||
public:
|
||||
static std::tuple<bool, std::string, HttpRequestPtr> parseRequest(
|
||||
std::unique_ptr<Socket>& socket);
|
||||
std::unique_ptr<Socket>& socket, int timeoutSecs);
|
||||
static bool sendResponse(HttpResponsePtr response, std::unique_ptr<Socket>& socket);
|
||||
|
||||
static std::pair<std::string, int> parseStatusLine(const std::string& line);
|
||||
|
@ -10,13 +10,17 @@
|
||||
#include "IXUrlParser.h"
|
||||
#include "IXUserAgent.h"
|
||||
#include "IXWebSocketHttpHeaders.h"
|
||||
#include <array>
|
||||
#include <assert.h>
|
||||
#include <cstring>
|
||||
#include <iomanip>
|
||||
#include <random>
|
||||
#include <sstream>
|
||||
#include <vector>
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
namespace ix
|
||||
{
|
||||
@ -174,11 +178,13 @@ namespace ix
|
||||
ss << verb << " " << path << " HTTP/1.1\r\n";
|
||||
ss << "Host: " << host << "\r\n";
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
if (args->compress)
|
||||
{
|
||||
ss << "Accept-Encoding: gzip"
|
||||
<< "\r\n";
|
||||
}
|
||||
#endif
|
||||
|
||||
// Append extra headers
|
||||
for (auto&& it : args->extraHeaders)
|
||||
@ -495,6 +501,7 @@ namespace ix
|
||||
|
||||
downloadSize = payload.size();
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
// If the content was compressed with gzip, decode it
|
||||
if (headers["Content-Encoding"] == "gzip")
|
||||
{
|
||||
@ -513,6 +520,7 @@ namespace ix
|
||||
}
|
||||
payload = decompressedPayload;
|
||||
}
|
||||
#endif
|
||||
|
||||
return std::make_shared<HttpResponse>(code,
|
||||
description,
|
||||
@ -672,6 +680,7 @@ namespace ix
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
bool HttpClient::gzipInflate(const std::string& in, std::string& out)
|
||||
{
|
||||
z_stream inflateState;
|
||||
@ -692,14 +701,12 @@ namespace ix
|
||||
inflateState.next_in = (unsigned char*) (const_cast<char*>(in.data()));
|
||||
|
||||
const int kBufferSize = 1 << 14;
|
||||
|
||||
std::unique_ptr<unsigned char[]> compressBuffer =
|
||||
std::make_unique<unsigned char[]>(kBufferSize);
|
||||
std::array<unsigned char, kBufferSize> compressBuffer;
|
||||
|
||||
do
|
||||
{
|
||||
inflateState.avail_out = (uInt) kBufferSize;
|
||||
inflateState.next_out = compressBuffer.get();
|
||||
inflateState.next_out = &compressBuffer.front();
|
||||
|
||||
int ret = inflate(&inflateState, Z_SYNC_FLUSH);
|
||||
|
||||
@ -709,13 +716,14 @@ namespace ix
|
||||
return false;
|
||||
}
|
||||
|
||||
out.append(reinterpret_cast<char*>(compressBuffer.get()),
|
||||
out.append(reinterpret_cast<char*>(&compressBuffer.front()),
|
||||
kBufferSize - inflateState.avail_out);
|
||||
} while (inflateState.avail_out == 0);
|
||||
|
||||
inflateEnd(&inflateState);
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
|
||||
void HttpClient::log(const std::string& msg, HttpRequestArgsPtr args)
|
||||
{
|
||||
|
@ -90,7 +90,9 @@ namespace ix
|
||||
private:
|
||||
void log(const std::string& msg, HttpRequestArgsPtr args);
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
bool gzipInflate(const std::string& in, std::string& out);
|
||||
#endif
|
||||
|
||||
// Async API background thread runner
|
||||
void run();
|
||||
|
@ -13,7 +13,10 @@
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
#include <vector>
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -41,6 +44,7 @@ namespace
|
||||
return std::make_pair(res.first, std::string(vec.begin(), vec.end()));
|
||||
}
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
std::string gzipCompress(const std::string& str)
|
||||
{
|
||||
z_stream zs; // z_stream is zlib's control structure
|
||||
@ -83,14 +87,22 @@ namespace
|
||||
|
||||
return outstring;
|
||||
}
|
||||
#endif
|
||||
} // namespace
|
||||
|
||||
namespace ix
|
||||
{
|
||||
HttpServer::HttpServer(
|
||||
int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily)
|
||||
const int HttpServer::kDefaultTimeoutSecs(30);
|
||||
|
||||
HttpServer::HttpServer(int port,
|
||||
const std::string& host,
|
||||
int backlog,
|
||||
size_t maxConnections,
|
||||
int addressFamily,
|
||||
int timeoutSecs)
|
||||
: SocketServer(port, host, backlog, maxConnections, addressFamily)
|
||||
, _connectedClientsCount(0)
|
||||
, _timeoutSecs(timeoutSecs)
|
||||
{
|
||||
setDefaultConnectionCallback();
|
||||
}
|
||||
@ -115,18 +127,16 @@ namespace ix
|
||||
}
|
||||
|
||||
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
_connectedClientsCount++;
|
||||
|
||||
auto ret = Http::parseRequest(socket);
|
||||
auto ret = Http::parseRequest(socket, _timeoutSecs);
|
||||
// FIXME: handle errors in parseRequest
|
||||
|
||||
if (std::get<0>(ret))
|
||||
{
|
||||
auto response =
|
||||
_onConnectionCallback(std::get<2>(ret), connectionState, std::move(connectionInfo));
|
||||
auto response = _onConnectionCallback(std::get<2>(ret), connectionState);
|
||||
if (!Http::sendResponse(response, socket))
|
||||
{
|
||||
logError("Cannot send response");
|
||||
@ -146,8 +156,7 @@ namespace ix
|
||||
{
|
||||
setOnConnectionCallback(
|
||||
[this](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
|
||||
std::string uri(request->uri);
|
||||
if (uri.empty() || uri == "/")
|
||||
{
|
||||
@ -168,17 +177,19 @@ namespace ix
|
||||
|
||||
std::string content = res.second;
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
std::string acceptEncoding = request->headers["Accept-encoding"];
|
||||
if (acceptEncoding == "*" || acceptEncoding.find("gzip") != std::string::npos)
|
||||
{
|
||||
content = gzipCompress(content);
|
||||
headers["Content-Encoding"] = "gzip";
|
||||
}
|
||||
#endif
|
||||
|
||||
// Log request
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
|
||||
<< " " << request->method << " " << request->headers["User-Agent"] << " "
|
||||
<< request->uri << " " << content.size();
|
||||
logInfo(ss.str());
|
||||
|
||||
@ -202,16 +213,16 @@ namespace ix
|
||||
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections
|
||||
//
|
||||
setOnConnectionCallback(
|
||||
[this, redirectUrl](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||
[this,
|
||||
redirectUrl](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Server"] = userAgent();
|
||||
|
||||
// Log request
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
|
||||
<< " " << request->method << " " << request->headers["User-Agent"] << " "
|
||||
<< request->uri;
|
||||
logInfo(ss.str());
|
||||
|
||||
|
@ -23,15 +23,14 @@ namespace ix
|
||||
{
|
||||
public:
|
||||
using OnConnectionCallback =
|
||||
std::function<HttpResponsePtr(HttpRequestPtr,
|
||||
std::shared_ptr<ConnectionState>,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)>;
|
||||
std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>;
|
||||
|
||||
HttpServer(int port = SocketServer::kDefaultPort,
|
||||
const std::string& host = SocketServer::kDefaultHost,
|
||||
int backlog = SocketServer::kDefaultTcpBacklog,
|
||||
size_t maxConnections = SocketServer::kDefaultMaxConnections,
|
||||
int addressFamily = SocketServer::kDefaultAddressFamily);
|
||||
int addressFamily = SocketServer::kDefaultAddressFamily,
|
||||
int timeoutSecs = HttpServer::kDefaultTimeoutSecs);
|
||||
virtual ~HttpServer();
|
||||
virtual void stop() final;
|
||||
|
||||
@ -44,10 +43,12 @@ namespace ix
|
||||
OnConnectionCallback _onConnectionCallback;
|
||||
std::atomic<int> _connectedClientsCount;
|
||||
|
||||
const static int kDefaultTimeoutSecs;
|
||||
int _timeoutSecs;
|
||||
|
||||
// Methods
|
||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) final;
|
||||
std::shared_ptr<ConnectionState> connectionState) final;
|
||||
virtual size_t getConnectedClientsCount() final;
|
||||
|
||||
void setDefaultConnectionCallback();
|
||||
|
@ -8,6 +8,9 @@
|
||||
|
||||
namespace ix
|
||||
{
|
||||
const uint64_t SelectInterrupt::kSendRequest = 1;
|
||||
const uint64_t SelectInterrupt::kCloseRequest = 2;
|
||||
|
||||
SelectInterrupt::SelectInterrupt()
|
||||
{
|
||||
;
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <stdint.h>
|
||||
#include <string>
|
||||
|
||||
@ -23,5 +24,11 @@ namespace ix
|
||||
virtual bool clear();
|
||||
virtual uint64_t read();
|
||||
virtual int getFd() const;
|
||||
|
||||
// Used as special codes for pipe communication
|
||||
static const uint64_t kSendRequest;
|
||||
static const uint64_t kCloseRequest;
|
||||
};
|
||||
|
||||
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
|
||||
} // namespace ix
|
||||
|
@ -27,8 +27,6 @@ namespace ix
|
||||
{
|
||||
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
|
||||
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
|
||||
const uint64_t Socket::kSendRequest = 1;
|
||||
const uint64_t Socket::kCloseRequest = 2;
|
||||
constexpr size_t Socket::kChunkSize;
|
||||
|
||||
Socket::Socket(int fd)
|
||||
@ -96,11 +94,11 @@ namespace ix
|
||||
{
|
||||
uint64_t value = selectInterrupt->read();
|
||||
|
||||
if (value == kSendRequest)
|
||||
if (value == SelectInterrupt::kSendRequest)
|
||||
{
|
||||
pollResult = PollResultType::SendRequest;
|
||||
}
|
||||
else if (value == kCloseRequest)
|
||||
else if (value == SelectInterrupt::kCloseRequest)
|
||||
{
|
||||
pollResult = PollResultType::CloseRequest;
|
||||
}
|
||||
|
@ -34,12 +34,10 @@ typedef SSIZE_T ssize_t;
|
||||
|
||||
#include "IXCancellationRequest.h"
|
||||
#include "IXProgressCallback.h"
|
||||
#include "IXSelectInterrupt.h"
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class SelectInterrupt;
|
||||
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
|
||||
|
||||
enum class PollResultType
|
||||
{
|
||||
ReadyForRead = 0,
|
||||
@ -96,11 +94,6 @@ namespace ix
|
||||
int sockfd,
|
||||
const SelectInterruptPtr& selectInterrupt);
|
||||
|
||||
|
||||
// Used as special codes for pipe communication
|
||||
static const uint64_t kSendRequest;
|
||||
static const uint64_t kCloseRequest;
|
||||
|
||||
protected:
|
||||
std::atomic<int> _sockfd;
|
||||
std::mutex _socketMutex;
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
#include "IXNetSystem.h"
|
||||
#include "IXSelectInterrupt.h"
|
||||
#include "IXSelectInterruptFactory.h"
|
||||
#include "IXSetThreadName.h"
|
||||
#include "IXSocket.h"
|
||||
#include "IXSocketConnect.h"
|
||||
@ -36,6 +37,7 @@ namespace ix
|
||||
, _stop(false)
|
||||
, _stopGc(false)
|
||||
, _connectionStateFactory(&ConnectionState::createConnectionState)
|
||||
, _acceptSelectInterrupt(createSelectInterrupt())
|
||||
{
|
||||
}
|
||||
|
||||
@ -58,6 +60,16 @@ namespace ix
|
||||
|
||||
std::pair<bool, std::string> SocketServer::listen()
|
||||
{
|
||||
std::string acceptSelectInterruptInitErrorMsg;
|
||||
if (!_acceptSelectInterrupt->init(acceptSelectInterruptInitErrorMsg))
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "SocketServer::listen() error in SelectInterrupt::init: "
|
||||
<< acceptSelectInterruptInitErrorMsg;
|
||||
|
||||
return std::make_pair(false, ss.str());
|
||||
}
|
||||
|
||||
if (_addressFamily != AF_INET && _addressFamily != AF_INET6)
|
||||
{
|
||||
std::string errMsg("SocketServer::listen() AF_INET and AF_INET6 are currently "
|
||||
@ -193,6 +205,12 @@ namespace ix
|
||||
if (_thread.joinable())
|
||||
{
|
||||
_stop = true;
|
||||
// Wake up select
|
||||
if (!_acceptSelectInterrupt->notify(SelectInterrupt::kCloseRequest))
|
||||
{
|
||||
logError("SocketServer::stop: Cannot wake up from select");
|
||||
}
|
||||
|
||||
_thread.join();
|
||||
_stop = false;
|
||||
}
|
||||
@ -201,6 +219,7 @@ namespace ix
|
||||
if (_gcThread.joinable())
|
||||
{
|
||||
_stopGc = true;
|
||||
_conditionVariableGC.notify_one();
|
||||
_gcThread.join();
|
||||
_stopGc = false;
|
||||
}
|
||||
@ -249,18 +268,22 @@ namespace ix
|
||||
// Set the socket to non blocking mode, so that accept calls are not blocking
|
||||
SocketConnect::configure(_serverFd);
|
||||
|
||||
setThreadName("SocketServer::listen");
|
||||
setThreadName("SocketServer::accept");
|
||||
|
||||
for (;;)
|
||||
{
|
||||
if (_stop) return;
|
||||
|
||||
// Use poll to check whether a new connection is in progress
|
||||
int timeoutMs = 10;
|
||||
int timeoutMs = -1;
|
||||
#ifdef _WIN32
|
||||
// select cannot be interrupted on Windows so we need to pass a small timeout
|
||||
timeoutMs = 10;
|
||||
#endif
|
||||
|
||||
bool readyToRead = true;
|
||||
auto selectInterrupt = std::make_unique<SelectInterrupt>();
|
||||
PollResultType pollResult =
|
||||
Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt);
|
||||
Socket::poll(readyToRead, timeoutMs, _serverFd, _acceptSelectInterrupt);
|
||||
|
||||
if (pollResult == PollResultType::Error)
|
||||
{
|
||||
@ -308,12 +331,14 @@ namespace ix
|
||||
continue;
|
||||
}
|
||||
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo;
|
||||
// Retrieve connection info, the ip address of the remote peer/client)
|
||||
std::string remoteIp;
|
||||
int remotePort;
|
||||
|
||||
if (_addressFamily == AF_INET)
|
||||
{
|
||||
char remoteIp[INET_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
|
||||
char remoteIp4[INET_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp4, INET_ADDRSTRLEN) == nullptr)
|
||||
{
|
||||
int err = Socket::getErrno();
|
||||
std::stringstream ss;
|
||||
@ -326,12 +351,13 @@ namespace ix
|
||||
continue;
|
||||
}
|
||||
|
||||
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
|
||||
remotePort = client.sin_port;
|
||||
remoteIp = remoteIp4;
|
||||
}
|
||||
else // AF_INET6
|
||||
{
|
||||
char remoteIp[INET6_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
|
||||
char remoteIp6[INET6_ADDRSTRLEN];
|
||||
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp6, INET6_ADDRSTRLEN) == nullptr)
|
||||
{
|
||||
int err = Socket::getErrno();
|
||||
std::stringstream ss;
|
||||
@ -344,7 +370,8 @@ namespace ix
|
||||
continue;
|
||||
}
|
||||
|
||||
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
|
||||
remotePort = client.sin_port;
|
||||
remoteIp = remoteIp6;
|
||||
}
|
||||
|
||||
std::shared_ptr<ConnectionState> connectionState;
|
||||
@ -352,6 +379,9 @@ namespace ix
|
||||
{
|
||||
connectionState = _connectionStateFactory();
|
||||
}
|
||||
connectionState->setOnSetTerminatedCallback([this] { onSetTerminatedCallback(); });
|
||||
connectionState->setRemoteIp(remoteIp);
|
||||
connectionState->setRemotePort(remotePort);
|
||||
|
||||
if (_stop) return;
|
||||
|
||||
@ -379,13 +409,10 @@ namespace ix
|
||||
|
||||
// Launch the handleConnection work asynchronously in its own thread.
|
||||
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
|
||||
_connectionsThreads.push_back(
|
||||
std::make_pair(connectionState,
|
||||
std::thread(&SocketServer::handleConnection,
|
||||
this,
|
||||
std::move(socket),
|
||||
connectionState,
|
||||
std::move(connectionInfo))));
|
||||
_connectionsThreads.push_back(std::make_pair(
|
||||
connectionState,
|
||||
std::thread(
|
||||
&SocketServer::handleConnection, this, std::move(socket), connectionState)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -411,8 +438,14 @@ namespace ix
|
||||
break;
|
||||
}
|
||||
|
||||
// Sleep a little bit then keep cleaning up
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
// Unless we are stopping the server, wait for a connection
|
||||
// to be terminated to run the threads GC, instead of busy waiting
|
||||
// with a sleep
|
||||
if (!_stopGc)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(_conditionVariableMutexGC);
|
||||
_conditionVariableGC.wait(lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -420,4 +453,11 @@ namespace ix
|
||||
{
|
||||
_socketTLSOptions = socketTLSOptions;
|
||||
}
|
||||
|
||||
void SocketServer::onSetTerminatedCallback()
|
||||
{
|
||||
// a connection got terminated, we can run the connection thread GC,
|
||||
// so wake up the thread responsible for that
|
||||
_conditionVariableGC.notify_one();
|
||||
}
|
||||
} // namespace ix
|
||||
|
@ -6,8 +6,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "IXConnectionInfo.h"
|
||||
#include "IXConnectionState.h"
|
||||
#include "IXSelectInterrupt.h"
|
||||
#include "IXSocketTLSOptions.h"
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
@ -84,6 +84,7 @@ namespace ix
|
||||
// background thread to wait for incoming connections
|
||||
std::thread _thread;
|
||||
void run();
|
||||
void onSetTerminatedCallback();
|
||||
|
||||
// background thread to cleanup (join) terminated threads
|
||||
std::atomic<bool> _stopGc;
|
||||
@ -103,8 +104,7 @@ namespace ix
|
||||
ConnectionStateFactory _connectionStateFactory;
|
||||
|
||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
|
||||
std::shared_ptr<ConnectionState> connectionState) = 0;
|
||||
virtual size_t getConnectedClientsCount() = 0;
|
||||
|
||||
// Returns true if all connection threads are joined
|
||||
@ -112,5 +112,13 @@ namespace ix
|
||||
size_t getConnectionsThreadsCount();
|
||||
|
||||
SocketTLSOptions _socketTLSOptions;
|
||||
|
||||
// to wake up from select
|
||||
SelectInterruptPtr _acceptSelectInterrupt;
|
||||
|
||||
// used by the gc thread, to know that a thread needs to be garbage collected
|
||||
// as a connection
|
||||
std::condition_variable _conditionVariableGC;
|
||||
std::mutex _conditionVariableMutexGC;
|
||||
};
|
||||
} // namespace ix
|
||||
|
@ -8,7 +8,9 @@
|
||||
|
||||
#include "IXWebSocketVersion.h"
|
||||
#include <sstream>
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
// Platform name
|
||||
#if defined(_WIN32)
|
||||
@ -77,8 +79,10 @@ namespace ix
|
||||
ss << " nossl";
|
||||
#endif
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
// Zlib version
|
||||
ss << " zlib " << ZLIB_VERSION;
|
||||
#endif
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
@ -405,6 +405,11 @@ namespace ix
|
||||
_onMessageCallback = callback;
|
||||
}
|
||||
|
||||
bool WebSocket::isOnMessageCallbackRegistered() const
|
||||
{
|
||||
return _onMessageCallback != nullptr;
|
||||
}
|
||||
|
||||
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
|
||||
{
|
||||
_onTrafficTrackerCallback = callback;
|
||||
|
@ -84,6 +84,7 @@ namespace ix
|
||||
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
|
||||
|
||||
void setOnMessageCallback(const OnMessageCallback& callback);
|
||||
bool isOnMessageCallbackRegistered() const;
|
||||
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
|
||||
static void resetTrafficTrackerCallback();
|
||||
|
||||
|
@ -170,20 +170,11 @@ namespace ix
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Expecting HTTP/1.1, got " << httpVersion << ". "
|
||||
<< "Rejecting connection to " << host << ":" << port << ", status: " << status
|
||||
<< "Rejecting connection to " << url << ", status: " << status
|
||||
<< ", HTTP Status line: " << line;
|
||||
return WebSocketInitResult(false, status, ss.str());
|
||||
}
|
||||
|
||||
// We want an 101 HTTP status
|
||||
if (status != 101)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Expecting status 101 (Switching Protocol), got " << status
|
||||
<< " status connecting to " << host << ":" << port << ", HTTP Status line: " << line;
|
||||
return WebSocketInitResult(false, status, ss.str());
|
||||
}
|
||||
|
||||
auto result = parseHttpHeaders(_socket, isCancellationRequested);
|
||||
auto headersValid = result.first;
|
||||
auto headers = result.second;
|
||||
@ -193,6 +184,17 @@ namespace ix
|
||||
return WebSocketInitResult(false, status, "Error parsing HTTP headers");
|
||||
}
|
||||
|
||||
// We want an 101 HTTP status for websocket, otherwise it could be
|
||||
// a redirection (like 301)
|
||||
if (status != 101)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Expecting status 101 (Switching Protocol), got " << status
|
||||
<< " status connecting to " << url << ", HTTP Status line: " << line;
|
||||
|
||||
return WebSocketInitResult(false, status, ss.str(), headers, path);
|
||||
}
|
||||
|
||||
// Check the presence of the connection field
|
||||
if (headers.find("connection") == headers.end())
|
||||
{
|
||||
|
@ -16,8 +16,6 @@ namespace
|
||||
// is treated as a char* and the null termination (\x00) makes it
|
||||
// look like an empty string.
|
||||
const std::string kEmptyUncompressedBlock = std::string("\x00\x00\xff\xff", 4);
|
||||
|
||||
const int kBufferSize = 1 << 14;
|
||||
} // namespace
|
||||
|
||||
namespace ix
|
||||
@ -26,23 +24,27 @@ namespace ix
|
||||
// Compressor
|
||||
//
|
||||
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
|
||||
: _compressBufferSize(kBufferSize)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
memset(&_deflateState, 0, sizeof(_deflateState));
|
||||
|
||||
_deflateState.zalloc = Z_NULL;
|
||||
_deflateState.zfree = Z_NULL;
|
||||
_deflateState.opaque = Z_NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor()
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
deflateEnd(&_deflateState);
|
||||
#endif
|
||||
}
|
||||
|
||||
bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits,
|
||||
bool clientNoContextTakeOver)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
int ret = deflateInit2(&_deflateState,
|
||||
Z_DEFAULT_COMPRESSION,
|
||||
Z_DEFLATED,
|
||||
@ -52,11 +54,12 @@ namespace ix
|
||||
|
||||
if (ret != Z_OK) return false;
|
||||
|
||||
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
|
||||
|
||||
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
|
||||
|
||||
return true;
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
@ -96,6 +99,7 @@ namespace ix
|
||||
template<typename T, typename S>
|
||||
bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
//
|
||||
// 7.2.1. Compression
|
||||
//
|
||||
@ -136,14 +140,14 @@ namespace ix
|
||||
do
|
||||
{
|
||||
// Output to local buffer
|
||||
_deflateState.avail_out = (uInt) _compressBufferSize;
|
||||
_deflateState.next_out = _compressBuffer.get();
|
||||
_deflateState.avail_out = (uInt) _compressBuffer.size();
|
||||
_deflateState.next_out = &_compressBuffer.front();
|
||||
|
||||
deflate(&_deflateState, _flush);
|
||||
|
||||
output = _compressBufferSize - _deflateState.avail_out;
|
||||
output = _compressBuffer.size() - _deflateState.avail_out;
|
||||
|
||||
out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
|
||||
out.insert(out.end(), _compressBuffer.begin(), _compressBuffer.begin() + output);
|
||||
} while (_deflateState.avail_out == 0);
|
||||
|
||||
if (endsWithEmptyUnCompressedBlock(out))
|
||||
@ -152,14 +156,17 @@ namespace ix
|
||||
}
|
||||
|
||||
return true;
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
//
|
||||
// Decompressor
|
||||
//
|
||||
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
|
||||
: _compressBufferSize(kBufferSize)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
memset(&_inflateState, 0, sizeof(_inflateState));
|
||||
|
||||
_inflateState.zalloc = Z_NULL;
|
||||
@ -167,29 +174,35 @@ namespace ix
|
||||
_inflateState.opaque = Z_NULL;
|
||||
_inflateState.avail_in = 0;
|
||||
_inflateState.next_in = Z_NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor()
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
inflateEnd(&_inflateState);
|
||||
#endif
|
||||
}
|
||||
|
||||
bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits,
|
||||
bool clientNoContextTakeOver)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
int ret = inflateInit2(&_inflateState, -1 * inflateBits);
|
||||
|
||||
if (ret != Z_OK) return false;
|
||||
|
||||
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
|
||||
|
||||
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
|
||||
|
||||
return true;
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in, std::string& out)
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
//
|
||||
// 7.2.2. Decompression
|
||||
//
|
||||
@ -211,8 +224,8 @@ namespace ix
|
||||
|
||||
do
|
||||
{
|
||||
_inflateState.avail_out = (uInt) _compressBufferSize;
|
||||
_inflateState.next_out = _compressBuffer.get();
|
||||
_inflateState.avail_out = (uInt) _compressBuffer.size();
|
||||
_inflateState.next_out = &_compressBuffer.front();
|
||||
|
||||
int ret = inflate(&_inflateState, Z_SYNC_FLUSH);
|
||||
|
||||
@ -221,10 +234,13 @@ namespace ix
|
||||
return false; // zlib error
|
||||
}
|
||||
|
||||
out.append(reinterpret_cast<char*>(_compressBuffer.get()),
|
||||
_compressBufferSize - _inflateState.avail_out);
|
||||
out.append(reinterpret_cast<char*>(&_compressBuffer.front()),
|
||||
_compressBuffer.size() - _inflateState.avail_out);
|
||||
} while (_inflateState.avail_out == 0);
|
||||
|
||||
return true;
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
} // namespace ix
|
||||
|
@ -6,8 +6,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
#include "zlib.h"
|
||||
#include <memory>
|
||||
#endif
|
||||
#include <array>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
@ -32,9 +34,11 @@ namespace ix
|
||||
bool endsWithEmptyUnCompressedBlock(const T& value);
|
||||
|
||||
int _flush;
|
||||
size_t _compressBufferSize;
|
||||
std::unique_ptr<unsigned char[]> _compressBuffer;
|
||||
std::array<unsigned char, 1 << 14> _compressBuffer;
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
z_stream _deflateState;
|
||||
#endif
|
||||
};
|
||||
|
||||
class WebSocketPerMessageDeflateDecompressor
|
||||
@ -48,9 +52,11 @@ namespace ix
|
||||
|
||||
private:
|
||||
int _flush;
|
||||
size_t _compressBufferSize;
|
||||
std::unique_ptr<unsigned char[]> _compressBuffer;
|
||||
std::array<unsigned char, 1 << 14> _compressBuffer;
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
z_stream _inflateState;
|
||||
#endif
|
||||
};
|
||||
|
||||
} // namespace ix
|
||||
|
@ -61,6 +61,7 @@ namespace ix
|
||||
_clientMaxWindowBits = kDefaultClientMaxWindowBits;
|
||||
_serverMaxWindowBits = kDefaultServerMaxWindowBits;
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
// Split by ;
|
||||
std::string token;
|
||||
std::stringstream tokenStream(extension);
|
||||
@ -112,6 +113,7 @@ namespace ix
|
||||
sanitizeClientMaxWindowBits();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void WebSocketPerMessageDeflateOptions::sanitizeClientMaxWindowBits()
|
||||
@ -126,6 +128,7 @@ namespace ix
|
||||
|
||||
std::string WebSocketPerMessageDeflateOptions::generateHeader()
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
std::stringstream ss;
|
||||
ss << "Sec-WebSocket-Extensions: permessage-deflate";
|
||||
|
||||
@ -138,11 +141,18 @@ namespace ix
|
||||
ss << "\r\n";
|
||||
|
||||
return ss.str();
|
||||
#else
|
||||
return std::string();
|
||||
#endif
|
||||
}
|
||||
|
||||
bool WebSocketPerMessageDeflateOptions::enabled() const
|
||||
{
|
||||
#ifdef IXWEBSOCKET_USE_ZLIB
|
||||
return _enabled;
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
bool WebSocketPerMessageDeflateOptions::getClientNoContextTakeover() const
|
||||
|
@ -43,6 +43,7 @@ namespace ix
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& remoteUrl,
|
||||
const RemoteUrlsMapping& remoteUrlsMapping,
|
||||
bool /*verbose*/)
|
||||
{
|
||||
ix::WebSocketServer server(port, hostname);
|
||||
@ -53,61 +54,74 @@ namespace ix
|
||||
};
|
||||
server.setConnectionStateFactory(factory);
|
||||
|
||||
server.setOnConnectionCallback([remoteUrl](std::weak_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
|
||||
auto remoteIp = connectionInfo->remoteIp;
|
||||
server.setOnConnectionCallback(
|
||||
[remoteUrl, remoteUrlsMapping](std::weak_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
// Server connection
|
||||
state->webSocket().setOnMessageCallback(
|
||||
[webSocket, state, remoteIp](const WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
state->setTerminated();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
auto ws = webSocket.lock();
|
||||
if (ws)
|
||||
// Server connection
|
||||
state->webSocket().setOnMessageCallback(
|
||||
[webSocket, state, remoteIp](const WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
ws->send(msg->str, msg->binary);
|
||||
state->setTerminated();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Client connection
|
||||
auto ws = webSocket.lock();
|
||||
if (ws)
|
||||
{
|
||||
ws->setOnMessageCallback([state, remoteUrl](const WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
// Connect to the 'real' server
|
||||
std::string url(remoteUrl);
|
||||
url += msg->openInfo.uri;
|
||||
state->webSocket().setUrl(url);
|
||||
state->webSocket().disableAutomaticReconnection();
|
||||
state->webSocket().start();
|
||||
|
||||
// we should sleep here for a bit until we've established the
|
||||
// connection with the remote server
|
||||
while (state->webSocket().getReadyState() != ReadyState::Open)
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
auto ws = webSocket.lock();
|
||||
if (ws)
|
||||
{
|
||||
ws->send(msg->str, msg->binary);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
state->webSocket().send(msg->str, msg->binary);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Client connection
|
||||
auto ws = webSocket.lock();
|
||||
if (ws)
|
||||
{
|
||||
ws->setOnMessageCallback([state, remoteUrl, remoteUrlsMapping](
|
||||
const WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
// Connect to the 'real' server
|
||||
std::string url(remoteUrl);
|
||||
|
||||
// maybe we want a different url based on the mapping
|
||||
std::string host = msg->openInfo.headers["Host"];
|
||||
auto it = remoteUrlsMapping.find(host);
|
||||
if (it != remoteUrlsMapping.end())
|
||||
{
|
||||
url = it->second;
|
||||
}
|
||||
|
||||
// append the uri to form the full url
|
||||
// (say ws://localhost:1234/foo/?bar=baz)
|
||||
url += msg->openInfo.uri;
|
||||
|
||||
state->webSocket().setUrl(url);
|
||||
state->webSocket().disableAutomaticReconnection();
|
||||
state->webSocket().start();
|
||||
|
||||
// we should sleep here for a bit until we've established the
|
||||
// connection with the remote server
|
||||
while (state->webSocket().getReadyState() != ReadyState::Open)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
state->webSocket().send(msg->str, msg->binary);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
|
@ -7,14 +7,18 @@
|
||||
|
||||
#include "IXSocketTLSOptions.h"
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <stddef.h>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
using RemoteUrlsMapping = std::map<std::string, std::string>;
|
||||
|
||||
int websocket_proxy_server_main(int port,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& remoteUrl,
|
||||
const RemoteUrlsMapping& remoteUrlsMapping,
|
||||
bool verbose);
|
||||
} // namespace ix
|
||||
|
@ -77,22 +77,29 @@ namespace ix
|
||||
}
|
||||
|
||||
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
setThreadName("WebSocketServer::" + connectionState->getId());
|
||||
|
||||
auto webSocket = std::make_shared<WebSocket>();
|
||||
if (_onConnectionCallback)
|
||||
{
|
||||
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
|
||||
_onConnectionCallback(webSocket, connectionState);
|
||||
|
||||
if (!webSocket->isOnMessageCallbackRegistered())
|
||||
{
|
||||
logError("WebSocketServer Application developer error: Server callback improperly "
|
||||
"registerered.");
|
||||
logError("Missing call to setOnMessageCallback inside setOnConnectionCallback.");
|
||||
connectionState->setTerminated();
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (_onClientMessageCallback)
|
||||
{
|
||||
webSocket->setOnMessageCallback(
|
||||
[this, &ws = *webSocket.get(), connectionState, &ci = *connectionInfo.get()](
|
||||
const WebSocketMessagePtr& msg) {
|
||||
_onClientMessageCallback(connectionState, ci, ws, msg);
|
||||
[this, &ws = *webSocket.get(), connectionState](const WebSocketMessagePtr& msg) {
|
||||
_onClientMessageCallback(connectionState, ws, msg);
|
||||
});
|
||||
}
|
||||
else
|
||||
|
@ -23,14 +23,10 @@ namespace ix
|
||||
{
|
||||
public:
|
||||
using OnConnectionCallback =
|
||||
std::function<void(std::weak_ptr<WebSocket>,
|
||||
std::shared_ptr<ConnectionState>,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo)>;
|
||||
std::function<void(std::weak_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>;
|
||||
|
||||
using OnClientMessageCallback = std::function<void(std::shared_ptr<ConnectionState>,
|
||||
ConnectionInfo&,
|
||||
WebSocket&,
|
||||
const WebSocketMessagePtr&)>;
|
||||
using OnClientMessageCallback = std::function<void(
|
||||
std::shared_ptr<ConnectionState>, WebSocket&, const WebSocketMessagePtr&)>;
|
||||
|
||||
WebSocketServer(int port = SocketServer::kDefaultPort,
|
||||
const std::string& host = SocketServer::kDefaultHost,
|
||||
@ -69,8 +65,7 @@ namespace ix
|
||||
|
||||
// Methods
|
||||
virtual void handleConnection(std::unique_ptr<Socket> socket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo);
|
||||
std::shared_ptr<ConnectionState> connectionState);
|
||||
virtual size_t getConnectedClientsCount() final;
|
||||
};
|
||||
} // namespace ix
|
||||
|
@ -107,36 +107,62 @@ namespace ix
|
||||
|
||||
std::string protocol, host, path, query;
|
||||
int port;
|
||||
std::string remoteUrl(url);
|
||||
|
||||
if (!UrlParser::parse(url, protocol, host, path, query, port))
|
||||
WebSocketInitResult result;
|
||||
const int maxRedirections = 10;
|
||||
|
||||
for (int i = 0; i < maxRedirections; ++i)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Could not parse url: '" << url << "'";
|
||||
return WebSocketInitResult(false, 0, ss.str());
|
||||
if (!UrlParser::parse(remoteUrl, protocol, host, path, query, port))
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Could not parse url: '" << url << "'";
|
||||
return WebSocketInitResult(false, 0, ss.str());
|
||||
}
|
||||
|
||||
std::string errorMsg;
|
||||
bool tls = protocol == "wss";
|
||||
_socket = createSocket(tls, -1, errorMsg, _socketTLSOptions);
|
||||
_perMessageDeflate = std::make_unique<WebSocketPerMessageDeflate>();
|
||||
|
||||
if (!_socket)
|
||||
{
|
||||
return WebSocketInitResult(false, 0, errorMsg);
|
||||
}
|
||||
|
||||
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
|
||||
_socket,
|
||||
_perMessageDeflate,
|
||||
_perMessageDeflateOptions,
|
||||
_enablePerMessageDeflate);
|
||||
|
||||
result = webSocketHandshake.clientHandshake(
|
||||
remoteUrl, headers, host, path, port, timeoutSecs);
|
||||
|
||||
if (result.http_status >= 300 && result.http_status < 400)
|
||||
{
|
||||
auto it = result.headers.find("Location");
|
||||
if (it == result.headers.end())
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Missing Location Header for HTTP Redirect response. "
|
||||
<< "Rejecting connection to " << url << ", status: " << result.http_status;
|
||||
result.errorStr = ss.str();
|
||||
break;
|
||||
}
|
||||
|
||||
remoteUrl = it->second;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (result.success)
|
||||
{
|
||||
setReadyState(ReadyState::OPEN);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::string errorMsg;
|
||||
bool tls = protocol == "wss";
|
||||
_socket = createSocket(tls, -1, errorMsg, _socketTLSOptions);
|
||||
_perMessageDeflate = std::make_unique<WebSocketPerMessageDeflate>();
|
||||
|
||||
if (!_socket)
|
||||
{
|
||||
return WebSocketInitResult(false, 0, errorMsg);
|
||||
}
|
||||
|
||||
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
|
||||
_socket,
|
||||
_perMessageDeflate,
|
||||
_perMessageDeflateOptions,
|
||||
_enablePerMessageDeflate);
|
||||
|
||||
auto result =
|
||||
webSocketHandshake.clientHandshake(url, headers, host, path, port, timeoutSecs);
|
||||
if (result.success)
|
||||
{
|
||||
setReadyState(ReadyState::OPEN);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -633,7 +659,7 @@ namespace ix
|
||||
// send back the CLOSE frame
|
||||
sendCloseFrame(code, reason);
|
||||
|
||||
wakeUpFromPoll(Socket::kCloseRequest);
|
||||
wakeUpFromPoll(SelectInterrupt::kCloseRequest);
|
||||
|
||||
bool remote = true;
|
||||
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
|
||||
@ -853,7 +879,7 @@ namespace ix
|
||||
// Request to flush the send buffer on the background thread if it isn't empty
|
||||
if (!isSendBufferEmpty())
|
||||
{
|
||||
wakeUpFromPoll(Socket::kSendRequest);
|
||||
wakeUpFromPoll(SelectInterrupt::kSendRequest);
|
||||
|
||||
// FIXME: we should have a timeout when sending large messages: see #131
|
||||
if (_blockingSend && !flushSendBuffer())
|
||||
@ -1122,7 +1148,7 @@ namespace ix
|
||||
sendCloseFrame(code, reason);
|
||||
|
||||
// wake up the poll, but do not close yet
|
||||
wakeUpFromPoll(Socket::kSendRequest);
|
||||
wakeUpFromPoll(SelectInterrupt::kSendRequest);
|
||||
}
|
||||
|
||||
size_t WebSocketTransport::bufferedAmount() const
|
||||
|
@ -6,4 +6,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define IX_WEBSOCKET_VERSION "10.1.1"
|
||||
#define IX_WEBSOCKET_VERSION "10.4.0"
|
||||
|
12
makefile
12
makefile
@ -34,10 +34,13 @@ ws:
|
||||
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install)
|
||||
|
||||
ws_install:
|
||||
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && make -j 4 install)
|
||||
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
|
||||
|
||||
ws_openssl:
|
||||
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
|
||||
ws_install_release:
|
||||
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
|
||||
|
||||
ws_openssl_install:
|
||||
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; ninja install)
|
||||
|
||||
ws_mbedtls:
|
||||
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4)
|
||||
@ -45,6 +48,9 @@ ws_mbedtls:
|
||||
ws_no_ssl:
|
||||
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_WS=1 .. ; make -j 4)
|
||||
|
||||
ws_no_python:
|
||||
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 .. ; ninja install)
|
||||
|
||||
uninstall:
|
||||
xargs rm -fv < build/install_manifest.txt
|
||||
|
||||
|
@ -95,15 +95,14 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
|
||||
|
||||
sentryServer.setOnConnectionCallback(
|
||||
[](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Server"] = userAgent();
|
||||
|
||||
// Log request
|
||||
std::stringstream ss;
|
||||
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
|
||||
<< " " << request->method << " " << request->headers["User-Agent"] << " "
|
||||
<< request->uri;
|
||||
|
||||
if (request->method == "POST")
|
||||
|
@ -63,6 +63,54 @@ TEST_CASE("http server", "[httpd]")
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
SECTION("Posting plain text data to a local HTTP server")
|
||||
{
|
||||
int port = getFreePort();
|
||||
ix::HttpServer server(port, "127.0.0.1");
|
||||
|
||||
server.setOnConnectionCallback(
|
||||
[](HttpRequestPtr request, std::shared_ptr<ConnectionState>) -> HttpResponsePtr {
|
||||
if (request->method == "POST")
|
||||
{
|
||||
return std::make_shared<HttpResponse>(
|
||||
200, "OK", HttpErrorCode::Ok, WebSocketHttpHeaders(), request->body);
|
||||
}
|
||||
|
||||
return std::make_shared<HttpResponse>(400, "BAD REQUEST");
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
REQUIRE(res.first);
|
||||
server.start();
|
||||
|
||||
HttpClient httpClient;
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Content-Type"] = "text/plain";
|
||||
|
||||
std::string url("http://127.0.0.1:");
|
||||
url += std::to_string(port);
|
||||
auto args = httpClient.createRequest(url);
|
||||
|
||||
args->extraHeaders = headers;
|
||||
args->connectTimeout = 60;
|
||||
args->transferTimeout = 60;
|
||||
args->verbose = true;
|
||||
args->logger = [](const std::string& msg) { std::cout << msg; };
|
||||
args->body = "Hello World!";
|
||||
|
||||
auto response = httpClient.post(url, args->body, args);
|
||||
|
||||
std::cerr << "Status: " << response->statusCode << std::endl;
|
||||
std::cerr << "Error message: " << response->errorMsg << std::endl;
|
||||
std::cerr << "Payload: " << response->payload << std::endl;
|
||||
|
||||
REQUIRE(response->errorCode == HttpErrorCode::Ok);
|
||||
REQUIRE(response->statusCode == 200);
|
||||
REQUIRE(response->payload == args->body);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("http server redirection", "[httpd_redirect]")
|
||||
|
@ -85,11 +85,10 @@ namespace ix
|
||||
bool startWebSocketEchoServer(ix::WebSocketServer& server)
|
||||
{
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||
ConnectionInfo& connectionInfo,
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
|
@ -191,11 +191,9 @@ namespace
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
|
@ -195,10 +195,9 @@ namespace
|
||||
{
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
@ -286,27 +285,27 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
|
||||
int attempts = 0;
|
||||
while (chatA.getReceivedMessagesCount() != 3 || chatB.getReceivedMessagesCount() != 3)
|
||||
{
|
||||
REQUIRE(attempts++ < 10);
|
||||
CHECK(attempts++ < 10);
|
||||
ix::msleep(1000);
|
||||
}
|
||||
|
||||
chatA.stop();
|
||||
chatB.stop();
|
||||
|
||||
REQUIRE(chatA.getReceivedMessagesCount() == 3);
|
||||
REQUIRE(chatB.getReceivedMessagesCount() == 3);
|
||||
CHECK(chatA.getReceivedMessagesCount() == 3);
|
||||
CHECK(chatB.getReceivedMessagesCount() == 3);
|
||||
|
||||
REQUIRE(chatB.getReceivedMessages()[0] == "from A1");
|
||||
REQUIRE(chatB.getReceivedMessages()[1] == "from A2");
|
||||
REQUIRE(chatB.getReceivedMessages()[2] == "from A3");
|
||||
CHECK(chatB.getReceivedMessages()[0] == "from A1");
|
||||
CHECK(chatB.getReceivedMessages()[1] == "from A2");
|
||||
CHECK(chatB.getReceivedMessages()[2] == "from A3");
|
||||
|
||||
REQUIRE(chatA.getReceivedMessages()[0] == "from B1");
|
||||
REQUIRE(chatA.getReceivedMessages()[1] == "from B2");
|
||||
REQUIRE(chatA.getReceivedMessages()[2].size() == bigMessage.size());
|
||||
CHECK(chatA.getReceivedMessages()[0] == "from B1");
|
||||
CHECK(chatA.getReceivedMessages()[1] == "from B2");
|
||||
CHECK(chatA.getReceivedMessages()[2].size() == bigMessage.size());
|
||||
|
||||
// Give us 1000ms for the server to notice that clients went away
|
||||
ix::msleep(1000);
|
||||
REQUIRE(server.getClients().size() == 0);
|
||||
CHECK(server.getClients().size() == 0);
|
||||
|
||||
ix::reportWebSocketTraffic();
|
||||
}
|
||||
|
@ -171,10 +171,9 @@ namespace
|
||||
server.setOnClientMessageCallback(
|
||||
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& /*webSocket*/,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New server connection";
|
||||
|
@ -35,11 +35,9 @@ namespace ix
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
|
@ -18,10 +18,9 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
|
||||
{
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
|
171
test/compatibility/cpp/libwebsockets/devnull_client.cpp
Normal file
171
test/compatibility/cpp/libwebsockets/devnull_client.cpp
Normal file
@ -0,0 +1,171 @@
|
||||
/*
|
||||
* lws-minimal-ws-client
|
||||
*
|
||||
* Written in 2010-2019 by Andy Green <andy@warmcat.com>
|
||||
*
|
||||
* This file is made available under the Creative Commons CC0 1.0
|
||||
* Universal Public Domain Dedication.
|
||||
*
|
||||
* This demonstrates the a minimal ws client using lws.
|
||||
*
|
||||
* Original programs connects to https://libwebsockets.org/ and makes a
|
||||
* wss connection to the dumb-increment protocol there. While
|
||||
* connected, it prints the numbers it is being sent by
|
||||
* dumb-increment protocol.
|
||||
*
|
||||
* This is modified to make a test client which counts how much messages
|
||||
* per second can be received.
|
||||
*
|
||||
* libwebsockets$ make && ./a.out
|
||||
* g++ --std=c++14 -I/usr/local/opt/openssl/include devnull_client.cpp -lwebsockets
|
||||
* messages received: 0 per second 0 total
|
||||
* [2020/08/02 19:22:21:4774] U: LWS minimal ws client rx [-d <logs>] [--h2]
|
||||
* [2020/08/02 19:22:21:4814] U: callback_dumb_increment: established
|
||||
* messages received: 0 per second 0 total
|
||||
* messages received: 180015 per second 180015 total
|
||||
* messages received: 172866 per second 352881 total
|
||||
* messages received: 176177 per second 529058 total
|
||||
* messages received: 174191 per second 703249 total
|
||||
* messages received: 193397 per second 896646 total
|
||||
* messages received: 196385 per second 1093031 total
|
||||
* messages received: 194593 per second 1287624 total
|
||||
* messages received: 189484 per second 1477108 total
|
||||
* messages received: 200825 per second 1677933 total
|
||||
* messages received: 183542 per second 1861475 total
|
||||
* ^C[2020/08/02 19:22:33:4450] U: Completed OK
|
||||
*
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <iostream>
|
||||
#include <libwebsockets.h>
|
||||
#include <signal.h>
|
||||
#include <string.h>
|
||||
#include <thread>
|
||||
|
||||
static int interrupted;
|
||||
static struct lws* client_wsi;
|
||||
|
||||
std::atomic<uint64_t> receivedCount(0);
|
||||
|
||||
static int callback_dumb_increment(
|
||||
struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len)
|
||||
{
|
||||
switch (reason)
|
||||
{
|
||||
/* because we are protocols[0] ... */
|
||||
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
||||
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char*) in : "(null)");
|
||||
client_wsi = NULL;
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: established\n", __func__); break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE: receivedCount++; break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_CLOSED: client_wsi = NULL; break;
|
||||
|
||||
default: break;
|
||||
}
|
||||
|
||||
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
||||
}
|
||||
|
||||
static const struct lws_protocols protocols[] = {{
|
||||
"dumb-increment-protocol",
|
||||
callback_dumb_increment,
|
||||
0,
|
||||
0,
|
||||
},
|
||||
{NULL, NULL, 0, 0}};
|
||||
|
||||
static void sigint_handler(int sig)
|
||||
{
|
||||
interrupted = 1;
|
||||
}
|
||||
|
||||
int main(int argc, const char** argv)
|
||||
{
|
||||
uint64_t receivedCountTotal(0);
|
||||
uint64_t receivedCountPerSecs(0);
|
||||
|
||||
auto timer = [&receivedCountTotal, &receivedCountPerSecs] {
|
||||
while (!interrupted)
|
||||
{
|
||||
std::cerr << "messages received: " << receivedCountPerSecs << " per second "
|
||||
<< receivedCountTotal << " total" << std::endl;
|
||||
|
||||
receivedCountPerSecs = receivedCount - receivedCountTotal;
|
||||
receivedCountTotal += receivedCountPerSecs;
|
||||
|
||||
auto duration = std::chrono::seconds(1);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
};
|
||||
|
||||
std::thread t1(timer);
|
||||
|
||||
struct lws_context_creation_info info;
|
||||
struct lws_client_connect_info i;
|
||||
struct lws_context* context;
|
||||
const char* p;
|
||||
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
|
||||
/* for LLL_ verbosity above NOTICE to be built into lws, lws
|
||||
* must have been configured with -DCMAKE_BUILD_TYPE=DEBUG
|
||||
* instead of =RELEASE */
|
||||
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
|
||||
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
|
||||
/* | LLL_DEBUG */;
|
||||
|
||||
signal(SIGINT, sigint_handler);
|
||||
if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p);
|
||||
|
||||
lws_set_log_level(logs, NULL);
|
||||
lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n");
|
||||
|
||||
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
|
||||
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
|
||||
info.protocols = protocols;
|
||||
info.timeout_secs = 10;
|
||||
|
||||
/*
|
||||
* since we know this lws context is only ever going to be used with
|
||||
* one client wsis / fds / sockets at a time, let lws know it doesn't
|
||||
* have to use the default allocations for fd tables up to ulimit -n.
|
||||
* It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we
|
||||
* will use.
|
||||
*/
|
||||
info.fd_limit_per_thread = 1 + 1 + 1;
|
||||
|
||||
context = lws_create_context(&info);
|
||||
if (!context)
|
||||
{
|
||||
lwsl_err("lws init failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
|
||||
i.context = context;
|
||||
i.port = 8008;
|
||||
i.address = "127.0.0.1";
|
||||
i.path = "/";
|
||||
i.host = i.address;
|
||||
i.origin = i.address;
|
||||
i.protocol = protocols[0].name; /* "dumb-increment-protocol" */
|
||||
i.pwsi = &client_wsi;
|
||||
|
||||
if (lws_cmdline_option(argc, argv, "--h2")) i.alpn = "h2";
|
||||
|
||||
lws_client_connect_via_info(&i);
|
||||
|
||||
while (n >= 0 && client_wsi && !interrupted)
|
||||
n = lws_service(context, 0);
|
||||
|
||||
lws_context_destroy(context);
|
||||
|
||||
lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed");
|
||||
|
||||
t1.join();
|
||||
|
||||
return receivedCount > 10;
|
||||
}
|
2
test/compatibility/csharp/.gitignore
vendored
Normal file
2
test/compatibility/csharp/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
bin
|
||||
obj
|
99
test/compatibility/csharp/Main.cs
Normal file
99
test/compatibility/csharp/Main.cs
Normal file
@ -0,0 +1,99 @@
|
||||
//
|
||||
// Main.cs
|
||||
// Author: Benjamin Sergeant
|
||||
// Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
//
|
||||
// In a different terminal, start a push server:
|
||||
// $ ws push_server -q
|
||||
//
|
||||
// $ dotnet run
|
||||
// messages received per second: 145157
|
||||
// messages received per second: 141405
|
||||
// messages received per second: 152202
|
||||
// messages received per second: 157149
|
||||
// messages received per second: 157673
|
||||
// messages received per second: 153594
|
||||
// messages received per second: 157830
|
||||
// messages received per second: 158422
|
||||
//
|
||||
|
||||
using System;
|
||||
using System.Net.WebSockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
public class DevNullClientCli
|
||||
{
|
||||
private static int receivedMessage = 0;
|
||||
|
||||
public static async Task<byte[]> ReceiveAsync(ClientWebSocket ws, CancellationToken token)
|
||||
{
|
||||
int bufferSize = 8192; // 8K
|
||||
var buffer = new byte[bufferSize];
|
||||
var offset = 0;
|
||||
var free = buffer.Length;
|
||||
|
||||
while (true)
|
||||
{
|
||||
var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer, offset, free), token).ConfigureAwait(false);
|
||||
|
||||
offset += result.Count;
|
||||
free -= result.Count;
|
||||
if (result.EndOfMessage) break;
|
||||
|
||||
if (free == 0)
|
||||
{
|
||||
// No free space
|
||||
// Resize the outgoing buffer
|
||||
var newSize = buffer.Length + bufferSize;
|
||||
|
||||
var newBuffer = new byte[newSize];
|
||||
Array.Copy(buffer, 0, newBuffer, 0, offset);
|
||||
buffer = newBuffer;
|
||||
free = buffer.Length - offset;
|
||||
}
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private static void OnTimedEvent(object source, EventArgs e)
|
||||
{
|
||||
Console.WriteLine($"messages received per second: {receivedMessage}");
|
||||
receivedMessage = 0;
|
||||
}
|
||||
|
||||
public static async Task ReceiveMessagesAsync(string url)
|
||||
{
|
||||
var ws = new ClientWebSocket();
|
||||
|
||||
System.Uri uri = new System.Uri(url);
|
||||
var cancellationToken = CancellationToken.None;
|
||||
|
||||
try
|
||||
{
|
||||
await ws.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
|
||||
while (true)
|
||||
{
|
||||
var data = await DevNullClientCli.ReceiveAsync(ws, cancellationToken);
|
||||
receivedMessage += 1;
|
||||
}
|
||||
}
|
||||
catch (System.Net.WebSockets.WebSocketException e)
|
||||
{
|
||||
Console.WriteLine($"WebSocket error: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public static async Task Main()
|
||||
{
|
||||
var timer = new System.Timers.Timer(1000);
|
||||
timer.Elapsed += OnTimedEvent;
|
||||
timer.Enabled = true;
|
||||
timer.Start();
|
||||
|
||||
var url = "ws://localhost:8008";
|
||||
await ReceiveMessagesAsync(url);
|
||||
}
|
||||
}
|
6
test/compatibility/csharp/devnull_client.csproj
Normal file
6
test/compatibility/csharp/devnull_client.csproj
Normal file
@ -0,0 +1,6 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp3.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
</Project>
|
42
test/compatibility/node/devnull_client.js
Normal file
42
test/compatibility/node/devnull_client.js
Normal file
@ -0,0 +1,42 @@
|
||||
//
|
||||
// With ws@7.3.1
|
||||
// and
|
||||
// node --version
|
||||
// v13.11.0
|
||||
//
|
||||
// In a different terminal, start a push server:
|
||||
// $ ws push_server -q
|
||||
//
|
||||
// $ node devnull_client.js
|
||||
// messages received per second: 16643
|
||||
// messages received per second: 28065
|
||||
// messages received per second: 28432
|
||||
// messages received per second: 22207
|
||||
// messages received per second: 28805
|
||||
// messages received per second: 28694
|
||||
// messages received per second: 28180
|
||||
// messages received per second: 28601
|
||||
// messages received per second: 28698
|
||||
// messages received per second: 28931
|
||||
// messages received per second: 27975
|
||||
//
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const ws = new WebSocket('ws://localhost:8008');
|
||||
|
||||
ws.on('open', function open() {
|
||||
ws.send('hello from node');
|
||||
});
|
||||
|
||||
var receivedMessages = 0;
|
||||
|
||||
setInterval(function timeout() {
|
||||
console.log(`messages received per second: ${receivedMessages}`)
|
||||
receivedMessages = 0;
|
||||
}, 1000);
|
||||
|
||||
ws.on('message', function incoming(data) {
|
||||
receivedMessages += 1;
|
||||
});
|
||||
|
||||
|
44
test/compatibility/python/websockets/devnull_client.py
Normal file
44
test/compatibility/python/websockets/devnull_client.py
Normal file
@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# websocket send client
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import websockets
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
uvloop.install()
|
||||
except ImportError:
|
||||
print('uvloop not available')
|
||||
pass
|
||||
|
||||
msgCount = 0
|
||||
|
||||
async def timer():
|
||||
global msgCount
|
||||
|
||||
while True:
|
||||
print(f'Received messages: {msgCount}')
|
||||
msgCount = 0
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def client(url):
|
||||
global msgCount
|
||||
|
||||
asyncio.ensure_future(timer())
|
||||
|
||||
async with websockets.connect(url) as ws:
|
||||
async for message in ws:
|
||||
msgCount += 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='websocket proxy.')
|
||||
parser.add_argument('--url', help='Remote websocket url',
|
||||
default='wss://echo.websocket.org')
|
||||
args = parser.parse_args()
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(client(args.url))
|
@ -10,7 +10,7 @@ import websockets
|
||||
async def echo(websocket, path):
|
||||
while True:
|
||||
msg = await websocket.recv()
|
||||
print(f'Received {len(msg)} bytes')
|
||||
# print(f'Received {len(msg)} bytes')
|
||||
await websocket.send(msg)
|
||||
|
||||
host = os.getenv('BIND_HOST', 'localhost')
|
||||
|
6
test/compatibility/ruby/README.md
Normal file
6
test/compatibility/ruby/README.md
Normal file
@ -0,0 +1,6 @@
|
||||
```
|
||||
export GEM_HOME=$HOME/local/gems
|
||||
bundle install faye-websocket
|
||||
```
|
||||
|
||||
https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv
|
59
test/compatibility/ruby/devnull_client.rb
Normal file
59
test/compatibility/ruby/devnull_client.rb
Normal file
@ -0,0 +1,59 @@
|
||||
#
|
||||
# $ ruby --version
|
||||
# ruby 2.6.3p62 (2019-04-16 revision 67580) [universal.x86_64-darwin19]
|
||||
#
|
||||
# Install a gem locally by setting GEM_HOME
|
||||
# https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv
|
||||
# export GEM_HOME=$HOME/local/gems
|
||||
# bundle install faye-websocket
|
||||
#
|
||||
# In a different terminal, start a push server:
|
||||
# $ ws push_server -q
|
||||
#
|
||||
# $ ruby devnull_client.rb
|
||||
# [:open]
|
||||
# Connected to server
|
||||
# messages received per second: 115926
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# ^C[:close, 1006, ""]
|
||||
#
|
||||
require 'faye/websocket'
|
||||
require 'eventmachine'
|
||||
|
||||
EM.run {
|
||||
ws = Faye::WebSocket::Client.new('ws://127.0.0.1:8008')
|
||||
|
||||
counter = 0
|
||||
|
||||
EM.add_periodic_timer(1) do
|
||||
print "messages received per second: #{counter}\n"
|
||||
counter = 0 # reset counter
|
||||
end
|
||||
|
||||
ws.on :open do |event|
|
||||
p [:open]
|
||||
print "Connected to server\n"
|
||||
end
|
||||
|
||||
ws.on :message do |event|
|
||||
# Uncomment the next line to validate that we receive something correct
|
||||
# p [:message, event.data]
|
||||
counter += 1
|
||||
end
|
||||
|
||||
ws.on :close do |event|
|
||||
p [:close, event.code, event.reason]
|
||||
ws = nil
|
||||
end
|
||||
}
|
18379
third_party/nlohmann/json.hpp
vendored
18379
third_party/nlohmann/json.hpp
vendored
File diff suppressed because it is too large
Load Diff
@ -46,27 +46,6 @@ add_executable(ws
|
||||
../third_party/msgpack11/msgpack11.cpp
|
||||
../third_party/cpp-linenoise/linenoise.cpp
|
||||
${JSONCPP_SOURCES}
|
||||
|
||||
ws_http_client.cpp
|
||||
ws_ping_pong.cpp
|
||||
ws_broadcast_server.cpp
|
||||
ws_echo_server.cpp
|
||||
ws_chat.cpp
|
||||
ws_connect.cpp
|
||||
ws_transfer.cpp
|
||||
ws_send.cpp
|
||||
ws_receive.cpp
|
||||
ws_redis_cli.cpp
|
||||
ws_redis_publish.cpp
|
||||
ws_redis_subscribe.cpp
|
||||
ws_redis_server.cpp
|
||||
ws_snake.cpp
|
||||
ws_cobra_metrics_publish.cpp
|
||||
ws_cobra_publish.cpp
|
||||
ws_httpd.cpp
|
||||
ws_autobahn.cpp
|
||||
ws_sentry_minidump_upload.cpp
|
||||
ws_dns_lookup.cpp
|
||||
ws.cpp)
|
||||
|
||||
# library with the most dependencies come first
|
||||
|
6
ws/proxyConfig.json
Normal file
6
ws/proxyConfig.json
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"remote_urls": {
|
||||
"echo.localhost:8008": "ws://localhost:8009",
|
||||
"cobra.localhost:8008": "ws://localhost:5678"
|
||||
}
|
||||
}
|
39
ws/test_ws_proxy.sh
Normal file
39
ws/test_ws_proxy.sh
Normal file
@ -0,0 +1,39 @@
|
||||
#!/bin/sh
|
||||
|
||||
# This test requires cobra to be available
|
||||
which cobra > /dev/null || {
|
||||
echo cobra is not installed on this machine.
|
||||
exit 0
|
||||
}
|
||||
|
||||
# Handle Ctrl-C by killing all sub-processing AND exiting
|
||||
trap cleanup INT
|
||||
|
||||
function cleanup {
|
||||
exit_code=${1:-1}
|
||||
echo "Killing all servers (ws and cobra)"
|
||||
echo
|
||||
kill `cat /tmp/pidfile.proxy` &>/dev/null
|
||||
kill `cat /tmp/pidfile.echo_server` &>/dev/null
|
||||
kill `cat /tmp/pidfile.cobra` &>/dev/null
|
||||
kill `cat /tmp/pidfile.connect.echo` &>/dev/null
|
||||
kill `cat /tmp/pidfile.connect.cobra` &>/dev/null
|
||||
exit ${exit_code}
|
||||
}
|
||||
|
||||
ws proxy_server --pidfile /tmp/pidfile.proxy --config_path proxyConfig.json &
|
||||
ws echo_server --pidfile /tmp/pidfile.echo_server --port 8009 &
|
||||
cobra -v run --pidfile /tmp/pidfile.cobra --port 5678 &
|
||||
|
||||
# Wait for the servers to be up
|
||||
sleep 1
|
||||
|
||||
# unbuffer comes with expect (written in tcl)
|
||||
echo 'hello' | unbuffer ws connect --pidfile /tmp/pidfile.connect.echo ws://echo.localhost:8008 &
|
||||
|
||||
echo 'hello' | unbuffer ws connect --pidfile /tmp/pidfile.connect.cobra ws://cobra.localhost:8008 &
|
||||
|
||||
# Wait
|
||||
sleep 2
|
||||
|
||||
cleanup
|
126
ws/ws.h
126
ws/ws.h
@ -1,126 +0,0 @@
|
||||
/*
|
||||
* ws.h
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <ixcobra/IXCobraConfig.h>
|
||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_http_client_main(const std::string& url,
|
||||
const std::string& headers,
|
||||
const std::string& data,
|
||||
bool headersOnly,
|
||||
int connectTimeout,
|
||||
int transferTimeout,
|
||||
bool followRedirects,
|
||||
int maxRedirects,
|
||||
bool verbose,
|
||||
bool save,
|
||||
const std::string& output,
|
||||
bool compress,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
int ws_echo_server_main(int port,
|
||||
bool greetings,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
bool ipv6,
|
||||
bool disablePerMessageDeflate,
|
||||
bool disablePong);
|
||||
|
||||
int ws_broadcast_server_main(int port,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
int ws_transfer_main(int port,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
int ws_chat_main(const std::string& url, const std::string& user);
|
||||
|
||||
int ws_connect_main(const std::string& url,
|
||||
const std::string& headers,
|
||||
bool disableAutomaticReconnection,
|
||||
bool disablePerMessageDeflate,
|
||||
bool binaryMode,
|
||||
uint32_t maxWaitBetweenReconnectionRetries,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& subprotocol,
|
||||
int pingIntervalSecs);
|
||||
|
||||
int ws_receive_main(const std::string& url,
|
||||
bool enablePerMessageDeflate,
|
||||
int delayMs,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
int ws_send_main(const std::string& url,
|
||||
const std::string& path,
|
||||
bool disablePerMessageDeflate,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password);
|
||||
|
||||
int ws_redis_publish_main(const std::string& hostname,
|
||||
int port,
|
||||
const std::string& password,
|
||||
const std::string& channel,
|
||||
const std::string& message,
|
||||
int count);
|
||||
|
||||
int ws_redis_subscribe_main(const std::string& hostname,
|
||||
int port,
|
||||
const std::string& password,
|
||||
const std::string& channel,
|
||||
bool verbose);
|
||||
|
||||
int ws_cobra_publish_main(const ix::CobraConfig& appkey,
|
||||
const std::string& channel,
|
||||
const std::string& path);
|
||||
|
||||
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
|
||||
const std::string& channel,
|
||||
const std::string& path,
|
||||
bool stress);
|
||||
|
||||
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
|
||||
const std::string& channel,
|
||||
const std::string& filter,
|
||||
const std::string& position,
|
||||
const std::string& host,
|
||||
int port);
|
||||
|
||||
int ws_snake_main(int port,
|
||||
const std::string& hostname,
|
||||
const std::string& redisHosts,
|
||||
int redisPort,
|
||||
const std::string& redisPassword,
|
||||
bool verbose,
|
||||
const std::string& appsConfigPath,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
bool disablePong,
|
||||
const std::string& republishChannel);
|
||||
|
||||
int ws_httpd_main(int port,
|
||||
const std::string& hostname,
|
||||
bool redirect,
|
||||
const std::string& redirectUrl,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
int ws_autobahn_main(const std::string& url, bool quiet);
|
||||
|
||||
int ws_redis_server_main(int port, const std::string& hostname);
|
||||
|
||||
int ws_sentry_minidump_upload(const std::string& metadataPath,
|
||||
const std::string& minidump,
|
||||
const std::string& project,
|
||||
const std::string& key,
|
||||
bool verbose);
|
||||
|
||||
int ws_dns_lookup(const std::string& hostname);
|
||||
} // namespace ix
|
@ -1,298 +0,0 @@
|
||||
/*
|
||||
* ws_autobahn.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
//
|
||||
// 1. First you need to generate a config file in a config folder,
|
||||
// which can use a white list of test to execute (with globbing),
|
||||
// or a black list of tests to ignore
|
||||
//
|
||||
// config/fuzzingserver.json
|
||||
// {
|
||||
// "url": "ws://127.0.0.1:9001",
|
||||
// "outdir": "./reports/clients",
|
||||
// "cases": ["2.*"],
|
||||
// "exclude-cases": [
|
||||
// ],
|
||||
// "exclude-agent-cases": {}
|
||||
// }
|
||||
//
|
||||
//
|
||||
// 2 Run the test server (using docker)
|
||||
// docker run -it --rm -v "${PWD}/config:/config" -v "${PWD}/reports:/reports" -p 9001:9001 --name
|
||||
// fuzzingserver crossbario/autobahn-testsuite
|
||||
//
|
||||
// 3. Run this command
|
||||
// ws autobahn -q --url ws://localhost:9001
|
||||
//
|
||||
// 4. A HTML report will be generated, you can inspect it to see if you are compliant or not
|
||||
//
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <mutex>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
std::string truncate(const std::string& str, size_t n)
|
||||
{
|
||||
if (str.size() < n)
|
||||
{
|
||||
return str;
|
||||
}
|
||||
else
|
||||
{
|
||||
return str.substr(0, n) + "...";
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class AutobahnTestCase
|
||||
{
|
||||
public:
|
||||
AutobahnTestCase(const std::string& _url, bool quiet);
|
||||
void run();
|
||||
|
||||
private:
|
||||
void log(const std::string& msg);
|
||||
|
||||
std::string _url;
|
||||
ix::WebSocket _webSocket;
|
||||
|
||||
bool _quiet;
|
||||
|
||||
std::mutex _mutex;
|
||||
std::condition_variable _condition;
|
||||
};
|
||||
|
||||
AutobahnTestCase::AutobahnTestCase(const std::string& url, bool quiet)
|
||||
: _url(url)
|
||||
, _quiet(quiet)
|
||||
{
|
||||
_webSocket.disableAutomaticReconnection();
|
||||
|
||||
// FIXME: this should be on by default
|
||||
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
|
||||
true, false, false, 15, 15);
|
||||
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
|
||||
}
|
||||
|
||||
void AutobahnTestCase::log(const std::string& msg)
|
||||
{
|
||||
if (!_quiet)
|
||||
{
|
||||
spdlog::info(msg);
|
||||
}
|
||||
}
|
||||
|
||||
void AutobahnTestCase::run()
|
||||
{
|
||||
_webSocket.setUrl(_url);
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("Connecting to url: ") + _url);
|
||||
|
||||
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
|
||||
std::stringstream ss;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
log("autobahn: connected");
|
||||
ss << "Uri: " << msg->openInfo.uri << std::endl;
|
||||
ss << "Handshake Headers:" << std::endl;
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
ss << it.first << ": " << it.second << std::endl;
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
ss << "autobahn: connection closed:";
|
||||
ss << " code " << msg->closeInfo.code;
|
||||
ss << " reason " << msg->closeInfo.reason << std::endl;
|
||||
|
||||
_condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
ss << "Received " << msg->wireSize << " bytes" << std::endl;
|
||||
|
||||
ss << "autobahn: received message: " << truncate(msg->str, 40) << std::endl;
|
||||
|
||||
_webSocket.send(msg->str, msg->binary);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
|
||||
// And error can happen, in which case the test-case is marked done
|
||||
_condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Fragment)
|
||||
{
|
||||
ss << "Received message fragment" << std::endl;
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
ss << "Received ping" << std::endl;
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
ss << "Received pong" << std::endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "Invalid ix::WebSocketMessageType" << std::endl;
|
||||
}
|
||||
|
||||
log(ss.str());
|
||||
});
|
||||
|
||||
_webSocket.start();
|
||||
|
||||
log("Waiting for test completion ...");
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_condition.wait(lock);
|
||||
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
bool generateReport(const std::string& url)
|
||||
{
|
||||
ix::WebSocket webSocket;
|
||||
std::string reportUrl(url);
|
||||
reportUrl += "/updateReports?agent=ixwebsocket";
|
||||
webSocket.setUrl(reportUrl);
|
||||
webSocket.disableAutomaticReconnection();
|
||||
|
||||
std::atomic<bool> success(true);
|
||||
std::condition_variable condition;
|
||||
|
||||
webSocket.setOnMessageCallback([&condition, &success](const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
spdlog::info("Report generated");
|
||||
condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
spdlog::info(ss.str());
|
||||
|
||||
success = false;
|
||||
}
|
||||
});
|
||||
|
||||
webSocket.start();
|
||||
std::mutex mutex;
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
condition.wait(lock);
|
||||
webSocket.stop();
|
||||
|
||||
if (!success)
|
||||
{
|
||||
spdlog::error("Cannot generate report at url {}", reportUrl);
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
int getTestCaseCount(const std::string& url)
|
||||
{
|
||||
ix::WebSocket webSocket;
|
||||
std::string caseCountUrl(url);
|
||||
caseCountUrl += "/getCaseCount";
|
||||
webSocket.setUrl(caseCountUrl);
|
||||
webSocket.disableAutomaticReconnection();
|
||||
|
||||
int count = -1;
|
||||
std::condition_variable condition;
|
||||
|
||||
webSocket.setOnMessageCallback([&condition, &count](const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
spdlog::info(ss.str());
|
||||
|
||||
condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
// response is a string
|
||||
std::stringstream ss;
|
||||
ss << msg->str;
|
||||
ss >> count;
|
||||
}
|
||||
});
|
||||
|
||||
webSocket.start();
|
||||
std::mutex mutex;
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
condition.wait(lock);
|
||||
webSocket.stop();
|
||||
|
||||
if (count == -1)
|
||||
{
|
||||
spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
//
|
||||
// make && bench ws autobahn --url ws://localhost:9001
|
||||
//
|
||||
int ws_autobahn_main(const std::string& url, bool quiet)
|
||||
{
|
||||
int testCasesCount = getTestCaseCount(url);
|
||||
spdlog::info("Test cases count: {}", testCasesCount);
|
||||
|
||||
if (testCasesCount == -1)
|
||||
{
|
||||
spdlog::error("Cannot retrieve test case count at url {}", url);
|
||||
return 1;
|
||||
}
|
||||
|
||||
testCasesCount++;
|
||||
|
||||
for (int i = 1; i < testCasesCount; ++i)
|
||||
{
|
||||
spdlog::info("Execute test case {}", i);
|
||||
|
||||
int caseNumber = i;
|
||||
|
||||
std::stringstream ss;
|
||||
ss << url << "/runCase?case=" << caseNumber << "&agent=ixwebsocket";
|
||||
|
||||
std::string url(ss.str());
|
||||
|
||||
AutobahnTestCase testCase(url, quiet);
|
||||
testCase.run();
|
||||
}
|
||||
|
||||
return generateReport(url) ? 0 : 1;
|
||||
}
|
||||
} // namespace ix
|
@ -1,98 +0,0 @@
|
||||
/*
|
||||
* ws_broadcast_server.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <ixwebsocket/IXWebSocketServer.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_broadcast_server_main(int port,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
spdlog::info("Listening on {}:{}", hostname, port);
|
||||
|
||||
ix::WebSocketServer server(port, hostname);
|
||||
server.setTLSOptions(tlsOptions);
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("New connection");
|
||||
spdlog::info("remote ip: {}", remoteIp);
|
||||
spdlog::info("id: {}", connectionState->getId());
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
spdlog::info("Closed connection: code {} reason {}",
|
||||
msg->closeInfo.code,
|
||||
msg->closeInfo.reason);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
spdlog::info(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Fragment)
|
||||
{
|
||||
spdlog::info("Received message fragment");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
spdlog::info("Received {} bytes", msg->wireSize);
|
||||
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client.get() != &webSocket)
|
||||
{
|
||||
client->send(msg->str, msg->binary, [](int current, int total) -> bool {
|
||||
spdlog::info("Step {} out of {}", current, total);
|
||||
return true;
|
||||
});
|
||||
|
||||
do
|
||||
{
|
||||
size_t bufferedAmount = client->bufferedAmount();
|
||||
spdlog::info("{} bytes left to be sent", bufferedAmount);
|
||||
|
||||
std::chrono::duration<double, std::milli> duration(500);
|
||||
std::this_thread::sleep_for(duration);
|
||||
} while (client->bufferedAmount() != 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
{
|
||||
spdlog::info(res.second);
|
||||
return 1;
|
||||
}
|
||||
|
||||
server.start();
|
||||
server.wait();
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
191
ws/ws_chat.cpp
191
ws/ws_chat.cpp
@ -1,191 +0,0 @@
|
||||
/*
|
||||
* ws_chat.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
//
|
||||
// Simple chat program that talks to a broadcast server
|
||||
// Broadcast server can be ran with `ws broadcast_server`
|
||||
//
|
||||
|
||||
#include "nlohmann/json.hpp"
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <queue>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
// for convenience
|
||||
using json = nlohmann::json;
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class WebSocketChat
|
||||
{
|
||||
public:
|
||||
WebSocketChat(const std::string& url, const std::string& user);
|
||||
|
||||
void subscribe(const std::string& channel);
|
||||
void start();
|
||||
void stop();
|
||||
bool isReady() const;
|
||||
|
||||
void sendMessage(const std::string& text);
|
||||
size_t getReceivedMessagesCount() const;
|
||||
|
||||
std::string encodeMessage(const std::string& text);
|
||||
std::pair<std::string, std::string> decodeMessage(const std::string& str);
|
||||
|
||||
private:
|
||||
std::string _url;
|
||||
std::string _user;
|
||||
ix::WebSocket _webSocket;
|
||||
std::queue<std::string> _receivedQueue;
|
||||
|
||||
void log(const std::string& msg);
|
||||
};
|
||||
|
||||
WebSocketChat::WebSocketChat(const std::string& url, const std::string& user)
|
||||
: _url(url)
|
||||
, _user(user)
|
||||
{
|
||||
;
|
||||
}
|
||||
|
||||
void WebSocketChat::log(const std::string& msg)
|
||||
{
|
||||
spdlog::info(msg);
|
||||
}
|
||||
|
||||
size_t WebSocketChat::getReceivedMessagesCount() const
|
||||
{
|
||||
return _receivedQueue.size();
|
||||
}
|
||||
|
||||
bool WebSocketChat::isReady() const
|
||||
{
|
||||
return _webSocket.getReadyState() == ix::ReadyState::Open;
|
||||
}
|
||||
|
||||
void WebSocketChat::stop()
|
||||
{
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
void WebSocketChat::start()
|
||||
{
|
||||
_webSocket.setUrl(_url);
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("Connecting to url: ") + _url);
|
||||
|
||||
_webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) {
|
||||
std::stringstream ss;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
log("ws chat: connected");
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
|
||||
spdlog::info("ws chat: user {} connected !", _user);
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
ss << "ws chat user disconnected: " << _user;
|
||||
ss << " code " << msg->closeInfo.code;
|
||||
ss << " reason " << msg->closeInfo.reason << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
auto result = decodeMessage(msg->str);
|
||||
|
||||
// Our "chat" / "broacast" node.js server does not send us
|
||||
// the messages we send, so we don't have to filter it out.
|
||||
|
||||
// store text
|
||||
_receivedQueue.push(result.second);
|
||||
|
||||
ss << std::endl
|
||||
<< result.first << "(" << msg->wireSize << " bytes)"
|
||||
<< " > " << result.second << std::endl
|
||||
<< _user << " > ";
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "Invalid ix::WebSocketMessageType";
|
||||
log(ss.str());
|
||||
}
|
||||
});
|
||||
|
||||
_webSocket.start();
|
||||
}
|
||||
|
||||
std::pair<std::string, std::string> WebSocketChat::decodeMessage(const std::string& str)
|
||||
{
|
||||
auto j = json::parse(str);
|
||||
|
||||
std::string msg_user = j["user"];
|
||||
std::string msg_text = j["text"];
|
||||
|
||||
return std::pair<std::string, std::string>(msg_user, msg_text);
|
||||
}
|
||||
|
||||
std::string WebSocketChat::encodeMessage(const std::string& text)
|
||||
{
|
||||
json j;
|
||||
j["user"] = _user;
|
||||
j["text"] = text;
|
||||
|
||||
std::string output = j.dump();
|
||||
return output;
|
||||
}
|
||||
|
||||
void WebSocketChat::sendMessage(const std::string& text)
|
||||
{
|
||||
_webSocket.sendText(encodeMessage(text));
|
||||
}
|
||||
|
||||
int ws_chat_main(const std::string& url, const std::string& user)
|
||||
{
|
||||
spdlog::info("Type Ctrl-D to exit prompt...");
|
||||
WebSocketChat webSocketChat(url, user);
|
||||
webSocketChat.start();
|
||||
|
||||
while (true)
|
||||
{
|
||||
// Read line
|
||||
std::string line;
|
||||
std::cout << user << " > " << std::flush;
|
||||
std::getline(std::cin, line);
|
||||
|
||||
if (!std::cin)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
webSocketChat.sendMessage(line);
|
||||
}
|
||||
|
||||
spdlog::info("");
|
||||
webSocketChat.stop();
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,77 +0,0 @@
|
||||
/*
|
||||
* ws_cobra_metrics_publish.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_cobra_metrics_publish_main(const ix::CobraConfig& config,
|
||||
const std::string& channel,
|
||||
const std::string& path,
|
||||
bool stress)
|
||||
{
|
||||
std::atomic<int> sentMessages(0);
|
||||
std::atomic<int> ackedMessages(0);
|
||||
CobraConnection::setPublishTrackerCallback(
|
||||
[&sentMessages, &ackedMessages](bool sent, bool acked) {
|
||||
if (sent) sentMessages++;
|
||||
if (acked) ackedMessages++;
|
||||
});
|
||||
|
||||
CobraMetricsPublisher cobraMetricsPublisher;
|
||||
cobraMetricsPublisher.enable(true);
|
||||
cobraMetricsPublisher.configure(config, channel);
|
||||
|
||||
while (!cobraMetricsPublisher.isAuthenticated())
|
||||
;
|
||||
|
||||
std::ifstream f(path);
|
||||
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
|
||||
|
||||
Json::Value data;
|
||||
Json::Reader reader;
|
||||
if (!reader.parse(str, data)) return 1;
|
||||
|
||||
if (!stress)
|
||||
{
|
||||
auto msgId = cobraMetricsPublisher.push(channel, data);
|
||||
spdlog::info("Sent message: {}", msgId);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Stress mode to try to trigger server and client bugs
|
||||
while (true)
|
||||
{
|
||||
for (int i = 0; i < 1000; ++i)
|
||||
{
|
||||
cobraMetricsPublisher.push(channel, data);
|
||||
}
|
||||
|
||||
cobraMetricsPublisher.suspend();
|
||||
cobraMetricsPublisher.resume();
|
||||
|
||||
// FIXME: investigate why without this check we trigger a lock
|
||||
while (!cobraMetricsPublisher.isAuthenticated())
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait a bit for the message to get a chance to be sent
|
||||
// there isn't any ack on publish right now so it's the best we can do
|
||||
// FIXME: this comment is a lie now
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages);
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,106 +0,0 @@
|
||||
/*
|
||||
* ws_cobra_publish.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <mutex>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_cobra_publish_main(const ix::CobraConfig& config,
|
||||
const std::string& channel,
|
||||
const std::string& path)
|
||||
{
|
||||
std::ifstream f(path);
|
||||
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
|
||||
|
||||
Json::Value data;
|
||||
Json::Reader reader;
|
||||
if (!reader.parse(str, data))
|
||||
{
|
||||
spdlog::info("Input file is not a JSON file");
|
||||
return 1;
|
||||
}
|
||||
|
||||
ix::CobraConnection conn;
|
||||
conn.configure(config);
|
||||
|
||||
// Display incoming messages
|
||||
std::atomic<bool> authenticated(false);
|
||||
std::atomic<bool> messageAcked(false);
|
||||
|
||||
conn.setEventCallback(
|
||||
[&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event) {
|
||||
if (event->type == ix::CobraEventType::Open)
|
||||
{
|
||||
spdlog::info("Publisher connected");
|
||||
|
||||
for (auto&& it : event->headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Closed)
|
||||
{
|
||||
spdlog::info("Subscriber closed: {}", event->errMsg);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Authenticated)
|
||||
{
|
||||
spdlog::info("Publisher authenticated");
|
||||
authenticated = true;
|
||||
|
||||
Json::Value channels;
|
||||
channels[0] = channel;
|
||||
auto msgId = conn.publish(channels, data);
|
||||
|
||||
spdlog::info("Published msg {}", msgId);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Subscribed)
|
||||
{
|
||||
spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||
{
|
||||
spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Error)
|
||||
{
|
||||
spdlog::error("Publisher: error {}", event->errMsg);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Published)
|
||||
{
|
||||
spdlog::info("Published message id {} acked", event->msgId);
|
||||
messageAcked = true;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Pong)
|
||||
{
|
||||
spdlog::info("Received websocket pong");
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::HandshakeError)
|
||||
{
|
||||
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::AuthenticationError)
|
||||
{
|
||||
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
|
||||
}
|
||||
});
|
||||
|
||||
conn.connect();
|
||||
|
||||
while (!authenticated)
|
||||
;
|
||||
while (!messageAcked)
|
||||
;
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,288 +0,0 @@
|
||||
/*
|
||||
* ws_connect.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXBench.h"
|
||||
#include "linenoise.hpp"
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class WebSocketConnect
|
||||
{
|
||||
public:
|
||||
WebSocketConnect(const std::string& _url,
|
||||
const std::string& headers,
|
||||
bool disableAutomaticReconnection,
|
||||
bool disablePerMessageDeflate,
|
||||
bool binaryMode,
|
||||
uint32_t maxWaitBetweenReconnectionRetries,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& subprotocol,
|
||||
int pingIntervalSecs);
|
||||
|
||||
void subscribe(const std::string& channel);
|
||||
void start();
|
||||
void stop();
|
||||
|
||||
int getSentBytes()
|
||||
{
|
||||
return _sentBytes;
|
||||
}
|
||||
int getReceivedBytes()
|
||||
{
|
||||
return _receivedBytes;
|
||||
}
|
||||
|
||||
void sendMessage(const std::string& text);
|
||||
|
||||
private:
|
||||
std::string _url;
|
||||
WebSocketHttpHeaders _headers;
|
||||
ix::WebSocket _webSocket;
|
||||
bool _disablePerMessageDeflate;
|
||||
bool _binaryMode;
|
||||
std::atomic<int> _receivedBytes;
|
||||
std::atomic<int> _sentBytes;
|
||||
|
||||
void log(const std::string& msg);
|
||||
WebSocketHttpHeaders parseHeaders(const std::string& data);
|
||||
};
|
||||
|
||||
WebSocketConnect::WebSocketConnect(const std::string& url,
|
||||
const std::string& headers,
|
||||
bool disableAutomaticReconnection,
|
||||
bool disablePerMessageDeflate,
|
||||
bool binaryMode,
|
||||
uint32_t maxWaitBetweenReconnectionRetries,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& subprotocol,
|
||||
int pingIntervalSecs)
|
||||
: _url(url)
|
||||
, _disablePerMessageDeflate(disablePerMessageDeflate)
|
||||
, _binaryMode(binaryMode)
|
||||
, _receivedBytes(0)
|
||||
, _sentBytes(0)
|
||||
{
|
||||
if (disableAutomaticReconnection)
|
||||
{
|
||||
_webSocket.disableAutomaticReconnection();
|
||||
}
|
||||
_webSocket.setMaxWaitBetweenReconnectionRetries(maxWaitBetweenReconnectionRetries);
|
||||
_webSocket.setTLSOptions(tlsOptions);
|
||||
_webSocket.setPingInterval(pingIntervalSecs);
|
||||
|
||||
_headers = parseHeaders(headers);
|
||||
|
||||
if (!subprotocol.empty())
|
||||
{
|
||||
_webSocket.addSubProtocol(subprotocol);
|
||||
}
|
||||
|
||||
WebSocket::setTrafficTrackerCallback([this](int size, bool incoming) {
|
||||
if (incoming)
|
||||
{
|
||||
_receivedBytes += size;
|
||||
}
|
||||
else
|
||||
{
|
||||
_sentBytes += size;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void WebSocketConnect::log(const std::string& msg)
|
||||
{
|
||||
std::cout << msg << std::endl;
|
||||
}
|
||||
|
||||
WebSocketHttpHeaders WebSocketConnect::parseHeaders(const std::string& data)
|
||||
{
|
||||
WebSocketHttpHeaders headers;
|
||||
|
||||
// Split by \n
|
||||
std::string token;
|
||||
std::stringstream tokenStream(data);
|
||||
|
||||
while (std::getline(tokenStream, token))
|
||||
{
|
||||
std::size_t pos = token.rfind(':');
|
||||
|
||||
// Bail out if last '.' is found
|
||||
if (pos == std::string::npos) continue;
|
||||
|
||||
auto key = token.substr(0, pos);
|
||||
auto val = token.substr(pos + 1);
|
||||
|
||||
spdlog::info("{}: {}", key, val);
|
||||
headers[key] = val;
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
void WebSocketConnect::stop()
|
||||
{
|
||||
{
|
||||
Bench bench("ws_connect: stop connection");
|
||||
_webSocket.stop();
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketConnect::start()
|
||||
{
|
||||
_webSocket.setUrl(_url);
|
||||
_webSocket.setExtraHeaders(_headers);
|
||||
|
||||
if (_disablePerMessageDeflate)
|
||||
{
|
||||
_webSocket.disablePerMessageDeflate();
|
||||
}
|
||||
else
|
||||
{
|
||||
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
|
||||
true, false, false, 15, 15);
|
||||
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
|
||||
}
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("Connecting to url: ") + _url);
|
||||
|
||||
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
|
||||
std::stringstream ss;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
log("ws_connect: connected");
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
ss << "ws_connect: connection closed:";
|
||||
ss << " code " << msg->closeInfo.code;
|
||||
ss << " reason " << msg->closeInfo.reason << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
spdlog::info("Received {} bytes", msg->wireSize);
|
||||
|
||||
ss << "ws_connect: received message: " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Fragment)
|
||||
{
|
||||
spdlog::info("Received message fragment");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
spdlog::info("Received ping");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
spdlog::info("Received pong {}", msg->str);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "Invalid ix::WebSocketMessageType";
|
||||
log(ss.str());
|
||||
}
|
||||
});
|
||||
|
||||
_webSocket.start();
|
||||
}
|
||||
|
||||
void WebSocketConnect::sendMessage(const std::string& text)
|
||||
{
|
||||
if (_binaryMode)
|
||||
{
|
||||
_webSocket.sendBinary(text);
|
||||
}
|
||||
else
|
||||
{
|
||||
_webSocket.sendText(text);
|
||||
}
|
||||
}
|
||||
|
||||
int ws_connect_main(const std::string& url,
|
||||
const std::string& headers,
|
||||
bool disableAutomaticReconnection,
|
||||
bool disablePerMessageDeflate,
|
||||
bool binaryMode,
|
||||
uint32_t maxWaitBetweenReconnectionRetries,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
const std::string& subprotocol,
|
||||
int pingIntervalSecs)
|
||||
{
|
||||
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
|
||||
WebSocketConnect webSocketChat(url,
|
||||
headers,
|
||||
disableAutomaticReconnection,
|
||||
disablePerMessageDeflate,
|
||||
binaryMode,
|
||||
maxWaitBetweenReconnectionRetries,
|
||||
tlsOptions,
|
||||
subprotocol,
|
||||
pingIntervalSecs);
|
||||
webSocketChat.start();
|
||||
|
||||
while (true)
|
||||
{
|
||||
// Read line
|
||||
std::string line;
|
||||
auto quit = linenoise::Readline("> ", line);
|
||||
|
||||
if (quit)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (line == "/stop")
|
||||
{
|
||||
spdlog::info("Stopping connection...");
|
||||
webSocketChat.stop();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (line == "/start")
|
||||
{
|
||||
spdlog::info("Starting connection...");
|
||||
webSocketChat.start();
|
||||
continue;
|
||||
}
|
||||
|
||||
webSocketChat.sendMessage(line);
|
||||
|
||||
// Add text to history
|
||||
linenoise::AddHistory(line.c_str());
|
||||
}
|
||||
|
||||
spdlog::info("");
|
||||
webSocketChat.stop();
|
||||
|
||||
spdlog::info("Received {} bytes", webSocketChat.getReceivedBytes());
|
||||
spdlog::info("Sent {} bytes", webSocketChat.getSentBytes());
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,34 +0,0 @@
|
||||
/*
|
||||
* ws_dns_lookup.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <ixwebsocket/IXDNSLookup.h>
|
||||
#include <ixwebsocket/IXNetSystem.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_dns_lookup(const std::string& hostname)
|
||||
{
|
||||
auto dnsLookup = std::make_shared<DNSLookup>(hostname, 80);
|
||||
|
||||
std::string errMsg;
|
||||
struct addrinfo* res;
|
||||
|
||||
res = dnsLookup->resolve(errMsg, [] { return false; });
|
||||
|
||||
auto addr = res->ai_addr;
|
||||
|
||||
char str[INET_ADDRSTRLEN];
|
||||
inet_ntop(AF_INET, &addr, str, INET_ADDRSTRLEN);
|
||||
|
||||
spdlog::info("host: {} ip: {}", hostname, str);
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,101 +0,0 @@
|
||||
/*
|
||||
* ws_echo_server.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <ixwebsocket/IXNetSystem.h>
|
||||
#include <ixwebsocket/IXWebSocketServer.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_echo_server_main(int port,
|
||||
bool greetings,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions,
|
||||
bool ipv6,
|
||||
bool disablePerMessageDeflate,
|
||||
bool disablePong)
|
||||
{
|
||||
spdlog::info("Listening on {}:{}", hostname, port);
|
||||
|
||||
ix::WebSocketServer server(port,
|
||||
hostname,
|
||||
SocketServer::kDefaultTcpBacklog,
|
||||
SocketServer::kDefaultMaxConnections,
|
||||
WebSocketServer::kDefaultHandShakeTimeoutSecs,
|
||||
(ipv6) ? AF_INET6 : AF_INET);
|
||||
|
||||
server.setTLSOptions(tlsOptions);
|
||||
|
||||
if (disablePerMessageDeflate)
|
||||
{
|
||||
spdlog::info("Disable per message deflate");
|
||||
server.disablePerMessageDeflate();
|
||||
}
|
||||
|
||||
if (disablePong)
|
||||
{
|
||||
spdlog::info("Disable responding to PING messages with PONG");
|
||||
server.disablePong();
|
||||
}
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[greetings](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("New connection");
|
||||
spdlog::info("remote ip: {}", remoteIp);
|
||||
spdlog::info("id: {}", connectionState->getId());
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
|
||||
if (greetings)
|
||||
{
|
||||
webSocket.sendText("Welcome !");
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
spdlog::info("Closed connection: client id {} code {} reason {}",
|
||||
connectionState->getId(),
|
||||
msg->closeInfo.code,
|
||||
msg->closeInfo.reason);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
spdlog::error("Connection error: {}", msg->errorInfo.reason);
|
||||
spdlog::error("#retries: {}", msg->errorInfo.retries);
|
||||
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
|
||||
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
spdlog::info("Received {} bytes", msg->wireSize);
|
||||
webSocket.send(msg->str, msg->binary);
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
{
|
||||
spdlog::error(res.second);
|
||||
return 1;
|
||||
}
|
||||
|
||||
server.start();
|
||||
server.wait();
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,185 +0,0 @@
|
||||
/*
|
||||
* http_client.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <fstream>
|
||||
#include <ixwebsocket/IXHttpClient.h>
|
||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
std::string extractFilename(const std::string& path)
|
||||
{
|
||||
std::string::size_type idx;
|
||||
|
||||
idx = path.rfind('/');
|
||||
if (idx != std::string::npos)
|
||||
{
|
||||
std::string filename = path.substr(idx + 1);
|
||||
return filename;
|
||||
}
|
||||
else
|
||||
{
|
||||
return path;
|
||||
}
|
||||
}
|
||||
|
||||
WebSocketHttpHeaders parseHeaders(const std::string& data)
|
||||
{
|
||||
WebSocketHttpHeaders headers;
|
||||
|
||||
// Split by \n
|
||||
std::string token;
|
||||
std::stringstream tokenStream(data);
|
||||
|
||||
while (std::getline(tokenStream, token))
|
||||
{
|
||||
std::size_t pos = token.rfind(':');
|
||||
|
||||
// Bail out if last '.' is found
|
||||
if (pos == std::string::npos) continue;
|
||||
|
||||
auto key = token.substr(0, pos);
|
||||
auto val = token.substr(pos + 1);
|
||||
|
||||
spdlog::info("{}: {}", key, val);
|
||||
headers[key] = val;
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
//
|
||||
// Useful endpoint to test HTTP post
|
||||
// https://postman-echo.com/post
|
||||
//
|
||||
HttpParameters parsePostParameters(const std::string& data)
|
||||
{
|
||||
HttpParameters httpParameters;
|
||||
|
||||
// Split by \n
|
||||
std::string token;
|
||||
std::stringstream tokenStream(data);
|
||||
|
||||
while (std::getline(tokenStream, token))
|
||||
{
|
||||
std::size_t pos = token.rfind('=');
|
||||
|
||||
// Bail out if last '.' is found
|
||||
if (pos == std::string::npos) continue;
|
||||
|
||||
auto key = token.substr(0, pos);
|
||||
auto val = token.substr(pos + 1);
|
||||
|
||||
spdlog::info("{}: {}", key, val);
|
||||
httpParameters[key] = val;
|
||||
}
|
||||
|
||||
return httpParameters;
|
||||
}
|
||||
|
||||
int ws_http_client_main(const std::string& url,
|
||||
const std::string& headersData,
|
||||
const std::string& data,
|
||||
bool headersOnly,
|
||||
int connectTimeout,
|
||||
int transferTimeout,
|
||||
bool followRedirects,
|
||||
int maxRedirects,
|
||||
bool verbose,
|
||||
bool save,
|
||||
const std::string& output,
|
||||
bool compress,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
HttpClient httpClient;
|
||||
httpClient.setTLSOptions(tlsOptions);
|
||||
|
||||
auto args = httpClient.createRequest();
|
||||
args->extraHeaders = parseHeaders(headersData);
|
||||
args->connectTimeout = connectTimeout;
|
||||
args->transferTimeout = transferTimeout;
|
||||
args->followRedirects = followRedirects;
|
||||
args->maxRedirects = maxRedirects;
|
||||
args->verbose = verbose;
|
||||
args->compress = compress;
|
||||
args->logger = [](const std::string& msg) { spdlog::info(msg); };
|
||||
args->onProgressCallback = [verbose](int current, int total) -> bool {
|
||||
if (verbose)
|
||||
{
|
||||
spdlog::info("Downloaded {} bytes out of {}", current, total);
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
HttpParameters httpParameters = parsePostParameters(data);
|
||||
|
||||
HttpResponsePtr response;
|
||||
if (headersOnly)
|
||||
{
|
||||
response = httpClient.head(url, args);
|
||||
}
|
||||
else if (data.empty())
|
||||
{
|
||||
response = httpClient.get(url, args);
|
||||
}
|
||||
else
|
||||
{
|
||||
response = httpClient.post(url, httpParameters, args);
|
||||
}
|
||||
|
||||
spdlog::info("");
|
||||
|
||||
for (auto it : response->headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
|
||||
spdlog::info("Upload size: {}", response->uploadSize);
|
||||
spdlog::info("Download size: {}", response->downloadSize);
|
||||
|
||||
spdlog::info("Status: {}", response->statusCode);
|
||||
if (response->errorCode != HttpErrorCode::Ok)
|
||||
{
|
||||
spdlog::info("error message: ", response->errorMsg);
|
||||
}
|
||||
|
||||
if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
|
||||
{
|
||||
if (save || !output.empty())
|
||||
{
|
||||
// FIMXE we should decode the url first
|
||||
std::string filename = extractFilename(url);
|
||||
if (!output.empty())
|
||||
{
|
||||
filename = output;
|
||||
}
|
||||
|
||||
spdlog::info("Writing to disk: {}", filename);
|
||||
std::ofstream out(filename);
|
||||
out.write((char*) &response->payload.front(), response->payload.size());
|
||||
out.close();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (response->headers["Content-Type"] != "application/octet-stream")
|
||||
{
|
||||
spdlog::info("payload: {}", response->payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
spdlog::info("Binary output can mess up your terminal.");
|
||||
spdlog::info("Use the -O flag to save the file to disk.");
|
||||
spdlog::info("You can also use the --output option to specify a filename.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
* ws_httpd.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <fstream>
|
||||
#include <ixwebsocket/IXHttpServer.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
#include <vector>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_httpd_main(int port,
|
||||
const std::string& hostname,
|
||||
bool redirect,
|
||||
const std::string& redirectUrl,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
spdlog::info("Listening on {}:{}", hostname, port);
|
||||
|
||||
ix::HttpServer server(port, hostname);
|
||||
server.setTLSOptions(tlsOptions);
|
||||
|
||||
if (redirect)
|
||||
{
|
||||
server.makeRedirectServer(redirectUrl);
|
||||
}
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
{
|
||||
spdlog::error(res.second);
|
||||
return 1;
|
||||
}
|
||||
|
||||
server.start();
|
||||
server.wait();
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,161 +0,0 @@
|
||||
/*
|
||||
* ws_ping_pong.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class WebSocketPingPong
|
||||
{
|
||||
public:
|
||||
WebSocketPingPong(const std::string& _url, const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
void subscribe(const std::string& channel);
|
||||
void start();
|
||||
void stop();
|
||||
|
||||
void ping(const std::string& text);
|
||||
void send(const std::string& text);
|
||||
|
||||
private:
|
||||
std::string _url;
|
||||
ix::WebSocket _webSocket;
|
||||
|
||||
void log(const std::string& msg);
|
||||
};
|
||||
|
||||
WebSocketPingPong::WebSocketPingPong(const std::string& url,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
: _url(url)
|
||||
{
|
||||
_webSocket.setTLSOptions(tlsOptions);
|
||||
}
|
||||
|
||||
void WebSocketPingPong::log(const std::string& msg)
|
||||
{
|
||||
spdlog::info(msg);
|
||||
}
|
||||
|
||||
void WebSocketPingPong::stop()
|
||||
{
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
void WebSocketPingPong::start()
|
||||
{
|
||||
_webSocket.setUrl(_url);
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("Connecting to url: ") + _url);
|
||||
|
||||
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
|
||||
spdlog::info("Received {} bytes", msg->wireSize);
|
||||
|
||||
std::stringstream ss;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
log("ping_pong: connected");
|
||||
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
ss << "ping_pong: disconnected:"
|
||||
<< " code " << msg->closeInfo.code << " reason " << msg->closeInfo.reason
|
||||
<< msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
ss << "ping_pong: received message: " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
ss << "ping_pong: received ping message: " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
ss << "ping_pong: received pong message: " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "Invalid ix::WebSocketMessageType";
|
||||
log(ss.str());
|
||||
}
|
||||
});
|
||||
|
||||
_webSocket.start();
|
||||
}
|
||||
|
||||
void WebSocketPingPong::ping(const std::string& text)
|
||||
{
|
||||
if (!_webSocket.ping(text).success)
|
||||
{
|
||||
std::cerr << "Failed to send ping message. Message too long (> 125 bytes) or endpoint "
|
||||
"is disconnected"
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketPingPong::send(const std::string& text)
|
||||
{
|
||||
_webSocket.send(text);
|
||||
}
|
||||
|
||||
int ws_ping_pong_main(const std::string& url, const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
spdlog::info("Type Ctrl-D to exit prompt...");
|
||||
WebSocketPingPong webSocketPingPong(url, tlsOptions);
|
||||
webSocketPingPong.start();
|
||||
|
||||
while (true)
|
||||
{
|
||||
std::string text;
|
||||
std::cout << "> " << std::flush;
|
||||
std::getline(std::cin, text);
|
||||
|
||||
if (!std::cin)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (text == "/close")
|
||||
{
|
||||
webSocketPingPong.send(text);
|
||||
}
|
||||
else
|
||||
{
|
||||
webSocketPingPong.ping(text);
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << std::endl;
|
||||
webSocketPingPong.stop();
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,282 +0,0 @@
|
||||
/*
|
||||
* ws_receiver.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <fstream>
|
||||
#include <ixcrypto/IXBase64.h>
|
||||
#include <ixcrypto/IXHash.h>
|
||||
#include <ixcrypto/IXUuid.h>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <msgpack11/msgpack11.hpp>
|
||||
#include <mutex>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
#include <vector>
|
||||
|
||||
using msgpack11::MsgPack;
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class WebSocketReceiver
|
||||
{
|
||||
public:
|
||||
WebSocketReceiver(const std::string& _url,
|
||||
bool enablePerMessageDeflate,
|
||||
int delayMs,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
void subscribe(const std::string& channel);
|
||||
void start();
|
||||
void stop();
|
||||
|
||||
void waitForConnection();
|
||||
void waitForMessage();
|
||||
void handleMessage(const std::string& str);
|
||||
|
||||
private:
|
||||
std::string _url;
|
||||
std::string _id;
|
||||
ix::WebSocket _webSocket;
|
||||
bool _enablePerMessageDeflate;
|
||||
int _delayMs;
|
||||
int _receivedFragmentCounter;
|
||||
|
||||
std::mutex _conditionVariableMutex;
|
||||
std::condition_variable _condition;
|
||||
|
||||
std::string extractFilename(const std::string& path);
|
||||
void handleError(const std::string& errMsg, const std::string& id);
|
||||
void log(const std::string& msg);
|
||||
};
|
||||
|
||||
WebSocketReceiver::WebSocketReceiver(const std::string& url,
|
||||
bool enablePerMessageDeflate,
|
||||
int delayMs,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
: _url(url)
|
||||
, _enablePerMessageDeflate(enablePerMessageDeflate)
|
||||
, _delayMs(delayMs)
|
||||
, _receivedFragmentCounter(0)
|
||||
{
|
||||
_webSocket.disableAutomaticReconnection();
|
||||
_webSocket.setTLSOptions(tlsOptions);
|
||||
}
|
||||
|
||||
void WebSocketReceiver::stop()
|
||||
{
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
void WebSocketReceiver::log(const std::string& msg)
|
||||
{
|
||||
spdlog::info(msg);
|
||||
}
|
||||
|
||||
void WebSocketReceiver::waitForConnection()
|
||||
{
|
||||
spdlog::info("{}: Connecting...", "ws_receive");
|
||||
|
||||
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
|
||||
_condition.wait(lock);
|
||||
}
|
||||
|
||||
void WebSocketReceiver::waitForMessage()
|
||||
{
|
||||
spdlog::info("{}: Waiting for message...", "ws_receive");
|
||||
|
||||
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
|
||||
_condition.wait(lock);
|
||||
}
|
||||
|
||||
// We should cleanup the file name and full path further to remove .. as well
|
||||
std::string WebSocketReceiver::extractFilename(const std::string& path)
|
||||
{
|
||||
std::string::size_type idx;
|
||||
|
||||
idx = path.rfind('/');
|
||||
if (idx != std::string::npos)
|
||||
{
|
||||
std::string filename = path.substr(idx + 1);
|
||||
return filename;
|
||||
}
|
||||
else
|
||||
{
|
||||
return path;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketReceiver::handleError(const std::string& errMsg, const std::string& id)
|
||||
{
|
||||
std::map<MsgPack, MsgPack> pdu;
|
||||
pdu["kind"] = "error";
|
||||
pdu["id"] = id;
|
||||
pdu["message"] = errMsg;
|
||||
|
||||
MsgPack msg(pdu);
|
||||
_webSocket.sendBinary(msg.dump());
|
||||
}
|
||||
|
||||
void WebSocketReceiver::handleMessage(const std::string& str)
|
||||
{
|
||||
spdlog::info("ws_receive: Received message: {}", str.size());
|
||||
|
||||
std::string errMsg;
|
||||
MsgPack data = MsgPack::parse(str, errMsg);
|
||||
if (!errMsg.empty())
|
||||
{
|
||||
handleError("ws_receive: Invalid MsgPack", std::string());
|
||||
return;
|
||||
}
|
||||
|
||||
spdlog::info("id: {}", data["id"].string_value());
|
||||
|
||||
std::vector<uint8_t> content = data["content"].binary_items();
|
||||
spdlog::info("ws_receive: Content size: {}", content.size());
|
||||
|
||||
// Validate checksum
|
||||
uint64_t cksum = ix::djb2Hash(content);
|
||||
auto cksumRef = data["djb2_hash"].string_value();
|
||||
|
||||
spdlog::info("ws_receive: Computed hash: {}", cksum);
|
||||
spdlog::info("ws_receive: Reference hash: {}", cksumRef);
|
||||
|
||||
if (std::to_string(cksum) != cksumRef)
|
||||
{
|
||||
handleError("Hash mismatch.", std::string());
|
||||
return;
|
||||
}
|
||||
|
||||
std::string filename = data["filename"].string_value();
|
||||
filename = extractFilename(filename);
|
||||
|
||||
std::string filenameTmp = filename + ".tmp";
|
||||
|
||||
spdlog::info("ws_receive: Writing to disk: {}", filenameTmp);
|
||||
std::ofstream out(filenameTmp);
|
||||
out.write((char*) &content.front(), content.size());
|
||||
out.close();
|
||||
|
||||
spdlog::info("ws_receive: Renaming {} to {}", filenameTmp, filename);
|
||||
rename(filenameTmp.c_str(), filename.c_str());
|
||||
|
||||
std::map<MsgPack, MsgPack> pdu;
|
||||
pdu["ack"] = true;
|
||||
pdu["id"] = data["id"];
|
||||
pdu["filename"] = data["filename"];
|
||||
|
||||
spdlog::info("Sending ack to sender");
|
||||
MsgPack msg(pdu);
|
||||
_webSocket.sendBinary(msg.dump());
|
||||
}
|
||||
|
||||
void WebSocketReceiver::start()
|
||||
{
|
||||
_webSocket.setUrl(_url);
|
||||
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
|
||||
_enablePerMessageDeflate, false, false, 15, 15);
|
||||
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("ws_receive: Connecting to url: ") + _url);
|
||||
|
||||
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
|
||||
std::stringstream ss;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
_condition.notify_one();
|
||||
|
||||
log("ws_receive: connected");
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
ss << "ws_receive: connection closed:";
|
||||
ss << " code " << msg->closeInfo.code;
|
||||
ss << " reason " << msg->closeInfo.reason << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
ss << "ws_receive: transfered " << msg->wireSize << " bytes";
|
||||
log(ss.str());
|
||||
handleMessage(msg->str);
|
||||
_condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Fragment)
|
||||
{
|
||||
ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
|
||||
log(ss.str());
|
||||
|
||||
if (_delayMs > 0)
|
||||
{
|
||||
// Introduce an arbitrary delay, to simulate a slow connection
|
||||
std::chrono::duration<double, std::milli> duration(_delayMs);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "ws_receive ";
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
log("ws_receive: received ping");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
log("ws_receive: received pong");
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "ws_receive: Invalid ix::WebSocketMessageType";
|
||||
log(ss.str());
|
||||
}
|
||||
});
|
||||
|
||||
_webSocket.start();
|
||||
}
|
||||
|
||||
void wsReceive(const std::string& url,
|
||||
bool enablePerMessageDeflate,
|
||||
int delayMs,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate, delayMs, tlsOptions);
|
||||
webSocketReceiver.start();
|
||||
|
||||
webSocketReceiver.waitForConnection();
|
||||
|
||||
webSocketReceiver.waitForMessage();
|
||||
|
||||
std::chrono::duration<double, std::milli> duration(1000);
|
||||
std::this_thread::sleep_for(duration);
|
||||
|
||||
spdlog::info("ws_receive: Done !");
|
||||
webSocketReceiver.stop();
|
||||
}
|
||||
|
||||
int ws_receive_main(const std::string& url,
|
||||
bool enablePerMessageDeflate,
|
||||
int delayMs,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
wsReceive(url, enablePerMessageDeflate, delayMs, tlsOptions);
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,84 +0,0 @@
|
||||
/*
|
||||
* ws_redis_cli.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "linenoise.hpp"
|
||||
#include <iostream>
|
||||
#include <ixredis/IXRedisClient.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password)
|
||||
{
|
||||
RedisClient redisClient;
|
||||
if (!redisClient.connect(hostname, port))
|
||||
{
|
||||
spdlog::info("Cannot connect to redis host");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!password.empty())
|
||||
{
|
||||
std::string authResponse;
|
||||
if (!redisClient.auth(password, authResponse))
|
||||
{
|
||||
std::stringstream ss;
|
||||
spdlog::info("Cannot authenticated to redis");
|
||||
return 1;
|
||||
}
|
||||
spdlog::info("Auth response: {}", authResponse);
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
// Read line
|
||||
std::string line;
|
||||
std::string prompt;
|
||||
prompt += hostname;
|
||||
prompt += ":";
|
||||
prompt += std::to_string(port);
|
||||
prompt += "> ";
|
||||
auto quit = linenoise::Readline(prompt.c_str(), line);
|
||||
|
||||
if (quit)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
std::stringstream ss(line);
|
||||
std::vector<std::string> args;
|
||||
std::string arg;
|
||||
|
||||
while (ss.good())
|
||||
{
|
||||
ss >> arg;
|
||||
args.push_back(arg);
|
||||
}
|
||||
|
||||
std::string errMsg;
|
||||
auto response = redisClient.send(args, errMsg);
|
||||
if (!errMsg.empty())
|
||||
{
|
||||
spdlog::error("(error) {}", errMsg);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (response.first != RespType::String)
|
||||
{
|
||||
std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")"
|
||||
<< " ";
|
||||
}
|
||||
|
||||
std::cout << response.second << std::endl;
|
||||
}
|
||||
|
||||
linenoise::AddHistory(line.c_str());
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,51 +0,0 @@
|
||||
/*
|
||||
* ws_redis_publish.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <ixredis/IXRedisClient.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_redis_publish_main(const std::string& hostname,
|
||||
int port,
|
||||
const std::string& password,
|
||||
const std::string& channel,
|
||||
const std::string& message,
|
||||
int count)
|
||||
{
|
||||
RedisClient redisClient;
|
||||
if (!redisClient.connect(hostname, port))
|
||||
{
|
||||
spdlog::info("Cannot connect to redis host");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!password.empty())
|
||||
{
|
||||
std::string authResponse;
|
||||
if (!redisClient.auth(password, authResponse))
|
||||
{
|
||||
std::stringstream ss;
|
||||
spdlog::info("Cannot authenticated to redis");
|
||||
return 1;
|
||||
}
|
||||
spdlog::info("Auth response: {}", authResponse);
|
||||
}
|
||||
|
||||
std::string errMsg;
|
||||
for (int i = 0; i < count; i++)
|
||||
{
|
||||
if (!redisClient.publish(channel, message, errMsg))
|
||||
{
|
||||
spdlog::error("Error publishing to channel {} error {}", channel, errMsg);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,31 +0,0 @@
|
||||
/*
|
||||
* ws_redis_publish.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <ixredis/IXRedisServer.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_redis_server_main(int port, const std::string& hostname)
|
||||
{
|
||||
spdlog::info("Listening on {}:{}", hostname, port);
|
||||
|
||||
ix::RedisServer server(port, hostname);
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
{
|
||||
spdlog::info(res.second);
|
||||
return 1;
|
||||
}
|
||||
|
||||
server.start();
|
||||
server.wait();
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,80 +0,0 @@
|
||||
/*
|
||||
* ws_redis_subscribe.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <ixredis/IXRedisClient.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_redis_subscribe_main(const std::string& hostname,
|
||||
int port,
|
||||
const std::string& password,
|
||||
const std::string& channel,
|
||||
bool verbose)
|
||||
{
|
||||
RedisClient redisClient;
|
||||
if (!redisClient.connect(hostname, port))
|
||||
{
|
||||
spdlog::info("Cannot connect to redis host");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!password.empty())
|
||||
{
|
||||
std::string authResponse;
|
||||
if (!redisClient.auth(password, authResponse))
|
||||
{
|
||||
std::stringstream ss;
|
||||
spdlog::info("Cannot authenticated to redis");
|
||||
return 1;
|
||||
}
|
||||
spdlog::info("Auth response: {}", authResponse);
|
||||
}
|
||||
|
||||
std::atomic<int> msgPerSeconds(0);
|
||||
std::atomic<int> msgCount(0);
|
||||
|
||||
auto callback = [&msgPerSeconds, &msgCount, verbose](const std::string& message) {
|
||||
if (verbose)
|
||||
{
|
||||
spdlog::info("recived: {}", message);
|
||||
}
|
||||
|
||||
msgPerSeconds++;
|
||||
msgCount++;
|
||||
};
|
||||
|
||||
auto responseCallback = [](const std::string& redisResponse) {
|
||||
spdlog::info("Redis subscribe response: {}", redisResponse);
|
||||
};
|
||||
|
||||
auto timer = [&msgPerSeconds, &msgCount] {
|
||||
while (true)
|
||||
{
|
||||
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
|
||||
|
||||
msgPerSeconds = 0;
|
||||
auto duration = std::chrono::seconds(1);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
};
|
||||
|
||||
std::thread t(timer);
|
||||
|
||||
spdlog::info("Subscribing to {} ...", channel);
|
||||
if (!redisClient.subscribe(channel, responseCallback, callback))
|
||||
{
|
||||
spdlog::info("Error subscribing to channel {}", channel);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
311
ws/ws_send.cpp
311
ws/ws_send.cpp
@ -1,311 +0,0 @@
|
||||
/*
|
||||
* ws_send.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXBench.h"
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <fstream>
|
||||
#include <ixcrypto/IXBase64.h>
|
||||
#include <ixcrypto/IXHash.h>
|
||||
#include <ixcrypto/IXUuid.h>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <msgpack11/msgpack11.hpp>
|
||||
#include <mutex>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
#include <vector>
|
||||
|
||||
using msgpack11::MsgPack;
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class WebSocketSender
|
||||
{
|
||||
public:
|
||||
WebSocketSender(const std::string& _url,
|
||||
bool enablePerMessageDeflate,
|
||||
const ix::SocketTLSOptions& tlsOptions);
|
||||
|
||||
void subscribe(const std::string& channel);
|
||||
void start();
|
||||
void stop();
|
||||
|
||||
void waitForConnection();
|
||||
void waitForAck();
|
||||
|
||||
bool sendMessage(const std::string& filename, bool throttle);
|
||||
|
||||
private:
|
||||
std::string _url;
|
||||
std::string _id;
|
||||
ix::WebSocket _webSocket;
|
||||
bool _enablePerMessageDeflate;
|
||||
|
||||
std::atomic<bool> _connected;
|
||||
|
||||
std::mutex _conditionVariableMutex;
|
||||
std::condition_variable _condition;
|
||||
|
||||
void log(const std::string& msg);
|
||||
};
|
||||
|
||||
WebSocketSender::WebSocketSender(const std::string& url,
|
||||
bool enablePerMessageDeflate,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
: _url(url)
|
||||
, _enablePerMessageDeflate(enablePerMessageDeflate)
|
||||
, _connected(false)
|
||||
{
|
||||
_webSocket.disableAutomaticReconnection();
|
||||
_webSocket.setTLSOptions(tlsOptions);
|
||||
}
|
||||
|
||||
void WebSocketSender::stop()
|
||||
{
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
void WebSocketSender::log(const std::string& msg)
|
||||
{
|
||||
spdlog::info(msg);
|
||||
}
|
||||
|
||||
void WebSocketSender::waitForConnection()
|
||||
{
|
||||
spdlog::info("{}: Connecting...", "ws_send");
|
||||
|
||||
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
|
||||
_condition.wait(lock);
|
||||
}
|
||||
|
||||
void WebSocketSender::waitForAck()
|
||||
{
|
||||
spdlog::info("{}: Waiting for ack...", "ws_send");
|
||||
|
||||
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
|
||||
_condition.wait(lock);
|
||||
}
|
||||
|
||||
std::vector<uint8_t> load(const std::string& path)
|
||||
{
|
||||
std::vector<uint8_t> memblock;
|
||||
|
||||
std::ifstream file(path);
|
||||
if (!file.is_open()) return memblock;
|
||||
|
||||
file.seekg(0, file.end);
|
||||
std::streamoff size = file.tellg();
|
||||
file.seekg(0, file.beg);
|
||||
|
||||
memblock.resize((size_t) size);
|
||||
file.read((char*) &memblock.front(), static_cast<std::streamsize>(size));
|
||||
|
||||
return memblock;
|
||||
}
|
||||
|
||||
void WebSocketSender::start()
|
||||
{
|
||||
_webSocket.setUrl(_url);
|
||||
|
||||
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
|
||||
_enablePerMessageDeflate, false, false, 15, 15);
|
||||
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("ws_send: Connecting to url: ") + _url);
|
||||
|
||||
_webSocket.setOnMessageCallback([this](const WebSocketMessagePtr& msg) {
|
||||
std::stringstream ss;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
_connected = true;
|
||||
|
||||
_condition.notify_one();
|
||||
|
||||
log("ws_send: connected");
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
_connected = false;
|
||||
|
||||
ss << "ws_send: connection closed:";
|
||||
ss << " code " << msg->closeInfo.code;
|
||||
ss << " reason " << msg->closeInfo.reason << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
_condition.notify_one();
|
||||
|
||||
ss << "ws_send: received message (" << msg->wireSize << " bytes)";
|
||||
log(ss.str());
|
||||
|
||||
std::string errMsg;
|
||||
MsgPack data = MsgPack::parse(msg->str, errMsg);
|
||||
if (!errMsg.empty())
|
||||
{
|
||||
spdlog::info("Invalid MsgPack response");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string id = data["id"].string_value();
|
||||
if (_id != id)
|
||||
{
|
||||
spdlog::info("Invalid id");
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "ws_send ";
|
||||
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
spdlog::info("ws_send: received ping");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
spdlog::info("ws_send: received pong");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Fragment)
|
||||
{
|
||||
spdlog::info("ws_send: received fragment");
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "ws_send: Invalid ix::WebSocketMessageType";
|
||||
log(ss.str());
|
||||
}
|
||||
});
|
||||
|
||||
_webSocket.start();
|
||||
}
|
||||
|
||||
bool WebSocketSender::sendMessage(const std::string& filename, bool throttle)
|
||||
{
|
||||
std::vector<uint8_t> content;
|
||||
{
|
||||
Bench bench("ws_send: load file from disk");
|
||||
content = load(filename);
|
||||
}
|
||||
|
||||
_id = uuid4();
|
||||
|
||||
std::map<MsgPack, MsgPack> pdu;
|
||||
pdu["kind"] = "send";
|
||||
pdu["id"] = _id;
|
||||
pdu["content"] = content;
|
||||
auto hash = djb2Hash(content);
|
||||
pdu["djb2_hash"] = std::to_string(hash);
|
||||
pdu["filename"] = filename;
|
||||
|
||||
MsgPack msg(pdu);
|
||||
|
||||
auto serializedMsg = msg.dump();
|
||||
spdlog::info("ws_send: sending {} bytes", serializedMsg.size());
|
||||
|
||||
Bench bench("ws_send: Sending file through websocket");
|
||||
auto result =
|
||||
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
|
||||
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
|
||||
|
||||
if (throttle)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
|
||||
return _connected;
|
||||
});
|
||||
|
||||
if (!result.success)
|
||||
{
|
||||
spdlog::error("ws_send: Error sending file.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!_connected)
|
||||
{
|
||||
spdlog::error("ws_send: Got disconnected from the server");
|
||||
return false;
|
||||
}
|
||||
|
||||
spdlog::info("ws_send: sent {} bytes", serializedMsg.size());
|
||||
|
||||
do
|
||||
{
|
||||
size_t bufferedAmount = _webSocket.bufferedAmount();
|
||||
spdlog::info("ws_send: {} bytes left to be sent", bufferedAmount);
|
||||
|
||||
std::chrono::duration<double, std::milli> duration(500);
|
||||
std::this_thread::sleep_for(duration);
|
||||
} while (_webSocket.bufferedAmount() != 0 && _connected);
|
||||
|
||||
if (_connected)
|
||||
{
|
||||
bench.report();
|
||||
auto duration = bench.getDuration();
|
||||
auto transferRate = 1000 * content.size() / duration;
|
||||
transferRate /= (1024 * 1024);
|
||||
spdlog::info("ws_send: Send transfer rate: {} MB/s", transferRate);
|
||||
}
|
||||
else
|
||||
{
|
||||
spdlog::error("ws_send: Got disconnected from the server");
|
||||
}
|
||||
|
||||
return _connected;
|
||||
}
|
||||
|
||||
void wsSend(const std::string& url,
|
||||
const std::string& path,
|
||||
bool enablePerMessageDeflate,
|
||||
bool throttle,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
WebSocketSender webSocketSender(url, enablePerMessageDeflate, tlsOptions);
|
||||
webSocketSender.start();
|
||||
|
||||
webSocketSender.waitForConnection();
|
||||
|
||||
spdlog::info("ws_send: Sending...");
|
||||
if (webSocketSender.sendMessage(path, throttle))
|
||||
{
|
||||
webSocketSender.waitForAck();
|
||||
spdlog::info("ws_send: Done !");
|
||||
}
|
||||
else
|
||||
{
|
||||
spdlog::error("ws_send: Error sending file.");
|
||||
}
|
||||
|
||||
webSocketSender.stop();
|
||||
}
|
||||
|
||||
int ws_send_main(const std::string& url,
|
||||
const std::string& path,
|
||||
bool disablePerMessageDeflate,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
bool throttle = false;
|
||||
bool enablePerMessageDeflate = !disablePerMessageDeflate;
|
||||
|
||||
wsSend(url, path, enablePerMessageDeflate, throttle, tlsOptions);
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,112 +0,0 @@
|
||||
/*
|
||||
* ws_sentry_minidump_upload.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <fstream>
|
||||
#include <ixsentry/IXSentryClient.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
// Assume the file exists
|
||||
std::string readBytes(const std::string& path)
|
||||
{
|
||||
std::vector<uint8_t> memblock;
|
||||
std::ifstream file(path);
|
||||
|
||||
file.seekg(0, file.end);
|
||||
std::streamoff size = file.tellg();
|
||||
file.seekg(0, file.beg);
|
||||
|
||||
memblock.resize(size);
|
||||
|
||||
file.read((char*) &memblock.front(), static_cast<std::streamsize>(size));
|
||||
|
||||
std::string bytes(memblock.begin(), memblock.end());
|
||||
return bytes;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_sentry_minidump_upload(const std::string& metadataPath,
|
||||
const std::string& minidump,
|
||||
const std::string& project,
|
||||
const std::string& key,
|
||||
bool verbose)
|
||||
{
|
||||
SentryClient sentryClient((std::string()));
|
||||
|
||||
// Read minidump file from disk
|
||||
std::string minidumpBytes = readBytes(minidump);
|
||||
|
||||
// Read json data
|
||||
std::string sentryMetadata = readBytes(metadataPath);
|
||||
|
||||
std::atomic<bool> done(false);
|
||||
|
||||
sentryClient.uploadMinidump(
|
||||
sentryMetadata,
|
||||
minidumpBytes,
|
||||
project,
|
||||
key,
|
||||
verbose,
|
||||
[verbose, &done](const HttpResponsePtr& response) {
|
||||
if (verbose)
|
||||
{
|
||||
for (auto it : response->headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
|
||||
spdlog::info("Upload size: {}", response->uploadSize);
|
||||
spdlog::info("Download size: {}", response->downloadSize);
|
||||
|
||||
spdlog::info("Status: {}", response->statusCode);
|
||||
if (response->errorCode != HttpErrorCode::Ok)
|
||||
{
|
||||
spdlog::info("error message: {}", response->errorMsg);
|
||||
}
|
||||
|
||||
if (response->headers["Content-Type"] != "application/octet-stream")
|
||||
{
|
||||
spdlog::info("payload: {}", response->payload);
|
||||
}
|
||||
}
|
||||
|
||||
if (response->statusCode != 200)
|
||||
{
|
||||
spdlog::error("Error sending data to sentry: {}", response->statusCode);
|
||||
spdlog::error("Status: {}", response->statusCode);
|
||||
spdlog::error("Response: {}", response->payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
spdlog::info("Event sent to sentry");
|
||||
}
|
||||
|
||||
done = true;
|
||||
});
|
||||
|
||||
int i = 0;
|
||||
|
||||
while (!done)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
|
||||
if (i++ > 5000) break; // wait 5 seconds max
|
||||
}
|
||||
|
||||
if (!done)
|
||||
{
|
||||
spdlog::error("Error: timing out trying to sent a crash to sentry");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
@ -1,88 +0,0 @@
|
||||
/*
|
||||
* snake_run.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <fstream>
|
||||
#include <ixsnake/IXSnakeServer.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace
|
||||
{
|
||||
std::vector<uint8_t> load(const std::string& path)
|
||||
{
|
||||
std::vector<uint8_t> memblock;
|
||||
|
||||
std::ifstream file(path);
|
||||
if (!file.is_open()) return memblock;
|
||||
|
||||
file.seekg(0, file.end);
|
||||
std::streamoff size = file.tellg();
|
||||
file.seekg(0, file.beg);
|
||||
|
||||
memblock.resize((size_t) size);
|
||||
file.read((char*) &memblock.front(), static_cast<std::streamsize>(size));
|
||||
|
||||
return memblock;
|
||||
}
|
||||
|
||||
std::string readAsString(const std::string& path)
|
||||
{
|
||||
auto vec = load(path);
|
||||
return std::string(vec.begin(), vec.end());
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_snake_main(int port,
|
||||
const std::string& hostname,
|
||||
const std::string& redisHosts,
|
||||
int redisPort,
|
||||
const std::string& redisPassword,
|
||||
bool verbose,
|
||||
const std::string& appsConfigPath,
|
||||
const SocketTLSOptions& socketTLSOptions,
|
||||
bool disablePong,
|
||||
const std::string& republishChannel)
|
||||
{
|
||||
snake::AppConfig appConfig;
|
||||
appConfig.port = port;
|
||||
appConfig.hostname = hostname;
|
||||
appConfig.verbose = verbose;
|
||||
appConfig.redisPort = redisPort;
|
||||
appConfig.redisPassword = redisPassword;
|
||||
appConfig.socketTLSOptions = socketTLSOptions;
|
||||
appConfig.disablePong = disablePong;
|
||||
appConfig.republishChannel = republishChannel;
|
||||
|
||||
// Parse config file
|
||||
auto str = readAsString(appsConfigPath);
|
||||
if (str.empty())
|
||||
{
|
||||
spdlog::error("Cannot read content of {}", appsConfigPath);
|
||||
return 1;
|
||||
}
|
||||
|
||||
spdlog::error(str);
|
||||
auto apps = nlohmann::json::parse(str);
|
||||
appConfig.apps = apps["apps"];
|
||||
|
||||
std::string token;
|
||||
std::stringstream tokenStream(redisHosts);
|
||||
while (std::getline(tokenStream, token, ';'))
|
||||
{
|
||||
appConfig.redisHosts.push_back(token);
|
||||
}
|
||||
|
||||
// Display config on the terminal for debugging
|
||||
dumpConfig(appConfig);
|
||||
|
||||
snake::SnakeServer snakeServer(appConfig);
|
||||
snakeServer.runForever();
|
||||
|
||||
return 0; // should never reach this
|
||||
}
|
||||
} // namespace ix
|
@ -1,135 +0,0 @@
|
||||
/*
|
||||
* ws_transfer.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <ixwebsocket/IXWebSocketServer.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <sstream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int ws_transfer_main(int port,
|
||||
const std::string& hostname,
|
||||
const ix::SocketTLSOptions& tlsOptions)
|
||||
{
|
||||
spdlog::info("Listening on {}:{}", hostname, port);
|
||||
|
||||
ix::WebSocketServer server(port, hostname);
|
||||
server.setTLSOptions(tlsOptions);
|
||||
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
ConnectionInfo& connectionInfo,
|
||||
WebSocket& webSocket,
|
||||
const WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionInfo.remoteIp;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
spdlog::info("ws_transfer: New connection");
|
||||
spdlog::info("remote ip: {}", remoteIp);
|
||||
spdlog::info("id: {}", connectionState->getId());
|
||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||
spdlog::info("Headers:");
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
spdlog::info("ws_transfer: Closed connection: client id {} code {} reason {}",
|
||||
connectionState->getId(),
|
||||
msg->closeInfo.code,
|
||||
msg->closeInfo.reason);
|
||||
auto remaining = server.getClients().size() - 1;
|
||||
spdlog::info("ws_transfer: {} remaining clients", remaining);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "ws_transfer: Connection error: " << msg->errorInfo.reason << std::endl;
|
||||
ss << "#retries: " << msg->errorInfo.retries << std::endl;
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
spdlog::info(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Fragment)
|
||||
{
|
||||
spdlog::info("ws_transfer: Received message fragment ");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
spdlog::info("ws_transfer: Received {} bytes", msg->wireSize);
|
||||
size_t receivers = 0;
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client.get() != &webSocket)
|
||||
{
|
||||
auto readyState = client->getReadyState();
|
||||
auto id = connectionState->getId();
|
||||
|
||||
if (readyState == ReadyState::Open)
|
||||
{
|
||||
++receivers;
|
||||
client->send(
|
||||
msg->str, msg->binary, [&id](int current, int total) -> bool {
|
||||
spdlog::info("{}: [client {}]: Step {} out of {}",
|
||||
"ws_transfer",
|
||||
id,
|
||||
current,
|
||||
total);
|
||||
return true;
|
||||
});
|
||||
do
|
||||
{
|
||||
size_t bufferedAmount = client->bufferedAmount();
|
||||
|
||||
spdlog::info("{}: [client {}]: {} bytes left to send",
|
||||
"ws_transfer",
|
||||
id,
|
||||
bufferedAmount);
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
|
||||
} while (client->bufferedAmount() != 0 &&
|
||||
client->getReadyState() == ReadyState::Open);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string readyStateString =
|
||||
readyState == ReadyState::Connecting
|
||||
? "Connecting"
|
||||
: readyState == ReadyState::Closing ? "Closing" : "Closed";
|
||||
size_t bufferedAmount = client->bufferedAmount();
|
||||
|
||||
spdlog::info(
|
||||
"{}: [client {}]: has readystate {} bytes left to be sent {}",
|
||||
"ws_transfer",
|
||||
id,
|
||||
readyStateString,
|
||||
bufferedAmount);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!receivers)
|
||||
{
|
||||
spdlog::info("ws_transfer: no remaining receivers");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
{
|
||||
spdlog::info(res.second);
|
||||
return 1;
|
||||
}
|
||||
|
||||
server.start();
|
||||
server.wait();
|
||||
|
||||
return 0;
|
||||
}
|
||||
} // namespace ix
|
Reference in New Issue
Block a user