Compare commits

...

41 Commits

Author SHA1 Message Date
b8265bf7f2 (ws) -q option imply info log level, not warning log level 2020-08-11 15:44:06 -07:00
e7c4f0b171 add documentation for the websocket send callback and the send return type (fix #239) 2020-08-11 11:24:00 -07:00
12f36b61ff (websocket server) Handle programmer error when the server callback is not registered properly (fix #227) 2020-08-06 04:40:32 -07:00
b15c4189f5 add csharp/dotnet devnull client to measure througput with different runtimes 2020-08-05 13:59:26 -07:00
74d3278258 add python test file to benchmark how many messages can be received per second 2020-08-04 10:53:35 -07:00
831152b906 add a devnull like sample code using libwebsockets C library, to see how many messages per second a client library can receive (answer is about the same as IXWebSocket) 2020-08-02 19:26:19 -07:00
7c81a98632 Add a node.js benchmarking test program, to see how fast node can receive messages. 2020-08-02 14:21:11 -07:00
6e47c62c06 (ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235). 2020-08-02 12:41:34 -07:00
bcae7f326d (ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235) 2020-08-02 12:09:13 -07:00
d719c41e31 (ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235) 2020-08-02 11:53:21 -07:00
6f0307fb35 (build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent 2020-07-31 22:54:57 -07:00
2e3d625c1e (websocket client) onProgressCallback not called for short messages on a websocket (fix #233) 2020-07-29 17:47:33 -07:00
029289413c ws test shell script / add option so tune how large sent file will be 2020-07-29 17:46:37 -07:00
4d51098c86 (websocket client) heartbeat is not sent at the requested frequency (fix #232) 2020-07-29 11:24:42 -07:00
c2b05af022 can compile on macOS against jsoncpp installed from homebrew 2020-07-28 22:00:29 -07:00
e85f975ab0 compiler warning fixes 2020-07-28 21:46:26 -07:00
dc77d62a5d (ixcobra) CobraConnection: unsubscribe from all subscriptions when disconnecting 2020-07-28 10:32:18 -07:00
4f41f209a2 (socket utility) move ix::getFreePort to ixwebsocket library 2020-07-27 18:17:13 -07:00
5940e53d77 enable cobra tests which were disabled 2020-07-27 17:39:53 -07:00
22dffd5b7e WebSocket::close is re-entrant 2020-07-27 17:38:33 -07:00
af2f31045d snake server / join subscription background thread in the ConnectionState destructor + attach cobra message subscription id to the connection state instead of having it be a local reference that gets unbound 2020-07-27 17:35:03 -07:00
5daa59f9f3 minor makefile tweaks 2020-07-27 17:19:05 -07:00
2ea9d06a93 fix typo in unittest string description: ununexpected -> unsubscribed 2020-07-27 17:16:53 -07:00
847fc142d1 (ixwebsocket server) change legacy api with 2 nested callbacks, so that the first api takes a weak_ptr<WebSocket> as its first argument 2020-07-25 11:42:07 -07:00
0388459bd0 (ixwebsocket) add WebSocketProxyServer, from ws. Still need to make the interface better. 2020-07-25 11:26:06 -07:00
9a47ec1217 (ixsnake) uses an std::thread to handle redis subscriptions (2 unittest still failing) 2020-07-24 18:12:07 -07:00
45a40c8640 new Dockerfile to run locally the test on an Ubuntu 20.04 system 2020-07-24 14:46:09 -07:00
e34f1c30d6 (ws) port broadcast_server sub-command to the new server API 2020-07-24 14:35:07 -07:00
c14a4c0e3e formatting 2020-07-24 13:04:14 -07:00
b146e93a3a (unittest) port most unittests to the new server API 2020-07-24 12:49:36 -07:00
9957ec9724 (ws) port ws snake to the new server API 2020-07-24 12:33:17 -07:00
78a42f61bd add tool to ease making commits 2020-07-24 11:53:09 -07:00
e78019dad6 (ws) port ws transfer to the new server API 2020-07-24 11:52:16 -07:00
0f026c5da2 (websocket client) reset WebSocketTransport onClose callback in the WebSocket destructor 2020-07-24 10:03:29 -07:00
c26a2d5d39 (websocket server) reset client websocket callback when the connection is closed 2020-07-24 09:41:02 -07:00
2798886c0b (websocket server) add a new simpler API to handle client connections / that API does not trigger a memory leak while the previous one did 2020-07-23 19:29:41 -07:00
ffde283a4b (build) merge platform specific files which were used to have different implementations for setting a thread name into a single file, to make it easier to include every source files and build the ixwebsocket library (fix #226) 2020-07-17 11:58:06 -07:00
f7031d0d3e set thread name in only one file 2020-07-17 11:43:50 -07:00
595e6c57df IXSelectInterruptPipe.h included in cmake on windows but compiled out 2020-07-17 11:33:02 -07:00
87709c201e (socket server) bump default max connection count from 32 to 128 2020-07-10 17:11:11 -07:00
e70d83ace1 (snake) implement super simple stream sql expression support in snake server 2020-07-10 16:10:59 -07:00
75 changed files with 1822 additions and 787 deletions

View File

@ -10,12 +10,10 @@ jobs:
steps:
- uses: actions/checkout@v1
- uses: seanmiddleditch/gha-setup-vsdevenv@master
- run: |
vcpkg install zlib:x64-uwp
- run: |
mkdir build
cd build
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 ..
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 -DUSE_ZLIB=0 ..
- run: cmake --build build
#

View File

@ -10,10 +10,11 @@ jobs:
steps:
- uses: actions/checkout@v1
- uses: seanmiddleditch/gha-setup-vsdevenv@master
- run: |
vcpkg install zlib:x64-windows
- run: |
mkdir build
cd build
cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 -DUSE_ZLIB=0 ..
- run: cmake --build build
#- run: ../build/test/ixwebsocket_unittest.exe
# working-directory: test

View File

@ -30,12 +30,15 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXExponentialBackoff.cpp
ixwebsocket/IXGetFreePort.cpp
ixwebsocket/IXHttp.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXHttpServer.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSetThreadName.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
@ -51,6 +54,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketProxyServer.cpp
ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp
)
@ -62,6 +66,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXExponentialBackoff.h
ixwebsocket/IXGetFreePort.h
ixwebsocket/IXHttp.h
ixwebsocket/IXHttpClient.h
ixwebsocket/IXHttpServer.h
@ -69,6 +74,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h
@ -93,29 +99,13 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketProxyServer.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXWebSocketVersion.h
)
if (UNIX)
# Linux, Mac, iOS, Android
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.cpp )
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.h )
endif()
# Platform specific code
if (APPLE)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/apple/IXSetThreadName_apple.cpp)
elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
elseif (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/freebsd/IXSetThreadName_freebsd.cpp)
else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
endif()
option(USE_TLS "Enable TLS support" FALSE)
if (USE_TLS)
@ -200,10 +190,16 @@ if (USE_TLS)
endif()
endif()
# Use ZLIB_ROOT CMake variable if you need to use your own zlib
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
option(USE_ZLIB "Enable zlib support" TRUE)
if (USE_ZLIB)
# Use ZLIB_ROOT CMake variable if you need to use your own zlib
find_package(ZLIB REQUIRED)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_ZLIB)
endif()
if (WIN32)
target_link_libraries(ixwebsocket wsock32 ws2_32 shlwapi)

View File

@ -0,0 +1,23 @@
# Build time
FROM ubuntu:groovy as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++ libssl-dev libz-dev make python ninja-build
RUN apt-get -y install cmake
RUN apt-get -y install gdb
COPY . /opt
WORKDIR /opt
#
# To use the container interactively for debugging/building
# 1. Build with
# CMD ["ls"]
# 2. Run with
# docker run --entrypoint sh -it docker-game-eng-dev.addsrv.com/ws:9.10.6
#
RUN ["make", "test"]
# CMD ["ls"]

View File

@ -1,6 +1,95 @@
# Changelog
All changes to this project will be documented in this file.
## [10.1.7] - 2020-08-11
(ws) -q option imply info log level, not warning log level
## [10.1.6] - 2020-08-06
(websocket server) Handle programmer error when the server callback is not registered properly (fix #227)
## [10.1.5] - 2020-08-02
(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235).
## [10.1.4] - 2020-08-02
(ws) Add a new ws sub-command, echo_client. This command sends a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235)
## [10.1.3] - 2020-08-02
(ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235)
## [10.1.2] - 2020-07-31
(build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent
## [10.1.1] - 2020-07-29
(websocket client) onProgressCallback not called for short messages on a websocket (fix #233)
## [10.1.0] - 2020-07-29
(websocket client) heartbeat is not sent at the requested frequency (fix #232)
## [10.0.3] - 2020-07-28
compiler warning fixes
## [10.0.2] - 2020-07-28
(ixcobra) CobraConnection: unsubscribe from all subscriptions when disconnecting
## [10.0.1] - 2020-07-27
(socket utility) move ix::getFreePort to ixwebsocket library
## [10.0.0] - 2020-07-25
(ixwebsocket server) change legacy api with 2 nested callbacks, so that the first api takes a weak_ptr<WebSocket> as its first argument
## [9.10.7] - 2020-07-25
(ixwebsocket) add WebSocketProxyServer, from ws. Still need to make the interface better.
## [9.10.6] - 2020-07-24
(ws) port broadcast_server sub-command to the new server API
## [9.10.5] - 2020-07-24
(unittest) port most unittests to the new server API
## [9.10.3] - 2020-07-24
(ws) port ws transfer to the new server API
## [9.10.2] - 2020-07-24
(websocket client) reset WebSocketTransport onClose callback in the WebSocket destructor
## [9.10.1] - 2020-07-24
(websocket server) reset client websocket callback when the connection is closed
## [9.10.0] - 2020-07-23
(websocket server) add a new simpler API to handle client connections / that API does not trigger a memory leak while the previous one did
## [9.9.3] - 2020-07-17
(build) merge platform specific files which were used to have different implementations for setting a thread name into a single file, to make it easier to include every source files and build the ixwebsocket library (fix #226)
## [9.9.2] - 2020-07-10
(socket server) bump default max connection count from 32 to 128
## [9.9.1] - 2020-07-10
(snake) implement super simple stream sql expression support in snake server
## [9.9.0] - 2020-07-08
(socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made

View File

@ -17,6 +17,7 @@ There is a unittest which can be executed by typing `make test`.
Options for building:
* `-DUSE_ZLIB=1` will enable zlib support, required for http client + server + websocket per message deflate extension
* `-DUSE_TLS=1` will enable TLS support
* `-DUSE_OPEN_SSL=1` will use [openssl](https://www.openssl.org/) for the TLS support (default on Linux and Windows)
* `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support

37
docs/performance.md Normal file
View File

@ -0,0 +1,37 @@
## WebSocket Client performance
We will run a client and a server on the same machine, connecting to localhost. This bench is run on a MacBook Pro from 2015. We can receive over 200,000 (small) messages per second, another way to put it is that it takes 5 micro-second to receive and process one message. This is an indication about the minimal latency to receive messages.
### Receiving messages
By using the push_server ws sub-command, the server will send the same message in a loop to any connected client.
```
ws push_server -q --send_msg 'yo'
```
By using the echo_client ws sub-command, with the -m (mute or no_send), we will display statistics on how many messages we can receive per second.
```
$ ws echo_client -m ws://localhost:8008
[2020-08-02 12:31:17.284] [info] ws_echo_client: connected
[2020-08-02 12:31:17.284] [info] Uri: /
[2020-08-02 12:31:17.284] [info] Headers:
[2020-08-02 12:31:17.284] [info] Connection: Upgrade
[2020-08-02 12:31:17.284] [info] Sec-WebSocket-Accept: byy/pMK2d0PtRwExaaiOnXJTQHo=
[2020-08-02 12:31:17.284] [info] Server: ixwebsocket/10.1.4 macos ssl/SecureTransport zlib 1.2.11
[2020-08-02 12:31:17.284] [info] Upgrade: websocket
[2020-08-02 12:31:17.663] [info] messages received: 0 per second 2595307 total
[2020-08-02 12:31:18.668] [info] messages received: 79679 per second 2674986 total
[2020-08-02 12:31:19.668] [info] messages received: 207438 per second 2882424 total
[2020-08-02 12:31:20.673] [info] messages received: 209207 per second 3091631 total
[2020-08-02 12:31:21.676] [info] messages received: 216056 per second 3307687 total
[2020-08-02 12:31:22.680] [info] messages received: 214927 per second 3522614 total
[2020-08-02 12:31:23.684] [info] messages received: 216960 per second 3739574 total
[2020-08-02 12:31:24.688] [info] messages received: 215232 per second 3954806 total
[2020-08-02 12:31:25.691] [info] messages received: 212300 per second 4167106 total
[2020-08-02 12:31:26.694] [info] messages received: 212501 per second 4379607 total
[2020-08-02 12:31:27.699] [info] messages received: 212330 per second 4591937 total
[2020-08-02 12:31:28.702] [info] messages received: 216511 per second 4808448 total
```

View File

@ -67,9 +67,28 @@ webSocket.stop()
### Sending messages
`websocket.send("foo")` will send a message.
`WebSocketSendInfo result = websocket.send("foo")` will send a message.
If the connection was closed and sending failed, the return value will be set to false.
If the connection was closed, sending will fail, and the success field of the result object will be set to false. There could also be a compression error in which case the compressError field will be set to true. The payloadSize field and wireSize fields will tell you respectively how much bytes the message weight, and how many bytes were sent on the wire (potentially compressed + counting the message header (a few bytes).
There is an optional progress callback that can be passed in as the second argument. If a message is large it will be fragmented into chunks which will be sent independantly. Everytime the we can write a fragment into the OS network cache, the callback will be invoked. If a user wants to cancel a slow send, false should be returned from within the callback.
Here is an example code snippet copied from the ws send sub-command. Each fragment weights 32K, so the total integer is the wireSize divided by 32K. As an example if you are sending 32M of data, uncompressed, total will be 1000. current will be set to 0 for the first fragment, then 1, 2 etc...
```
auto result =
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
if (throttle)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
return _connected;
});
```
### ReadyState
@ -246,6 +265,10 @@ uint32_t m = webSocket.getMaxWaitBetweenReconnectionRetries();
## WebSocket server API
### Legacy api
This api was actually changed to take a weak_ptr<WebSocket> as the first argument to setOnConnectionCallback ; previously it would take a shared_ptr<WebSocket> which was creating cycles and then memory leaks problems.
```cpp
#include <ixwebsocket/IXWebSocketServer.h>
@ -256,41 +279,49 @@ uint32_t m = webSocket.getMaxWaitBetweenReconnectionRetries();
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<WebSocket> webSocket,
[&server](std::weak_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
{
if (msg->type == ix::WebSocketMessageType::Open)
auto ws = webSocket.lock();
if (ws)
{
ws->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
{
std::cout << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cout << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cout << it.first << ": " << it.second << std::endl;
std::cout << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cout << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
auto ws = webSocket.lock();
if (ws)
{
ws->send(msg->str, msg->binary);
}
}
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket->send(msg->str, msg->binary);
}
}
);
@ -312,6 +343,74 @@ server.wait();
```
### New api
The new API does not require to use 2 nested callbacks, which is a bit annoying. The real fix is that there was a memory leak due to a shared_ptr cycle, due to passing down a shared_ptr<WebSocket> down to the callbacks.
The webSocket reference is guaranteed to be always valid ; by design the callback will never be invoked with a null webSocket object.
```cpp
#include <ixwebsocket/IXWebSocketServer.h>
...
// Run a server on localhost at a given port.
// Bound host name, max connections and listen backlog can also be passed in as parameters.
ix::WebSocketServer server(port);
server.setOnClientMessageCallback(std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg)
{
// The ConnectionInfo object contains information about the connection,
// at this point only the client ip address and the port.
std::cout << "Remote ip: " << connectionInfo.remoteIp << std::endl;
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cout << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cout << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket.send(msg->str, msg->binary);
}
);
auto res = server.listen();
if (!res.first)
{
// Error handling
return 1;
}
// Run the server in the background. Server can be stoped by calling server.stop()
server.start();
// Block until server.stop() is called.
server.wait();
```
## HTTP client API
```cpp

View File

@ -111,6 +111,12 @@ namespace ix
void CobraConnection::disconnect()
{
auto subscriptionIds = getSubscriptionsIds();
for (auto&& subscriptionId : subscriptionIds)
{
unsubscribe(subscriptionId);
}
_authenticated = false;
_webSocket->stop();
}
@ -614,6 +620,18 @@ namespace ix
_webSocket->send(pdu.toStyledString());
}
std::vector<std::string> CobraConnection::getSubscriptionsIds()
{
std::vector<std::string> subscriptionIds;
std::lock_guard<std::mutex> lock(_cbsMutex);
for (auto&& it : _cbs)
{
subscriptionIds.push_back(it.first);
}
return subscriptionIds;
}
//
// Enqueue strategy drops old messages when we are at full capacity
//

View File

@ -163,6 +163,9 @@ namespace ix
/// Tells whether the internal queue is empty or not
bool isQueueEmpty();
/// Retrieve all subscriptions ids
std::vector<std::string> getSubscriptionsIds();
///
/// Member variables
///

View File

@ -26,6 +26,12 @@ namespace snake
}
auto roles = appConfig.apps[appkey]["roles"];
if (roles.count(role) == 0)
{
std::cerr << "Missing role " << role << std::endl;
return std::string();
}
auto channel = roles[role]["secret"];
return channel;
}

View File

@ -7,15 +7,21 @@
#pragma once
#include <ixredis/IXRedisClient.h>
#include <future>
#include <thread>
#include <ixwebsocket/IXConnectionState.h>
#include <string>
#include "IXStreamSql.h"
namespace snake
{
class SnakeConnectionState : public ix::ConnectionState
{
public:
virtual ~SnakeConnectionState()
{
stopSubScriptionThread();
}
std::string getNonce()
{
return _nonce;
@ -51,7 +57,24 @@ namespace snake
return _redisClient;
}
std::future<void> fut;
void stopSubScriptionThread()
{
if (subscriptionThread.joinable())
{
subscriptionRedisClient.stop();
subscriptionThread.join();
}
}
// We could make those accessible through methods
std::thread subscriptionThread;
std::string appChannel;
std::string subscriptionId;
uint64_t id;
std::unique_ptr<StreamSql> streamSql;
ix::RedisClient subscriptionRedisClient;
ix::RedisClient::OnRedisSubscribeResponseCallback onRedisSubscribeResponseCallback;
ix::RedisClient::OnRedisSubscribeCallback onRedisSubscribeCallback;
private:
std::string _nonce;

View File

@ -8,7 +8,6 @@
#include "IXAppConfig.h"
#include "IXSnakeConnectionState.h"
#include "IXStreamSql.h"
#include "nlohmann/json.hpp"
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
@ -19,21 +18,22 @@
namespace snake
{
void handleError(const std::string& action,
std::shared_ptr<ix::WebSocket> ws,
nlohmann::json pdu,
ix::WebSocket& ws,
uint64_t pduId,
const std::string& errMsg)
{
std::string actionError(action);
actionError += "/error";
nlohmann::json response = {
{"action", actionError}, {"id", pdu.value("id", 1)}, {"body", {{"reason", errMsg}}}};
ws->sendText(response.dump());
{"action", actionError}, {"id", pduId}, {"body", {{"reason", errMsg}}}};
ws.sendText(response.dump());
}
void handleHandshake(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const nlohmann::json& pdu)
ix::WebSocket& ws,
const nlohmann::json& pdu,
uint64_t pduId)
{
std::string role = pdu["body"]["data"]["role"];
@ -42,7 +42,7 @@ namespace snake
nlohmann::json response = {
{"action", "auth/handshake/ok"},
{"id", pdu.value("id", 1)},
{"id", pduId},
{"body",
{
{"data", {{"nonce", state->getNonce()}, {"connection_id", state->getId()}}},
@ -50,13 +50,14 @@ namespace snake
auto serializedResponse = response.dump();
ws->sendText(serializedResponse);
ws.sendText(serializedResponse);
}
void handleAuth(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
const nlohmann::json& pdu,
uint64_t pduId)
{
auto secret = getRoleSecret(appConfig, state->appkey(), state->role());
@ -64,9 +65,9 @@ namespace snake
{
nlohmann::json response = {
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"id", pduId},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
return;
}
@ -80,20 +81,21 @@ namespace snake
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid hash"}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
return;
}
nlohmann::json response = {
{"action", "auth/authenticate/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
}
void handlePublish(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
const nlohmann::json& pdu,
uint64_t pduId)
{
std::vector<std::string> channels;
@ -113,7 +115,7 @@ namespace snake
{
std::stringstream ss;
ss << "Missing channels or channel field in publish data";
handleError("rtm/publish", ws, pdu, ss.str());
handleError("rtm/publish", ws, pduId, ss.str());
return;
}
@ -133,7 +135,7 @@ namespace snake
{
std::stringstream ss;
ss << "Cannot publish to redis host " << errMsg;
handleError("rtm/publish", ws, pdu, ss.str());
handleError("rtm/publish", ws, pduId, ss.str());
return;
}
}
@ -141,26 +143,27 @@ namespace snake
nlohmann::json response = {
{"action", "rtm/publish/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
}
//
// FIXME: this is not cancellable. We should be able to cancel the redis subscription
//
void handleRedisSubscription(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
void handleSubscribe(std::shared_ptr<SnakeConnectionState> state,
ix::WebSocket& ws,
const AppConfig& appConfig,
const nlohmann::json& pdu,
uint64_t pduId)
{
std::string channel = pdu["body"]["channel"];
std::string subscriptionId = channel;
state->subscriptionId = channel;
std::stringstream ss;
ss << state->appkey() << "::" << channel;
std::string appChannel(ss.str());
state->appChannel = ss.str();
ix::RedisClient redisClient;
ix::RedisClient& redisClient = state->subscriptionRedisClient;
int port = appConfig.redisPort;
auto urls = appConfig.redisHosts;
@ -171,7 +174,7 @@ namespace snake
{
std::stringstream ss;
ss << "Cannot connect to redis host " << hostname << ":" << port;
handleError("rtm/subscribe", ws, pdu, ss.str());
handleError("rtm/subscribe", ws, pduId, ss.str());
return;
}
@ -183,7 +186,7 @@ namespace snake
{
std::stringstream ss;
ss << "Cannot authenticated to redis";
handleError("rtm/subscribe", ws, pdu, ss.str());
handleError("rtm/subscribe", ws, pduId, ss.str());
return;
}
}
@ -193,83 +196,80 @@ namespace snake
{
std::string filterStr = pdu["body"]["filter"];
}
std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr);
int id = 0;
auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) {
state->streamSql = std::make_unique<StreamSql>(filterStr);
state->id = 0;
state->onRedisSubscribeCallback = [&ws, state](const std::string& messageStr) {
auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
if (streamSql->valid() && !streamSql->match(msg))
if (state->streamSql->valid() && !state->streamSql->match(msg))
{
return;
}
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
{"id", state->id++},
{"body",
{{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
{{"subscription_id", state->subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
};
auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse) {
state->onRedisSubscribeResponseCallback = [&ws, state, pduId](const std::string& redisResponse) {
std::stringstream ss;
ss << "Redis Response: " << redisResponse << "...";
ix::CoreLogger::log(ss.str().c_str());
// Success
nlohmann::json response = {{"action", "rtm/subscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {{"subscription_id", subscriptionId}}}};
ws->sendText(response.dump());
{"id", pduId},
{"body", {{"subscription_id", state->subscriptionId}}}};
ws.sendText(response.dump());
};
{
std::stringstream ss;
ss << "Subscribing to " << appChannel << "...";
ss << "Subscribing to " << state->appChannel << "...";
ix::CoreLogger::log(ss.str().c_str());
}
if (!redisClient.subscribe(appChannel, responseCallback, callback))
auto subscription = [&redisClient, state, &ws, pduId]
{
std::stringstream ss;
ss << "Error subscribing to channel " << appChannel;
handleError("rtm/subscribe", ws, pdu, ss.str());
return;
}
}
if (!redisClient.subscribe(state->appChannel,
state->onRedisSubscribeResponseCallback,
state->onRedisSubscribeCallback))
{
std::stringstream ss;
ss << "Error subscribing to channel " << state->appChannel;
handleError("rtm/subscribe", ws, pduId, ss.str());
return;
}
};
void handleSubscribe(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
state->fut =
std::async(std::launch::async, handleRedisSubscription, state, ws, appConfig, pdu);
state->subscriptionThread = std::thread(subscription);
}
void handleUnSubscribe(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const nlohmann::json& pdu)
ix::WebSocket& ws,
const nlohmann::json& pdu,
uint64_t pduId)
{
// extract subscription_id
auto body = pdu["body"];
auto subscriptionId = body["subscription_id"];
state->redisClient().stop();
state->stopSubScriptionThread();
nlohmann::json response = {{"action", "rtm/unsubscribe/ok"},
{"id", pdu.value("id", 1)},
{"id", pduId},
{"body", {{"subscription_id", subscriptionId}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
}
void processCobraMessage(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const std::string& str)
{
@ -284,31 +284,32 @@ namespace snake
ss << "malformed json pdu: " << e.what() << " -> " << str << "";
nlohmann::json response = {{"body", {{"error", "invalid_json"}, {"reason", ss.str()}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
return;
}
auto action = pdu["action"];
uint64_t pduId = pdu.value("id", 1);
if (action == "auth/handshake")
{
handleHandshake(state, ws, pdu);
handleHandshake(state, ws, pdu, pduId);
}
else if (action == "auth/authenticate")
{
handleAuth(state, ws, appConfig, pdu);
handleAuth(state, ws, appConfig, pdu, pduId);
}
else if (action == "rtm/publish")
{
handlePublish(state, ws, appConfig, pdu);
handlePublish(state, ws, appConfig, pdu, pduId);
}
else if (action == "rtm/subscribe")
{
handleSubscribe(state, ws, appConfig, pdu);
handleSubscribe(state, ws, appConfig, pdu, pduId);
}
else if (action == "rtm/unsubscribe")
{
handleUnSubscribe(state, ws, pdu);
handleUnSubscribe(state, ws, pdu, pduId);
}
else
{

View File

@ -20,7 +20,7 @@ namespace snake
struct AppConfig;
void processCobraMessage(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const std::string& str);
} // namespace snake

View File

@ -59,68 +59,68 @@ namespace snake
};
_server.setConnectionStateFactory(factory);
_server.setOnConnectionCallback(
[this](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ix::ConnectionState> connectionState,
std::unique_ptr<ix::ConnectionInfo> connectionInfo) {
_server.setOnClientMessageCallback(
[this](std::shared_ptr<ix::ConnectionState> connectionState,
ix::ConnectionInfo& connectionInfo,
ix::WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
auto remoteIp = connectionInfo.remoteIp;
webSocket->setOnMessageCallback(
[this, webSocket, state, remoteIp](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
ix::LogLevel logLevel = ix::LogLevel::Debug;
if (msg->type == ix::WebSocketMessageType::Open)
{
ss << "New connection" << std::endl;
ss << "remote ip: " << remoteIp << std::endl;
ss << "id: " << state->getId() << std::endl;
ss << "Uri: " << msg->openInfo.uri << std::endl;
ss << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
ss << it.first << ": " << it.second << std::endl;
}
std::stringstream ss;
ss << "[" << state->getId() << "] ";
std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey);
ix::LogLevel logLevel = ix::LogLevel::Debug;
if (msg->type == ix::WebSocketMessageType::Open)
{
ss << "New connection" << std::endl;
ss << "remote ip: " << remoteIp << std::endl;
ss << "id: " << state->getId() << std::endl;
ss << "Uri: " << msg->openInfo.uri << std::endl;
ss << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
ss << it.first << ": " << it.second << std::endl;
}
// Connect to redis first
if (!state->redisClient().connect(_appConfig.redisHosts[0],
_appConfig.redisPort))
{
ss << "Cannot connect to redis host" << std::endl;
logLevel = ix::LogLevel::Error;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "Closed connection"
<< " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
logLevel = ix::LogLevel::Error;
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
ss << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, msg->str);
}
std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey);
ix::CoreLogger::log(ss.str().c_str(), logLevel);
});
});
// Connect to redis first
if (!state->redisClient().connect(_appConfig.redisHosts[0],
_appConfig.redisPort))
{
ss << "Cannot connect to redis host" << std::endl;
logLevel = ix::LogLevel::Error;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "Closed connection"
<< " code " << msg->closeInfo.code << " reason "
<< msg->closeInfo.reason << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
logLevel = ix::LogLevel::Error;
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
ss << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "Received " << msg->wireSize << " bytes" << " " << msg->str << std::endl;
processCobraMessage(state, webSocket, _appConfig, msg->str);
}
ix::CoreLogger::log(ss.str().c_str(), logLevel);
});
auto res = _server.listen();
if (!res.first)

View File

@ -16,7 +16,10 @@
#include <random>
#include <sstream>
#include <vector>
#ifdef IXWEBSOCKET_USE_ZLIB
#include <zlib.h>
#endif
namespace ix
{
@ -174,11 +177,13 @@ namespace ix
ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
#ifdef IXWEBSOCKET_USE_ZLIB
if (args->compress)
{
ss << "Accept-Encoding: gzip"
<< "\r\n";
}
#endif
// Append extra headers
for (auto&& it : args->extraHeaders)
@ -495,6 +500,7 @@ namespace ix
downloadSize = payload.size();
#ifdef IXWEBSOCKET_USE_ZLIB
// If the content was compressed with gzip, decode it
if (headers["Content-Encoding"] == "gzip")
{
@ -513,6 +519,7 @@ namespace ix
}
payload = decompressedPayload;
}
#endif
return std::make_shared<HttpResponse>(code,
description,
@ -672,6 +679,7 @@ namespace ix
return ss.str();
}
#ifdef IXWEBSOCKET_USE_ZLIB
bool HttpClient::gzipInflate(const std::string& in, std::string& out)
{
z_stream inflateState;
@ -716,6 +724,7 @@ namespace ix
inflateEnd(&inflateState);
return true;
}
#endif
void HttpClient::log(const std::string& msg, HttpRequestArgsPtr args)
{

View File

@ -90,7 +90,9 @@ namespace ix
private:
void log(const std::string& msg, HttpRequestArgsPtr args);
#ifdef IXWEBSOCKET_USE_ZLIB
bool gzipInflate(const std::string& in, std::string& out);
#endif
// Async API background thread runner
void run();

View File

@ -13,7 +13,10 @@
#include <fstream>
#include <sstream>
#include <vector>
#ifdef IXWEBSOCKET_USE_ZLIB
#include <zlib.h>
#endif
namespace
{
@ -41,6 +44,7 @@ namespace
return std::make_pair(res.first, std::string(vec.begin(), vec.end()));
}
#ifdef IXWEBSOCKET_USE_ZLIB
std::string gzipCompress(const std::string& str)
{
z_stream zs; // z_stream is zlib's control structure
@ -83,6 +87,7 @@ namespace
return outstring;
}
#endif
} // namespace
namespace ix
@ -125,9 +130,8 @@ namespace ix
if (std::get<0>(ret))
{
auto response = _onConnectionCallback(std::get<2>(ret),
connectionState,
std::move(connectionInfo));
auto response =
_onConnectionCallback(std::get<2>(ret), connectionState, std::move(connectionInfo));
if (!Http::sendResponse(response, socket))
{
logError("Cannot send response");
@ -169,12 +173,14 @@ namespace ix
std::string content = res.second;
#ifdef IXWEBSOCKET_USE_ZLIB
std::string acceptEncoding = request->headers["Accept-encoding"];
if (acceptEncoding == "*" || acceptEncoding.find("gzip") != std::string::npos)
{
content = gzipCompress(content);
headers["Content-Encoding"] = "gzip";
}
#endif
// Log request
std::stringstream ss;
@ -203,10 +209,9 @@ namespace ix
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections
//
setOnConnectionCallback(
[this,
redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
[this, redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();

View File

@ -5,8 +5,10 @@
*/
//
// On macOS we use UNIX pipes to wake up select.
// On UNIX we use pipes to wake up select. There is no way to do that
// on Windows so this file is compiled out on Windows.
//
#ifndef _WIN32
#include "IXSelectInterruptPipe.h"
@ -144,3 +146,5 @@ namespace ix
return _fildes[kPipeReadIndex];
}
} // namespace ix
#endif // !_WIN32

View File

@ -0,0 +1,81 @@
/*
* IXSetThreadName.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXSetThreadName.h"
// unix systems
#if defined(__APPLE__) || defined(__linux__) || defined(BSD)
#include <pthread.h>
#endif
// freebsd needs this header as well
#if defined(BSD)
#include <pthread_np.h>
#endif
// Windows
#ifdef _WIN32
#include <Windows.h>
#endif
namespace ix
{
#ifdef _WIN32
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void SetThreadName(DWORD dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(
MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
#endif
void setThreadName(const std::string& name)
{
#if defined(__APPLE__)
//
// Apple reserves 16 bytes for its thread names
// Notice that the Apple version of pthread_setname_np
// does not take a pthread_t argument
//
pthread_setname_np(name.substr(0, 63).c_str());
#elif defined(__linux__)
//
// Linux only reserves 16 bytes for its thread names
// See prctl and PR_SET_NAME property in
// http://man7.org/linux/man-pages/man2/prctl.2.html
//
pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
#elif defined(_WIN32)
SetThreadName(-1, name.c_str());
#elif defined(BSD)
pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str());
#else
// ... assert here ?
#endif
}
} // namespace ix

View File

@ -22,7 +22,7 @@ namespace ix
const int SocketServer::kDefaultPort(8080);
const std::string SocketServer::kDefaultHost("127.0.0.1");
const int SocketServer::kDefaultTcpBacklog(5);
const size_t SocketServer::kDefaultMaxConnections(32);
const size_t SocketServer::kDefaultMaxConnections(128);
const int SocketServer::kDefaultAddressFamily(AF_INET);
SocketServer::SocketServer(
@ -379,10 +379,13 @@ namespace ix
// Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::thread(
&SocketServer::handleConnection, this, std::move(socket), connectionState, std::move(connectionInfo))));
_connectionsThreads.push_back(
std::make_pair(connectionState,
std::thread(&SocketServer::handleConnection,
this,
std::move(socket),
connectionState,
std::move(connectionInfo))));
}
}

View File

@ -8,7 +8,9 @@
#include "IXWebSocketVersion.h"
#include <sstream>
#ifdef IXWEBSOCKET_USE_ZLIB
#include <zlib.h>
#endif
// Platform name
#if defined(_WIN32)
@ -77,8 +79,10 @@ namespace ix
ss << " nossl";
#endif
#ifdef IXWEBSOCKET_USE_ZLIB
// Zlib version
ss << " zlib " << ZLIB_VERSION;
#endif
return ss.str();
}

View File

@ -46,6 +46,7 @@ namespace ix
WebSocket::~WebSocket()
{
stop();
_ws.setOnCloseCallback(nullptr);
}
void WebSocket::setUrl(const std::string& url)
@ -404,6 +405,11 @@ namespace ix
_onMessageCallback = callback;
}
bool WebSocket::isOnMessageCallbackRegistered() const
{
return _onMessageCallback != nullptr;
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
{
_onTrafficTrackerCallback = callback;

View File

@ -84,6 +84,7 @@ namespace ix
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback);
bool isOnMessageCallbackRegistered() const;
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback();

View File

@ -28,21 +28,26 @@ namespace ix
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
: _compressBufferSize(kBufferSize)
{
#ifdef IXWEBSOCKET_USE_ZLIB
memset(&_deflateState, 0, sizeof(_deflateState));
_deflateState.zalloc = Z_NULL;
_deflateState.zfree = Z_NULL;
_deflateState.opaque = Z_NULL;
#endif
}
WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor()
{
#ifdef IXWEBSOCKET_USE_ZLIB
deflateEnd(&_deflateState);
#endif
}
bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits,
bool clientNoContextTakeOver)
{
#ifdef IXWEBSOCKET_USE_ZLIB
int ret = deflateInit2(&_deflateState,
Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
@ -57,6 +62,9 @@ namespace ix
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
return true;
#else
return false;
#endif
}
template<typename T>
@ -96,6 +104,7 @@ namespace ix
template<typename T, typename S>
bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
{
#ifdef IXWEBSOCKET_USE_ZLIB
//
// 7.2.1. Compression
//
@ -152,6 +161,9 @@ namespace ix
}
return true;
#else
return false;
#endif
}
//
@ -160,6 +172,7 @@ namespace ix
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
: _compressBufferSize(kBufferSize)
{
#ifdef IXWEBSOCKET_USE_ZLIB
memset(&_inflateState, 0, sizeof(_inflateState));
_inflateState.zalloc = Z_NULL;
@ -167,16 +180,20 @@ namespace ix
_inflateState.opaque = Z_NULL;
_inflateState.avail_in = 0;
_inflateState.next_in = Z_NULL;
#endif
}
WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor()
{
#ifdef IXWEBSOCKET_USE_ZLIB
inflateEnd(&_inflateState);
#endif
}
bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits,
bool clientNoContextTakeOver)
{
#ifdef IXWEBSOCKET_USE_ZLIB
int ret = inflateInit2(&_inflateState, -1 * inflateBits);
if (ret != Z_OK) return false;
@ -186,10 +203,14 @@ namespace ix
_flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH;
return true;
#else
return false;
#endif
}
bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in, std::string& out)
{
#ifdef IXWEBSOCKET_USE_ZLIB
//
// 7.2.2. Decompression
//
@ -226,5 +247,8 @@ namespace ix
} while (_inflateState.avail_out == 0);
return true;
#else
return false;
#endif
}
} // namespace ix

View File

@ -6,7 +6,9 @@
#pragma once
#ifdef IXWEBSOCKET_USE_ZLIB
#include "zlib.h"
#endif
#include <memory>
#include <string>
#include <vector>
@ -34,7 +36,10 @@ namespace ix
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
#ifdef IXWEBSOCKET_USE_ZLIB
z_stream _deflateState;
#endif
};
class WebSocketPerMessageDeflateDecompressor
@ -50,7 +55,10 @@ namespace ix
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
#ifdef IXWEBSOCKET_USE_ZLIB
z_stream _inflateState;
#endif
};
} // namespace ix

View File

@ -61,6 +61,7 @@ namespace ix
_clientMaxWindowBits = kDefaultClientMaxWindowBits;
_serverMaxWindowBits = kDefaultServerMaxWindowBits;
#ifdef IXWEBSOCKET_USE_ZLIB
// Split by ;
std::string token;
std::stringstream tokenStream(extension);
@ -112,6 +113,7 @@ namespace ix
sanitizeClientMaxWindowBits();
}
}
#endif
}
void WebSocketPerMessageDeflateOptions::sanitizeClientMaxWindowBits()
@ -126,6 +128,7 @@ namespace ix
std::string WebSocketPerMessageDeflateOptions::generateHeader()
{
#ifdef IXWEBSOCKET_USE_ZLIB
std::stringstream ss;
ss << "Sec-WebSocket-Extensions: permessage-deflate";
@ -138,11 +141,18 @@ namespace ix
ss << "\r\n";
return ss.str();
#else
return std::string();
#endif
}
bool WebSocketPerMessageDeflateOptions::enabled() const
{
#ifdef IXWEBSOCKET_USE_ZLIB
return _enabled;
#else
return false;
#endif
}
bool WebSocketPerMessageDeflateOptions::getClientNoContextTakeover() const

View File

@ -0,0 +1,123 @@
/*
* IXWebSocketProxyServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketProxyServer.h"
#include "IXWebSocketServer.h"
#include <sstream>
namespace ix
{
class ProxyConnectionState : public ix::ConnectionState
{
public:
ProxyConnectionState()
: _connected(false)
{
}
ix::WebSocket& webSocket()
{
return _serverWebSocket;
}
bool isConnected()
{
return _connected;
}
void setConnected()
{
_connected = true;
}
private:
ix::WebSocket _serverWebSocket;
bool _connected;
};
int websocket_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
bool /*verbose*/)
{
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
auto factory = []() -> std::shared_ptr<ix::ConnectionState> {
return std::make_shared<ProxyConnectionState>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback([remoteUrl](std::weak_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
// Server connection
state->webSocket().setOnMessageCallback(
[webSocket, state, remoteIp](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Close)
{
state->setTerminated();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto ws = webSocket.lock();
if (ws)
{
ws->send(msg->str, msg->binary);
}
}
});
// Client connection
auto ws = webSocket.lock();
if (ws)
{
ws->setOnMessageCallback([state, remoteUrl](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
state->webSocket().send(msg->str, msg->binary);
}
});
}
});
auto res = server.listen();
if (!res.first)
{
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -0,0 +1,20 @@
/*
* IXWebSocketProxyServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSocketTLSOptions.h"
#include <cstdint>
#include <stddef.h>
#include <string>
namespace ix
{
int websocket_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
bool verbose);
} // namespace ix

View File

@ -71,6 +71,11 @@ namespace ix
_onConnectionCallback = callback;
}
void WebSocketServer::setOnClientMessageCallback(const OnClientMessageCallback& callback)
{
_onClientMessageCallback = callback;
}
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
@ -78,7 +83,35 @@ namespace ix
setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
if (_onConnectionCallback)
{
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
if (!webSocket->isOnMessageCallbackRegistered())
{
logError("WebSocketServer Application developer error: Server callback improperly "
"registerered.");
logError("Missing call to setOnMessageCallback inside setOnConnectionCallback.");
connectionState->setTerminated();
return;
}
}
else if (_onClientMessageCallback)
{
webSocket->setOnMessageCallback(
[this, &ws = *webSocket.get(), connectionState, &ci = *connectionInfo.get()](
const WebSocketMessagePtr& msg) {
_onClientMessageCallback(connectionState, ci, ws, msg);
});
}
else
{
logError(
"WebSocketServer Application developer error: No server callback is registerered.");
logError("Missing call to setOnConnectionCallback or setOnClientMessageCallback.");
connectionState->setTerminated();
return;
}
webSocket->disableAutomaticReconnection();
@ -112,6 +145,8 @@ namespace ix
logError(ss.str());
}
webSocket->setOnMessageCallback(nullptr);
// Remove this client from our client set
{
std::lock_guard<std::mutex> lock(_clientsMutex);

View File

@ -23,9 +23,15 @@ namespace ix
{
public:
using OnConnectionCallback =
std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>,
std::function<void(std::weak_ptr<WebSocket>,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
using OnClientMessageCallback = std::function<void(std::shared_ptr<ConnectionState>,
ConnectionInfo&,
WebSocket&,
const WebSocketMessagePtr&)>;
WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
@ -40,6 +46,7 @@ namespace ix
void disablePerMessageDeflate();
void setOnConnectionCallback(const OnConnectionCallback& callback);
void setOnClientMessageCallback(const OnClientMessageCallback& callback);
// Get all the connected clients
std::set<std::shared_ptr<WebSocket>> getClients();
@ -53,6 +60,7 @@ namespace ix
bool _enablePerMessageDeflate;
OnConnectionCallback _onConnectionCallback;
OnClientMessageCallback _onClientMessageCallback;
std::mutex _clientsMutex;
std::set<std::shared_ptr<WebSocket>> _clients;

View File

@ -65,7 +65,6 @@ namespace ix
, _receivedMessageCompressed(false)
, _readyState(ReadyState::CLOSED)
, _closeCode(WebSocketCloseConstants::kInternalErrorCode)
, _closeReason(WebSocketCloseConstants::kInternalErrorMessage)
, _closeWireSize(0)
, _closeRemote(false)
, _enablePerMessageDeflate(false)
@ -77,6 +76,7 @@ namespace ix
, _pingCount(0)
, _lastSendPingTimePoint(std::chrono::steady_clock::now())
{
setCloseReason(WebSocketCloseConstants::kInternalErrorMessage);
_readbuf.resize(kChunkSize);
}
@ -179,10 +179,12 @@ namespace ix
if (readyState == ReadyState::CLOSED)
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_onCloseCallback(_closeCode, _closeReason, _closeWireSize, _closeRemote);
if (_onCloseCallback)
{
_onCloseCallback(_closeCode, getCloseReason(), _closeWireSize, _closeRemote);
}
setCloseReason(WebSocketCloseConstants::kInternalErrorMessage);
_closeCode = WebSocketCloseConstants::kInternalErrorCode;
_closeReason = WebSocketCloseConstants::kInternalErrorMessage;
_closeWireSize = 0;
_closeRemote = false;
}
@ -261,9 +263,10 @@ namespace ix
{
// compute lasting delay to wait for next ping / timeout, if at least one set
auto now = std::chrono::steady_clock::now();
lastingTimeoutDelayInMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>(
int timeSinceLastPingMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>(
now - _lastSendPingTimePoint)
.count();
lastingTimeoutDelayInMs = (1000 * _pingIntervalSecs) - timeSinceLastPingMs;
}
#ifdef _WIN32
@ -639,11 +642,7 @@ namespace ix
{
// we got the CLOSE frame answer from our close, so we can close the connection
// if the code/reason are the same
bool identicalReason;
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
identicalReason = _closeCode == code && _closeReason == reason;
}
bool identicalReason = _closeCode == code && getCloseReason() == reason;
if (identicalReason)
{
@ -797,6 +796,11 @@ namespace ix
if (wireSize < kChunkSize)
{
success = sendFragment(type, true, message_begin, message_end, compress);
if (onProgressCallback)
{
onProgressCallback(0, 1);
}
}
else
{
@ -1081,13 +1085,10 @@ namespace ix
{
closeSocket();
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code;
_closeReason = reason;
_closeWireSize = closeWireSize;
_closeRemote = remote;
}
setCloseReason(reason);
_closeCode = code;
_closeWireSize = closeWireSize;
_closeRemote = remote;
setReadyState(ReadyState::CLOSED);
_requestInitCancellation = false;
@ -1107,13 +1108,11 @@ namespace ix
closeWireSize = reason.size();
}
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code;
_closeReason = reason;
_closeWireSize = closeWireSize;
_closeRemote = remote;
}
setCloseReason(reason);
_closeCode = code;
_closeWireSize = closeWireSize;
_closeRemote = remote;
{
std::lock_guard<std::mutex> lock(_closingTimePointMutex);
_closingTimePoint = std::chrono::steady_clock::now();
@ -1158,4 +1157,15 @@ namespace ix
return true;
}
void WebSocketTransport::setCloseReason(const std::string& reason)
{
std::lock_guard<std::mutex> lock(_closeReasonMutex);
_closeReason = reason;
}
const std::string& WebSocketTransport::getCloseReason() const
{
std::lock_guard<std::mutex> lock(_closeReasonMutex);
return _closeReason;
}
} // namespace ix

View File

@ -178,11 +178,11 @@ namespace ix
std::atomic<ReadyState> _readyState;
OnCloseCallback _onCloseCallback;
uint16_t _closeCode;
std::string _closeReason;
size_t _closeWireSize;
bool _closeRemote;
mutable std::mutex _closeDataMutex;
mutable std::mutex _closeReasonMutex;
std::atomic<uint16_t> _closeCode;
std::atomic<size_t> _closeWireSize;
std::atomic<bool> _closeRemote;
// Data used for Per Message Deflate compression (with zlib)
WebSocketPerMessageDeflatePtr _perMessageDeflate;
@ -267,5 +267,8 @@ namespace ix
void unmaskReceiveBuffer(const wsheader_type& ws);
std::string getMergedChunks() const;
void setCloseReason(const std::string& reason);
const std::string& getCloseReason() const;
};
} // namespace ix

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "9.9.0"
#define IX_WEBSOCKET_VERSION "10.1.7"

View File

@ -1,20 +0,0 @@
/*
* IXSetThreadName_apple.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
namespace ix
{
void setThreadName(const std::string& name)
{
//
// Apple reserves 16 bytes for its thread names
// Notice that the Apple version of pthread_setname_np
// does not take a pthread_t argument
//
pthread_setname_np(name.substr(0, 63).c_str());
}
} // namespace ix

View File

@ -1,16 +0,0 @@
/*
* IXSetThreadName_freebsd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
#include <pthread_np.h>
namespace ix
{
void setThreadName(const std::string& name)
{
pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str());
}
} // namespace ix

View File

@ -1,20 +0,0 @@
/*
* IXSetThreadName_linux.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
namespace ix
{
void setThreadName(const std::string& name)
{
//
// Linux only reserves 16 bytes for its thread names
// See prctl and PR_SET_NAME property in
// http://man7.org/linux/man-pages/man2/prctl.2.html
//
pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
}
} // namespace ix

View File

@ -1,46 +0,0 @@
/*
* IXSetThreadName_windows.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <Windows.h>
namespace ix
{
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void SetThreadName(DWORD dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(
MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
void setThreadName(const std::string& name)
{
SetThreadName(-1, name.c_str());
}
} // namespace ix

View File

@ -34,7 +34,10 @@ ws:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install)
ws_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && make -j 4 install)
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
ws_install_release:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
ws_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
@ -174,7 +177,7 @@ autobahn_report:
cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/
httpd:
clang++ --std=c++14 --stdlib=libc++ httpd.cpp \
clang++ --std=c++14 --stdlib=libc++ -o ixhttpd httpd.cpp \
ixwebsocket/IXSelectInterruptFactory.cpp \
ixwebsocket/IXCancellationRequest.cpp \
ixwebsocket/IXSocketTLSOptions.cpp \
@ -193,11 +196,11 @@ httpd:
ixwebsocket/IXConnectionState.cpp \
ixwebsocket/IXUrlParser.cpp \
ixwebsocket/IXSelectInterrupt.cpp \
ixwebsocket/apple/IXSetThreadName_apple.cpp \
ixwebsocket/IXSetThreadName.cpp \
-lz
httpd_linux:
g++ --std=c++11 -o ixhttpd httpd.cpp -Iixwebsocket \
g++ --std=c++14 -o ixhttpd httpd.cpp -Iixwebsocket \
ixwebsocket/IXSelectInterruptFactory.cpp \
ixwebsocket/IXCancellationRequest.cpp \
ixwebsocket/IXSocketTLSOptions.cpp \
@ -216,7 +219,7 @@ httpd_linux:
ixwebsocket/IXConnectionState.cpp \
ixwebsocket/IXUrlParser.cpp \
ixwebsocket/IXSelectInterrupt.cpp \
ixwebsocket/linux/IXSetThreadName_linux.cpp \
ixwebsocket/IXSetThreadName.cpp \
-lz -lpthread
cp -f ixhttpd /usr/local/bin
@ -238,9 +241,12 @@ install_cmake_for_linux:
doc:
mkdocs gh-deploy
change:
change: format
vim ixwebsocket/IXWebSocketVersion.h docs/CHANGELOG.md
commit:
git commit -am "`sh tools/extract_latest_change.sh`"
.PHONY: test
.PHONY: build
.PHONY: ws

View File

@ -37,7 +37,6 @@ set (SOURCES
test_runner.cpp
IXTest.cpp
IXGetFreePort.cpp
../third_party/msgpack11/msgpack11.cpp
IXSocketTest.cpp

View File

@ -108,7 +108,7 @@ namespace
}
else if (event->type == ix::CobraEventType::UnSubscribed)
{
TLogger() << "Subscriber: ununexpected from channel " << event->subscriptionId;
TLogger() << "Subscriber: unsubscribed from channel " << event->subscriptionId;
if (event->subscriptionId != channel)
{
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;

View File

@ -4,9 +4,9 @@
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include "IXGetFreePort.h"
#include "catch.hpp"
#include <iostream>
#include <ixwebsocket/IXGetFreePort.h>
#include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXHttpServer.h>

View File

@ -84,39 +84,38 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback(
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> /*connectionState*/,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "Remote ip: " << remoteIp;
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << "New connection";
TLogger() << "Remote ip: " << remoteIp;
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client.get() != &webSocket)
{
TLogger() << it.first << ": " << it.second;
client->send(msg->str, msg->binary);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str, msg->binary);
}
}
}
});
});
}
});
auto res = server.listen();
if (!res.first)

View File

@ -6,9 +6,9 @@
#pragma once
#include "IXGetFreePort.h"
#include <iostream>
#include <ixsnake/IXAppConfig.h>
#include <ixwebsocket/IXGetFreePort.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex>

View File

@ -189,13 +189,14 @@ namespace
bool preferTLS = true;
server.setTLSOptions(makeServerTLSOptions(preferTLS));
server.setOnConnectionCallback([&server, &connectionId](
std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
const ix::WebSocketMessagePtr& msg) {
server.setOnClientMessageCallback(
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
@ -219,14 +220,13 @@ namespace
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
if (client.get() != &webSocket)
{
client->send(msg->str, msg->binary);
}
}
}
});
});
auto res = server.listen();
if (!res.first)

View File

@ -193,41 +193,39 @@ namespace
bool startServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback(
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
TLogger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client.get() != &webSocket)
{
TLogger() << it.first << ": " << it.second;
client->sendBinary(msg->str);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->sendBinary(msg->str);
}
}
}
});
});
}
});
auto res = server.listen();
if (!res.first)

View File

@ -168,45 +168,38 @@ namespace
std::mutex& mutexWrite)
{
// A dev/null server
server.setOnConnectionCallback(
server.setOnClientMessageCallback(
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket,
connectionState,
remoteIp,
&receivedCloseCode,
&receivedCloseReason,
&receivedCloseRemote,
&mutexWrite](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
ConnectionInfo& connectionInfo,
WebSocket& /*webSocket*/,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New server connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << "New server connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << it.first << ": " << it.second;
}
TLogger() << it.first << ": " << it.second;
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "Server closed connection(" << msg->closeInfo.code << ","
<< msg->closeInfo.reason << ")";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "Server closed connection(" << msg->closeInfo.code << ","
<< msg->closeInfo.reason << ")";
log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite);
std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = msg->closeInfo.remote;
}
});
receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = msg->closeInfo.remote;
}
});
auto res = server.listen();

View File

@ -5,13 +5,11 @@
*/
#include "IXTest.h"
#include "catch.hpp"
#include <memory>
#include <sstream>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <memory>
#include <sstream>
using namespace ix;
@ -69,8 +67,7 @@ namespace
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg)
{
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
@ -118,34 +115,37 @@ TEST_CASE("Websocket leak test")
int port = getFreePort();
WebSocketServer server(port);
server.setOnConnectionCallback([&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{
// original ptr in WebSocketServer::handleConnection and the callback argument
REQUIRE(webSocket.use_count() == 2);
webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Open)
{
log(std::string("New connection id: ") + connectionState->getId());
// original ptr in WebSocketServer::handleConnection, captured ptr of this callback, and ptr in WebSocketServer::_clients
REQUIRE(webSocket.use_count() == 3);
webSocketPtr = std::shared_ptr<WebSocket>(webSocket);
REQUIRE(webSocket.use_count() == 4);
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log(std::string("Client closed connection id: ") + connectionState->getId());
}
else
{
log(std::string(msg->str));
}
server.setOnConnectionCallback(
[&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
// original ptr in WebSocketServer::handleConnection and the callback argument
REQUIRE(webSocket.use_count() == 2);
webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
log(std::string("New connection id: ") + connectionState->getId());
// original ptr in WebSocketServer::handleConnection, captured ptr of
// this callback, and ptr in WebSocketServer::_clients
REQUIRE(webSocket.use_count() == 3);
webSocketPtr = std::shared_ptr<WebSocket>(webSocket);
REQUIRE(webSocket.use_count() == 4);
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log(std::string("Client closed connection id: ") +
connectionState->getId());
}
else
{
log(std::string(msg->str));
}
});
// original ptr in WebSocketServer::handleConnection, argument of this callback,
// and captured ptr in websocket callback
REQUIRE(webSocket.use_count() == 3);
});
// original ptr in WebSocketServer::handleConnection, argument of this callback, and captured ptr in websocket callback
REQUIRE(webSocket.use_count() == 3);
});
server.listen();
server.start();
@ -169,7 +169,8 @@ TEST_CASE("Websocket leak test")
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
// websocket should only be referenced by webSocketPtr but is still used by the websocket callback
// websocket should only be referenced by webSocketPtr but is still used by the
// websocket callback
REQUIRE(webSocketPtr.use_count() == 1);
webSocketPtr->setOnMessageCallback(nullptr);
// websocket should only be referenced by webSocketPtr

View File

@ -33,13 +33,14 @@ namespace ix
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback([&server, &connectionId](
std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
const ix::WebSocketMessagePtr& msg) {
server.setOnClientMessageCallback(
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
@ -63,14 +64,13 @@ namespace ix
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
if (client.get() != &webSocket)
{
client->send(msg->str, msg->binary);
}
}
}
});
});
auto res = server.listen();
if (!res.first)

View File

@ -16,42 +16,40 @@ using namespace ix;
bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
{
server.setOnConnectionCallback(
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server, &subProtocols](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
server.setOnClientMessageCallback(
[&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << it.first << ": " << it.second;
}
TLogger() << it.first << ": " << it.second;
}
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];
}
else if (msg->type == ix::WebSocketMessageType::Close)
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
if (client.get() != &webSocket)
{
if (client != webSocket)
{
client->sendBinary(msg->str);
}
client->sendBinary(msg->str);
}
}
});
}
});
auto res = server.listen();

View File

@ -0,0 +1,171 @@
/*
* lws-minimal-ws-client
*
* Written in 2010-2019 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This demonstrates the a minimal ws client using lws.
*
* Original programs connects to https://libwebsockets.org/ and makes a
* wss connection to the dumb-increment protocol there. While
* connected, it prints the numbers it is being sent by
* dumb-increment protocol.
*
* This is modified to make a test client which counts how much messages
* per second can be received.
*
* libwebsockets$ make && ./a.out
* g++ --std=c++14 -I/usr/local/opt/openssl/include devnull_client.cpp -lwebsockets
* messages received: 0 per second 0 total
* [2020/08/02 19:22:21:4774] U: LWS minimal ws client rx [-d <logs>] [--h2]
* [2020/08/02 19:22:21:4814] U: callback_dumb_increment: established
* messages received: 0 per second 0 total
* messages received: 180015 per second 180015 total
* messages received: 172866 per second 352881 total
* messages received: 176177 per second 529058 total
* messages received: 174191 per second 703249 total
* messages received: 193397 per second 896646 total
* messages received: 196385 per second 1093031 total
* messages received: 194593 per second 1287624 total
* messages received: 189484 per second 1477108 total
* messages received: 200825 per second 1677933 total
* messages received: 183542 per second 1861475 total
* ^C[2020/08/02 19:22:33:4450] U: Completed OK
*
*/
#include <atomic>
#include <iostream>
#include <libwebsockets.h>
#include <signal.h>
#include <string.h>
#include <thread>
static int interrupted;
static struct lws* client_wsi;
std::atomic<uint64_t> receivedCount(0);
static int callback_dumb_increment(
struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len)
{
switch (reason)
{
/* because we are protocols[0] ... */
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char*) in : "(null)");
client_wsi = NULL;
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: established\n", __func__); break;
case LWS_CALLBACK_CLIENT_RECEIVE: receivedCount++; break;
case LWS_CALLBACK_CLIENT_CLOSED: client_wsi = NULL; break;
default: break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
static const struct lws_protocols protocols[] = {{
"dumb-increment-protocol",
callback_dumb_increment,
0,
0,
},
{NULL, NULL, 0, 0}};
static void sigint_handler(int sig)
{
interrupted = 1;
}
int main(int argc, const char** argv)
{
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
auto timer = [&receivedCountTotal, &receivedCountPerSecs] {
while (!interrupted)
{
std::cerr << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total" << std::endl;
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
struct lws_context_creation_info info;
struct lws_client_connect_info i;
struct lws_context* context;
const char* p;
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
/* for LLL_ verbosity above NOTICE to be built into lws, lws
* must have been configured with -DCMAKE_BUILD_TYPE=DEBUG
* instead of =RELEASE */
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
/* | LLL_DEBUG */;
signal(SIGINT, sigint_handler);
if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p);
lws_set_log_level(logs, NULL);
lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n");
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
info.timeout_secs = 10;
/*
* since we know this lws context is only ever going to be used with
* one client wsis / fds / sockets at a time, let lws know it doesn't
* have to use the default allocations for fd tables up to ulimit -n.
* It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we
* will use.
*/
info.fd_limit_per_thread = 1 + 1 + 1;
context = lws_create_context(&info);
if (!context)
{
lwsl_err("lws init failed\n");
return 1;
}
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
i.context = context;
i.port = 8008;
i.address = "127.0.0.1";
i.path = "/";
i.host = i.address;
i.origin = i.address;
i.protocol = protocols[0].name; /* "dumb-increment-protocol" */
i.pwsi = &client_wsi;
if (lws_cmdline_option(argc, argv, "--h2")) i.alpn = "h2";
lws_client_connect_via_info(&i);
while (n >= 0 && client_wsi && !interrupted)
n = lws_service(context, 0);
lws_context_destroy(context);
lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed");
t1.join();
return receivedCount > 10;
}

2
test/compatibility/csharp/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
bin
obj

View File

@ -0,0 +1,99 @@
//
// Main.cs
// Author: Benjamin Sergeant
// Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
//
// In a different terminal, start a push server:
// $ ws push_server -q
//
// $ dotnet run
// messages received per second: 145157
// messages received per second: 141405
// messages received per second: 152202
// messages received per second: 157149
// messages received per second: 157673
// messages received per second: 153594
// messages received per second: 157830
// messages received per second: 158422
//
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
public class DevNullClientCli
{
private static int receivedMessage = 0;
public static async Task<byte[]> ReceiveAsync(ClientWebSocket ws, CancellationToken token)
{
int bufferSize = 8192; // 8K
var buffer = new byte[bufferSize];
var offset = 0;
var free = buffer.Length;
while (true)
{
var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer, offset, free), token).ConfigureAwait(false);
offset += result.Count;
free -= result.Count;
if (result.EndOfMessage) break;
if (free == 0)
{
// No free space
// Resize the outgoing buffer
var newSize = buffer.Length + bufferSize;
var newBuffer = new byte[newSize];
Array.Copy(buffer, 0, newBuffer, 0, offset);
buffer = newBuffer;
free = buffer.Length - offset;
}
}
return buffer;
}
private static void OnTimedEvent(object source, EventArgs e)
{
Console.WriteLine($"messages received per second: {receivedMessage}");
receivedMessage = 0;
}
public static async Task ReceiveMessagesAsync(string url)
{
var ws = new ClientWebSocket();
System.Uri uri = new System.Uri(url);
var cancellationToken = CancellationToken.None;
try
{
await ws.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
while (true)
{
var data = await DevNullClientCli.ReceiveAsync(ws, cancellationToken);
receivedMessage += 1;
}
}
catch (System.Net.WebSockets.WebSocketException e)
{
Console.WriteLine($"WebSocket error: {e}");
return;
}
}
public static async Task Main()
{
var timer = new System.Timers.Timer(1000);
timer.Elapsed += OnTimedEvent;
timer.Enabled = true;
timer.Start();
var url = "ws://localhost:8008";
await ReceiveMessagesAsync(url);
}
}

View File

@ -0,0 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
</Project>

View File

@ -0,0 +1,42 @@
//
// With ws@7.3.1
// and
// node --version
// v13.11.0
//
// In a different terminal, start a push server:
// $ ws push_server -q
//
// $ node devnull_client.js
// messages received per second: 16643
// messages received per second: 28065
// messages received per second: 28432
// messages received per second: 22207
// messages received per second: 28805
// messages received per second: 28694
// messages received per second: 28180
// messages received per second: 28601
// messages received per second: 28698
// messages received per second: 28931
// messages received per second: 27975
//
const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:8008');
ws.on('open', function open() {
ws.send('hello from node');
});
var receivedMessages = 0;
setInterval(function timeout() {
console.log(`messages received per second: ${receivedMessages}`)
receivedMessages = 0;
}, 1000);
ws.on('message', function incoming(data) {
receivedMessages += 1;
});

View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
# websocket send client
import argparse
import asyncio
import websockets
try:
import uvloop
uvloop.install()
except ImportError:
print('uvloop not available')
pass
msgCount = 0
async def timer():
global msgCount
while True:
print(f'Received messages: {msgCount}')
msgCount = 0
await asyncio.sleep(1)
async def client(url):
global msgCount
asyncio.ensure_future(timer())
async with websockets.connect(url) as ws:
async for message in ws:
msgCount += 1
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='websocket proxy.')
parser.add_argument('--url', help='Remote websocket url',
default='wss://echo.websocket.org')
args = parser.parse_args()
asyncio.get_event_loop().run_until_complete(client(args.url))

View File

@ -10,7 +10,7 @@ import websockets
async def echo(websocket, path):
while True:
msg = await websocket.recv()
print(f'Received {len(msg)} bytes')
# print(f'Received {len(msg)} bytes')
await websocket.send(msg)
host = os.getenv('BIND_HOST', 'localhost')

View File

@ -10,10 +10,18 @@
#include <ixwebsocket/IXNetSystem.h>
#include <spdlog/spdlog.h>
#ifndef _WIN32
#include <signal.h>
#endif
int main(int argc, char* argv[])
{
ix::initNetSystem();
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#endif
ix::CoreLogger::LogFunc logFunc = [](const char* msg, ix::LogLevel level) {
switch (level)
{
@ -49,6 +57,7 @@ int main(int argc, char* argv[])
}
};
ix::CoreLogger::setLogFunction(logFunc);
spdlog::set_level(spdlog::level::debug);
int result = Catch::Session().run(argc, argv);

View File

@ -0,0 +1,3 @@
#!/bin/sh
grep -A 3 '^##' docs/CHANGELOG.md | head -n 3 | tail -n 1

View File

@ -20,7 +20,6 @@ option(USE_TLS "Add TLS support" ON)
include_directories(ws .)
include_directories(ws ..)
include_directories(ws ../third_party)
include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise)
@ -51,7 +50,9 @@ add_executable(ws
ws_http_client.cpp
ws_ping_pong.cpp
ws_broadcast_server.cpp
ws_push_server.cpp
ws_echo_server.cpp
ws_echo_client.cpp
ws_chat.cpp
ws_connect.cpp
ws_transfer.cpp
@ -66,7 +67,6 @@ add_executable(ws
ws_cobra_publish.cpp
ws_httpd.cpp
ws_autobahn.cpp
ws_proxy_server.cpp
ws_sentry_minidump_upload.cpp
ws_dns_lookup.cpp
ws.cpp)

View File

@ -14,6 +14,7 @@ function cleanup_and_exit {
}
WITH_TLS=${WITH_TLS:-0}
BLOCKS=${BLOCKS:-20000}
rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test
@ -57,7 +58,7 @@ ws receive "${protocol}127.0.0.1:8090" ${delay} --pidfile /tmp/ws_test/pidfile.r
mkdir -p /tmp/ws_test/send
cd /tmp/ws_test/send
dd if=/dev/urandom of=/tmp/ws_test/send/20M_file count=20000 bs=1024
dd if=/dev/urandom of=/tmp/ws_test/send/20M_file count=$BLOCKS bs=1024
# Start the sender job
ws send ${client_tls} --pidfile /tmp/ws_test/pidfile.send "${protocol}127.0.0.1:8090" /tmp/ws_test/send/20M_file

View File

@ -22,6 +22,7 @@
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXUserAgent.h>
#include <ixwebsocket/IXWebSocketProxyServer.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/spdlog.h>
#include <sstream>
@ -74,6 +75,7 @@ int main(int argc, char** argv)
}
};
ix::CoreLogger::setLogFunction(logFunc);
spdlog::set_level(spdlog::level::debug);
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
@ -123,6 +125,7 @@ int main(int argc, char** argv)
std::string logfile;
std::string scriptPath;
std::string republishChannel;
std::string sendMsg("hello world");
ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig;
ix::CobraBotConfig cobraBotConfig;
@ -145,6 +148,7 @@ int main(int argc, char** argv)
bool version = false;
bool verifyNone = false;
bool disablePong = false;
bool noSend = false;
int port = 8008;
int redisPort = 6379;
int statsdPort = 8125;
@ -241,6 +245,19 @@ int main(int argc, char** argv)
connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
addTLSOptions(connectApp);
CLI::App* echoClientApp =
app.add_subcommand("echo_client", "Echo messages sent by a remote server");
echoClientApp->fallthrough();
echoClientApp->add_option("url", url, "Connection url")->required();
echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
echoClientApp->add_option(
"--ping_interval", pingIntervalSecs, "Interval between sending pings");
echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
echoClientApp->add_option("--send_msg", sendMsg, "Send message");
echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
addTLSOptions(echoClientApp);
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->fallthrough();
chatApp->add_option("url", url, "Connection url")->required();
@ -250,12 +267,25 @@ int main(int argc, char** argv)
echoServerApp->fallthrough();
echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
echoServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
echoServerApp->add_flag("-g", greetings, "Greet");
echoServerApp->add_flag("-6", ipv6, "IpV6");
echoServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
addTLSOptions(echoServerApp);
CLI::App* pushServerApp = app.add_subcommand("push_server", "Push server");
pushServerApp->fallthrough();
pushServerApp->add_option("--port", port, "Port");
pushServerApp->add_option("--host", hostname, "Hostname");
pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
pushServerApp->add_flag("-g", greetings, "Greet");
pushServerApp->add_flag("-6", ipv6, "IpV6");
pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
pushServerApp->add_option("--send_msg", sendMsg, "Send message");
addTLSOptions(pushServerApp);
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->fallthrough();
broadcastServerApp->add_option("--port", port, "Port");
@ -475,6 +505,11 @@ int main(int argc, char** argv)
}
}
if (quiet)
{
spdlog::set_level(spdlog::level::info);
}
// Cobra config
cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true);
cobraConfig.socketTLSOptions = tlsOptions;
@ -484,7 +519,46 @@ int main(int argc, char** argv)
cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions;
int ret = 1;
if (app.got_subcommand("transfer"))
if (app.got_subcommand("connect"))
{
ret = ix::ws_connect_main(url,
headers,
disableAutomaticReconnection,
disablePerMessageDeflate,
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions,
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("echo_client"))
{
ret = ix::ws_echo_client(url,
disablePerMessageDeflate,
binaryMode,
tlsOptions,
subprotocol,
pingIntervalSecs,
sendMsg,
noSend);
}
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong);
}
else if (app.got_subcommand("push_server"))
{
ret = ix::ws_push_server(port,
greetings,
hostname,
tlsOptions,
ipv6,
disablePerMessageDeflate,
disablePong,
sendMsg);
}
else if (app.got_subcommand("transfer"))
{
ret = ix::ws_transfer_main(port, hostname, tlsOptions);
}
@ -497,27 +571,10 @@ int main(int argc, char** argv)
bool enablePerMessageDeflate = false;
ret = ix::ws_receive_main(url, enablePerMessageDeflate, delayMs, tlsOptions);
}
else if (app.got_subcommand("connect"))
{
ret = ix::ws_connect_main(url,
headers,
disableAutomaticReconnection,
disablePerMessageDeflate,
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions,
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("chat"))
{
ret = ix::ws_chat_main(url, user);
}
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong);
}
else if (app.got_subcommand("broadcast_server"))
{
ret = ix::ws_broadcast_server_main(port, hostname, tlsOptions);
@ -656,7 +713,7 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("proxy_server"))
{
ret = ix::ws_proxy_server_main(port, hostname, tlsOptions, remoteHost, verbose);
ret = ix::websocket_proxy_server_main(port, hostname, tlsOptions, remoteHost, verbose);
}
else if (app.got_subcommand("upload_minidump"))
{

24
ws/ws.h
View File

@ -35,6 +35,15 @@ namespace ix
bool disablePerMessageDeflate,
bool disablePong);
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong,
const std::string& sendMsg);
int ws_broadcast_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions);
@ -54,6 +63,15 @@ namespace ix
const std::string& subprotocol,
int pingIntervalSecs);
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend);
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate,
int delayMs,
@ -116,12 +134,6 @@ namespace ix
int ws_redis_server_main(int port, const std::string& hostname);
int ws_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteHost,
bool verbose);
int ws_sentry_minidump_upload(const std::string& metadataPath,
const std::string& minidump,
const std::string& project,

View File

@ -20,12 +20,12 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
const WebSocketMessagePtr& msg) {
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
@ -63,7 +63,7 @@ namespace ix
for (auto&& client : server.getClients())
{
if (client != webSocket)
if (client.get() != &webSocket)
{
client->send(msg->str, msg->binary, [](int current, int total) -> bool {
spdlog::info("Step {} out of {}", current, total);
@ -82,7 +82,6 @@ namespace ix
}
}
});
});
auto res = server.listen();
if (!res.first)

View File

@ -8,7 +8,6 @@
#include <chrono>
#include <fstream>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>

View File

@ -8,7 +8,6 @@
#include <chrono>
#include <fstream>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <jsoncpp/json/json.h>
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>

View File

@ -160,7 +160,7 @@ namespace ix
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws_connect: connected");
spdlog::info("ws_connect: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
@ -200,7 +200,7 @@ namespace ix
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong");
spdlog::info("Received pong {}", msg->str);
}
else
{

121
ws/ws_echo_client.cpp Normal file
View File

@ -0,0 +1,121 @@
/*
* ws_echo_client.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSetThreadName.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend)
{
// Our websocket object
ix::WebSocket webSocket;
webSocket.setUrl(url);
webSocket.setTLSOptions(tlsOptions);
webSocket.setPingInterval(pingIntervalSecs);
if (disablePerMessageDeflate)
{
webSocket.disablePerMessageDeflate();
}
if (!subprotocol.empty())
{
webSocket.addSubProtocol(subprotocol);
}
std::atomic<uint64_t> receivedCount(0);
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
// when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
if (!noSend)
{
webSocket.send(msg->str, msg->binary);
}
receivedCount++;
}
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_echo_client: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
webSocket.send(sendMsg, binaryMode);
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
});
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
setThreadName("Timer");
while (true)
{
//
// We cannot write to sentCount and receivedCount
// as those are used externally, so we need to introduce
// our own counters
//
std::stringstream ss;
ss << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total";
CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t1(timer);
// Now that our callback is setup, we can start our background thread and receive messages
std::cout << "Connecting to " << url << "..." << std::endl;
webSocket.start();
// Send a message to the server (default to TEXT mode)
webSocket.send("hello world");
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
webSocket.send(text);
}
return 0;
}
} // namespace ix

View File

@ -42,50 +42,48 @@ namespace ix
server.disablePong();
}
server.setOnConnectionCallback(
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback(
[webSocket, connectionState, remoteIp, greetings](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
server.setOnClientMessageCallback(
[greetings](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket->sendText("Welcome !");
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
webSocket->send(msg->str, msg->binary);
}
});
if (greetings)
{
webSocket.sendText("Welcome !");
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
webSocket.send(msg->str, msg->binary);
}
});
auto res = server.listen();

View File

@ -1,176 +0,0 @@
/*
* ws_proxy_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
class ProxyConnectionState : public ix::ConnectionState
{
public:
ProxyConnectionState()
: _connected(false)
{
}
ix::WebSocket& webSocket()
{
return _serverWebSocket;
}
bool isConnected()
{
return _connected;
}
void setConnected()
{
_connected = true;
}
private:
ix::WebSocket _serverWebSocket;
bool _connected;
};
int ws_proxy_server_main(int port,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
const std::string& remoteUrl,
bool verbose)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
auto factory = []() -> std::shared_ptr<ix::ConnectionState> {
return std::make_shared<ProxyConnectionState>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback([remoteUrl,
verbose](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
// Server connection
state->webSocket().setOnMessageCallback([webSocket, state, remoteIp, verbose](
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection to remote server");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed remote server connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->setTerminated();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes from server", msg->wireSize);
if (verbose)
{
spdlog::info("payload {}", msg->str);
}
webSocket->send(msg->str, msg->binary);
}
});
// Client connection
webSocket->setOnMessageCallback(
[state, remoteUrl, verbose](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection from client");
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
// Connect to the 'real' server
std::string url(remoteUrl);
url += msg->openInfo.uri;
state->webSocket().setUrl(url);
state->webSocket().disableAutomaticReconnection();
state->webSocket().start();
// we should sleep here for a bit until we've established the
// connection with the remote server
while (state->webSocket().getReadyState() != ReadyState::Open)
{
spdlog::info("waiting for server connection establishment");
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
spdlog::info("server connection established");
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed client connection: client id {} code {} reason {}",
state->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes from client", msg->wireSize);
if (verbose)
{
spdlog::info("payload {}", msg->str);
}
state->webSocket().send(msg->str, msg->binary);
}
});
});
auto res = server.listen();
if (!res.first)
{
spdlog::info(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

108
ws/ws_push_server.cpp Normal file
View File

@ -0,0 +1,108 @@
/*
* ws_push_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate,
bool disablePong,
const std::string& sendMsg)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::WebSocketServer server(port,
hostname,
SocketServer::kDefaultTcpBacklog,
SocketServer::kDefaultMaxConnections,
WebSocketServer::kDefaultHandShakeTimeoutSecs,
(ipv6) ? AF_INET6 : AF_INET);
server.setTLSOptions(tlsOptions);
if (disablePerMessageDeflate)
{
spdlog::info("Disable per message deflate");
server.disablePerMessageDeflate();
}
if (disablePong)
{
spdlog::info("Disable responding to PING messages with PONG");
server.disablePong();
}
server.setOnClientMessageCallback(
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket.sendText("Welcome !");
}
bool binary = false;
while (true)
{
webSocket.send(sendMsg, binary);
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("Closed connection: client id {} code {} reason {}",
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
spdlog::error("Connection error: {}", msg->errorInfo.reason);
spdlog::error("#retries: {}", msg->errorInfo.retries);
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
spdlog::info("Received {} bytes", msg->wireSize);
webSocket.send(msg->str, msg->binary);
}
});
auto res = server.listen();
if (!res.first)
{
spdlog::error(res.second);
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -6,7 +6,6 @@
#include <fstream>
#include <ixsentry/IXSentryClient.h>
#include <jsoncpp/json/json.h>
#include <spdlog/spdlog.h>
#include <sstream>

View File

@ -19,12 +19,12 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
const WebSocketMessagePtr& msg) {
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_transfer: New connection");
@ -43,7 +43,7 @@ namespace ix
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
auto remaining = server.getClients().erase(webSocket);
auto remaining = server.getClients().size() - 1;
spdlog::info("ws_transfer: {} remaining clients", remaining);
}
else if (msg->type == ix::WebSocketMessageType::Error)
@ -65,7 +65,7 @@ namespace ix
size_t receivers = 0;
for (auto&& client : server.getClients())
{
if (client != webSocket)
if (client.get() != &webSocket)
{
auto readyState = client->getReadyState();
auto id = connectionState->getId();
@ -119,7 +119,6 @@ namespace ix
}
}
});
});
auto res = server.listen();
if (!res.first)