Compare commits

...

53 Commits

Author SHA1 Message Date
b8265bf7f2 (ws) -q option imply info log level, not warning log level 2020-08-11 15:44:06 -07:00
e7c4f0b171 add documentation for the websocket send callback and the send return type (fix #239) 2020-08-11 11:24:00 -07:00
12f36b61ff (websocket server) Handle programmer error when the server callback is not registered properly (fix #227) 2020-08-06 04:40:32 -07:00
b15c4189f5 add csharp/dotnet devnull client to measure througput with different runtimes 2020-08-05 13:59:26 -07:00
74d3278258 add python test file to benchmark how many messages can be received per second 2020-08-04 10:53:35 -07:00
831152b906 add a devnull like sample code using libwebsockets C library, to see how many messages per second a client library can receive (answer is about the same as IXWebSocket) 2020-08-02 19:26:19 -07:00
7c81a98632 Add a node.js benchmarking test program, to see how fast node can receive messages. 2020-08-02 14:21:11 -07:00
6e47c62c06 (ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235). 2020-08-02 12:41:34 -07:00
bcae7f326d (ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235) 2020-08-02 12:09:13 -07:00
d719c41e31 (ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235) 2020-08-02 11:53:21 -07:00
6f0307fb35 (build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent 2020-07-31 22:54:57 -07:00
2e3d625c1e (websocket client) onProgressCallback not called for short messages on a websocket (fix #233) 2020-07-29 17:47:33 -07:00
029289413c ws test shell script / add option so tune how large sent file will be 2020-07-29 17:46:37 -07:00
4d51098c86 (websocket client) heartbeat is not sent at the requested frequency (fix #232) 2020-07-29 11:24:42 -07:00
c2b05af022 can compile on macOS against jsoncpp installed from homebrew 2020-07-28 22:00:29 -07:00
e85f975ab0 compiler warning fixes 2020-07-28 21:46:26 -07:00
dc77d62a5d (ixcobra) CobraConnection: unsubscribe from all subscriptions when disconnecting 2020-07-28 10:32:18 -07:00
4f41f209a2 (socket utility) move ix::getFreePort to ixwebsocket library 2020-07-27 18:17:13 -07:00
5940e53d77 enable cobra tests which were disabled 2020-07-27 17:39:53 -07:00
22dffd5b7e WebSocket::close is re-entrant 2020-07-27 17:38:33 -07:00
af2f31045d snake server / join subscription background thread in the ConnectionState destructor + attach cobra message subscription id to the connection state instead of having it be a local reference that gets unbound 2020-07-27 17:35:03 -07:00
5daa59f9f3 minor makefile tweaks 2020-07-27 17:19:05 -07:00
2ea9d06a93 fix typo in unittest string description: ununexpected -> unsubscribed 2020-07-27 17:16:53 -07:00
847fc142d1 (ixwebsocket server) change legacy api with 2 nested callbacks, so that the first api takes a weak_ptr<WebSocket> as its first argument 2020-07-25 11:42:07 -07:00
0388459bd0 (ixwebsocket) add WebSocketProxyServer, from ws. Still need to make the interface better. 2020-07-25 11:26:06 -07:00
9a47ec1217 (ixsnake) uses an std::thread to handle redis subscriptions (2 unittest still failing) 2020-07-24 18:12:07 -07:00
45a40c8640 new Dockerfile to run locally the test on an Ubuntu 20.04 system 2020-07-24 14:46:09 -07:00
e34f1c30d6 (ws) port broadcast_server sub-command to the new server API 2020-07-24 14:35:07 -07:00
c14a4c0e3e formatting 2020-07-24 13:04:14 -07:00
b146e93a3a (unittest) port most unittests to the new server API 2020-07-24 12:49:36 -07:00
9957ec9724 (ws) port ws snake to the new server API 2020-07-24 12:33:17 -07:00
78a42f61bd add tool to ease making commits 2020-07-24 11:53:09 -07:00
e78019dad6 (ws) port ws transfer to the new server API 2020-07-24 11:52:16 -07:00
0f026c5da2 (websocket client) reset WebSocketTransport onClose callback in the WebSocket destructor 2020-07-24 10:03:29 -07:00
c26a2d5d39 (websocket server) reset client websocket callback when the connection is closed 2020-07-24 09:41:02 -07:00
2798886c0b (websocket server) add a new simpler API to handle client connections / that API does not trigger a memory leak while the previous one did 2020-07-23 19:29:41 -07:00
ffde283a4b (build) merge platform specific files which were used to have different implementations for setting a thread name into a single file, to make it easier to include every source files and build the ixwebsocket library (fix #226) 2020-07-17 11:58:06 -07:00
f7031d0d3e set thread name in only one file 2020-07-17 11:43:50 -07:00
595e6c57df IXSelectInterruptPipe.h included in cmake on windows but compiled out 2020-07-17 11:33:02 -07:00
87709c201e (socket server) bump default max connection count from 32 to 128 2020-07-10 17:11:11 -07:00
e70d83ace1 (snake) implement super simple stream sql expression support in snake server 2020-07-10 16:10:59 -07:00
ca829a3a98 implement very very simple stream sql support 2020-07-10 16:07:51 -07:00
26a1e63626 snake: stream sql mock + add republished channel option 2020-07-10 15:06:55 -07:00
c98959b895 comment out unittest which cannot be activated yet 2020-07-09 10:34:52 -07:00
baf18648e9 Added test for websocket leak (#225)
* Added test for websocket leak

* Fixed test
2020-07-09 10:19:44 -07:00
b21306376b uwp build fix + more ivp6 support 2020-07-08 12:38:55 -07:00
fbd17685a1 (socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made (see #222) / only ipv4 is handled 2020-07-08 12:10:35 -07:00
3a673575dd clang format 2020-07-08 10:39:46 -07:00
d5e51840ab use const iterators 2020-07-08 10:34:14 -07:00
543c2086b2 more templates in WebSocketTransport 2020-07-07 21:26:42 -07:00
95eab59c08 WebSocketPerMessageDeflateCompressor can work with vector or std::string 2020-07-07 21:26:04 -07:00
e9e768a288 better unittest for IXWebSocketPerMessageDeflateCompressor 2020-07-07 21:15:34 -07:00
e2180a1f31 add unittest for IXWebSocketPerMessageDeflateCompressor 2020-07-07 20:56:38 -07:00
93 changed files with 2437 additions and 800 deletions

View File

@ -10,12 +10,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- uses: seanmiddleditch/gha-setup-vsdevenv@master - uses: seanmiddleditch/gha-setup-vsdevenv@master
- run: |
vcpkg install zlib:x64-uwp
- run: | - run: |
mkdir build mkdir build
cd build cd build
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 .. cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 -DUSE_ZLIB=0 ..
- run: cmake --build build - run: cmake --build build
# #

View File

@ -10,10 +10,11 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- uses: seanmiddleditch/gha-setup-vsdevenv@master - uses: seanmiddleditch/gha-setup-vsdevenv@master
- run: |
vcpkg install zlib:x64-windows
- run: | - run: |
mkdir build mkdir build
cd build cd build
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 .. cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 -DUSE_ZLIB=0 ..
- run: cmake --build build - run: cmake --build build
#- run: ../build/test/ixwebsocket_unittest.exe
# working-directory: test

View File

@ -30,12 +30,15 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXConnectionState.cpp ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXExponentialBackoff.cpp ixwebsocket/IXExponentialBackoff.cpp
ixwebsocket/IXGetFreePort.cpp
ixwebsocket/IXHttp.cpp ixwebsocket/IXHttp.cpp
ixwebsocket/IXHttpClient.cpp ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXHttpServer.cpp ixwebsocket/IXHttpServer.cpp
ixwebsocket/IXNetSystem.cpp ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSelectInterrupt.cpp ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSetThreadName.cpp
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketConnect.cpp ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp ixwebsocket/IXSocketFactory.cpp
@ -51,6 +54,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketPerMessageDeflate.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketProxyServer.cpp
ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketTransport.cpp
) )
@ -58,9 +62,11 @@ set( IXWEBSOCKET_SOURCES
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
ixwebsocket/IXBench.h ixwebsocket/IXBench.h
ixwebsocket/IXCancellationRequest.h ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionInfo.h
ixwebsocket/IXConnectionState.h ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h ixwebsocket/IXDNSLookup.h
ixwebsocket/IXExponentialBackoff.h ixwebsocket/IXExponentialBackoff.h
ixwebsocket/IXGetFreePort.h
ixwebsocket/IXHttp.h ixwebsocket/IXHttp.h
ixwebsocket/IXHttpClient.h ixwebsocket/IXHttpClient.h
ixwebsocket/IXHttpServer.h ixwebsocket/IXHttpServer.h
@ -68,6 +74,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXProgressCallback.h ixwebsocket/IXProgressCallback.h
ixwebsocket/IXSelectInterrupt.h ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSetThreadName.h ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h ixwebsocket/IXSocketConnect.h
@ -92,29 +99,13 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketPerMessageDeflate.h ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketProxyServer.h
ixwebsocket/IXWebSocketSendInfo.h ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXWebSocketVersion.h ixwebsocket/IXWebSocketVersion.h
) )
if (UNIX)
# Linux, Mac, iOS, Android
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.cpp )
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.h )
endif()
# Platform specific code
if (APPLE)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/apple/IXSetThreadName_apple.cpp)
elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
elseif (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/freebsd/IXSetThreadName_freebsd.cpp)
else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
endif()
option(USE_TLS "Enable TLS support" FALSE) option(USE_TLS "Enable TLS support" FALSE)
if (USE_TLS) if (USE_TLS)
@ -199,10 +190,16 @@ if (USE_TLS)
endif() endif()
endif() endif()
# Use ZLIB_ROOT CMake variable if you need to use your own zlib option(USE_ZLIB "Enable zlib support" TRUE)
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS}) if (USE_ZLIB)
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES}) # Use ZLIB_ROOT CMake variable if you need to use your own zlib
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_ZLIB)
endif()
if (WIN32) if (WIN32)
target_link_libraries(ixwebsocket wsock32 ws2_32 shlwapi) target_link_libraries(ixwebsocket wsock32 ws2_32 shlwapi)

View File

@ -0,0 +1,23 @@
# Build time
FROM ubuntu:groovy as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++ libssl-dev libz-dev make python ninja-build
RUN apt-get -y install cmake
RUN apt-get -y install gdb
COPY . /opt
WORKDIR /opt
#
# To use the container interactively for debugging/building
# 1. Build with
# CMD ["ls"]
# 2. Run with
# docker run --entrypoint sh -it docker-game-eng-dev.addsrv.com/ws:9.10.6
#
RUN ["make", "test"]
# CMD ["ls"]

View File

@ -1,6 +1,99 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [10.1.7] - 2020-08-11
(ws) -q option imply info log level, not warning log level
## [10.1.6] - 2020-08-06
(websocket server) Handle programmer error when the server callback is not registered properly (fix #227)
## [10.1.5] - 2020-08-02
(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235).
## [10.1.4] - 2020-08-02
(ws) Add a new ws sub-command, echo_client. This command sends a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235)
## [10.1.3] - 2020-08-02
(ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235)
## [10.1.2] - 2020-07-31
(build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent
## [10.1.1] - 2020-07-29
(websocket client) onProgressCallback not called for short messages on a websocket (fix #233)
## [10.1.0] - 2020-07-29
(websocket client) heartbeat is not sent at the requested frequency (fix #232)
## [10.0.3] - 2020-07-28
compiler warning fixes
## [10.0.2] - 2020-07-28
(ixcobra) CobraConnection: unsubscribe from all subscriptions when disconnecting
## [10.0.1] - 2020-07-27
(socket utility) move ix::getFreePort to ixwebsocket library
## [10.0.0] - 2020-07-25
(ixwebsocket server) change legacy api with 2 nested callbacks, so that the first api takes a weak_ptr<WebSocket> as its first argument
## [9.10.7] - 2020-07-25
(ixwebsocket) add WebSocketProxyServer, from ws. Still need to make the interface better.
## [9.10.6] - 2020-07-24
(ws) port broadcast_server sub-command to the new server API
## [9.10.5] - 2020-07-24
(unittest) port most unittests to the new server API
## [9.10.3] - 2020-07-24
(ws) port ws transfer to the new server API
## [9.10.2] - 2020-07-24
(websocket client) reset WebSocketTransport onClose callback in the WebSocket destructor
## [9.10.1] - 2020-07-24
(websocket server) reset client websocket callback when the connection is closed
## [9.10.0] - 2020-07-23
(websocket server) add a new simpler API to handle client connections / that API does not trigger a memory leak while the previous one did
## [9.9.3] - 2020-07-17
(build) merge platform specific files which were used to have different implementations for setting a thread name into a single file, to make it easier to include every source files and build the ixwebsocket library (fix #226)
## [9.9.2] - 2020-07-10
(socket server) bump default max connection count from 32 to 128
## [9.9.1] - 2020-07-10
(snake) implement super simple stream sql expression support in snake server
## [9.9.0] - 2020-07-08
(socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made
## [9.8.6] - 2020-07-06 ## [9.8.6] - 2020-07-06
(cmake) change the way zlib and openssl are searched (cmake) change the way zlib and openssl are searched

View File

@ -17,6 +17,7 @@ There is a unittest which can be executed by typing `make test`.
Options for building: Options for building:
* `-DUSE_ZLIB=1` will enable zlib support, required for http client + server + websocket per message deflate extension
* `-DUSE_TLS=1` will enable TLS support * `-DUSE_TLS=1` will enable TLS support
* `-DUSE_OPEN_SSL=1` will use [openssl](https://www.openssl.org/) for the TLS support (default on Linux and Windows) * `-DUSE_OPEN_SSL=1` will use [openssl](https://www.openssl.org/) for the TLS support (default on Linux and Windows)
* `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support * `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support

37
docs/performance.md Normal file
View File

@ -0,0 +1,37 @@
## WebSocket Client performance
We will run a client and a server on the same machine, connecting to localhost. This bench is run on a MacBook Pro from 2015. We can receive over 200,000 (small) messages per second, another way to put it is that it takes 5 micro-second to receive and process one message. This is an indication about the minimal latency to receive messages.
### Receiving messages
By using the push_server ws sub-command, the server will send the same message in a loop to any connected client.
```
ws push_server -q --send_msg 'yo'
```
By using the echo_client ws sub-command, with the -m (mute or no_send), we will display statistics on how many messages we can receive per second.
```
$ ws echo_client -m ws://localhost:8008
[2020-08-02 12:31:17.284] [info] ws_echo_client: connected
[2020-08-02 12:31:17.284] [info] Uri: /
[2020-08-02 12:31:17.284] [info] Headers:
[2020-08-02 12:31:17.284] [info] Connection: Upgrade
[2020-08-02 12:31:17.284] [info] Sec-WebSocket-Accept: byy/pMK2d0PtRwExaaiOnXJTQHo=
[2020-08-02 12:31:17.284] [info] Server: ixwebsocket/10.1.4 macos ssl/SecureTransport zlib 1.2.11
[2020-08-02 12:31:17.284] [info] Upgrade: websocket
[2020-08-02 12:31:17.663] [info] messages received: 0 per second 2595307 total
[2020-08-02 12:31:18.668] [info] messages received: 79679 per second 2674986 total
[2020-08-02 12:31:19.668] [info] messages received: 207438 per second 2882424 total
[2020-08-02 12:31:20.673] [info] messages received: 209207 per second 3091631 total
[2020-08-02 12:31:21.676] [info] messages received: 216056 per second 3307687 total
[2020-08-02 12:31:22.680] [info] messages received: 214927 per second 3522614 total
[2020-08-02 12:31:23.684] [info] messages received: 216960 per second 3739574 total
[2020-08-02 12:31:24.688] [info] messages received: 215232 per second 3954806 total
[2020-08-02 12:31:25.691] [info] messages received: 212300 per second 4167106 total
[2020-08-02 12:31:26.694] [info] messages received: 212501 per second 4379607 total
[2020-08-02 12:31:27.699] [info] messages received: 212330 per second 4591937 total
[2020-08-02 12:31:28.702] [info] messages received: 216511 per second 4808448 total
```

View File

@ -67,9 +67,28 @@ webSocket.stop()
### Sending messages ### Sending messages
`websocket.send("foo")` will send a message. `WebSocketSendInfo result = websocket.send("foo")` will send a message.
If the connection was closed and sending failed, the return value will be set to false. If the connection was closed, sending will fail, and the success field of the result object will be set to false. There could also be a compression error in which case the compressError field will be set to true. The payloadSize field and wireSize fields will tell you respectively how much bytes the message weight, and how many bytes were sent on the wire (potentially compressed + counting the message header (a few bytes).
There is an optional progress callback that can be passed in as the second argument. If a message is large it will be fragmented into chunks which will be sent independantly. Everytime the we can write a fragment into the OS network cache, the callback will be invoked. If a user wants to cancel a slow send, false should be returned from within the callback.
Here is an example code snippet copied from the ws send sub-command. Each fragment weights 32K, so the total integer is the wireSize divided by 32K. As an example if you are sending 32M of data, uncompressed, total will be 1000. current will be set to 0 for the first fragment, then 1, 2 etc...
```
auto result =
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
if (throttle)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
return _connected;
});
```
### ReadyState ### ReadyState
@ -246,6 +265,10 @@ uint32_t m = webSocket.getMaxWaitBetweenReconnectionRetries();
## WebSocket server API ## WebSocket server API
### Legacy api
This api was actually changed to take a weak_ptr<WebSocket> as the first argument to setOnConnectionCallback ; previously it would take a shared_ptr<WebSocket> which was creating cycles and then memory leaks problems.
```cpp ```cpp
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
@ -256,38 +279,49 @@ uint32_t m = webSocket.getMaxWaitBetweenReconnectionRetries();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<WebSocket> webSocket, [&server](std::weak_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
webSocket->setOnMessageCallback( std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
{ auto ws = webSocket.lock();
if (msg->type == ix::WebSocketMessageType::Open) if (ws)
{
ws->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
{ {
std::cerr << "New connection" << std::endl; if (msg->type == ix::WebSocketMessageType::Open)
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cout << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cout << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
auto ws = webSocket.lock();
if (ws)
{
ws->send(msg->str, msg->binary);
}
} }
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket->send(msg->str, msg->binary);
} }
} }
); );
@ -309,6 +343,74 @@ server.wait();
``` ```
### New api
The new API does not require to use 2 nested callbacks, which is a bit annoying. The real fix is that there was a memory leak due to a shared_ptr cycle, due to passing down a shared_ptr<WebSocket> down to the callbacks.
The webSocket reference is guaranteed to be always valid ; by design the callback will never be invoked with a null webSocket object.
```cpp
#include <ixwebsocket/IXWebSocketServer.h>
...
// Run a server on localhost at a given port.
// Bound host name, max connections and listen backlog can also be passed in as parameters.
ix::WebSocketServer server(port);
server.setOnClientMessageCallback(std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg)
{
// The ConnectionInfo object contains information about the connection,
// at this point only the client ip address and the port.
std::cout << "Remote ip: " << connectionInfo.remoteIp << std::endl;
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cout << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cout << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket.send(msg->str, msg->binary);
}
);
auto res = server.listen();
if (!res.first)
{
// Error handling
return 1;
}
// Run the server in the background. Server can be stoped by calling server.stop()
server.start();
// Block until server.stop() is called.
server.wait();
```
## HTTP client API ## HTTP client API
```cpp ```cpp
@ -417,11 +519,14 @@ If you want to handle how requests are processed, implement the setOnConnectionC
```cpp ```cpp
setOnConnectionCallback( setOnConnectionCallback(
[this](HttpRequestPtr request, [this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
{ {
// Build a string for the response // Build a string for the response
std::stringstream ss; std::stringstream ss;
ss << request->method ss << connectionInfo->remoteIp
<< " "
<< request->method
<< " " << " "
<< request->uri; << request->uri;

View File

@ -111,6 +111,12 @@ namespace ix
void CobraConnection::disconnect() void CobraConnection::disconnect()
{ {
auto subscriptionIds = getSubscriptionsIds();
for (auto&& subscriptionId : subscriptionIds)
{
unsubscribe(subscriptionId);
}
_authenticated = false; _authenticated = false;
_webSocket->stop(); _webSocket->stop();
} }
@ -614,6 +620,18 @@ namespace ix
_webSocket->send(pdu.toStyledString()); _webSocket->send(pdu.toStyledString());
} }
std::vector<std::string> CobraConnection::getSubscriptionsIds()
{
std::vector<std::string> subscriptionIds;
std::lock_guard<std::mutex> lock(_cbsMutex);
for (auto&& it : _cbs)
{
subscriptionIds.push_back(it.first);
}
return subscriptionIds;
}
// //
// Enqueue strategy drops old messages when we are at full capacity // Enqueue strategy drops old messages when we are at full capacity
// //

View File

@ -163,6 +163,9 @@ namespace ix
/// Tells whether the internal queue is empty or not /// Tells whether the internal queue is empty or not
bool isQueueEmpty(); bool isQueueEmpty();
/// Retrieve all subscriptions ids
std::vector<std::string> getSubscriptionsIds();
/// ///
/// Member variables /// Member variables
/// ///

View File

@ -45,8 +45,11 @@ namespace ix
} }
void RedisServer::handleConnection(std::unique_ptr<Socket> socket, void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
_connectedClientsCount++; _connectedClientsCount++;
while (!_stopHandlingConnections) while (!_stopHandlingConnections)

View File

@ -44,7 +44,8 @@ namespace ix
// Methods // Methods
virtual void handleConnection(std::unique_ptr<Socket>, virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) final; std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
bool startsWith(const std::string& str, const std::string& start); bool startsWith(const std::string& str, const std::string& start);

View File

@ -7,12 +7,14 @@ set (IXSNAKE_SOURCES
ixsnake/IXSnakeServer.cpp ixsnake/IXSnakeServer.cpp
ixsnake/IXSnakeProtocol.cpp ixsnake/IXSnakeProtocol.cpp
ixsnake/IXAppConfig.cpp ixsnake/IXAppConfig.cpp
ixsnake/IXStreamSql.cpp
) )
set (IXSNAKE_HEADERS set (IXSNAKE_HEADERS
ixsnake/IXSnakeServer.h ixsnake/IXSnakeServer.h
ixsnake/IXSnakeProtocol.h ixsnake/IXSnakeProtocol.h
ixsnake/IXAppConfig.h ixsnake/IXAppConfig.h
ixsnake/IXStreamSql.h
) )
add_library(ixsnake STATIC add_library(ixsnake STATIC

View File

@ -26,6 +26,12 @@ namespace snake
} }
auto roles = appConfig.apps[appkey]["roles"]; auto roles = appConfig.apps[appkey]["roles"];
if (roles.count(role) == 0)
{
std::cerr << "Missing role " << role << std::endl;
return std::string();
}
auto channel = roles[role]["secret"]; auto channel = roles[role]["secret"];
return channel; return channel;
} }

View File

@ -33,6 +33,9 @@ namespace snake
// Misc // Misc
bool verbose; bool verbose;
bool disablePong; bool disablePong;
// If non empty, every published message gets republished to a given channel
std::string republishChannel;
}; };
bool isAppKeyValid(const AppConfig& appConfig, std::string appkey); bool isAppKeyValid(const AppConfig& appConfig, std::string appkey);

View File

@ -7,15 +7,21 @@
#pragma once #pragma once
#include <ixredis/IXRedisClient.h> #include <ixredis/IXRedisClient.h>
#include <future> #include <thread>
#include <ixwebsocket/IXConnectionState.h> #include <ixwebsocket/IXConnectionState.h>
#include <string> #include <string>
#include "IXStreamSql.h"
namespace snake namespace snake
{ {
class SnakeConnectionState : public ix::ConnectionState class SnakeConnectionState : public ix::ConnectionState
{ {
public: public:
virtual ~SnakeConnectionState()
{
stopSubScriptionThread();
}
std::string getNonce() std::string getNonce()
{ {
return _nonce; return _nonce;
@ -30,6 +36,7 @@ namespace snake
{ {
return _appkey; return _appkey;
} }
void setAppkey(const std::string& appkey) void setAppkey(const std::string& appkey)
{ {
_appkey = appkey; _appkey = appkey;
@ -39,6 +46,7 @@ namespace snake
{ {
return _role; return _role;
} }
void setRole(const std::string& role) void setRole(const std::string& role)
{ {
_role = role; _role = role;
@ -49,7 +57,24 @@ namespace snake
return _redisClient; return _redisClient;
} }
std::future<void> fut; void stopSubScriptionThread()
{
if (subscriptionThread.joinable())
{
subscriptionRedisClient.stop();
subscriptionThread.join();
}
}
// We could make those accessible through methods
std::thread subscriptionThread;
std::string appChannel;
std::string subscriptionId;
uint64_t id;
std::unique_ptr<StreamSql> streamSql;
ix::RedisClient subscriptionRedisClient;
ix::RedisClient::OnRedisSubscribeResponseCallback onRedisSubscribeResponseCallback;
ix::RedisClient::OnRedisSubscribeCallback onRedisSubscribeCallback;
private: private:
std::string _nonce; std::string _nonce;

View File

@ -18,21 +18,22 @@
namespace snake namespace snake
{ {
void handleError(const std::string& action, void handleError(const std::string& action,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
nlohmann::json pdu, uint64_t pduId,
const std::string& errMsg) const std::string& errMsg)
{ {
std::string actionError(action); std::string actionError(action);
actionError += "/error"; actionError += "/error";
nlohmann::json response = { nlohmann::json response = {
{"action", actionError}, {"id", pdu.value("id", 1)}, {"body", {{"reason", errMsg}}}}; {"action", actionError}, {"id", pduId}, {"body", {{"reason", errMsg}}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
} }
void handleHandshake(std::shared_ptr<SnakeConnectionState> state, void handleHandshake(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
const nlohmann::json& pdu) const nlohmann::json& pdu,
uint64_t pduId)
{ {
std::string role = pdu["body"]["data"]["role"]; std::string role = pdu["body"]["data"]["role"];
@ -41,7 +42,7 @@ namespace snake
nlohmann::json response = { nlohmann::json response = {
{"action", "auth/handshake/ok"}, {"action", "auth/handshake/ok"},
{"id", pdu.value("id", 1)}, {"id", pduId},
{"body", {"body",
{ {
{"data", {{"nonce", state->getNonce()}, {"connection_id", state->getId()}}}, {"data", {{"nonce", state->getNonce()}, {"connection_id", state->getId()}}},
@ -49,13 +50,14 @@ namespace snake
auto serializedResponse = response.dump(); auto serializedResponse = response.dump();
ws->sendText(serializedResponse); ws.sendText(serializedResponse);
} }
void handleAuth(std::shared_ptr<SnakeConnectionState> state, void handleAuth(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
const AppConfig& appConfig, const AppConfig& appConfig,
const nlohmann::json& pdu) const nlohmann::json& pdu,
uint64_t pduId)
{ {
auto secret = getRoleSecret(appConfig, state->appkey(), state->role()); auto secret = getRoleSecret(appConfig, state->appkey(), state->role());
@ -63,9 +65,9 @@ namespace snake
{ {
nlohmann::json response = { nlohmann::json response = {
{"action", "auth/authenticate/error"}, {"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)}, {"id", pduId},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}}; {"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
return; return;
} }
@ -79,19 +81,21 @@ namespace snake
{"action", "auth/authenticate/error"}, {"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)}, {"id", pdu.value("id", 1)},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid hash"}}}}; {"body", {{"error", "authentication_failed"}, {"reason", "invalid hash"}}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
return; return;
} }
nlohmann::json response = { nlohmann::json response = {
{"action", "auth/authenticate/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}}; {"action", "auth/authenticate/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
} }
void handlePublish(std::shared_ptr<SnakeConnectionState> state, void handlePublish(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
const nlohmann::json& pdu) const AppConfig& appConfig,
const nlohmann::json& pdu,
uint64_t pduId)
{ {
std::vector<std::string> channels; std::vector<std::string> channels;
@ -111,10 +115,16 @@ namespace snake
{ {
std::stringstream ss; std::stringstream ss;
ss << "Missing channels or channel field in publish data"; ss << "Missing channels or channel field in publish data";
handleError("rtm/publish", ws, pdu, ss.str()); handleError("rtm/publish", ws, pduId, ss.str());
return; return;
} }
// add an extra channel if the config has one specified
if (!appConfig.republishChannel.empty())
{
channels.push_back(appConfig.republishChannel);
}
for (auto&& channel : channels) for (auto&& channel : channels)
{ {
std::stringstream ss; std::stringstream ss;
@ -125,7 +135,7 @@ namespace snake
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot publish to redis host " << errMsg; ss << "Cannot publish to redis host " << errMsg;
handleError("rtm/publish", ws, pdu, ss.str()); handleError("rtm/publish", ws, pduId, ss.str());
return; return;
} }
} }
@ -133,26 +143,27 @@ namespace snake
nlohmann::json response = { nlohmann::json response = {
{"action", "rtm/publish/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}}; {"action", "rtm/publish/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
} }
// //
// FIXME: this is not cancellable. We should be able to cancel the redis subscription // FIXME: this is not cancellable. We should be able to cancel the redis subscription
// //
void handleRedisSubscription(std::shared_ptr<SnakeConnectionState> state, void handleSubscribe(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
const AppConfig& appConfig, const AppConfig& appConfig,
const nlohmann::json& pdu) const nlohmann::json& pdu,
uint64_t pduId)
{ {
std::string channel = pdu["body"]["channel"]; std::string channel = pdu["body"]["channel"];
std::string subscriptionId = channel; state->subscriptionId = channel;
std::stringstream ss; std::stringstream ss;
ss << state->appkey() << "::" << channel; ss << state->appkey() << "::" << channel;
std::string appChannel(ss.str()); state->appChannel = ss.str();
ix::RedisClient redisClient; ix::RedisClient& redisClient = state->subscriptionRedisClient;
int port = appConfig.redisPort; int port = appConfig.redisPort;
auto urls = appConfig.redisHosts; auto urls = appConfig.redisHosts;
@ -163,7 +174,7 @@ namespace snake
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot connect to redis host " << hostname << ":" << port; ss << "Cannot connect to redis host " << hostname << ":" << port;
handleError("rtm/subscribe", ws, pdu, ss.str()); handleError("rtm/subscribe", ws, pduId, ss.str());
return; return;
} }
@ -175,80 +186,90 @@ namespace snake
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot authenticated to redis"; ss << "Cannot authenticated to redis";
handleError("rtm/subscribe", ws, pdu, ss.str()); handleError("rtm/subscribe", ws, pduId, ss.str());
return; return;
} }
} }
int id = 0; std::string filterStr;
auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) { if (pdu["body"].find("filter") != pdu["body"].end())
{
std::string filterStr = pdu["body"]["filter"];
}
state->streamSql = std::make_unique<StreamSql>(filterStr);
state->id = 0;
state->onRedisSubscribeCallback = [&ws, state](const std::string& messageStr) {
auto msg = nlohmann::json::parse(messageStr); auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"]; msg = msg["body"]["message"];
if (state->streamSql->valid() && !state->streamSql->match(msg))
{
return;
}
nlohmann::json response = { nlohmann::json response = {
{"action", "rtm/subscription/data"}, {"action", "rtm/subscription/data"},
{"id", id++}, {"id", state->id++},
{"body", {"body",
{{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}}; {{"subscription_id", state->subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
}; };
auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse) { state->onRedisSubscribeResponseCallback = [&ws, state, pduId](const std::string& redisResponse) {
std::stringstream ss; std::stringstream ss;
ss << "Redis Response: " << redisResponse << "..."; ss << "Redis Response: " << redisResponse << "...";
ix::CoreLogger::log(ss.str().c_str()); ix::CoreLogger::log(ss.str().c_str());
// Success // Success
nlohmann::json response = {{"action", "rtm/subscribe/ok"}, nlohmann::json response = {{"action", "rtm/subscribe/ok"},
{"id", pdu.value("id", 1)}, {"id", pduId},
{"body", {{"subscription_id", subscriptionId}}}}; {"body", {{"subscription_id", state->subscriptionId}}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
}; };
{ {
std::stringstream ss; std::stringstream ss;
ss << "Subscribing to " << appChannel << "..."; ss << "Subscribing to " << state->appChannel << "...";
ix::CoreLogger::log(ss.str().c_str()); ix::CoreLogger::log(ss.str().c_str());
} }
if (!redisClient.subscribe(appChannel, responseCallback, callback)) auto subscription = [&redisClient, state, &ws, pduId]
{ {
std::stringstream ss; if (!redisClient.subscribe(state->appChannel,
ss << "Error subscribing to channel " << appChannel; state->onRedisSubscribeResponseCallback,
handleError("rtm/subscribe", ws, pdu, ss.str()); state->onRedisSubscribeCallback))
return; {
} std::stringstream ss;
} ss << "Error subscribing to channel " << state->appChannel;
handleError("rtm/subscribe", ws, pduId, ss.str());
return;
}
};
void handleSubscribe(std::shared_ptr<SnakeConnectionState> state, state->subscriptionThread = std::thread(subscription);
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
state->fut =
std::async(std::launch::async, handleRedisSubscription, state, ws, appConfig, pdu);
} }
void handleUnSubscribe(std::shared_ptr<SnakeConnectionState> state, void handleUnSubscribe(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
const nlohmann::json& pdu) const nlohmann::json& pdu,
uint64_t pduId)
{ {
// extract subscription_id // extract subscription_id
auto body = pdu["body"]; auto body = pdu["body"];
auto subscriptionId = body["subscription_id"]; auto subscriptionId = body["subscription_id"];
state->redisClient().stop(); state->stopSubScriptionThread();
nlohmann::json response = {{"action", "rtm/unsubscribe/ok"}, nlohmann::json response = {{"action", "rtm/unsubscribe/ok"},
{"id", pdu.value("id", 1)}, {"id", pduId},
{"body", {{"subscription_id", subscriptionId}}}}; {"body", {{"subscription_id", subscriptionId}}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
} }
void processCobraMessage(std::shared_ptr<SnakeConnectionState> state, void processCobraMessage(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
const AppConfig& appConfig, const AppConfig& appConfig,
const std::string& str) const std::string& str)
{ {
@ -263,31 +284,32 @@ namespace snake
ss << "malformed json pdu: " << e.what() << " -> " << str << ""; ss << "malformed json pdu: " << e.what() << " -> " << str << "";
nlohmann::json response = {{"body", {{"error", "invalid_json"}, {"reason", ss.str()}}}}; nlohmann::json response = {{"body", {{"error", "invalid_json"}, {"reason", ss.str()}}}};
ws->sendText(response.dump()); ws.sendText(response.dump());
return; return;
} }
auto action = pdu["action"]; auto action = pdu["action"];
uint64_t pduId = pdu.value("id", 1);
if (action == "auth/handshake") if (action == "auth/handshake")
{ {
handleHandshake(state, ws, pdu); handleHandshake(state, ws, pdu, pduId);
} }
else if (action == "auth/authenticate") else if (action == "auth/authenticate")
{ {
handleAuth(state, ws, appConfig, pdu); handleAuth(state, ws, appConfig, pdu, pduId);
} }
else if (action == "rtm/publish") else if (action == "rtm/publish")
{ {
handlePublish(state, ws, pdu); handlePublish(state, ws, appConfig, pdu, pduId);
} }
else if (action == "rtm/subscribe") else if (action == "rtm/subscribe")
{ {
handleSubscribe(state, ws, appConfig, pdu); handleSubscribe(state, ws, appConfig, pdu, pduId);
} }
else if (action == "rtm/unsubscribe") else if (action == "rtm/unsubscribe")
{ {
handleUnSubscribe(state, ws, pdu); handleUnSubscribe(state, ws, pdu, pduId);
} }
else else
{ {

View File

@ -20,7 +20,7 @@ namespace snake
struct AppConfig; struct AppConfig;
void processCobraMessage(std::shared_ptr<SnakeConnectionState> state, void processCobraMessage(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, ix::WebSocket& ws,
const AppConfig& appConfig, const AppConfig& appConfig,
const std::string& str); const std::string& str);
} // namespace snake } // namespace snake

View File

@ -59,65 +59,68 @@ namespace snake
}; };
_server.setConnectionStateFactory(factory); _server.setConnectionStateFactory(factory);
_server.setOnConnectionCallback( _server.setOnClientMessageCallback(
[this](std::shared_ptr<ix::WebSocket> webSocket, [this](std::shared_ptr<ix::ConnectionState> connectionState,
std::shared_ptr<ix::ConnectionState> connectionState) { ix::ConnectionInfo& connectionInfo,
ix::WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState); auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
auto remoteIp = connectionInfo.remoteIp;
std::stringstream ss;
ss << "[" << state->getId() << "] ";
webSocket->setOnMessageCallback( ix::LogLevel logLevel = ix::LogLevel::Debug;
[this, webSocket, state](const ix::WebSocketMessagePtr& msg) { if (msg->type == ix::WebSocketMessageType::Open)
std::stringstream ss; {
ix::LogLevel logLevel = ix::LogLevel::Debug; ss << "New connection" << std::endl;
if (msg->type == ix::WebSocketMessageType::Open) ss << "remote ip: " << remoteIp << std::endl;
{ ss << "id: " << state->getId() << std::endl;
ss << "New connection" << std::endl; ss << "Uri: " << msg->openInfo.uri << std::endl;
ss << "id: " << state->getId() << std::endl; ss << "Headers:" << std::endl;
ss << "Uri: " << msg->openInfo.uri << std::endl; for (auto it : msg->openInfo.headers)
ss << "Headers:" << std::endl; {
for (auto it : msg->openInfo.headers) ss << it.first << ": " << it.second << std::endl;
{ }
ss << it.first << ": " << it.second << std::endl;
}
std::string appkey = parseAppKey(msg->openInfo.uri); std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey); state->setAppkey(appkey);
// Connect to redis first // Connect to redis first
if (!state->redisClient().connect(_appConfig.redisHosts[0], if (!state->redisClient().connect(_appConfig.redisHosts[0],
_appConfig.redisPort)) _appConfig.redisPort))
{ {
ss << "Cannot connect to redis host" << std::endl; ss << "Cannot connect to redis host" << std::endl;
logLevel = ix::LogLevel::Error; logLevel = ix::LogLevel::Error;
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "Closed connection" ss << "Closed connection"
<< " code " << msg->closeInfo.code << " reason " << " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl; << msg->closeInfo.reason << std::endl;
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
logLevel = ix::LogLevel::Error; logLevel = ix::LogLevel::Error;
} }
else if (msg->type == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
ss << "Received message fragment" << std::endl; ss << "Received message fragment" << std::endl;
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "Received " << msg->wireSize << " bytes" << std::endl; ss << "Received " << msg->wireSize << " bytes" << " " << msg->str << std::endl;
processCobraMessage(state, webSocket, _appConfig, msg->str); processCobraMessage(state, webSocket, _appConfig, msg->str);
} }
ix::CoreLogger::log(ss.str().c_str(), logLevel); ix::CoreLogger::log(ss.str().c_str(), logLevel);
}); });
});
auto res = _server.listen(); auto res = _server.listen();
if (!res.first) if (!res.first)

View File

@ -0,0 +1,63 @@
/*
* IXStreamSql.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*
* Super simple hacked up version of a stream sql expression,
* that only supports non nested field evaluation
*/
#include "IXStreamSql.h"
#include <sstream>
#include <iostream>
namespace snake
{
StreamSql::StreamSql(const std::string& sqlFilter)
: _valid(false)
{
std::string token;
std::stringstream tokenStream(sqlFilter);
std::vector<std::string> tokens;
// Split by ' '
while (std::getline(tokenStream, token, ' '))
{
tokens.push_back(token);
}
_valid = tokens.size() == 8;
if (!_valid) return;
_field = tokens[5];
_operator = tokens[6];
_value = tokens[7];
// remove single quotes
_value = _value.substr(1, _value.size() - 2);
if (_operator == "LIKE")
{
_value = _value.substr(1, _value.size() - 2);
}
}
bool StreamSql::valid() const
{
return _valid;
}
bool StreamSql::match(const nlohmann::json& msg)
{
if (!_valid) return false;
if (msg.find(_field) == msg.end())
{
return false;
}
std::string value = msg[_field];
return value == _value;
}
} // namespace snake

View File

@ -0,0 +1,29 @@
/*
* IXStreamSql.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include "nlohmann/json.hpp"
namespace snake
{
class StreamSql
{
public:
StreamSql(const std::string& sqlFilter = std::string());
~StreamSql() = default;
bool match(const nlohmann::json& msg);
bool valid() const;
private:
std::string _field;
std::string _operator;
std::string _value;
bool _valid;
};
}

View File

@ -0,0 +1,25 @@
/*
* IXConnectionInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
namespace ix
{
struct ConnectionInfo
{
std::string remoteIp;
int remotePort;
ConnectionInfo(const std::string& r = std::string(), int p = 0)
: remoteIp(r)
, remotePort(p)
{
;
}
};
} // namespace ix

View File

@ -16,7 +16,10 @@
#include <random> #include <random>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
#ifdef IXWEBSOCKET_USE_ZLIB
#include <zlib.h> #include <zlib.h>
#endif
namespace ix namespace ix
{ {
@ -174,11 +177,13 @@ namespace ix
ss << verb << " " << path << " HTTP/1.1\r\n"; ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n"; ss << "Host: " << host << "\r\n";
#ifdef IXWEBSOCKET_USE_ZLIB
if (args->compress) if (args->compress)
{ {
ss << "Accept-Encoding: gzip" ss << "Accept-Encoding: gzip"
<< "\r\n"; << "\r\n";
} }
#endif
// Append extra headers // Append extra headers
for (auto&& it : args->extraHeaders) for (auto&& it : args->extraHeaders)
@ -495,6 +500,7 @@ namespace ix
downloadSize = payload.size(); downloadSize = payload.size();
#ifdef IXWEBSOCKET_USE_ZLIB
// If the content was compressed with gzip, decode it // If the content was compressed with gzip, decode it
if (headers["Content-Encoding"] == "gzip") if (headers["Content-Encoding"] == "gzip")
{ {
@ -513,6 +519,7 @@ namespace ix
} }
payload = decompressedPayload; payload = decompressedPayload;
} }
#endif
return std::make_shared<HttpResponse>(code, return std::make_shared<HttpResponse>(code,
description, description,
@ -672,6 +679,7 @@ namespace ix
return ss.str(); return ss.str();
} }
#ifdef IXWEBSOCKET_USE_ZLIB
bool HttpClient::gzipInflate(const std::string& in, std::string& out) bool HttpClient::gzipInflate(const std::string& in, std::string& out)
{ {
z_stream inflateState; z_stream inflateState;
@ -716,6 +724,7 @@ namespace ix
inflateEnd(&inflateState); inflateEnd(&inflateState);
return true; return true;
} }
#endif
void HttpClient::log(const std::string& msg, HttpRequestArgsPtr args) void HttpClient::log(const std::string& msg, HttpRequestArgsPtr args)
{ {

View File

@ -90,7 +90,9 @@ namespace ix
private: private:
void log(const std::string& msg, HttpRequestArgsPtr args); void log(const std::string& msg, HttpRequestArgsPtr args);
#ifdef IXWEBSOCKET_USE_ZLIB
bool gzipInflate(const std::string& in, std::string& out); bool gzipInflate(const std::string& in, std::string& out);
#endif
// Async API background thread runner // Async API background thread runner
void run(); void run();
@ -103,9 +105,9 @@ namespace ix
std::thread _thread; std::thread _thread;
std::unique_ptr<Socket> _socket; std::unique_ptr<Socket> _socket;
std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per client) std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per
// the mutex needs to be recursive as this function might // client) the mutex needs to be recursive as this function
// be called recursively to follow HTTP redirections // might be called recursively to follow HTTP redirections
SocketTLSOptions _tlsOptions; SocketTLSOptions _tlsOptions;

View File

@ -9,11 +9,14 @@
#include "IXNetSystem.h" #include "IXNetSystem.h"
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXUserAgent.h" #include "IXUserAgent.h"
#include <cstring>
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
#ifdef IXWEBSOCKET_USE_ZLIB
#include <zlib.h> #include <zlib.h>
#include <cstring> #endif
namespace namespace
{ {
@ -41,6 +44,7 @@ namespace
return std::make_pair(res.first, std::string(vec.begin(), vec.end())); return std::make_pair(res.first, std::string(vec.begin(), vec.end()));
} }
#ifdef IXWEBSOCKET_USE_ZLIB
std::string gzipCompress(const std::string& str) std::string gzipCompress(const std::string& str)
{ {
z_stream zs; // z_stream is zlib's control structure z_stream zs; // z_stream is zlib's control structure
@ -50,8 +54,11 @@ namespace
const int windowBits = 15; const int windowBits = 15;
const int GZIP_ENCODING = 16; const int GZIP_ENCODING = 16;
deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, deflateInit2(&zs,
windowBits | GZIP_ENCODING, 8, Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
windowBits | GZIP_ENCODING,
8,
Z_DEFAULT_STRATEGY); Z_DEFAULT_STRATEGY);
zs.next_in = (Bytef*) str.data(); zs.next_in = (Bytef*) str.data();
@ -69,18 +76,18 @@ namespace
ret = deflate(&zs, Z_FINISH); ret = deflate(&zs, Z_FINISH);
if(outstring.size() < zs.total_out) if (outstring.size() < zs.total_out)
{ {
// append the block to the output string // append the block to the output string
outstring.append(outbuffer, outstring.append(outbuffer, zs.total_out - outstring.size());
zs.total_out - outstring.size());
} }
} while(ret == Z_OK); } while (ret == Z_OK);
deflateEnd(&zs); deflateEnd(&zs);
return outstring; return outstring;
} }
#endif
} // namespace } // namespace
namespace ix namespace ix
@ -113,7 +120,8 @@ namespace ix
} }
void HttpServer::handleConnection(std::unique_ptr<Socket> socket, void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
_connectedClientsCount++; _connectedClientsCount++;
@ -122,7 +130,8 @@ namespace ix
if (std::get<0>(ret)) if (std::get<0>(ret))
{ {
auto response = _onConnectionCallback(std::get<2>(ret), connectionState); auto response =
_onConnectionCallback(std::get<2>(ret), connectionState, std::move(connectionInfo));
if (!Http::sendResponse(response, socket)) if (!Http::sendResponse(response, socket))
{ {
logError("Cannot send response"); logError("Cannot send response");
@ -142,7 +151,8 @@ namespace ix
{ {
setOnConnectionCallback( setOnConnectionCallback(
[this](HttpRequestPtr request, [this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr { std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
std::string uri(request->uri); std::string uri(request->uri);
if (uri.empty() || uri == "/") if (uri.empty() || uri == "/")
{ {
@ -163,16 +173,19 @@ namespace ix
std::string content = res.second; std::string content = res.second;
#ifdef IXWEBSOCKET_USE_ZLIB
std::string acceptEncoding = request->headers["Accept-encoding"]; std::string acceptEncoding = request->headers["Accept-encoding"];
if (acceptEncoding == "*" || acceptEncoding.find("gzip") != std::string::npos) if (acceptEncoding == "*" || acceptEncoding.find("gzip") != std::string::npos)
{ {
content = gzipCompress(content); content = gzipCompress(content);
headers["Content-Encoding"] = "gzip"; headers["Content-Encoding"] = "gzip";
} }
#endif
// Log request // Log request
std::stringstream ss; std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " " ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri << " " << content.size(); << request->uri << " " << content.size();
logInfo(ss.str()); logInfo(ss.str());
@ -196,15 +209,16 @@ namespace ix
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections
// //
setOnConnectionCallback( setOnConnectionCallback(
[this, [this, redirectUrl](HttpRequestPtr request,
redirectUrl](HttpRequestPtr request, std::shared_ptr<ConnectionState> /*connectionState*/,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr { std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["Server"] = userAgent(); headers["Server"] = userAgent();
// Log request // Log request
std::stringstream ss; std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " " ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri; << request->uri;
logInfo(ss.str()); logInfo(ss.str());

View File

@ -23,7 +23,9 @@ namespace ix
{ {
public: public:
using OnConnectionCallback = using OnConnectionCallback =
std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>; std::function<HttpResponsePtr(HttpRequestPtr,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
HttpServer(int port = SocketServer::kDefaultPort, HttpServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
@ -44,7 +46,8 @@ namespace ix
// Methods // Methods
virtual void handleConnection(std::unique_ptr<Socket>, virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) final; std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
void setDefaultConnectionCallback(); void setDefaultConnectionCallback();

View File

@ -5,8 +5,10 @@
*/ */
// //
// On macOS we use UNIX pipes to wake up select. // On UNIX we use pipes to wake up select. There is no way to do that
// on Windows so this file is compiled out on Windows.
// //
#ifndef _WIN32
#include "IXSelectInterruptPipe.h" #include "IXSelectInterruptPipe.h"
@ -144,3 +146,5 @@ namespace ix
return _fildes[kPipeReadIndex]; return _fildes[kPipeReadIndex];
} }
} // namespace ix } // namespace ix
#endif // !_WIN32

View File

@ -0,0 +1,81 @@
/*
* IXSetThreadName.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXSetThreadName.h"
// unix systems
#if defined(__APPLE__) || defined(__linux__) || defined(BSD)
#include <pthread.h>
#endif
// freebsd needs this header as well
#if defined(BSD)
#include <pthread_np.h>
#endif
// Windows
#ifdef _WIN32
#include <Windows.h>
#endif
namespace ix
{
#ifdef _WIN32
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void SetThreadName(DWORD dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(
MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
#endif
void setThreadName(const std::string& name)
{
#if defined(__APPLE__)
//
// Apple reserves 16 bytes for its thread names
// Notice that the Apple version of pthread_setname_np
// does not take a pthread_t argument
//
pthread_setname_np(name.substr(0, 63).c_str());
#elif defined(__linux__)
//
// Linux only reserves 16 bytes for its thread names
// See prctl and PR_SET_NAME property in
// http://man7.org/linux/man-pages/man2/prctl.2.html
//
pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
#elif defined(_WIN32)
SetThreadName(-1, name.c_str());
#elif defined(BSD)
pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str());
#else
// ... assert here ?
#endif
}
} // namespace ix

View File

@ -22,7 +22,7 @@ namespace ix
const int SocketServer::kDefaultPort(8080); const int SocketServer::kDefaultPort(8080);
const std::string SocketServer::kDefaultHost("127.0.0.1"); const std::string SocketServer::kDefaultHost("127.0.0.1");
const int SocketServer::kDefaultTcpBacklog(5); const int SocketServer::kDefaultTcpBacklog(5);
const size_t SocketServer::kDefaultMaxConnections(32); const size_t SocketServer::kDefaultMaxConnections(128);
const int SocketServer::kDefaultAddressFamily(AF_INET); const int SocketServer::kDefaultAddressFamily(AF_INET);
SocketServer::SocketServer( SocketServer::SocketServer(
@ -276,6 +276,7 @@ namespace ix
} }
// Accept a connection. // Accept a connection.
// FIXME: Is this working for ipv6 ?
struct sockaddr_in client; // client address information struct sockaddr_in client; // client address information
int clientFd; // socket connected to client int clientFd; // socket connected to client
socklen_t addressLen = sizeof(client); socklen_t addressLen = sizeof(client);
@ -307,6 +308,45 @@ namespace ix
continue; continue;
} }
std::unique_ptr<ConnectionInfo> connectionInfo;
if (_addressFamily == AF_INET)
{
char remoteIp[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error calling inet_ntop (ipv4): " << err << ", "
<< strerror(err);
logError(ss.str());
Socket::closeSocket(clientFd);
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
}
else // AF_INET6
{
char remoteIp[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error calling inet_ntop (ipv6): " << err << ", "
<< strerror(err);
logError(ss.str());
Socket::closeSocket(clientFd);
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
}
std::shared_ptr<ConnectionState> connectionState; std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory) if (_connectionStateFactory)
{ {
@ -339,10 +379,13 @@ namespace ix
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
_connectionsThreads.push_back(std::make_pair( _connectionsThreads.push_back(
connectionState, std::make_pair(connectionState,
std::thread( std::thread(&SocketServer::handleConnection,
&SocketServer::handleConnection, this, std::move(socket), connectionState))); this,
std::move(socket),
connectionState,
std::move(connectionInfo))));
} }
} }

View File

@ -6,6 +6,7 @@
#pragma once #pragma once
#include "IXConnectionInfo.h"
#include "IXConnectionState.h" #include "IXConnectionState.h"
#include "IXSocketTLSOptions.h" #include "IXSocketTLSOptions.h"
#include <atomic> #include <atomic>
@ -102,7 +103,8 @@ namespace ix
ConnectionStateFactory _connectionStateFactory; ConnectionStateFactory _connectionStateFactory;
virtual void handleConnection(std::unique_ptr<Socket>, virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) = 0; std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
// Returns true if all connection threads are joined // Returns true if all connection threads are joined

View File

@ -32,8 +32,8 @@
#include "IXUrlParser.h" #include "IXUrlParser.h"
#include <algorithm> #include <algorithm>
#include <cstring>
#include <cstdlib> #include <cstdlib>
#include <cstring>
namespace namespace
{ {

View File

@ -8,7 +8,9 @@
#include "IXWebSocketVersion.h" #include "IXWebSocketVersion.h"
#include <sstream> #include <sstream>
#ifdef IXWEBSOCKET_USE_ZLIB
#include <zlib.h> #include <zlib.h>
#endif
// Platform name // Platform name
#if defined(_WIN32) #if defined(_WIN32)
@ -77,8 +79,10 @@ namespace ix
ss << " nossl"; ss << " nossl";
#endif #endif
#ifdef IXWEBSOCKET_USE_ZLIB
// Zlib version // Zlib version
ss << " zlib " << ZLIB_VERSION; ss << " zlib " << ZLIB_VERSION;
#endif
return ss.str(); return ss.str();
} }

View File

@ -46,6 +46,7 @@ namespace ix
WebSocket::~WebSocket() WebSocket::~WebSocket()
{ {
stop(); stop();
_ws.setOnCloseCallback(nullptr);
} }
void WebSocket::setUrl(const std::string& url) void WebSocket::setUrl(const std::string& url)
@ -404,6 +405,11 @@ namespace ix
_onMessageCallback = callback; _onMessageCallback = callback;
} }
bool WebSocket::isOnMessageCallbackRegistered() const
{
return _onMessageCallback != nullptr;
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback) void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
{ {
_onTrafficTrackerCallback = callback; _onTrafficTrackerCallback = callback;

View File

@ -84,6 +84,7 @@ namespace ix
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage); const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
bool isOnMessageCallbackRegistered() const;
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback(); static void resetTrafficTrackerCallback();

View File

@ -28,21 +28,26 @@ namespace ix
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor() WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
: _compressBufferSize(kBufferSize) : _compressBufferSize(kBufferSize)
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
memset(&_deflateState, 0, sizeof(_deflateState)); memset(&_deflateState, 0, sizeof(_deflateState));
_deflateState.zalloc = Z_NULL; _deflateState.zalloc = Z_NULL;
_deflateState.zfree = Z_NULL; _deflateState.zfree = Z_NULL;
_deflateState.opaque = Z_NULL; _deflateState.opaque = Z_NULL;
#endif
} }
WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor() WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor()
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
deflateEnd(&_deflateState); deflateEnd(&_deflateState);
#endif
} }
bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits, bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits,
bool clientNoContextTakeOver) bool clientNoContextTakeOver)
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
int ret = deflateInit2(&_deflateState, int ret = deflateInit2(&_deflateState,
Z_DEFAULT_COMPRESSION, Z_DEFAULT_COMPRESSION,
Z_DEFLATED, Z_DEFLATED,
@ -57,17 +62,49 @@ namespace ix
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH; _flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
return true; return true;
#else
return false;
#endif
} }
bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value, template<typename T>
const std::string& ending) bool WebSocketPerMessageDeflateCompressor::endsWithEmptyUnCompressedBlock(const T& value)
{ {
if (ending.size() > value.size()) return false; if (kEmptyUncompressedBlock.size() > value.size()) return false;
return std::equal(ending.rbegin(), ending.rend(), value.rbegin()); auto N = value.size();
return value[N - 1] == kEmptyUncompressedBlock[3] &&
value[N - 2] == kEmptyUncompressedBlock[2] &&
value[N - 3] == kEmptyUncompressedBlock[1] &&
value[N - 4] == kEmptyUncompressedBlock[0];
} }
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out) bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out)
{ {
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in,
std::vector<uint8_t>& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in,
std::string& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in,
std::vector<uint8_t>& out)
{
return compressData(in, out);
}
template<typename T, typename S>
bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
{
#ifdef IXWEBSOCKET_USE_ZLIB
// //
// 7.2.1. Compression // 7.2.1. Compression
// //
@ -96,7 +133,8 @@ namespace ix
// The normal buffer size should be 6 but // The normal buffer size should be 6 but
// we remove the 4 octets from the tail (#4) // we remove the 4 octets from the tail (#4)
uint8_t buf[2] = {0x02, 0x00}; uint8_t buf[2] = {0x02, 0x00};
out.append((char*) (buf), 2); out.push_back(buf[0]);
out.push_back(buf[1]);
return true; return true;
} }
@ -114,15 +152,18 @@ namespace ix
output = _compressBufferSize - _deflateState.avail_out; output = _compressBufferSize - _deflateState.avail_out;
out.append((char*) (_compressBuffer.get()), output); out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
} while (_deflateState.avail_out == 0); } while (_deflateState.avail_out == 0);
if (endsWith(out, kEmptyUncompressedBlock)) if (endsWithEmptyUnCompressedBlock(out))
{ {
out.resize(out.size() - 4); out.resize(out.size() - 4);
} }
return true; return true;
#else
return false;
#endif
} }
// //
@ -131,6 +172,7 @@ namespace ix
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor() WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
: _compressBufferSize(kBufferSize) : _compressBufferSize(kBufferSize)
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
memset(&_inflateState, 0, sizeof(_inflateState)); memset(&_inflateState, 0, sizeof(_inflateState));
_inflateState.zalloc = Z_NULL; _inflateState.zalloc = Z_NULL;
@ -138,16 +180,20 @@ namespace ix
_inflateState.opaque = Z_NULL; _inflateState.opaque = Z_NULL;
_inflateState.avail_in = 0; _inflateState.avail_in = 0;
_inflateState.next_in = Z_NULL; _inflateState.next_in = Z_NULL;
#endif
} }
WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor() WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor()
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
inflateEnd(&_inflateState); inflateEnd(&_inflateState);
#endif
} }
bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits, bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits,
bool clientNoContextTakeOver) bool clientNoContextTakeOver)
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
int ret = inflateInit2(&_inflateState, -1 * inflateBits); int ret = inflateInit2(&_inflateState, -1 * inflateBits);
if (ret != Z_OK) return false; if (ret != Z_OK) return false;
@ -157,10 +203,14 @@ namespace ix
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH; _flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
return true; return true;
#else
return false;
#endif
} }
bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in, std::string& out) bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in, std::string& out)
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
// //
// 7.2.2. Decompression // 7.2.2. Decompression
// //
@ -197,5 +247,8 @@ namespace ix
} while (_inflateState.avail_out == 0); } while (_inflateState.avail_out == 0);
return true; return true;
#else
return false;
#endif
} }
} // namespace ix } // namespace ix

View File

@ -6,9 +6,12 @@
#pragma once #pragma once
#ifdef IXWEBSOCKET_USE_ZLIB
#include "zlib.h" #include "zlib.h"
#endif
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
namespace ix namespace ix
{ {
@ -20,14 +23,23 @@ namespace ix
bool init(uint8_t deflateBits, bool clientNoContextTakeOver); bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
bool compress(const std::string& in, std::string& out); bool compress(const std::string& in, std::string& out);
bool compress(const std::string& in, std::vector<uint8_t>& out);
bool compress(const std::vector<uint8_t>& in, std::string& out);
bool compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out);
private: private:
static bool endsWith(const std::string& value, const std::string& ending); template<typename T, typename S>
bool compressData(const T& in, S& out);
template<typename T>
bool endsWithEmptyUnCompressedBlock(const T& value);
int _flush; int _flush;
size_t _compressBufferSize; size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer; std::unique_ptr<unsigned char[]> _compressBuffer;
#ifdef IXWEBSOCKET_USE_ZLIB
z_stream _deflateState; z_stream _deflateState;
#endif
}; };
class WebSocketPerMessageDeflateDecompressor class WebSocketPerMessageDeflateDecompressor
@ -43,7 +55,10 @@ namespace ix
int _flush; int _flush;
size_t _compressBufferSize; size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer; std::unique_ptr<unsigned char[]> _compressBuffer;
#ifdef IXWEBSOCKET_USE_ZLIB
z_stream _inflateState; z_stream _inflateState;
#endif
}; };
} // namespace ix } // namespace ix

View File

@ -61,6 +61,7 @@ namespace ix
_clientMaxWindowBits = kDefaultClientMaxWindowBits; _clientMaxWindowBits = kDefaultClientMaxWindowBits;
_serverMaxWindowBits = kDefaultServerMaxWindowBits; _serverMaxWindowBits = kDefaultServerMaxWindowBits;
#ifdef IXWEBSOCKET_USE_ZLIB
// Split by ; // Split by ;
std::string token; std::string token;
std::stringstream tokenStream(extension); std::stringstream tokenStream(extension);
@ -112,6 +113,7 @@ namespace ix
sanitizeClientMaxWindowBits(); sanitizeClientMaxWindowBits();
} }
} }
#endif
} }
void WebSocketPerMessageDeflateOptions::sanitizeClientMaxWindowBits() void WebSocketPerMessageDeflateOptions::sanitizeClientMaxWindowBits()
@ -126,6 +128,7 @@ namespace ix
std::string WebSocketPerMessageDeflateOptions::generateHeader() std::string WebSocketPerMessageDeflateOptions::generateHeader()
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
std::stringstream ss; std::stringstream ss;
ss << "Sec-WebSocket-Extensions: permessage-deflate"; ss << "Sec-WebSocket-Extensions: permessage-deflate";
@ -138,11 +141,18 @@ namespace ix
ss << "\r\n"; ss << "\r\n";
return ss.str(); return ss.str();
#else
return std::string();
#endif
} }
bool WebSocketPerMessageDeflateOptions::enabled() const bool WebSocketPerMessageDeflateOptions::enabled() const
{ {
#ifdef IXWEBSOCKET_USE_ZLIB
return _enabled; return _enabled;
#else
return false;
#endif
} }
bool WebSocketPerMessageDeflateOptions::getClientNoContextTakeover() const bool WebSocketPerMessageDeflateOptions::getClientNoContextTakeover() const

View File

@ -0,0 +1,123 @@
/*
* IXWebSocketProxyServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketProxyServer.h"
#include "IXWebSocketServer.h"
#include <sstream>
namespace ix
{
class ProxyConnectionState : public ix::ConnectionState
{
public:
ProxyConnectionState()
: _connected(false)
{
}
ix::WebSocket& webSocket()
{
return _serverWebSocket;
}
bool isConnected()
{
return _connected;
}
void setConnected()
{
_connected = true;
}
private:
ix::WebSocket _serverWebSocket;
bool _connected;
};
int websocket_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
bool /*verbose*/)
{
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
auto factory = []() -> std::shared_ptr<ix::ConnectionState> {
return std::make_shared<ProxyConnectionState>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback([remoteUrl](std::weak_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
// Server connection
state->webSocket().setOnMessageCallback(
[webSocket, state, remoteIp](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close)
{
state->setTerminated();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto ws = webSocket.lock();
if (ws)
{
ws->send(msg->str, msg->binary);
}
}
});
// Client connection
auto ws = webSocket.lock();
if (ws)
{
ws->setOnMessageCallback([state, remoteUrl](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
state->webSocket().send(msg->str, msg->binary);
}
});
}
});
auto res = server.listen();
if (!res.first)
{
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -0,0 +1,20 @@
/*
* IXWebSocketProxyServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSocketTLSOptions.h"
#include <cstdint>
#include <stddef.h>
#include <string>
namespace ix
{
int websocket_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
bool verbose);
} // namespace ix

View File

@ -71,13 +71,47 @@ namespace ix
_onConnectionCallback = callback; _onConnectionCallback = callback;
} }
void WebSocketServer::setOnClientMessageCallback(const OnClientMessageCallback& callback)
{
_onClientMessageCallback = callback;
}
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket, void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
setThreadName("WebSocketServer::" + connectionState->getId()); setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>(); auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState); if (_onConnectionCallback)
{
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
if (!webSocket->isOnMessageCallbackRegistered())
{
logError("WebSocketServer Application developer error: Server callback improperly "
"registerered.");
logError("Missing call to setOnMessageCallback inside setOnConnectionCallback.");
connectionState->setTerminated();
return;
}
}
else if (_onClientMessageCallback)
{
webSocket->setOnMessageCallback(
[this, &ws = *webSocket.get(), connectionState, &ci = *connectionInfo.get()](
const WebSocketMessagePtr& msg) {
_onClientMessageCallback(connectionState, ci, ws, msg);
});
}
else
{
logError(
"WebSocketServer Application developer error: No server callback is registerered.");
logError("Missing call to setOnConnectionCallback or setOnClientMessageCallback.");
connectionState->setTerminated();
return;
}
webSocket->disableAutomaticReconnection(); webSocket->disableAutomaticReconnection();
@ -111,6 +145,8 @@ namespace ix
logError(ss.str()); logError(ss.str());
} }
webSocket->setOnMessageCallback(nullptr);
// Remove this client from our client set // Remove this client from our client set
{ {
std::lock_guard<std::mutex> lock(_clientsMutex); std::lock_guard<std::mutex> lock(_clientsMutex);

View File

@ -23,7 +23,14 @@ namespace ix
{ {
public: public:
using OnConnectionCallback = using OnConnectionCallback =
std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>; std::function<void(std::weak_ptr<WebSocket>,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
using OnClientMessageCallback = std::function<void(std::shared_ptr<ConnectionState>,
ConnectionInfo&,
WebSocket&,
const WebSocketMessagePtr&)>;
WebSocketServer(int port = SocketServer::kDefaultPort, WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
@ -39,6 +46,7 @@ namespace ix
void disablePerMessageDeflate(); void disablePerMessageDeflate();
void setOnConnectionCallback(const OnConnectionCallback& callback); void setOnConnectionCallback(const OnConnectionCallback& callback);
void setOnClientMessageCallback(const OnClientMessageCallback& callback);
// Get all the connected clients // Get all the connected clients
std::set<std::shared_ptr<WebSocket>> getClients(); std::set<std::shared_ptr<WebSocket>> getClients();
@ -52,6 +60,7 @@ namespace ix
bool _enablePerMessageDeflate; bool _enablePerMessageDeflate;
OnConnectionCallback _onConnectionCallback; OnConnectionCallback _onConnectionCallback;
OnClientMessageCallback _onClientMessageCallback;
std::mutex _clientsMutex; std::mutex _clientsMutex;
std::set<std::shared_ptr<WebSocket>> _clients; std::set<std::shared_ptr<WebSocket>> _clients;
@ -60,7 +69,8 @@ namespace ix
// Methods // Methods
virtual void handleConnection(std::unique_ptr<Socket> socket, virtual void handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState) final; std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo);
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
}; };
} // namespace ix } // namespace ix

View File

@ -65,7 +65,6 @@ namespace ix
, _receivedMessageCompressed(false) , _receivedMessageCompressed(false)
, _readyState(ReadyState::CLOSED) , _readyState(ReadyState::CLOSED)
, _closeCode(WebSocketCloseConstants::kInternalErrorCode) , _closeCode(WebSocketCloseConstants::kInternalErrorCode)
, _closeReason(WebSocketCloseConstants::kInternalErrorMessage)
, _closeWireSize(0) , _closeWireSize(0)
, _closeRemote(false) , _closeRemote(false)
, _enablePerMessageDeflate(false) , _enablePerMessageDeflate(false)
@ -77,6 +76,7 @@ namespace ix
, _pingCount(0) , _pingCount(0)
, _lastSendPingTimePoint(std::chrono::steady_clock::now()) , _lastSendPingTimePoint(std::chrono::steady_clock::now())
{ {
setCloseReason(WebSocketCloseConstants::kInternalErrorMessage);
_readbuf.resize(kChunkSize); _readbuf.resize(kChunkSize);
} }
@ -179,10 +179,12 @@ namespace ix
if (readyState == ReadyState::CLOSED) if (readyState == ReadyState::CLOSED)
{ {
std::lock_guard<std::mutex> lock(_closeDataMutex); if (_onCloseCallback)
_onCloseCallback(_closeCode, _closeReason, _closeWireSize, _closeRemote); {
_onCloseCallback(_closeCode, getCloseReason(), _closeWireSize, _closeRemote);
}
setCloseReason(WebSocketCloseConstants::kInternalErrorMessage);
_closeCode = WebSocketCloseConstants::kInternalErrorCode; _closeCode = WebSocketCloseConstants::kInternalErrorCode;
_closeReason = WebSocketCloseConstants::kInternalErrorMessage;
_closeWireSize = 0; _closeWireSize = 0;
_closeRemote = false; _closeRemote = false;
} }
@ -261,9 +263,10 @@ namespace ix
{ {
// compute lasting delay to wait for next ping / timeout, if at least one set // compute lasting delay to wait for next ping / timeout, if at least one set
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
lastingTimeoutDelayInMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>( int timeSinceLastPingMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>(
now - _lastSendPingTimePoint) now - _lastSendPingTimePoint)
.count(); .count();
lastingTimeoutDelayInMs = (1000 * _pingIntervalSecs) - timeSinceLastPingMs;
} }
#ifdef _WIN32 #ifdef _WIN32
@ -326,9 +329,10 @@ namespace ix
return _txbuf.empty(); return _txbuf.empty();
} }
template<class Iterator>
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header, void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin, Iterator begin,
std::string::const_iterator end, Iterator end,
uint64_t message_size, uint64_t message_size,
uint8_t masking_key[4]) uint8_t masking_key[4])
{ {
@ -638,11 +642,7 @@ namespace ix
{ {
// we got the CLOSE frame answer from our close, so we can close the connection // we got the CLOSE frame answer from our close, so we can close the connection
// if the code/reason are the same // if the code/reason are the same
bool identicalReason; bool identicalReason = _closeCode == code && getCloseReason() == reason;
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
identicalReason = _closeCode == code && _closeReason == reason;
}
if (identicalReason) if (identicalReason)
{ {
@ -750,8 +750,9 @@ namespace ix
return static_cast<unsigned>(seconds); return static_cast<unsigned>(seconds);
} }
template<class T>
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type, WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::string& message, const T& message,
bool compress, bool compress,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {
@ -764,8 +765,8 @@ namespace ix
size_t wireSize = message.size(); size_t wireSize = message.size();
bool compressionError = false; bool compressionError = false;
std::string::const_iterator message_begin = message.begin(); auto message_begin = message.cbegin();
std::string::const_iterator message_end = message.end(); auto message_end = message.cend();
if (compress) if (compress)
{ {
@ -780,8 +781,8 @@ namespace ix
compressionError = false; compressionError = false;
wireSize = _compressedMessage.size(); wireSize = _compressedMessage.size();
message_begin = _compressedMessage.begin(); message_begin = _compressedMessage.cbegin();
message_end = _compressedMessage.end(); message_end = _compressedMessage.cend();
} }
{ {
@ -795,6 +796,11 @@ namespace ix
if (wireSize < kChunkSize) if (wireSize < kChunkSize)
{ {
success = sendFragment(type, true, message_begin, message_end, compress); success = sendFragment(type, true, message_begin, message_end, compress);
if (onProgressCallback)
{
onProgressCallback(0, 1);
}
} }
else else
{ {
@ -859,10 +865,11 @@ namespace ix
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize); return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
} }
template<class Iterator>
bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type, bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin, bool fin,
std::string::const_iterator message_begin, Iterator message_begin,
std::string::const_iterator message_end, Iterator message_end,
bool compress) bool compress)
{ {
uint64_t message_size = static_cast<uint64_t>(message_end - message_begin); uint64_t message_size = static_cast<uint64_t>(message_end - message_begin);
@ -1055,7 +1062,7 @@ namespace ix
else else
{ {
// no close code/reason set // no close code/reason set
sendData(wsheader_type::CLOSE, "", compress); sendData(wsheader_type::CLOSE, std::string(""), compress);
} }
} }
@ -1078,13 +1085,10 @@ namespace ix
{ {
closeSocket(); closeSocket();
{ setCloseReason(reason);
std::lock_guard<std::mutex> lock(_closeDataMutex); _closeCode = code;
_closeCode = code; _closeWireSize = closeWireSize;
_closeReason = reason; _closeRemote = remote;
_closeWireSize = closeWireSize;
_closeRemote = remote;
}
setReadyState(ReadyState::CLOSED); setReadyState(ReadyState::CLOSED);
_requestInitCancellation = false; _requestInitCancellation = false;
@ -1104,13 +1108,11 @@ namespace ix
closeWireSize = reason.size(); closeWireSize = reason.size();
} }
{ setCloseReason(reason);
std::lock_guard<std::mutex> lock(_closeDataMutex); _closeCode = code;
_closeCode = code; _closeWireSize = closeWireSize;
_closeReason = reason; _closeRemote = remote;
_closeWireSize = closeWireSize;
_closeRemote = remote;
}
{ {
std::lock_guard<std::mutex> lock(_closingTimePointMutex); std::lock_guard<std::mutex> lock(_closingTimePointMutex);
_closingTimePoint = std::chrono::steady_clock::now(); _closingTimePoint = std::chrono::steady_clock::now();
@ -1155,4 +1157,15 @@ namespace ix
return true; return true;
} }
void WebSocketTransport::setCloseReason(const std::string& reason)
{
std::lock_guard<std::mutex> lock(_closeReasonMutex);
_closeReason = reason;
}
const std::string& WebSocketTransport::getCloseReason() const
{
std::lock_guard<std::mutex> lock(_closeReasonMutex);
return _closeReason;
}
} // namespace ix } // namespace ix

View File

@ -178,11 +178,11 @@ namespace ix
std::atomic<ReadyState> _readyState; std::atomic<ReadyState> _readyState;
OnCloseCallback _onCloseCallback; OnCloseCallback _onCloseCallback;
uint16_t _closeCode;
std::string _closeReason; std::string _closeReason;
size_t _closeWireSize; mutable std::mutex _closeReasonMutex;
bool _closeRemote; std::atomic<uint16_t> _closeCode;
mutable std::mutex _closeDataMutex; std::atomic<size_t> _closeWireSize;
std::atomic<bool> _closeRemote;
// Data used for Per Message Deflate compression (with zlib) // Data used for Per Message Deflate compression (with zlib)
WebSocketPerMessageDeflatePtr _perMessageDeflate; WebSocketPerMessageDeflatePtr _perMessageDeflate;
@ -239,16 +239,15 @@ namespace ix
bool sendOnSocket(); bool sendOnSocket();
bool receiveFromSocket(); bool receiveFromSocket();
template<class T>
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message, const T& message,
bool compress, bool compress,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
bool sendFragment(wsheader_type::opcode_type type, template<class Iterator>
bool fin, bool sendFragment(
std::string::const_iterator begin, wsheader_type::opcode_type type, bool fin, Iterator begin, Iterator end, bool compress);
std::string::const_iterator end,
bool compress);
void emitMessage(MessageKind messageKind, void emitMessage(MessageKind messageKind,
const std::string& message, const std::string& message,
@ -256,9 +255,11 @@ namespace ix
const OnMessageCallback& onMessageCallback); const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const; bool isSendBufferEmpty() const;
template<class Iterator>
void appendToSendBuffer(const std::vector<uint8_t>& header, void appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin, Iterator begin,
std::string::const_iterator end, Iterator end,
uint64_t message_size, uint64_t message_size,
uint8_t masking_key[4]); uint8_t masking_key[4]);
@ -266,5 +267,8 @@ namespace ix
void unmaskReceiveBuffer(const wsheader_type& ws); void unmaskReceiveBuffer(const wsheader_type& ws);
std::string getMergedChunks() const; std::string getMergedChunks() const;
void setCloseReason(const std::string& reason);
const std::string& getCloseReason() const;
}; };
} // namespace ix } // namespace ix

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.8.6" #define IX_WEBSOCKET_VERSION "10.1.7"

View File

@ -1,20 +0,0 @@
/*
* IXSetThreadName_apple.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
namespace ix
{
void setThreadName(const std::string& name)
{
//
// Apple reserves 16 bytes for its thread names
// Notice that the Apple version of pthread_setname_np
// does not take a pthread_t argument
//
pthread_setname_np(name.substr(0, 63).c_str());
}
} // namespace ix

View File

@ -1,16 +0,0 @@
/*
* IXSetThreadName_freebsd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
#include <pthread_np.h>
namespace ix
{
void setThreadName(const std::string& name)
{
pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str());
}
} // namespace ix

View File

@ -1,20 +0,0 @@
/*
* IXSetThreadName_linux.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
namespace ix
{
void setThreadName(const std::string& name)
{
//
// Linux only reserves 16 bytes for its thread names
// See prctl and PR_SET_NAME property in
// http://man7.org/linux/man-pages/man2/prctl.2.html
//
pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
}
} // namespace ix

View File

@ -1,46 +0,0 @@
/*
* IXSetThreadName_windows.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <Windows.h>
namespace ix
{
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void SetThreadName(DWORD dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(
MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
void setThreadName(const std::string& name)
{
SetThreadName(-1, name.c_str());
}
} // namespace ix

View File

@ -34,7 +34,10 @@ ws:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install) mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install)
ws_install: ws_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && make -j 4 install) mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
ws_install_release:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
ws_openssl: ws_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
@ -104,8 +107,10 @@ test_server:
# env TEST=Websocket_server make test # env TEST=Websocket_server make test
# env TEST=Websocket_chat make test # env TEST=Websocket_chat make test
# env TEST=heartbeat make test # env TEST=heartbeat make test
test: build_test:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; ninja install) mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. ; ninja install)
test: build_test
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)
test_make: test_make:
@ -172,7 +177,7 @@ autobahn_report:
cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/ cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/
httpd: httpd:
clang++ --std=c++14 --stdlib=libc++ httpd.cpp \ clang++ --std=c++14 --stdlib=libc++ -o ixhttpd httpd.cpp \
ixwebsocket/IXSelectInterruptFactory.cpp \ ixwebsocket/IXSelectInterruptFactory.cpp \
ixwebsocket/IXCancellationRequest.cpp \ ixwebsocket/IXCancellationRequest.cpp \
ixwebsocket/IXSocketTLSOptions.cpp \ ixwebsocket/IXSocketTLSOptions.cpp \
@ -191,11 +196,11 @@ httpd:
ixwebsocket/IXConnectionState.cpp \ ixwebsocket/IXConnectionState.cpp \
ixwebsocket/IXUrlParser.cpp \ ixwebsocket/IXUrlParser.cpp \
ixwebsocket/IXSelectInterrupt.cpp \ ixwebsocket/IXSelectInterrupt.cpp \
ixwebsocket/apple/IXSetThreadName_apple.cpp \ ixwebsocket/IXSetThreadName.cpp \
-lz -lz
httpd_linux: httpd_linux:
g++ --std=c++11 -o ixhttpd httpd.cpp -Iixwebsocket \ g++ --std=c++14 -o ixhttpd httpd.cpp -Iixwebsocket \
ixwebsocket/IXSelectInterruptFactory.cpp \ ixwebsocket/IXSelectInterruptFactory.cpp \
ixwebsocket/IXCancellationRequest.cpp \ ixwebsocket/IXCancellationRequest.cpp \
ixwebsocket/IXSocketTLSOptions.cpp \ ixwebsocket/IXSocketTLSOptions.cpp \
@ -214,7 +219,7 @@ httpd_linux:
ixwebsocket/IXConnectionState.cpp \ ixwebsocket/IXConnectionState.cpp \
ixwebsocket/IXUrlParser.cpp \ ixwebsocket/IXUrlParser.cpp \
ixwebsocket/IXSelectInterrupt.cpp \ ixwebsocket/IXSelectInterrupt.cpp \
ixwebsocket/linux/IXSetThreadName_linux.cpp \ ixwebsocket/IXSetThreadName.cpp \
-lz -lpthread -lz -lpthread
cp -f ixhttpd /usr/local/bin cp -f ixhttpd /usr/local/bin
@ -236,9 +241,12 @@ install_cmake_for_linux:
doc: doc:
mkdocs gh-deploy mkdocs gh-deploy
change: change: format
vim ixwebsocket/IXWebSocketVersion.h docs/CHANGELOG.md vim ixwebsocket/IXWebSocketVersion.h docs/CHANGELOG.md
commit:
git commit -am "`sh tools/extract_latest_change.sh`"
.PHONY: test .PHONY: test
.PHONY: build .PHONY: build
.PHONY: ws .PHONY: ws

View File

@ -37,11 +37,11 @@ set (SOURCES
test_runner.cpp test_runner.cpp
IXTest.cpp IXTest.cpp
IXGetFreePort.cpp
../third_party/msgpack11/msgpack11.cpp ../third_party/msgpack11/msgpack11.cpp
IXSocketTest.cpp IXSocketTest.cpp
IXSocketConnectTest.cpp IXSocketConnectTest.cpp
# IXWebSocketLeakTest.cpp # commented until we have a fix for #224
IXWebSocketServerTest.cpp IXWebSocketServerTest.cpp
IXWebSocketTestConnectionDisconnection.cpp IXWebSocketTestConnectionDisconnection.cpp
IXUrlParserTest.cpp IXUrlParserTest.cpp
@ -55,6 +55,8 @@ set (SOURCES
IXSentryClientTest.cpp IXSentryClientTest.cpp
IXWebSocketChatTest.cpp IXWebSocketChatTest.cpp
IXWebSocketBroadcastTest.cpp IXWebSocketBroadcastTest.cpp
IXWebSocketPerMessageDeflateCompressorTest.cpp
IXStreamSqlTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet

View File

@ -108,7 +108,7 @@ namespace
} }
else if (event->type == ix::CobraEventType::UnSubscribed) else if (event->type == ix::CobraEventType::UnSubscribed)
{ {
TLogger() << "Subscriber: ununexpected from channel " << event->subscriptionId; TLogger() << "Subscriber: unsubscribed from channel " << event->subscriptionId;
if (event->subscriptionId != channel) if (event->subscriptionId != channel)
{ {
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId; TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;

View File

@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h> #include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>
@ -95,13 +95,15 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
sentryServer.setOnConnectionCallback( sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request, [](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr { std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["Server"] = userAgent(); headers["Server"] = userAgent();
// Log request // Log request
std::stringstream ss; std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " " ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri; << request->uri;
if (request->method == "POST") if (request->method == "POST")

View File

@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h> #include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>

View File

@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h> #include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>
@ -92,6 +92,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
cobraBotConfig.enableHeartbeat = false; cobraBotConfig.enableHeartbeat = false;
bool quiet = false; bool quiet = false;
cobraBotConfig.filter =
std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'";
// We could try to capture the output ... not sure how. // We could try to capture the output ... not sure how.
bool fluentd = true; bool fluentd = true;

View File

@ -4,9 +4,9 @@
* Copyright (c) 2019 Machine Zone. All rights reserved. * Copyright (c) 2019 Machine Zone. All rights reserved.
*/ */
#include "IXGetFreePort.h"
#include "catch.hpp" #include "catch.hpp"
#include <iostream> #include <iostream>
#include <ixwebsocket/IXGetFreePort.h>
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
@ -67,7 +67,8 @@ TEST_CASE("http server", "[httpd]")
TEST_CASE("http server redirection", "[httpd_redirect]") TEST_CASE("http server redirection", "[httpd_redirect]")
{ {
SECTION("Connect to a local HTTP server, with redirection enabled, but we do not follow redirects") SECTION(
"Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
{ {
int port = getFreePort(); int port = getFreePort();
ix::HttpServer server(port, "127.0.0.1"); ix::HttpServer server(port, "127.0.0.1");

42
test/IXStreamSqlTest.cpp Normal file
View File

@ -0,0 +1,42 @@
/*
* IXStreamSqlTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include <ixsnake/IXStreamSql.h>
#include <string.h>
using namespace ix;
namespace ix
{
TEST_CASE("stream_sql", "[streamsql]")
{
SECTION("expression A")
{
snake::StreamSql streamSql(
"select * from subscriber_republished_v1_neo where session LIKE '%123456%'");
nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}};
CHECK(streamSql.match(msg));
}
SECTION("expression A")
{
snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where "
"session = '30091320ed8d4e50b758f8409b83bed7'");
nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"},
{"id", "foo_id"},
{"timestamp", 12}};
CHECK(streamSql.match(msg));
}
}
} // namespace ix

View File

@ -84,36 +84,38 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server) bool startWebSocketEchoServer(ix::WebSocketServer& server)
{ {
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, server.setOnClientMessageCallback(
std::shared_ptr<ConnectionState> connectionState) { [&server](std::shared_ptr<ConnectionState> /*connectionState*/,
webSocket->setOnMessageCallback( ConnectionInfo& connectionInfo,
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) { WebSocket& webSocket,
if (msg->type == ix::WebSocketMessageType::Open) const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "Remote ip: " << remoteIp;
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{ {
TLogger() << "New connection"; TLogger() << it.first << ": " << it.second;
TLogger() << "Uri: " << msg->openInfo.uri; }
TLogger() << "Headers:"; }
for (auto it : msg->openInfo.headers) else if (msg->type == ix::WebSocketMessageType::Close)
{
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client.get() != &webSocket)
{ {
TLogger() << it.first << ": " << it.second; client->send(msg->str, msg->binary);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) }
{ });
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str, msg->binary);
}
}
}
});
});
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)

View File

@ -6,9 +6,9 @@
#pragma once #pragma once
#include "IXGetFreePort.h"
#include <iostream> #include <iostream>
#include <ixsnake/IXAppConfig.h> #include <ixsnake/IXAppConfig.h>
#include <ixwebsocket/IXGetFreePort.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <mutex> #include <mutex>

View File

@ -189,15 +189,19 @@ namespace
bool preferTLS = true; bool preferTLS = true;
server.setTLSOptions(makeServerTLSOptions(preferTLS)); server.setTLSOptions(makeServerTLSOptions(preferTLS));
server.setOnConnectionCallback([&server, &connectionId]( server.setOnClientMessageCallback(
std::shared_ptr<ix::WebSocket> webSocket, [&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
std::shared_ptr<ConnectionState> connectionState) { ConnectionInfo& connectionInfo,
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server]( WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) { const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New connection"; TLogger() << "New connection";
connectionState->computeId(); connectionState->computeId();
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId(); TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";
@ -216,14 +220,13 @@ namespace
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client.get() != &webSocket)
{ {
client->send(msg->str, msg->binary); client->send(msg->str, msg->binary);
} }
} }
} }
}); });
});
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)

View File

@ -193,37 +193,39 @@ namespace
bool startServer(ix::WebSocketServer& server) bool startServer(ix::WebSocketServer& server)
{ {
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, server.setOnClientMessageCallback(
std::shared_ptr<ConnectionState> connectionState) { [&server](std::shared_ptr<ConnectionState> connectionState,
webSocket->setOnMessageCallback( ConnectionInfo& connectionInfo,
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) { WebSocket& webSocket,
if (msg->type == ix::WebSocketMessageType::Open) const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{ {
TLogger() << "New connection"; TLogger() << it.first << ": " << it.second;
TLogger() << "id: " << connectionState->getId(); }
TLogger() << "Uri: " << msg->openInfo.uri; }
TLogger() << "Headers:"; else if (msg->type == ix::WebSocketMessageType::Close)
for (auto it : msg->openInfo.headers) {
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client.get() != &webSocket)
{ {
TLogger() << it.first << ": " << it.second; client->sendBinary(msg->str);
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) }
{ });
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->sendBinary(msg->str);
}
}
}
});
});
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)

View File

@ -168,41 +168,38 @@ namespace
std::mutex& mutexWrite) std::mutex& mutexWrite)
{ {
// A dev/null server // A dev/null server
server.setOnConnectionCallback( server.setOnClientMessageCallback(
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite]( [&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
std::shared_ptr<ix::WebSocket> webSocket, std::shared_ptr<ConnectionState> connectionState,
std::shared_ptr<ConnectionState> connectionState) { ConnectionInfo& connectionInfo,
webSocket->setOnMessageCallback([webSocket, WebSocket& /*webSocket*/,
connectionState, const ix::WebSocketMessagePtr& msg) {
&receivedCloseCode, auto remoteIp = connectionInfo.remoteIp;
&receivedCloseReason, if (msg->type == ix::WebSocketMessageType::Open)
&receivedCloseRemote, {
&mutexWrite](const ix::WebSocketMessagePtr& msg) { TLogger() << "New server connection";
if (msg->type == ix::WebSocketMessageType::Open) TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{ {
TLogger() << "New server connection"; TLogger() << it.first << ": " << it.second;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << it.first << ": " << it.second;
}
} }
else if (msg->type == ix::WebSocketMessageType::Close) }
{ else if (msg->type == ix::WebSocketMessageType::Close)
std::stringstream ss; {
ss << "Server closed connection(" << msg->closeInfo.code << "," std::stringstream ss;
<< msg->closeInfo.reason << ")"; ss << "Server closed connection(" << msg->closeInfo.code << ","
log(ss.str()); << msg->closeInfo.reason << ")";
log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite); std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = msg->closeInfo.code; receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(msg->closeInfo.reason); receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = msg->closeInfo.remote; receivedCloseRemote = msg->closeInfo.remote;
} }
});
}); });
auto res = server.listen(); auto res = server.listen();

View File

@ -0,0 +1,183 @@
/*
* IXWebSocketServer.cpp
* Author: Benjamin Sergeant, @marcelkauf
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <memory>
#include <sstream>
using namespace ix;
namespace
{
class WebSocketClient
{
public:
WebSocketClient(int port);
void start();
void stop();
bool isReady() const;
bool hasConnectionError() const;
private:
ix::WebSocket _webSocket;
int _port;
std::atomic<bool> _connectionError;
};
WebSocketClient::WebSocketClient(int port)
: _port(port)
, _connectionError(false)
{
}
bool WebSocketClient::hasConnectionError() const
{
return _connectionError;
}
bool WebSocketClient::isReady() const
{
return _webSocket.getReadyState() == ix::ReadyState::Open;
}
void WebSocketClient::stop()
{
_webSocket.stop();
}
void WebSocketClient::start()
{
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:" << _port << "/";
url = ss.str();
}
_webSocket.setUrl(url);
_webSocket.disableAutomaticReconnection();
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("client connected");
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("client disconnected");
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
_connectionError = true;
log("error");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("pong");
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("ping");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
log("message");
}
else
{
log("invalid type");
}
});
_webSocket.start();
}
} // namespace
TEST_CASE("Websocket leak test")
{
SECTION("Websocket destructor is called when closing the connection.")
{
// stores the server websocket in order to check the use_count
std::shared_ptr<WebSocket> webSocketPtr;
{
int port = getFreePort();
WebSocketServer server(port);
server.setOnConnectionCallback(
[&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
// original ptr in WebSocketServer::handleConnection and the callback argument
REQUIRE(webSocket.use_count() == 2);
webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
log(std::string("New connection id: ") + connectionState->getId());
// original ptr in WebSocketServer::handleConnection, captured ptr of
// this callback, and ptr in WebSocketServer::_clients
REQUIRE(webSocket.use_count() == 3);
webSocketPtr = std::shared_ptr<WebSocket>(webSocket);
REQUIRE(webSocket.use_count() == 4);
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log(std::string("Client closed connection id: ") +
connectionState->getId());
}
else
{
log(std::string(msg->str));
}
});
// original ptr in WebSocketServer::handleConnection, argument of this callback,
// and captured ptr in websocket callback
REQUIRE(webSocket.use_count() == 3);
});
server.listen();
server.start();
WebSocketClient webSocketClient(port);
webSocketClient.start();
while (true)
{
REQUIRE(!webSocketClient.hasConnectionError());
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
// same value as in Open-handler above
REQUIRE(webSocketPtr.use_count() == 4);
ix::msleep(500);
webSocketClient.stop();
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
// websocket should only be referenced by webSocketPtr but is still used by the
// websocket callback
REQUIRE(webSocketPtr.use_count() == 1);
webSocketPtr->setOnMessageCallback(nullptr);
// websocket should only be referenced by webSocketPtr
REQUIRE(webSocketPtr.use_count() == 1);
server.stop();
}
// websocket should only be referenced by webSocketPtr
REQUIRE(webSocketPtr.use_count() == 1);
}
}

View File

@ -0,0 +1,76 @@
/*
* IXWebSocketPerMessageDeflateCodecTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*
* make build_test && build/test/ixwebsocket_unittest per-message-deflate-codec
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include <ixwebsocket/IXWebSocketPerMessageDeflateCodec.h>
#include <string.h>
using namespace ix;
namespace ix
{
std::string compressAndDecompress(const std::string& a)
{
std::string b, c;
WebSocketPerMessageDeflateCompressor compressor;
compressor.init(11, true);
compressor.compress(a, b);
WebSocketPerMessageDeflateDecompressor decompressor;
decompressor.init(11, true);
decompressor.decompress(b, c);
return c;
}
std::string compressAndDecompressVector(const std::string& a)
{
std::string b, c;
std::vector<uint8_t> vec(a.begin(), a.end());
WebSocketPerMessageDeflateCompressor compressor;
compressor.init(11, true);
compressor.compress(vec, b);
WebSocketPerMessageDeflateDecompressor decompressor;
decompressor.init(11, true);
decompressor.decompress(b, c);
return c;
}
TEST_CASE("per-message-deflate-codec", "[zlib]")
{
SECTION("string api")
{
REQUIRE(compressAndDecompress("") == "");
REQUIRE(compressAndDecompress("foo") == "foo");
REQUIRE(compressAndDecompress("bar") == "bar");
REQUIRE(compressAndDecompress("asdcaseqw`21897dehqwed") == "asdcaseqw`21897dehqwed");
REQUIRE(compressAndDecompress("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
}
SECTION("vector api")
{
REQUIRE(compressAndDecompressVector("") == "");
REQUIRE(compressAndDecompressVector("foo") == "foo");
REQUIRE(compressAndDecompressVector("bar") == "bar");
REQUIRE(compressAndDecompressVector("asdcaseqw`21897dehqwed") ==
"asdcaseqw`21897dehqwed");
REQUIRE(
compressAndDecompressVector("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
}
}
} // namespace ix

View File

@ -33,15 +33,19 @@ namespace ix
}; };
server.setConnectionStateFactory(factory); server.setConnectionStateFactory(factory);
server.setOnConnectionCallback([&server, &connectionId]( server.setOnClientMessageCallback(
std::shared_ptr<ix::WebSocket> webSocket, [&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
std::shared_ptr<ConnectionState> connectionState) { ConnectionInfo& connectionInfo,
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server]( WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) { const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New connection"; TLogger() << "New connection";
connectionState->computeId(); connectionState->computeId();
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId(); TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";
@ -60,14 +64,13 @@ namespace ix
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client.get() != &webSocket)
{ {
client->send(msg->str, msg->binary); client->send(msg->str, msg->binary);
} }
} }
} }
}); });
});
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)

View File

@ -16,39 +16,40 @@ using namespace ix;
bool startServer(ix::WebSocketServer& server, std::string& subProtocols) bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
{ {
server.setOnConnectionCallback( server.setOnClientMessageCallback(
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket, [&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState,
std::shared_ptr<ConnectionState> connectionState) { ConnectionInfo& connectionInfo,
webSocket->setOnMessageCallback([webSocket, connectionState, &server, &subProtocols]( WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) { const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{ {
TLogger() << "New connection"; TLogger() << it.first << ": " << it.second;
TLogger() << "id: " << connectionState->getId(); }
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << it.first << ": " << it.second;
}
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"]; subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{ {
log("Closed connection"); if (client.get() != &webSocket)
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{ {
if (client != webSocket) client->sendBinary(msg->str);
{
client->sendBinary(msg->str);
}
} }
} }
}); }
}); });
auto res = server.listen(); auto res = server.listen();

View File

@ -0,0 +1,171 @@
/*
* lws-minimal-ws-client
*
* Written in 2010-2019 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This demonstrates the a minimal ws client using lws.
*
* Original programs connects to https://libwebsockets.org/ and makes a
* wss connection to the dumb-increment protocol there. While
* connected, it prints the numbers it is being sent by
* dumb-increment protocol.
*
* This is modified to make a test client which counts how much messages
* per second can be received.
*
* libwebsockets$ make && ./a.out
* g++ --std=c++14 -I/usr/local/opt/openssl/include devnull_client.cpp -lwebsockets
* messages received: 0 per second 0 total
* [2020/08/02 19:22:21:4774] U: LWS minimal ws client rx [-d <logs>] [--h2]
* [2020/08/02 19:22:21:4814] U: callback_dumb_increment: established
* messages received: 0 per second 0 total
* messages received: 180015 per second 180015 total
* messages received: 172866 per second 352881 total
* messages received: 176177 per second 529058 total
* messages received: 174191 per second 703249 total
* messages received: 193397 per second 896646 total
* messages received: 196385 per second 1093031 total
* messages received: 194593 per second 1287624 total
* messages received: 189484 per second 1477108 total
* messages received: 200825 per second 1677933 total
* messages received: 183542 per second 1861475 total
* ^C[2020/08/02 19:22:33:4450] U: Completed OK
*
*/
#include <atomic>
#include <iostream>
#include <libwebsockets.h>
#include <signal.h>
#include <string.h>
#include <thread>
static int interrupted;
static struct lws* client_wsi;
std::atomic<uint64_t> receivedCount(0);
static int callback_dumb_increment(
struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len)
{
switch (reason)
{
/* because we are protocols[0] ... */
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char*) in : "(null)");
client_wsi = NULL;
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: established\n", __func__); break;
case LWS_CALLBACK_CLIENT_RECEIVE: receivedCount++; break;
case LWS_CALLBACK_CLIENT_CLOSED: client_wsi = NULL; break;
default: break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
static const struct lws_protocols protocols[] = {{
"dumb-increment-protocol",
callback_dumb_increment,
0,
0,
},
{NULL, NULL, 0, 0}};
static void sigint_handler(int sig)
{
interrupted = 1;
}
int main(int argc, const char** argv)
{
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
auto timer = [&receivedCountTotal, &receivedCountPerSecs] {
while (!interrupted)
{
std::cerr << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total" << std::endl;
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
struct lws_context_creation_info info;
struct lws_client_connect_info i;
struct lws_context* context;
const char* p;
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
/* for LLL_ verbosity above NOTICE to be built into lws, lws
* must have been configured with -DCMAKE_BUILD_TYPE=DEBUG
* instead of =RELEASE */
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
/* | LLL_DEBUG */;
signal(SIGINT, sigint_handler);
if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p);
lws_set_log_level(logs, NULL);
lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n");
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
info.timeout_secs = 10;
/*
* since we know this lws context is only ever going to be used with
* one client wsis / fds / sockets at a time, let lws know it doesn't
* have to use the default allocations for fd tables up to ulimit -n.
* It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we
* will use.
*/
info.fd_limit_per_thread = 1 + 1 + 1;
context = lws_create_context(&info);
if (!context)
{
lwsl_err("lws init failed\n");
return 1;
}
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
i.context = context;
i.port = 8008;
i.address = "127.0.0.1";
i.path = "/";
i.host = i.address;
i.origin = i.address;
i.protocol = protocols[0].name; /* "dumb-increment-protocol" */
i.pwsi = &client_wsi;
if (lws_cmdline_option(argc, argv, "--h2")) i.alpn = "h2";
lws_client_connect_via_info(&i);
while (n >= 0 && client_wsi && !interrupted)
n = lws_service(context, 0);
lws_context_destroy(context);
lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed");
t1.join();
return receivedCount > 10;
}

2
test/compatibility/csharp/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
bin
obj

View File

@ -0,0 +1,99 @@
//
// Main.cs
// Author: Benjamin Sergeant
// Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
//
// In a different terminal, start a push server:
// $ ws push_server -q
//
// $ dotnet run
// messages received per second: 145157
// messages received per second: 141405
// messages received per second: 152202
// messages received per second: 157149
// messages received per second: 157673
// messages received per second: 153594
// messages received per second: 157830
// messages received per second: 158422
//
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
public class DevNullClientCli
{
private static int receivedMessage = 0;
public static async Task<byte[]> ReceiveAsync(ClientWebSocket ws, CancellationToken token)
{
int bufferSize = 8192; // 8K
var buffer = new byte[bufferSize];
var offset = 0;
var free = buffer.Length;
while (true)
{
var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer, offset, free), token).ConfigureAwait(false);
offset += result.Count;
free -= result.Count;
if (result.EndOfMessage) break;
if (free == 0)
{
// No free space
// Resize the outgoing buffer
var newSize = buffer.Length + bufferSize;
var newBuffer = new byte[newSize];
Array.Copy(buffer, 0, newBuffer, 0, offset);
buffer = newBuffer;
free = buffer.Length - offset;
}
}
return buffer;
}
private static void OnTimedEvent(object source, EventArgs e)
{
Console.WriteLine($"messages received per second: {receivedMessage}");
receivedMessage = 0;
}
public static async Task ReceiveMessagesAsync(string url)
{
var ws = new ClientWebSocket();
System.Uri uri = new System.Uri(url);
var cancellationToken = CancellationToken.None;
try
{
await ws.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
while (true)
{
var data = await DevNullClientCli.ReceiveAsync(ws, cancellationToken);
receivedMessage += 1;
}
}
catch (System.Net.WebSockets.WebSocketException e)
{
Console.WriteLine($"WebSocket error: {e}");
return;
}
}
public static async Task Main()
{
var timer = new System.Timers.Timer(1000);
timer.Elapsed += OnTimedEvent;
timer.Enabled = true;
timer.Start();
var url = "ws://localhost:8008";
await ReceiveMessagesAsync(url);
}
}

View File

@ -0,0 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
</Project>

View File

@ -0,0 +1,42 @@
//
// With ws@7.3.1
// and
// node --version
// v13.11.0
//
// In a different terminal, start a push server:
// $ ws push_server -q
//
// $ node devnull_client.js
// messages received per second: 16643
// messages received per second: 28065
// messages received per second: 28432
// messages received per second: 22207
// messages received per second: 28805
// messages received per second: 28694
// messages received per second: 28180
// messages received per second: 28601
// messages received per second: 28698
// messages received per second: 28931
// messages received per second: 27975
//
const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:8008');
ws.on('open', function open() {
ws.send('hello from node');
});
var receivedMessages = 0;
setInterval(function timeout() {
console.log(`messages received per second: ${receivedMessages}`)
receivedMessages = 0;
}, 1000);
ws.on('message', function incoming(data) {
receivedMessages += 1;
});

View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
# websocket send client
import argparse
import asyncio
import websockets
try:
import uvloop
uvloop.install()
except ImportError:
print('uvloop not available')
pass
msgCount = 0
async def timer():
global msgCount
while True:
print(f'Received messages: {msgCount}')
msgCount = 0
await asyncio.sleep(1)
async def client(url):
global msgCount
asyncio.ensure_future(timer())
async with websockets.connect(url) as ws:
async for message in ws:
msgCount += 1
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='websocket proxy.')
parser.add_argument('--url', help='Remote websocket url',
default='wss://echo.websocket.org')
args = parser.parse_args()
asyncio.get_event_loop().run_until_complete(client(args.url))

View File

@ -10,7 +10,7 @@ import websockets
async def echo(websocket, path): async def echo(websocket, path):
while True: while True:
msg = await websocket.recv() msg = await websocket.recv()
print(f'Received {len(msg)} bytes') # print(f'Received {len(msg)} bytes')
await websocket.send(msg) await websocket.send(msg)
host = os.getenv('BIND_HOST', 'localhost') host = os.getenv('BIND_HOST', 'localhost')

View File

@ -10,10 +10,18 @@
#include <ixwebsocket/IXNetSystem.h> #include <ixwebsocket/IXNetSystem.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#ifndef _WIN32
#include <signal.h>
#endif
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
ix::initNetSystem(); ix::initNetSystem();
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#endif
ix::CoreLogger::LogFunc logFunc = [](const char* msg, ix::LogLevel level) { ix::CoreLogger::LogFunc logFunc = [](const char* msg, ix::LogLevel level) {
switch (level) switch (level)
{ {
@ -49,6 +57,7 @@ int main(int argc, char* argv[])
} }
}; };
ix::CoreLogger::setLogFunction(logFunc); ix::CoreLogger::setLogFunction(logFunc);
spdlog::set_level(spdlog::level::debug);
int result = Catch::Session().run(argc, argv); int result = Catch::Session().run(argc, argv);

View File

@ -122,6 +122,8 @@
#pragma once #pragma once
#include <string>
namespace linenoise namespace linenoise
{ {
bool Readline(const char *prompt, std::string& line); bool Readline(const char *prompt, std::string& line);

View File

@ -0,0 +1,3 @@
#!/bin/sh
grep -A 3 '^##' docs/CHANGELOG.md | head -n 3 | tail -n 1

View File

@ -20,7 +20,6 @@ option(USE_TLS "Add TLS support" ON)
include_directories(ws .) include_directories(ws .)
include_directories(ws ..) include_directories(ws ..)
include_directories(ws ../third_party)
include_directories(ws ../third_party/spdlog/include) include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise) include_directories(ws ../third_party/cpp-linenoise)
@ -51,7 +50,9 @@ add_executable(ws
ws_http_client.cpp ws_http_client.cpp
ws_ping_pong.cpp ws_ping_pong.cpp
ws_broadcast_server.cpp ws_broadcast_server.cpp
ws_push_server.cpp
ws_echo_server.cpp ws_echo_server.cpp
ws_echo_client.cpp
ws_chat.cpp ws_chat.cpp
ws_connect.cpp ws_connect.cpp
ws_transfer.cpp ws_transfer.cpp
@ -66,7 +67,6 @@ add_executable(ws
ws_cobra_publish.cpp ws_cobra_publish.cpp
ws_httpd.cpp ws_httpd.cpp
ws_autobahn.cpp ws_autobahn.cpp
ws_proxy_server.cpp
ws_sentry_minidump_upload.cpp ws_sentry_minidump_upload.cpp
ws_dns_lookup.cpp ws_dns_lookup.cpp
ws.cpp) ws.cpp)

View File

@ -14,6 +14,7 @@ function cleanup_and_exit {
} }
WITH_TLS=${WITH_TLS:-0} WITH_TLS=${WITH_TLS:-0}
BLOCKS=${BLOCKS:-20000}
rm -rf /tmp/ws_test rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test mkdir -p /tmp/ws_test
@ -57,7 +58,7 @@ ws receive "${protocol}127.0.0.1:8090" ${delay} --pidfile /tmp/ws_test/pidfile.r
mkdir -p /tmp/ws_test/send mkdir -p /tmp/ws_test/send
cd /tmp/ws_test/send cd /tmp/ws_test/send
dd if=/dev/urandom of=/tmp/ws_test/send/20M_file count=20000 bs=1024 dd if=/dev/urandom of=/tmp/ws_test/send/20M_file count=$BLOCKS bs=1024
# Start the sender job # Start the sender job
ws send ${client_tls} --pidfile /tmp/ws_test/pidfile.send "${protocol}127.0.0.1:8090" /tmp/ws_test/send/20M_file ws send ${client_tls} --pidfile /tmp/ws_test/pidfile.send "${protocol}127.0.0.1:8090" /tmp/ws_test/send/20M_file

121
ws/ws.cpp
View File

@ -11,17 +11,18 @@
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <fstream> #include <fstream>
#include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixbots/IXCobraToPythonBot.h> #include <ixbots/IXCobraToPythonBot.h>
#include <ixbots/IXCobraToSentryBot.h> #include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h> #include <ixbots/IXCobraToStatsdBot.h>
#include <ixbots/IXCobraToStdoutBot.h> #include <ixbots/IXCobraToStdoutBot.h>
#include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixredis/IXRedisClient.h>
#include <ixcore/utils/IXCoreLogger.h> #include <ixcore/utils/IXCoreLogger.h>
#include <ixredis/IXRedisClient.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <ixwebsocket/IXNetSystem.h> #include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>
#include <ixwebsocket/IXWebSocketProxyServer.h>
#include <spdlog/sinks/basic_file_sink.h> #include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -74,6 +75,7 @@ int main(int argc, char** argv)
} }
}; };
ix::CoreLogger::setLogFunction(logFunc); ix::CoreLogger::setLogFunction(logFunc);
spdlog::set_level(spdlog::level::debug);
#ifndef _WIN32 #ifndef _WIN32
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
@ -122,6 +124,8 @@ int main(int argc, char** argv)
std::string key; std::string key;
std::string logfile; std::string logfile;
std::string scriptPath; std::string scriptPath;
std::string republishChannel;
std::string sendMsg("hello world");
ix::SocketTLSOptions tlsOptions; ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig; ix::CobraConfig cobraConfig;
ix::CobraBotConfig cobraBotConfig; ix::CobraBotConfig cobraBotConfig;
@ -144,6 +148,7 @@ int main(int argc, char** argv)
bool version = false; bool version = false;
bool verifyNone = false; bool verifyNone = false;
bool disablePong = false; bool disablePong = false;
bool noSend = false;
int port = 8008; int port = 8008;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -193,8 +198,7 @@ int main(int argc, char** argv)
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute"); "--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
app->add_option( app->add_option(
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute"); "--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
app->add_option( app->add_option("--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
"--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
}; };
app.add_flag("--version", version, "Print ws version"); app.add_flag("--version", version, "Print ws version");
@ -241,6 +245,19 @@ int main(int argc, char** argv)
connectApp->add_option("--subprotocol", subprotocol, "Subprotocol"); connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
addTLSOptions(connectApp); addTLSOptions(connectApp);
CLI::App* echoClientApp =
app.add_subcommand("echo_client", "Echo messages sent by a remote server");
echoClientApp->fallthrough();
echoClientApp->add_option("url", url, "Connection url")->required();
echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
echoClientApp->add_option(
"--ping_interval", pingIntervalSecs, "Interval between sending pings");
echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
echoClientApp->add_option("--send_msg", sendMsg, "Send message");
echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
addTLSOptions(echoClientApp);
CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->fallthrough(); chatApp->fallthrough();
chatApp->add_option("url", url, "Connection url")->required(); chatApp->add_option("url", url, "Connection url")->required();
@ -250,12 +267,25 @@ int main(int argc, char** argv)
echoServerApp->fallthrough(); echoServerApp->fallthrough();
echoServerApp->add_option("--port", port, "Port"); echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname"); echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose"); echoServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
echoServerApp->add_flag("-g", greetings, "Greet");
echoServerApp->add_flag("-6", ipv6, "IpV6"); echoServerApp->add_flag("-6", ipv6, "IpV6");
echoServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); echoServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING"); echoServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
addTLSOptions(echoServerApp); addTLSOptions(echoServerApp);
CLI::App* pushServerApp = app.add_subcommand("push_server", "Push server");
pushServerApp->fallthrough();
pushServerApp->add_option("--port", port, "Port");
pushServerApp->add_option("--host", hostname, "Hostname");
pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
pushServerApp->add_flag("-g", greetings, "Greet");
pushServerApp->add_flag("-6", ipv6, "IpV6");
pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
pushServerApp->add_option("--send_msg", sendMsg, "Send message");
addTLSOptions(pushServerApp);
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->fallthrough(); broadcastServerApp->fallthrough();
broadcastServerApp->add_option("--port", port, "Port"); broadcastServerApp->add_option("--port", port, "Port");
@ -358,7 +388,8 @@ int main(int argc, char** argv)
cobra2python->add_option("--host", hostname, "Statsd host"); cobra2python->add_option("--host", hostname, "Statsd host");
cobra2python->add_option("--port", statsdPort, "Statsd port"); cobra2python->add_option("--port", statsdPort, "Statsd port");
cobra2python->add_option("--prefix", prefix, "Statsd prefix"); cobra2python->add_option("--prefix", prefix, "Statsd prefix");
cobra2python->add_option("--script", scriptPath, "Python script path")->check(CLI::ExistingPath); cobra2python->add_option("--script", scriptPath, "Python script path")
->check(CLI::ExistingPath);
cobra2python->add_option("--pidfile", pidfile, "Pid file"); cobra2python->add_option("--pidfile", pidfile, "Pid file");
addTLSOptions(cobra2python); addTLSOptions(cobra2python);
addCobraBotConfig(cobra2python); addCobraBotConfig(cobra2python);
@ -391,6 +422,7 @@ int main(int argc, char** argv)
snakeApp->add_option("--redis_password", redisPassword, "Redis password"); snakeApp->add_option("--redis_password", redisPassword, "Redis password");
snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data") snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
->check(CLI::ExistingPath); ->check(CLI::ExistingPath);
snakeApp->add_option("--republish_channel", republishChannel, "Republish channel");
snakeApp->add_flag("-v", verbose, "Verbose"); snakeApp->add_flag("-v", verbose, "Verbose");
snakeApp->add_flag("-d", disablePong, "Disable Pongs"); snakeApp->add_flag("-d", disablePong, "Disable Pongs");
addTLSOptions(snakeApp); addTLSOptions(snakeApp);
@ -473,6 +505,11 @@ int main(int argc, char** argv)
} }
} }
if (quiet)
{
spdlog::set_level(spdlog::level::info);
}
// Cobra config // Cobra config
cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true); cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
cobraConfig.socketTLSOptions = tlsOptions; cobraConfig.socketTLSOptions = tlsOptions;
@ -482,7 +519,46 @@ int main(int argc, char** argv)
cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions; cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions;
int ret = 1; int ret = 1;
if (app.got_subcommand("transfer")) if (app.got_subcommand("connect"))
{
ret = ix::ws_connect_main(url,
headers,
disableAutomaticReconnection,
disablePerMessageDeflate,
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions,
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("echo_client"))
{
ret = ix::ws_echo_client(url,
disablePerMessageDeflate,
binaryMode,
tlsOptions,
subprotocol,
pingIntervalSecs,
sendMsg,
noSend);
}
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong);
}
else if (app.got_subcommand("push_server"))
{
ret = ix::ws_push_server(port,
greetings,
hostname,
tlsOptions,
ipv6,
disablePerMessageDeflate,
disablePong,
sendMsg);
}
else if (app.got_subcommand("transfer"))
{ {
ret = ix::ws_transfer_main(port, hostname, tlsOptions); ret = ix::ws_transfer_main(port, hostname, tlsOptions);
} }
@ -495,27 +571,10 @@ int main(int argc, char** argv)
bool enablePerMessageDeflate = false; bool enablePerMessageDeflate = false;
ret = ix::ws_receive_main(url, enablePerMessageDeflate, delayMs, tlsOptions); ret = ix::ws_receive_main(url, enablePerMessageDeflate, delayMs, tlsOptions);
} }
else if (app.got_subcommand("connect"))
{
ret = ix::ws_connect_main(url,
headers,
disableAutomaticReconnection,
disablePerMessageDeflate,
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions,
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("chat")) else if (app.got_subcommand("chat"))
{ {
ret = ix::ws_chat_main(url, user); ret = ix::ws_chat_main(url, user);
} }
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong);
}
else if (app.got_subcommand("broadcast_server")) else if (app.got_subcommand("broadcast_server"))
{ {
ret = ix::ws_broadcast_server_main(port, hostname, tlsOptions); ret = ix::ws_broadcast_server_main(port, hostname, tlsOptions);
@ -604,8 +663,7 @@ int main(int argc, char** argv)
} }
else else
{ {
ret = (int) ix::cobra_to_python_bot( ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, scriptPath);
cobraBotConfig, statsdClient, scriptPath);
} }
} }
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
@ -620,14 +678,12 @@ int main(int argc, char** argv)
ix::RedisClient redisClient; ix::RedisClient redisClient;
if (!redisClient.connect(hostname, redisPort)) if (!redisClient.connect(hostname, redisPort))
{ {
spdlog::error("Cannot connect to redis host {}:{}", spdlog::error("Cannot connect to redis host {}:{}", redisHosts, redisPort);
redisHosts, redisPort);
return 1; return 1;
} }
else else
{ {
ret = (int) ix::cobra_metrics_to_redis_bot( ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose);
cobraBotConfig, redisClient, verbose);
} }
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
@ -640,7 +696,8 @@ int main(int argc, char** argv)
verbose, verbose,
appsConfigPath, appsConfigPath,
tlsOptions, tlsOptions,
disablePong); disablePong,
republishChannel);
} }
else if (app.got_subcommand("httpd")) else if (app.got_subcommand("httpd"))
{ {
@ -656,7 +713,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("proxy_server")) else if (app.got_subcommand("proxy_server"))
{ {
ret = ix::ws_proxy_server_main(port, hostname, tlsOptions, remoteHost, verbose); ret = ix::websocket_proxy_server_main(port, hostname, tlsOptions, remoteHost, verbose);
} }
else if (app.got_subcommand("upload_minidump")) else if (app.got_subcommand("upload_minidump"))
{ {

31
ws/ws.h
View File

@ -35,6 +35,15 @@ namespace ix
bool disablePerMessageDeflate, bool disablePerMessageDeflate,
bool disablePong); bool disablePong);
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong,
const std::string& sendMsg);
int ws_broadcast_server_main(int port, int ws_broadcast_server_main(int port,
const std::string& hostname, const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions); const ix::SocketTLSOptions& tlsOptions);
@ -54,6 +63,15 @@ namespace ix
const std::string& subprotocol, const std::string& subprotocol,
int pingIntervalSecs); int pingIntervalSecs);
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend);
int ws_receive_main(const std::string& url, int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate, bool enablePerMessageDeflate,
int delayMs, int delayMs,
@ -64,9 +82,7 @@ namespace ix
bool disablePerMessageDeflate, bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions); const ix::SocketTLSOptions& tlsOptions);
int ws_redis_cli_main(const std::string& hostname, int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password);
int port,
const std::string& password);
int ws_redis_publish_main(const std::string& hostname, int ws_redis_publish_main(const std::string& hostname,
int port, int port,
@ -105,7 +121,8 @@ namespace ix
bool verbose, bool verbose,
const std::string& appsConfigPath, const std::string& appsConfigPath,
const ix::SocketTLSOptions& tlsOptions, const ix::SocketTLSOptions& tlsOptions,
bool disablePong); bool disablePong,
const std::string& republishChannel);
int ws_httpd_main(int port, int ws_httpd_main(int port,
const std::string& hostname, const std::string& hostname,
@ -117,12 +134,6 @@ namespace ix
int ws_redis_server_main(int port, const std::string& hostname); int ws_redis_server_main(int port, const std::string& hostname);
int ws_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteHost,
bool verbose);
int ws_sentry_minidump_upload(const std::string& metadataPath, int ws_sentry_minidump_upload(const std::string& metadataPath,
const std::string& minidump, const std::string& minidump,
const std::string& project, const std::string& project,

View File

@ -20,13 +20,16 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket, server.setOnClientMessageCallback(
std::shared_ptr<ConnectionState> connectionState) { [&server](std::shared_ptr<ConnectionState> connectionState,
webSocket->setOnMessageCallback([webSocket, connectionState, &server]( ConnectionInfo& connectionInfo,
const WebSocketMessagePtr& msg) { WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("New connection"); spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId()); spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:"); spdlog::info("Headers:");
@ -60,7 +63,7 @@ namespace ix
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client.get() != &webSocket)
{ {
client->send(msg->str, msg->binary, [](int current, int total) -> bool { client->send(msg->str, msg->binary, [](int current, int total) -> bool {
spdlog::info("Step {} out of {}", current, total); spdlog::info("Step {} out of {}", current, total);
@ -79,7 +82,6 @@ namespace ix
} }
} }
}); });
});
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)

View File

@ -8,7 +8,6 @@
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <thread> #include <thread>

View File

@ -8,7 +8,6 @@
#include <chrono> #include <chrono>
#include <fstream> #include <fstream>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>

View File

@ -6,12 +6,12 @@
#include "IXBench.h" #include "IXBench.h"
#include "linenoise.hpp" #include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
namespace ix namespace ix
@ -160,7 +160,7 @@ namespace ix
std::stringstream ss; std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); spdlog::info("ws_connect: connected");
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:"); spdlog::info("Headers:");
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
@ -200,7 +200,7 @@ namespace ix
} }
else if (msg->type == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
spdlog::info("Received pong"); spdlog::info("Received pong {}", msg->str);
} }
else else
{ {

121
ws/ws_echo_client.cpp Normal file
View File

@ -0,0 +1,121 @@
/*
* ws_echo_client.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSetThreadName.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend)
{
// Our websocket object
ix::WebSocket webSocket;
webSocket.setUrl(url);
webSocket.setTLSOptions(tlsOptions);
webSocket.setPingInterval(pingIntervalSecs);
if (disablePerMessageDeflate)
{
webSocket.disablePerMessageDeflate();
}
if (!subprotocol.empty())
{
webSocket.addSubProtocol(subprotocol);
}
std::atomic<uint64_t> receivedCount(0);
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
// when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
if (!noSend)
{
webSocket.send(msg->str, msg->binary);
}
receivedCount++;
}
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_echo_client: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
webSocket.send(sendMsg, binaryMode);
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
});
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
setThreadName("Timer");
while (true)
{
//
// We cannot write to sentCount and receivedCount
// as those are used externally, so we need to introduce
// our own counters
//
std::stringstream ss;
ss << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total";
CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
// Now that our callback is setup, we can start our background thread and receive messages
std::cout << "Connecting to " << url << "..." << std::endl;
webSocket.start();
// Send a message to the server (default to TEXT mode)
webSocket.send("hello world");
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
webSocket.send(text);
}
return 0;
}
} // namespace ix

View File

@ -42,47 +42,48 @@ namespace ix
server.disablePong(); server.disablePong();
} }
server.setOnConnectionCallback( server.setOnClientMessageCallback(
[greetings](std::shared_ptr<ix::WebSocket> webSocket, [greetings](std::shared_ptr<ConnectionState> connectionState,
std::shared_ptr<ConnectionState> connectionState) { ConnectionInfo& connectionInfo,
webSocket->setOnMessageCallback( WebSocket& webSocket,
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) auto remoteIp = connectionInfo.remoteIp;
{ if (msg->type == ix::WebSocketMessageType::Open)
spdlog::info("New connection"); {
spdlog::info("id: {}", connectionState->getId()); spdlog::info("New connection");
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("remote ip: {}", remoteIp);
spdlog::info("Headers:"); spdlog::info("id: {}", connectionState->getId());
for (auto it : msg->openInfo.headers) spdlog::info("Uri: {}", msg->openInfo.uri);
{ spdlog::info("Headers:");
spdlog::info("{}: {}", it.first, it.second); for (auto it : msg->openInfo.headers)
} {
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings) if (greetings)
{ {
webSocket->sendText("Welcome !"); webSocket.sendText("Welcome !");
} }
} }
else if (msg->type == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
spdlog::info("Closed connection: client id {} code {} reason {}", spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(), connectionState->getId(),
msg->closeInfo.code, msg->closeInfo.code,
msg->closeInfo.reason); msg->closeInfo.reason);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
spdlog::error("Connection error: {}", msg->errorInfo.reason); spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries); spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
spdlog::info("Received {} bytes", msg->wireSize); spdlog::info("Received {} bytes", msg->wireSize);
webSocket->send(msg->str, msg->binary); webSocket.send(msg->str, msg->binary);
} }
});
}); });
auto res = server.listen(); auto res = server.listen();

View File

@ -1,173 +0,0 @@
/*
* ws_proxy_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
class ProxyConnectionState : public ix::ConnectionState
{
public:
ProxyConnectionState()
: _connected(false)
{
}
ix::WebSocket& webSocket()
{
return _serverWebSocket;
}
bool isConnected()
{
return _connected;
}
void setConnected()
{
_connected = true;
}
private:
ix::WebSocket _serverWebSocket;
bool _connected;
};
int ws_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
bool verbose)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
auto factory = []() -> std::shared_ptr<ix::ConnectionState> {
return std::make_shared<ProxyConnectionState>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback([remoteUrl,
verbose](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
// Server connection
state->webSocket().setOnMessageCallback([webSocket, state, verbose](
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection to remote server");
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed remote server connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->setTerminated();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes from server", msg->wireSize);
if (verbose)
{
spdlog::info("payload {}", msg->str);
}
webSocket->send(msg->str, msg->binary);
}
});
// Client connection
webSocket->setOnMessageCallback(
[state, remoteUrl, verbose](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection from client");
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{
spdlog::info("waiting for server connection establishment");
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
spdlog::info("server connection established");
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed client connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose)
{
spdlog::info("payload {}", msg->str);
}
state->webSocket().send(msg->str, msg->binary);
}
});
});
auto res = server.listen();
if (!res.first)
{
spdlog::info(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

108
ws/ws_push_server.cpp Normal file
View File

@ -0,0 +1,108 @@
/*
* ws_push_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong,
const std::string& sendMsg)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port,
hostname,
SocketServer::kDefaultTcpBacklog,
SocketServer::kDefaultMaxConnections,
WebSocketServer::kDefaultHandShakeTimeoutSecs,
(ipv6) ? AF_INET6 : AF_INET);
server.setTLSOptions(tlsOptions);
if (disablePerMessageDeflate)
{
spdlog::info("Disable per message deflate");
server.disablePerMessageDeflate();
}
if (disablePong)
{
spdlog::info("Disable responding to PING messages with PONG");
server.disablePong();
}
server.setOnClientMessageCallback(
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket.sendText("Welcome !");
}
bool binary = false;
while (true)
{
webSocket.send(sendMsg, binary);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
webSocket.send(msg->str, msg->binary);
}
});
auto res = server.listen();
if (!res.first)
{
spdlog::error(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -4,17 +4,15 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include "linenoise.hpp"
#include <iostream>
#include <ixredis/IXRedisClient.h> #include <ixredis/IXRedisClient.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
#include "linenoise.hpp"
namespace ix namespace ix
{ {
int ws_redis_cli_main(const std::string& hostname, int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password)
int port,
const std::string& password)
{ {
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
@ -71,9 +69,7 @@ namespace ix
{ {
if (response.first != RespType::String) if (response.first != RespType::String)
{ {
std::cout << "(" std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")"
<< redisClient.getRespTypeDescription(response.first)
<< ")"
<< " "; << " ";
} }

View File

@ -6,7 +6,6 @@
#include <fstream> #include <fstream>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>

View File

@ -45,7 +45,8 @@ namespace ix
bool verbose, bool verbose,
const std::string& appsConfigPath, const std::string& appsConfigPath,
const SocketTLSOptions& socketTLSOptions, const SocketTLSOptions& socketTLSOptions,
bool disablePong) bool disablePong,
const std::string& republishChannel)
{ {
snake::AppConfig appConfig; snake::AppConfig appConfig;
appConfig.port = port; appConfig.port = port;
@ -55,6 +56,7 @@ namespace ix
appConfig.redisPassword = redisPassword; appConfig.redisPassword = redisPassword;
appConfig.socketTLSOptions = socketTLSOptions; appConfig.socketTLSOptions = socketTLSOptions;
appConfig.disablePong = disablePong; appConfig.disablePong = disablePong;
appConfig.republishChannel = republishChannel;
// Parse config file // Parse config file
auto str = readAsString(appsConfigPath); auto str = readAsString(appsConfigPath);

View File

@ -19,13 +19,16 @@ namespace ix
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, server.setOnClientMessageCallback(
std::shared_ptr<ConnectionState> connectionState) { [&server](std::shared_ptr<ConnectionState> connectionState,
webSocket->setOnMessageCallback([webSocket, connectionState, &server]( ConnectionInfo& connectionInfo,
const WebSocketMessagePtr& msg) { WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("ws_transfer: New connection"); spdlog::info("ws_transfer: New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId()); spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:"); spdlog::info("Headers:");
@ -40,7 +43,7 @@ namespace ix
connectionState->getId(), connectionState->getId(),
msg->closeInfo.code, msg->closeInfo.code,
msg->closeInfo.reason); msg->closeInfo.reason);
auto remaining = server.getClients().erase(webSocket); auto remaining = server.getClients().size() - 1;
spdlog::info("ws_transfer: {} remaining clients", remaining); spdlog::info("ws_transfer: {} remaining clients", remaining);
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -62,7 +65,7 @@ namespace ix
size_t receivers = 0; size_t receivers = 0;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client.get() != &webSocket)
{ {
auto readyState = client->getReadyState(); auto readyState = client->getReadyState();
auto id = connectionState->getId(); auto id = connectionState->getId();
@ -116,7 +119,6 @@ namespace ix
} }
} }
}); });
});
auto res = server.listen(); auto res = server.listen();
if (!res.first) if (!res.first)