Compare commits
	
		
			36 Commits
		
	
	
		
			feature/no
			...
			feature/kq
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 0834198e74 | ||
|  | 22dd32d4e9 | ||
|  | b15c4189f5 | ||
|  | 74d3278258 | ||
|  | 831152b906 | ||
|  | 7c81a98632 | ||
|  | 6e47c62c06 | ||
|  | bcae7f326d | ||
|  | d719c41e31 | ||
|  | 6f0307fb35 | ||
|  | 2e3d625c1e | ||
|  | 029289413c | ||
|  | 4d51098c86 | ||
|  | c2b05af022 | ||
|  | e85f975ab0 | ||
|  | dc77d62a5d | ||
|  | 4f41f209a2 | ||
|  | 5940e53d77 | ||
|  | 22dffd5b7e | ||
|  | af2f31045d | ||
|  | 5daa59f9f3 | ||
|  | 2ea9d06a93 | ||
|  | 847fc142d1 | ||
|  | 0388459bd0 | ||
|  | 9a47ec1217 | ||
|  | 45a40c8640 | ||
|  | e34f1c30d6 | ||
|  | c14a4c0e3e | ||
|  | b146e93a3a | ||
|  | 9957ec9724 | ||
|  | 78a42f61bd | ||
|  | e78019dad6 | ||
|  | 0f026c5da2 | ||
|  | c26a2d5d39 | ||
|  | 2798886c0b | ||
|  | ffde283a4b | 
							
								
								
									
										4
									
								
								.github/workflows/unittest_uwp.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								.github/workflows/unittest_uwp.yml
									
									
									
									
										vendored
									
									
								
							| @@ -10,12 +10,10 @@ jobs: | ||||
|     steps: | ||||
|     - uses: actions/checkout@v1 | ||||
|     - uses: seanmiddleditch/gha-setup-vsdevenv@master | ||||
|     - run: | | ||||
|         vcpkg install zlib:x64-uwp | ||||
|     - run: | | ||||
|         mkdir build | ||||
|         cd build | ||||
|         cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 .. | ||||
|         cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_SYSTEM_NAME=WindowsStore -DCMAKE_SYSTEM_VERSION="10.0" -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 -DUSE_ZLIB=0 .. | ||||
|     - run: cmake --build build | ||||
|  | ||||
| # | ||||
|   | ||||
							
								
								
									
										7
									
								
								.github/workflows/unittest_windows.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										7
									
								
								.github/workflows/unittest_windows.yml
									
									
									
									
										vendored
									
									
								
							| @@ -10,10 +10,11 @@ jobs: | ||||
|     steps: | ||||
|     - uses: actions/checkout@v1 | ||||
|     - uses: seanmiddleditch/gha-setup-vsdevenv@master | ||||
|     - run: | | ||||
|         vcpkg install zlib:x64-windows | ||||
|     - run: | | ||||
|         mkdir build | ||||
|         cd build | ||||
|         cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 .. | ||||
|         cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 -DUSE_ZLIB=0 .. | ||||
|     - run: cmake --build build | ||||
|  | ||||
| #- run: ../build/test/ixwebsocket_unittest.exe | ||||
| # working-directory: test | ||||
|   | ||||
| @@ -30,6 +30,7 @@ set( IXWEBSOCKET_SOURCES | ||||
|     ixwebsocket/IXConnectionState.cpp | ||||
|     ixwebsocket/IXDNSLookup.cpp | ||||
|     ixwebsocket/IXExponentialBackoff.cpp | ||||
|     ixwebsocket/IXGetFreePort.cpp | ||||
|     ixwebsocket/IXHttp.cpp | ||||
|     ixwebsocket/IXHttpClient.cpp | ||||
|     ixwebsocket/IXHttpServer.cpp | ||||
| @@ -53,6 +54,7 @@ set( IXWEBSOCKET_SOURCES | ||||
|     ixwebsocket/IXWebSocketPerMessageDeflate.cpp | ||||
|     ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp | ||||
|     ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp | ||||
|     ixwebsocket/IXWebSocketProxyServer.cpp | ||||
|     ixwebsocket/IXWebSocketServer.cpp | ||||
|     ixwebsocket/IXWebSocketTransport.cpp | ||||
| ) | ||||
| @@ -64,6 +66,7 @@ set( IXWEBSOCKET_HEADERS | ||||
|     ixwebsocket/IXConnectionState.h | ||||
|     ixwebsocket/IXDNSLookup.h | ||||
|     ixwebsocket/IXExponentialBackoff.h | ||||
|     ixwebsocket/IXGetFreePort.h | ||||
|     ixwebsocket/IXHttp.h | ||||
|     ixwebsocket/IXHttpClient.h | ||||
|     ixwebsocket/IXHttpServer.h | ||||
| @@ -96,6 +99,7 @@ set( IXWEBSOCKET_HEADERS | ||||
|     ixwebsocket/IXWebSocketPerMessageDeflate.h | ||||
|     ixwebsocket/IXWebSocketPerMessageDeflateCodec.h | ||||
|     ixwebsocket/IXWebSocketPerMessageDeflateOptions.h | ||||
|     ixwebsocket/IXWebSocketProxyServer.h | ||||
|     ixwebsocket/IXWebSocketSendInfo.h | ||||
|     ixwebsocket/IXWebSocketServer.h | ||||
|     ixwebsocket/IXWebSocketTransport.h | ||||
| @@ -186,10 +190,16 @@ if (USE_TLS) | ||||
|   endif() | ||||
| endif() | ||||
|  | ||||
| # Use ZLIB_ROOT CMake variable if you need to use your own zlib | ||||
| find_package(ZLIB REQUIRED) | ||||
| include_directories(${ZLIB_INCLUDE_DIRS}) | ||||
| target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES}) | ||||
| option(USE_ZLIB "Enable zlib support" TRUE) | ||||
|  | ||||
| if (USE_ZLIB) | ||||
|   # Use ZLIB_ROOT CMake variable if you need to use your own zlib | ||||
|   find_package(ZLIB REQUIRED) | ||||
|   include_directories(${ZLIB_INCLUDE_DIRS}) | ||||
|   target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES}) | ||||
|  | ||||
|   target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_ZLIB) | ||||
| endif() | ||||
|  | ||||
| if (WIN32) | ||||
|   target_link_libraries(ixwebsocket wsock32 ws2_32 shlwapi) | ||||
|   | ||||
							
								
								
									
										23
									
								
								docker/Dockerfile.ubuntu_groovy
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								docker/Dockerfile.ubuntu_groovy
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,23 @@ | ||||
| # Build time | ||||
| FROM ubuntu:groovy as build | ||||
|  | ||||
| ENV DEBIAN_FRONTEND noninteractive | ||||
| RUN apt-get update | ||||
|  | ||||
| RUN apt-get -y install g++ libssl-dev libz-dev make python ninja-build | ||||
| RUN apt-get -y install cmake | ||||
| RUN apt-get -y install gdb | ||||
|  | ||||
| COPY . /opt | ||||
| WORKDIR /opt | ||||
|  | ||||
| # | ||||
| # To use the container interactively for debugging/building | ||||
| # 1. Build with | ||||
| #    CMD ["ls"] | ||||
| # 2. Run with | ||||
| #    docker run --entrypoint sh -it docker-game-eng-dev.addsrv.com/ws:9.10.6  | ||||
| # | ||||
|  | ||||
| RUN ["make", "test"] | ||||
| # CMD ["ls"] | ||||
| @@ -1,6 +1,79 @@ | ||||
| # Changelog | ||||
|  | ||||
| All changes to this project will be documented in this file. | ||||
|  | ||||
| ## [10.1.5] - 2020-08-02 | ||||
|  | ||||
| (ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235). | ||||
|  | ||||
| ## [10.1.4] - 2020-08-02 | ||||
|  | ||||
| (ws) Add a new ws sub-command, echo_client. This command sends a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235) | ||||
|  | ||||
| ## [10.1.3] - 2020-08-02 | ||||
|  | ||||
| (ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235) | ||||
|  | ||||
| ## [10.1.2] - 2020-07-31 | ||||
|  | ||||
| (build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent | ||||
|  | ||||
| ## [10.1.1] - 2020-07-29 | ||||
|  | ||||
| (websocket client) onProgressCallback not called for short messages on a websocket (fix #233) | ||||
|  | ||||
| ## [10.1.0] - 2020-07-29 | ||||
|  | ||||
| (websocket client) heartbeat is not sent at the requested frequency (fix #232) | ||||
|  | ||||
| ## [10.0.3] - 2020-07-28 | ||||
|  | ||||
| compiler warning fixes | ||||
|  | ||||
| ## [10.0.2] - 2020-07-28 | ||||
|  | ||||
| (ixcobra) CobraConnection: unsubscribe from all subscriptions when disconnecting | ||||
|  | ||||
| ## [10.0.1] - 2020-07-27 | ||||
|  | ||||
| (socket utility) move ix::getFreePort to ixwebsocket library | ||||
|  | ||||
| ## [10.0.0] - 2020-07-25 | ||||
|  | ||||
| (ixwebsocket server) change legacy api with 2 nested callbacks, so that the first api takes a weak_ptr<WebSocket> as its first argument | ||||
|  | ||||
| ## [9.10.7] - 2020-07-25 | ||||
|  | ||||
| (ixwebsocket) add WebSocketProxyServer, from ws. Still need to make the interface better. | ||||
|  | ||||
| ## [9.10.6] - 2020-07-24 | ||||
|  | ||||
| (ws) port broadcast_server sub-command to the new server API | ||||
|  | ||||
| ## [9.10.5] - 2020-07-24 | ||||
|  | ||||
| (unittest) port most unittests to the new server API | ||||
|  | ||||
| ## [9.10.3] - 2020-07-24 | ||||
|  | ||||
| (ws) port ws transfer to the new server API | ||||
|  | ||||
| ## [9.10.2] - 2020-07-24 | ||||
|  | ||||
| (websocket client) reset WebSocketTransport onClose callback in the WebSocket destructor | ||||
|  | ||||
| ## [9.10.1] - 2020-07-24 | ||||
|  | ||||
| (websocket server) reset client websocket callback when the connection is closed | ||||
|  | ||||
| ## [9.10.0] - 2020-07-23 | ||||
|  | ||||
| (websocket server) add a new simpler API to handle client connections / that API does not trigger a memory leak while the previous one did | ||||
|  | ||||
| ## [9.9.3] - 2020-07-17 | ||||
|  | ||||
| (build) merge platform specific files which were used to have different implementations for setting a thread name into a single file, to make it easier to include every source files and build the ixwebsocket library (fix #226) | ||||
|  | ||||
| ## [9.9.2] - 2020-07-10 | ||||
|  | ||||
| (socket server) bump default max connection count from 32 to 128 | ||||
|   | ||||
| @@ -17,6 +17,7 @@ There is a unittest which can be executed by typing `make test`. | ||||
|  | ||||
| Options for building: | ||||
|  | ||||
| * `-DUSE_ZLIB=1` will enable zlib support, required for http client + server + websocket per message deflate extension | ||||
| * `-DUSE_TLS=1` will enable TLS support | ||||
| * `-DUSE_OPEN_SSL=1` will use [openssl](https://www.openssl.org/) for the TLS support (default on Linux and Windows) | ||||
| * `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support | ||||
|   | ||||
							
								
								
									
										37
									
								
								docs/performance.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								docs/performance.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,37 @@ | ||||
|  | ||||
| ## WebSocket Client performance | ||||
|  | ||||
| We will run a client and a server on the same machine, connecting to localhost. This bench is run on a MacBook Pro from 2015. We can receive over 200,000 (small) messages per second, another way to put it is that it takes 5 micro-second to receive and process one message. This is an indication about the minimal latency to receive messages. | ||||
|  | ||||
| ### Receiving messages | ||||
|  | ||||
| By using the push_server ws sub-command, the server will send the same message in a loop to any connected client. | ||||
|  | ||||
| ``` | ||||
| ws push_server -q --send_msg 'yo' | ||||
| ``` | ||||
|  | ||||
| By using the echo_client ws sub-command, with the -m (mute or no_send), we will display statistics on how many messages we can receive per second. | ||||
|  | ||||
| ``` | ||||
| $ ws echo_client -m ws://localhost:8008 | ||||
| [2020-08-02 12:31:17.284] [info] ws_echo_client: connected | ||||
| [2020-08-02 12:31:17.284] [info] Uri: / | ||||
| [2020-08-02 12:31:17.284] [info] Headers: | ||||
| [2020-08-02 12:31:17.284] [info] Connection: Upgrade | ||||
| [2020-08-02 12:31:17.284] [info] Sec-WebSocket-Accept: byy/pMK2d0PtRwExaaiOnXJTQHo= | ||||
| [2020-08-02 12:31:17.284] [info] Server: ixwebsocket/10.1.4 macos ssl/SecureTransport zlib 1.2.11 | ||||
| [2020-08-02 12:31:17.284] [info] Upgrade: websocket | ||||
| [2020-08-02 12:31:17.663] [info] messages received: 0 per second 2595307 total | ||||
| [2020-08-02 12:31:18.668] [info] messages received: 79679 per second 2674986 total | ||||
| [2020-08-02 12:31:19.668] [info] messages received: 207438 per second 2882424 total | ||||
| [2020-08-02 12:31:20.673] [info] messages received: 209207 per second 3091631 total | ||||
| [2020-08-02 12:31:21.676] [info] messages received: 216056 per second 3307687 total | ||||
| [2020-08-02 12:31:22.680] [info] messages received: 214927 per second 3522614 total | ||||
| [2020-08-02 12:31:23.684] [info] messages received: 216960 per second 3739574 total | ||||
| [2020-08-02 12:31:24.688] [info] messages received: 215232 per second 3954806 total | ||||
| [2020-08-02 12:31:25.691] [info] messages received: 212300 per second 4167106 total | ||||
| [2020-08-02 12:31:26.694] [info] messages received: 212501 per second 4379607 total | ||||
| [2020-08-02 12:31:27.699] [info] messages received: 212330 per second 4591937 total | ||||
| [2020-08-02 12:31:28.702] [info] messages received: 216511 per second 4808448 total | ||||
| ``` | ||||
							
								
								
									
										134
									
								
								docs/usage.md
									
									
									
									
									
								
							
							
						
						
									
										134
									
								
								docs/usage.md
									
									
									
									
									
								
							| @@ -246,6 +246,10 @@ uint32_t m = webSocket.getMaxWaitBetweenReconnectionRetries(); | ||||
|  | ||||
| ## WebSocket server API | ||||
|  | ||||
| ### Legacy api | ||||
|  | ||||
| This api was actually changed to take a weak_ptr<WebSocket> as the first argument to setOnConnectionCallback ; previously it would take a shared_ptr<WebSocket> which was creating cycles and then memory leaks problems. | ||||
|  | ||||
| ```cpp | ||||
| #include <ixwebsocket/IXWebSocketServer.h> | ||||
|  | ||||
| @@ -256,41 +260,49 @@ uint32_t m = webSocket.getMaxWaitBetweenReconnectionRetries(); | ||||
| ix::WebSocketServer server(port); | ||||
|  | ||||
| server.setOnConnectionCallback( | ||||
|     [&server](std::shared_ptr<WebSocket> webSocket, | ||||
|     [&server](std::weak_ptr<WebSocket> webSocket, | ||||
|               std::shared_ptr<ConnectionState> connectionState, | ||||
|               std::unique_ptr<ConnectionInfo> connectionInfo) | ||||
|     { | ||||
|         std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl; | ||||
|  | ||||
|         webSocket->setOnMessageCallback( | ||||
|             [webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg) | ||||
|             { | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|         auto ws = webSocket.lock(); | ||||
|         if (ws) | ||||
|         { | ||||
|             ws->setOnMessageCallback( | ||||
|                 [webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg) | ||||
|                 { | ||||
|                     std::cout << "New connection" << std::endl; | ||||
|  | ||||
|                     // A connection state object is available, and has a default id | ||||
|                     // You can subclass ConnectionState and pass an alternate factory | ||||
|                     // to override it. It is useful if you want to store custom | ||||
|                     // attributes per connection (authenticated bool flag, attributes, etc...) | ||||
|                     std::cout << "id: " << connectionState->getId() << std::endl; | ||||
|  | ||||
|                     // The uri the client did connect to. | ||||
|                     std::cout << "Uri: " << msg->openInfo.uri << std::endl; | ||||
|  | ||||
|                     std::cout << "Headers:" << std::endl; | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                     { | ||||
|                         std::cout << it.first << ": " << it.second << std::endl; | ||||
|                         std::cout << "New connection" << std::endl; | ||||
|  | ||||
|                         // A connection state object is available, and has a default id | ||||
|                         // You can subclass ConnectionState and pass an alternate factory | ||||
|                         // to override it. It is useful if you want to store custom | ||||
|                         // attributes per connection (authenticated bool flag, attributes, etc...) | ||||
|                         std::cout << "id: " << connectionState->getId() << std::endl; | ||||
|  | ||||
|                         // The uri the client did connect to. | ||||
|                         std::cout << "Uri: " << msg->openInfo.uri << std::endl; | ||||
|  | ||||
|                         std::cout << "Headers:" << std::endl; | ||||
|                         for (auto it : msg->openInfo.headers) | ||||
|                         { | ||||
|                             std::cout << it.first << ": " << it.second << std::endl; | ||||
|                         } | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                     { | ||||
|                         // For an echo server, we just send back to the client whatever was received by the server | ||||
|                         // All connected clients are available in an std::set. See the broadcast cpp example. | ||||
|                         // Second parameter tells whether we are sending the message in binary or text mode. | ||||
|                         // Here we send it in the same mode as it was received. | ||||
|                         auto ws = webSocket.lock(); | ||||
|                         if (ws) | ||||
|                         { | ||||
|                             ws->send(msg->str, msg->binary); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     // For an echo server, we just send back to the client whatever was received by the server | ||||
|                     // All connected clients are available in an std::set. See the broadcast cpp example. | ||||
|                     // Second parameter tells whether we are sending the message in binary or text mode. | ||||
|                     // Here we send it in the same mode as it was received. | ||||
|                     webSocket->send(msg->str, msg->binary); | ||||
|                 } | ||||
|             } | ||||
|         ); | ||||
| @@ -312,6 +324,74 @@ server.wait(); | ||||
|  | ||||
| ``` | ||||
|  | ||||
| ### New api | ||||
|  | ||||
| The new API does not require to use 2 nested callbacks, which is a bit annoying. The real fix is that there was a memory leak due to a shared_ptr cycle, due to passing down a shared_ptr<WebSocket> down to the callbacks. | ||||
|  | ||||
| The webSocket reference is guaranteed to be always valid ; by design the callback will never be invoked with a null webSocket object. | ||||
|  | ||||
| ```cpp | ||||
| #include <ixwebsocket/IXWebSocketServer.h> | ||||
|  | ||||
| ... | ||||
|  | ||||
| // Run a server on localhost at a given port. | ||||
| // Bound host name, max connections and listen backlog can also be passed in as parameters. | ||||
| ix::WebSocketServer server(port); | ||||
|  | ||||
| server.setOnClientMessageCallback(std::shared_ptr<ConnectionState> connectionState, | ||||
|                                   ConnectionInfo& connectionInfo, | ||||
|                                   WebSocket& webSocket, | ||||
|                                   const WebSocketMessagePtr& msg) | ||||
| { | ||||
|     // The ConnectionInfo object contains information about the connection, | ||||
|     // at this point only the client ip address and the port. | ||||
|     std::cout << "Remote ip: " << connectionInfo.remoteIp << std::endl; | ||||
|  | ||||
|     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|     { | ||||
|         std::cout << "New connection" << std::endl; | ||||
|  | ||||
|         // A connection state object is available, and has a default id | ||||
|         // You can subclass ConnectionState and pass an alternate factory | ||||
|         // to override it. It is useful if you want to store custom | ||||
|         // attributes per connection (authenticated bool flag, attributes, etc...) | ||||
|         std::cout << "id: " << connectionState->getId() << std::endl; | ||||
|  | ||||
|         // The uri the client did connect to. | ||||
|         std::cout << "Uri: " << msg->openInfo.uri << std::endl; | ||||
|  | ||||
|         std::cout << "Headers:" << std::endl; | ||||
|         for (auto it : msg->openInfo.headers) | ||||
|         { | ||||
|             std::cout << it.first << ": " << it.second << std::endl; | ||||
|         } | ||||
|     } | ||||
|     else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|     { | ||||
|         // For an echo server, we just send back to the client whatever was received by the server | ||||
|         // All connected clients are available in an std::set. See the broadcast cpp example. | ||||
|         // Second parameter tells whether we are sending the message in binary or text mode. | ||||
|         // Here we send it in the same mode as it was received. | ||||
|         webSocket.send(msg->str, msg->binary); | ||||
|     } | ||||
| ); | ||||
|  | ||||
| auto res = server.listen(); | ||||
| if (!res.first) | ||||
| { | ||||
|     // Error handling | ||||
|     return 1; | ||||
| } | ||||
|  | ||||
| // Run the server in the background. Server can be stoped by calling server.stop() | ||||
| server.start(); | ||||
|  | ||||
| // Block until server.stop() is called. | ||||
| server.wait(); | ||||
|  | ||||
| ``` | ||||
|  | ||||
| ## HTTP client API | ||||
|  | ||||
| ```cpp | ||||
|   | ||||
| @@ -111,6 +111,12 @@ namespace ix | ||||
|  | ||||
|     void CobraConnection::disconnect() | ||||
|     { | ||||
|         auto subscriptionIds = getSubscriptionsIds(); | ||||
|         for (auto&& subscriptionId : subscriptionIds) | ||||
|         { | ||||
|             unsubscribe(subscriptionId); | ||||
|         } | ||||
|  | ||||
|         _authenticated = false; | ||||
|         _webSocket->stop(); | ||||
|     } | ||||
| @@ -614,6 +620,18 @@ namespace ix | ||||
|         _webSocket->send(pdu.toStyledString()); | ||||
|     } | ||||
|  | ||||
|     std::vector<std::string> CobraConnection::getSubscriptionsIds() | ||||
|     { | ||||
|         std::vector<std::string> subscriptionIds; | ||||
|         std::lock_guard<std::mutex> lock(_cbsMutex); | ||||
|  | ||||
|         for (auto&& it : _cbs) | ||||
|         { | ||||
|             subscriptionIds.push_back(it.first); | ||||
|         } | ||||
|         return subscriptionIds; | ||||
|     } | ||||
|  | ||||
|     // | ||||
|     // Enqueue strategy drops old messages when we are at full capacity | ||||
|     // | ||||
|   | ||||
| @@ -163,6 +163,9 @@ namespace ix | ||||
|         /// Tells whether the internal queue is empty or not | ||||
|         bool isQueueEmpty(); | ||||
|  | ||||
|         /// Retrieve all subscriptions ids | ||||
|         std::vector<std::string> getSubscriptionsIds(); | ||||
|  | ||||
|         /// | ||||
|         /// Member variables | ||||
|         /// | ||||
|   | ||||
| @@ -26,6 +26,12 @@ namespace snake | ||||
|         } | ||||
|  | ||||
|         auto roles = appConfig.apps[appkey]["roles"]; | ||||
|         if (roles.count(role) == 0) | ||||
|         { | ||||
|             std::cerr << "Missing role " << role << std::endl; | ||||
|             return std::string(); | ||||
|         } | ||||
|  | ||||
|         auto channel = roles[role]["secret"]; | ||||
|         return channel; | ||||
|     } | ||||
|   | ||||
| @@ -7,15 +7,21 @@ | ||||
| #pragma once | ||||
|  | ||||
| #include <ixredis/IXRedisClient.h> | ||||
| #include <future> | ||||
| #include <thread> | ||||
| #include <ixwebsocket/IXConnectionState.h> | ||||
| #include <string> | ||||
| #include "IXStreamSql.h" | ||||
|  | ||||
| namespace snake | ||||
| { | ||||
|     class SnakeConnectionState : public ix::ConnectionState | ||||
|     { | ||||
|     public: | ||||
|         virtual ~SnakeConnectionState() | ||||
|         { | ||||
|             stopSubScriptionThread(); | ||||
|         } | ||||
|  | ||||
|         std::string getNonce() | ||||
|         { | ||||
|             return _nonce; | ||||
| @@ -51,7 +57,24 @@ namespace snake | ||||
|             return _redisClient; | ||||
|         } | ||||
|  | ||||
|         std::future<void> fut; | ||||
|         void stopSubScriptionThread() | ||||
|         { | ||||
|             if (subscriptionThread.joinable()) | ||||
|             { | ||||
|                 subscriptionRedisClient.stop(); | ||||
|                 subscriptionThread.join(); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // We could make those accessible through methods | ||||
|         std::thread subscriptionThread; | ||||
|         std::string appChannel; | ||||
|         std::string subscriptionId; | ||||
|         uint64_t id; | ||||
|         std::unique_ptr<StreamSql> streamSql; | ||||
|         ix::RedisClient subscriptionRedisClient; | ||||
|         ix::RedisClient::OnRedisSubscribeResponseCallback onRedisSubscribeResponseCallback; | ||||
|         ix::RedisClient::OnRedisSubscribeCallback onRedisSubscribeCallback; | ||||
|  | ||||
|     private: | ||||
|         std::string _nonce; | ||||
|   | ||||
| @@ -8,7 +8,6 @@ | ||||
|  | ||||
| #include "IXAppConfig.h" | ||||
| #include "IXSnakeConnectionState.h" | ||||
| #include "IXStreamSql.h" | ||||
| #include "nlohmann/json.hpp" | ||||
| #include <iostream> | ||||
| #include <ixcore/utils/IXCoreLogger.h> | ||||
| @@ -19,21 +18,22 @@ | ||||
| namespace snake | ||||
| { | ||||
|     void handleError(const std::string& action, | ||||
|                      std::shared_ptr<ix::WebSocket> ws, | ||||
|                      nlohmann::json pdu, | ||||
|                      ix::WebSocket& ws, | ||||
|                      uint64_t pduId, | ||||
|                      const std::string& errMsg) | ||||
|     { | ||||
|         std::string actionError(action); | ||||
|         actionError += "/error"; | ||||
|  | ||||
|         nlohmann::json response = { | ||||
|             {"action", actionError}, {"id", pdu.value("id", 1)}, {"body", {{"reason", errMsg}}}}; | ||||
|         ws->sendText(response.dump()); | ||||
|             {"action", actionError}, {"id", pduId}, {"body", {{"reason", errMsg}}}}; | ||||
|         ws.sendText(response.dump()); | ||||
|     } | ||||
|  | ||||
|     void handleHandshake(std::shared_ptr<SnakeConnectionState> state, | ||||
|                          std::shared_ptr<ix::WebSocket> ws, | ||||
|                          const nlohmann::json& pdu) | ||||
|                          ix::WebSocket& ws, | ||||
|                          const nlohmann::json& pdu, | ||||
|                          uint64_t pduId) | ||||
|     { | ||||
|         std::string role = pdu["body"]["data"]["role"]; | ||||
|  | ||||
| @@ -42,7 +42,7 @@ namespace snake | ||||
|  | ||||
|         nlohmann::json response = { | ||||
|             {"action", "auth/handshake/ok"}, | ||||
|             {"id", pdu.value("id", 1)}, | ||||
|             {"id", pduId}, | ||||
|             {"body", | ||||
|              { | ||||
|                  {"data", {{"nonce", state->getNonce()}, {"connection_id", state->getId()}}}, | ||||
| @@ -50,13 +50,14 @@ namespace snake | ||||
|  | ||||
|         auto serializedResponse = response.dump(); | ||||
|  | ||||
|         ws->sendText(serializedResponse); | ||||
|         ws.sendText(serializedResponse); | ||||
|     } | ||||
|  | ||||
|     void handleAuth(std::shared_ptr<SnakeConnectionState> state, | ||||
|                     std::shared_ptr<ix::WebSocket> ws, | ||||
|                     ix::WebSocket& ws, | ||||
|                     const AppConfig& appConfig, | ||||
|                     const nlohmann::json& pdu) | ||||
|                     const nlohmann::json& pdu, | ||||
|                     uint64_t pduId) | ||||
|     { | ||||
|         auto secret = getRoleSecret(appConfig, state->appkey(), state->role()); | ||||
|  | ||||
| @@ -64,9 +65,9 @@ namespace snake | ||||
|         { | ||||
|             nlohmann::json response = { | ||||
|                 {"action", "auth/authenticate/error"}, | ||||
|                 {"id", pdu.value("id", 1)}, | ||||
|                 {"id", pduId}, | ||||
|                 {"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}}; | ||||
|             ws->sendText(response.dump()); | ||||
|             ws.sendText(response.dump()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
| @@ -80,20 +81,21 @@ namespace snake | ||||
|                 {"action", "auth/authenticate/error"}, | ||||
|                 {"id", pdu.value("id", 1)}, | ||||
|                 {"body", {{"error", "authentication_failed"}, {"reason", "invalid hash"}}}}; | ||||
|             ws->sendText(response.dump()); | ||||
|             ws.sendText(response.dump()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         nlohmann::json response = { | ||||
|             {"action", "auth/authenticate/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}}; | ||||
|  | ||||
|         ws->sendText(response.dump()); | ||||
|         ws.sendText(response.dump()); | ||||
|     } | ||||
|  | ||||
|     void handlePublish(std::shared_ptr<SnakeConnectionState> state, | ||||
|                        std::shared_ptr<ix::WebSocket> ws, | ||||
|                        ix::WebSocket& ws, | ||||
|                        const AppConfig& appConfig, | ||||
|                        const nlohmann::json& pdu) | ||||
|                        const nlohmann::json& pdu, | ||||
|                        uint64_t pduId) | ||||
|     { | ||||
|         std::vector<std::string> channels; | ||||
|  | ||||
| @@ -113,7 +115,7 @@ namespace snake | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Missing channels or channel field in publish data"; | ||||
|             handleError("rtm/publish", ws, pdu, ss.str()); | ||||
|             handleError("rtm/publish", ws, pduId, ss.str()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
| @@ -133,7 +135,7 @@ namespace snake | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 ss << "Cannot publish to redis host " << errMsg; | ||||
|                 handleError("rtm/publish", ws, pdu, ss.str()); | ||||
|                 handleError("rtm/publish", ws, pduId, ss.str()); | ||||
|                 return; | ||||
|             } | ||||
|         } | ||||
| @@ -141,26 +143,27 @@ namespace snake | ||||
|         nlohmann::json response = { | ||||
|             {"action", "rtm/publish/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}}; | ||||
|  | ||||
|         ws->sendText(response.dump()); | ||||
|         ws.sendText(response.dump()); | ||||
|     } | ||||
|  | ||||
|     // | ||||
|     // FIXME: this is not cancellable. We should be able to cancel the redis subscription | ||||
|     // | ||||
|     void handleRedisSubscription(std::shared_ptr<SnakeConnectionState> state, | ||||
|                                  std::shared_ptr<ix::WebSocket> ws, | ||||
|                                  const AppConfig& appConfig, | ||||
|                                  const nlohmann::json& pdu) | ||||
|     void handleSubscribe(std::shared_ptr<SnakeConnectionState> state, | ||||
|                          ix::WebSocket& ws, | ||||
|                          const AppConfig& appConfig, | ||||
|                          const nlohmann::json& pdu, | ||||
|                          uint64_t pduId) | ||||
|     { | ||||
|         std::string channel = pdu["body"]["channel"]; | ||||
|         std::string subscriptionId = channel; | ||||
|         state->subscriptionId = channel; | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         ss << state->appkey() << "::" << channel; | ||||
|  | ||||
|         std::string appChannel(ss.str()); | ||||
|         state->appChannel = ss.str(); | ||||
|  | ||||
|         ix::RedisClient redisClient; | ||||
|         ix::RedisClient& redisClient = state->subscriptionRedisClient; | ||||
|         int port = appConfig.redisPort; | ||||
|  | ||||
|         auto urls = appConfig.redisHosts; | ||||
| @@ -171,7 +174,7 @@ namespace snake | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Cannot connect to redis host " << hostname << ":" << port; | ||||
|             handleError("rtm/subscribe", ws, pdu, ss.str()); | ||||
|             handleError("rtm/subscribe", ws, pduId, ss.str()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
| @@ -183,7 +186,7 @@ namespace snake | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 ss << "Cannot authenticated to redis"; | ||||
|                 handleError("rtm/subscribe", ws, pdu, ss.str()); | ||||
|                 handleError("rtm/subscribe", ws, pduId, ss.str()); | ||||
|                 return; | ||||
|             } | ||||
|         } | ||||
| @@ -193,83 +196,80 @@ namespace snake | ||||
|         { | ||||
|             std::string filterStr = pdu["body"]["filter"]; | ||||
|         } | ||||
|  | ||||
|         std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr); | ||||
|  | ||||
|         int id = 0; | ||||
|         auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) { | ||||
|         state->streamSql = std::make_unique<StreamSql>(filterStr); | ||||
|         state->id = 0; | ||||
|         state->onRedisSubscribeCallback = [&ws, state](const std::string& messageStr) { | ||||
|             auto msg = nlohmann::json::parse(messageStr); | ||||
|  | ||||
|             msg = msg["body"]["message"]; | ||||
|  | ||||
|             if (streamSql->valid() && !streamSql->match(msg)) | ||||
|             if (state->streamSql->valid() && !state->streamSql->match(msg)) | ||||
|             { | ||||
|                 return; | ||||
|             } | ||||
|  | ||||
|             nlohmann::json response = { | ||||
|                 {"action", "rtm/subscription/data"}, | ||||
|                 {"id", id++}, | ||||
|                 {"id", state->id++}, | ||||
|                 {"body", | ||||
|                  {{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}}; | ||||
|                  {{"subscription_id", state->subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}}; | ||||
|  | ||||
|             ws->sendText(response.dump()); | ||||
|             ws.sendText(response.dump()); | ||||
|         }; | ||||
|  | ||||
|         auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse) { | ||||
|         state->onRedisSubscribeResponseCallback = [&ws, state, pduId](const std::string& redisResponse) { | ||||
|             std::stringstream ss; | ||||
|             ss << "Redis Response: " << redisResponse << "..."; | ||||
|             ix::CoreLogger::log(ss.str().c_str()); | ||||
|  | ||||
|             // Success | ||||
|             nlohmann::json response = {{"action", "rtm/subscribe/ok"}, | ||||
|                                        {"id", pdu.value("id", 1)}, | ||||
|                                        {"body", {{"subscription_id", subscriptionId}}}}; | ||||
|             ws->sendText(response.dump()); | ||||
|                                        {"id", pduId}, | ||||
|                                        {"body", {{"subscription_id", state->subscriptionId}}}}; | ||||
|             ws.sendText(response.dump()); | ||||
|         }; | ||||
|  | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Subscribing to " << appChannel << "..."; | ||||
|             ss << "Subscribing to " << state->appChannel << "..."; | ||||
|             ix::CoreLogger::log(ss.str().c_str()); | ||||
|         } | ||||
|  | ||||
|         if (!redisClient.subscribe(appChannel, responseCallback, callback)) | ||||
|         auto subscription = [&redisClient, state, &ws, pduId] | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Error subscribing to channel " << appChannel; | ||||
|             handleError("rtm/subscribe", ws, pdu, ss.str()); | ||||
|             return; | ||||
|         } | ||||
|     } | ||||
|             if (!redisClient.subscribe(state->appChannel,  | ||||
|                                        state->onRedisSubscribeResponseCallback, | ||||
|                                        state->onRedisSubscribeCallback)) | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 ss << "Error subscribing to channel " << state->appChannel; | ||||
|                 handleError("rtm/subscribe", ws, pduId, ss.str()); | ||||
|                 return; | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|     void handleSubscribe(std::shared_ptr<SnakeConnectionState> state, | ||||
|                          std::shared_ptr<ix::WebSocket> ws, | ||||
|                          const AppConfig& appConfig, | ||||
|                          const nlohmann::json& pdu) | ||||
|     { | ||||
|         state->fut = | ||||
|             std::async(std::launch::async, handleRedisSubscription, state, ws, appConfig, pdu); | ||||
|         state->subscriptionThread = std::thread(subscription); | ||||
|     } | ||||
|  | ||||
|     void handleUnSubscribe(std::shared_ptr<SnakeConnectionState> state, | ||||
|                            std::shared_ptr<ix::WebSocket> ws, | ||||
|                            const nlohmann::json& pdu) | ||||
|                            ix::WebSocket& ws, | ||||
|                            const nlohmann::json& pdu, | ||||
|                            uint64_t pduId) | ||||
|     { | ||||
|         // extract subscription_id | ||||
|         auto body = pdu["body"]; | ||||
|         auto subscriptionId = body["subscription_id"]; | ||||
|  | ||||
|         state->redisClient().stop(); | ||||
|         state->stopSubScriptionThread(); | ||||
|  | ||||
|         nlohmann::json response = {{"action", "rtm/unsubscribe/ok"}, | ||||
|                                    {"id", pdu.value("id", 1)}, | ||||
|                                    {"id", pduId}, | ||||
|                                    {"body", {{"subscription_id", subscriptionId}}}}; | ||||
|         ws->sendText(response.dump()); | ||||
|         ws.sendText(response.dump()); | ||||
|     } | ||||
|  | ||||
|     void processCobraMessage(std::shared_ptr<SnakeConnectionState> state, | ||||
|                              std::shared_ptr<ix::WebSocket> ws, | ||||
|                              ix::WebSocket& ws, | ||||
|                              const AppConfig& appConfig, | ||||
|                              const std::string& str) | ||||
|     { | ||||
| @@ -284,31 +284,32 @@ namespace snake | ||||
|             ss << "malformed json pdu: " << e.what() << " -> " << str << ""; | ||||
|  | ||||
|             nlohmann::json response = {{"body", {{"error", "invalid_json"}, {"reason", ss.str()}}}}; | ||||
|             ws->sendText(response.dump()); | ||||
|             ws.sendText(response.dump()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         auto action = pdu["action"]; | ||||
|         uint64_t pduId = pdu.value("id", 1); | ||||
|  | ||||
|         if (action == "auth/handshake") | ||||
|         { | ||||
|             handleHandshake(state, ws, pdu); | ||||
|             handleHandshake(state, ws, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "auth/authenticate") | ||||
|         { | ||||
|             handleAuth(state, ws, appConfig, pdu); | ||||
|             handleAuth(state, ws, appConfig, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "rtm/publish") | ||||
|         { | ||||
|             handlePublish(state, ws, appConfig, pdu); | ||||
|             handlePublish(state, ws, appConfig, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "rtm/subscribe") | ||||
|         { | ||||
|             handleSubscribe(state, ws, appConfig, pdu); | ||||
|             handleSubscribe(state, ws, appConfig, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "rtm/unsubscribe") | ||||
|         { | ||||
|             handleUnSubscribe(state, ws, pdu); | ||||
|             handleUnSubscribe(state, ws, pdu, pduId); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|   | ||||
| @@ -20,7 +20,7 @@ namespace snake | ||||
|     struct AppConfig; | ||||
|  | ||||
|     void processCobraMessage(std::shared_ptr<SnakeConnectionState> state, | ||||
|                              std::shared_ptr<ix::WebSocket> ws, | ||||
|                              ix::WebSocket& ws, | ||||
|                              const AppConfig& appConfig, | ||||
|                              const std::string& str); | ||||
| } // namespace snake | ||||
|   | ||||
| @@ -59,68 +59,68 @@ namespace snake | ||||
|         }; | ||||
|         _server.setConnectionStateFactory(factory); | ||||
|  | ||||
|         _server.setOnConnectionCallback( | ||||
|             [this](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                    std::shared_ptr<ix::ConnectionState> connectionState, | ||||
|                    std::unique_ptr<ix::ConnectionInfo> connectionInfo) { | ||||
|         _server.setOnClientMessageCallback( | ||||
|             [this](std::shared_ptr<ix::ConnectionState> connectionState, | ||||
|                    ix::ConnectionInfo& connectionInfo, | ||||
|                    ix::WebSocket& webSocket, | ||||
|                    const ix::WebSocketMessagePtr& msg) { | ||||
|                 auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState); | ||||
|                 auto remoteIp = connectionInfo->remoteIp; | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|              | ||||
|                 webSocket->setOnMessageCallback( | ||||
|                     [this, webSocket, state, remoteIp](const ix::WebSocketMessagePtr& msg) { | ||||
|                         std::stringstream ss; | ||||
|                         ix::LogLevel logLevel = ix::LogLevel::Debug; | ||||
|                         if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                         { | ||||
|                             ss << "New connection" << std::endl; | ||||
|                             ss << "remote ip: " << remoteIp << std::endl; | ||||
|                             ss << "id: " << state->getId() << std::endl; | ||||
|                             ss << "Uri: " << msg->openInfo.uri << std::endl; | ||||
|                             ss << "Headers:" << std::endl; | ||||
|                             for (auto it : msg->openInfo.headers) | ||||
|                             { | ||||
|                                 ss << it.first << ": " << it.second << std::endl; | ||||
|                             } | ||||
|                 std::stringstream ss; | ||||
|                 ss << "[" << state->getId() << "] "; | ||||
|  | ||||
|                             std::string appkey = parseAppKey(msg->openInfo.uri); | ||||
|                             state->setAppkey(appkey); | ||||
|                 ix::LogLevel logLevel = ix::LogLevel::Debug; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     ss << "New connection" << std::endl; | ||||
|                     ss << "remote ip: " << remoteIp << std::endl; | ||||
|                     ss << "id: " << state->getId() << std::endl; | ||||
|                     ss << "Uri: " << msg->openInfo.uri << std::endl; | ||||
|                     ss << "Headers:" << std::endl; | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         ss << it.first << ": " << it.second << std::endl; | ||||
|                     } | ||||
|  | ||||
|                             // Connect to redis first | ||||
|                             if (!state->redisClient().connect(_appConfig.redisHosts[0], | ||||
|                                                               _appConfig.redisPort)) | ||||
|                             { | ||||
|                                 ss << "Cannot connect to redis host" << std::endl; | ||||
|                                 logLevel = ix::LogLevel::Error; | ||||
|                             } | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                         { | ||||
|                             ss << "Closed connection" | ||||
|                                << " code " << msg->closeInfo.code << " reason " | ||||
|                                << msg->closeInfo.reason << std::endl; | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                         { | ||||
|                             std::stringstream ss; | ||||
|                             ss << "Connection error: " << msg->errorInfo.reason << std::endl; | ||||
|                             ss << "#retries: " << msg->errorInfo.retries << std::endl; | ||||
|                             ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; | ||||
|                             ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; | ||||
|                             logLevel = ix::LogLevel::Error; | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Fragment) | ||||
|                         { | ||||
|                             ss << "Received message fragment" << std::endl; | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                         { | ||||
|                             ss << "Received " << msg->wireSize << " bytes" << std::endl; | ||||
|                             processCobraMessage(state, webSocket, _appConfig, msg->str); | ||||
|                         } | ||||
|                     std::string appkey = parseAppKey(msg->openInfo.uri); | ||||
|                     state->setAppkey(appkey); | ||||
|  | ||||
|                         ix::CoreLogger::log(ss.str().c_str(), logLevel); | ||||
|                     }); | ||||
|             }); | ||||
|                     // Connect to redis first | ||||
|                     if (!state->redisClient().connect(_appConfig.redisHosts[0], | ||||
|                                                       _appConfig.redisPort)) | ||||
|                     { | ||||
|                         ss << "Cannot connect to redis host" << std::endl; | ||||
|                         logLevel = ix::LogLevel::Error; | ||||
|                     } | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     ss << "Closed connection" | ||||
|                        << " code " << msg->closeInfo.code << " reason " | ||||
|                        << msg->closeInfo.reason << std::endl; | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                 { | ||||
|                     std::stringstream ss; | ||||
|                     ss << "Connection error: " << msg->errorInfo.reason << std::endl; | ||||
|                     ss << "#retries: " << msg->errorInfo.retries << std::endl; | ||||
|                     ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; | ||||
|                     ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; | ||||
|                     logLevel = ix::LogLevel::Error; | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Fragment) | ||||
|                 { | ||||
|                     ss << "Received message fragment" << std::endl; | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     ss << "Received " << msg->wireSize << " bytes" << " " << msg->str << std::endl; | ||||
|                     processCobraMessage(state, webSocket, _appConfig, msg->str); | ||||
|                 } | ||||
|  | ||||
|                 ix::CoreLogger::log(ss.str().c_str(), logLevel); | ||||
|         }); | ||||
|  | ||||
|         auto res = _server.listen(); | ||||
|         if (!res.first) | ||||
|   | ||||
| @@ -16,7 +16,10 @@ | ||||
| #include <random> | ||||
| #include <sstream> | ||||
| #include <vector> | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
| #include <zlib.h> | ||||
| #endif | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
| @@ -174,11 +177,13 @@ namespace ix | ||||
|         ss << verb << " " << path << " HTTP/1.1\r\n"; | ||||
|         ss << "Host: " << host << "\r\n"; | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         if (args->compress) | ||||
|         { | ||||
|             ss << "Accept-Encoding: gzip" | ||||
|                << "\r\n"; | ||||
|         } | ||||
| #endif | ||||
|  | ||||
|         // Append extra headers | ||||
|         for (auto&& it : args->extraHeaders) | ||||
| @@ -495,6 +500,7 @@ namespace ix | ||||
|  | ||||
|         downloadSize = payload.size(); | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         // If the content was compressed with gzip, decode it | ||||
|         if (headers["Content-Encoding"] == "gzip") | ||||
|         { | ||||
| @@ -513,6 +519,7 @@ namespace ix | ||||
|             } | ||||
|             payload = decompressedPayload; | ||||
|         } | ||||
| #endif | ||||
|  | ||||
|         return std::make_shared<HttpResponse>(code, | ||||
|                                               description, | ||||
| @@ -672,6 +679,7 @@ namespace ix | ||||
|         return ss.str(); | ||||
|     } | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|     bool HttpClient::gzipInflate(const std::string& in, std::string& out) | ||||
|     { | ||||
|         z_stream inflateState; | ||||
| @@ -716,6 +724,7 @@ namespace ix | ||||
|         inflateEnd(&inflateState); | ||||
|         return true; | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|     void HttpClient::log(const std::string& msg, HttpRequestArgsPtr args) | ||||
|     { | ||||
|   | ||||
| @@ -90,7 +90,9 @@ namespace ix | ||||
|     private: | ||||
|         void log(const std::string& msg, HttpRequestArgsPtr args); | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         bool gzipInflate(const std::string& in, std::string& out); | ||||
| #endif | ||||
|  | ||||
|         // Async API background thread runner | ||||
|         void run(); | ||||
|   | ||||
| @@ -13,7 +13,10 @@ | ||||
| #include <fstream> | ||||
| #include <sstream> | ||||
| #include <vector> | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
| #include <zlib.h> | ||||
| #endif | ||||
|  | ||||
| namespace | ||||
| { | ||||
| @@ -41,6 +44,7 @@ namespace | ||||
|         return std::make_pair(res.first, std::string(vec.begin(), vec.end())); | ||||
|     } | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|     std::string gzipCompress(const std::string& str) | ||||
|     { | ||||
|         z_stream zs; // z_stream is zlib's control structure | ||||
| @@ -83,6 +87,7 @@ namespace | ||||
|  | ||||
|         return outstring; | ||||
|     } | ||||
| #endif | ||||
| } // namespace | ||||
|  | ||||
| namespace ix | ||||
| @@ -125,9 +130,8 @@ namespace ix | ||||
|  | ||||
|         if (std::get<0>(ret)) | ||||
|         { | ||||
|             auto response = _onConnectionCallback(std::get<2>(ret), | ||||
|                                                   connectionState, | ||||
|                                                   std::move(connectionInfo)); | ||||
|             auto response = | ||||
|                 _onConnectionCallback(std::get<2>(ret), connectionState, std::move(connectionInfo)); | ||||
|             if (!Http::sendResponse(response, socket)) | ||||
|             { | ||||
|                 logError("Cannot send response"); | ||||
| @@ -169,12 +173,14 @@ namespace ix | ||||
|  | ||||
|                 std::string content = res.second; | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|                 std::string acceptEncoding = request->headers["Accept-encoding"]; | ||||
|                 if (acceptEncoding == "*" || acceptEncoding.find("gzip") != std::string::npos) | ||||
|                 { | ||||
|                     content = gzipCompress(content); | ||||
|                     headers["Content-Encoding"] = "gzip"; | ||||
|                 } | ||||
| #endif | ||||
|  | ||||
|                 // Log request | ||||
|                 std::stringstream ss; | ||||
| @@ -203,10 +209,9 @@ namespace ix | ||||
|         // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections | ||||
|         // | ||||
|         setOnConnectionCallback( | ||||
|             [this, | ||||
|              redirectUrl](HttpRequestPtr request, | ||||
|                           std::shared_ptr<ConnectionState> /*connectionState*/, | ||||
|                           std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr { | ||||
|             [this, redirectUrl](HttpRequestPtr request, | ||||
|                                 std::shared_ptr<ConnectionState> /*connectionState*/, | ||||
|                                 std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr { | ||||
|                 WebSocketHttpHeaders headers; | ||||
|                 headers["Server"] = userAgent(); | ||||
|  | ||||
|   | ||||
| @@ -7,17 +7,17 @@ | ||||
|  | ||||
| // unix systems | ||||
| #if defined(__APPLE__) || defined(__linux__) || defined(BSD) | ||||
| # include <pthread.h> | ||||
| #include <pthread.h> | ||||
| #endif | ||||
|  | ||||
| // freebsd needs this header as well | ||||
| #if defined(BSD) | ||||
| # include <pthread_np.h> | ||||
| #include <pthread_np.h> | ||||
| #endif | ||||
|  | ||||
| // Windows | ||||
| #ifdef _WIN32 | ||||
| # include <Windows.h> | ||||
| #include <Windows.h> | ||||
| #endif | ||||
|  | ||||
| namespace ix | ||||
|   | ||||
| @@ -35,19 +35,122 @@ namespace ix | ||||
|         : _sockfd(fd) | ||||
|         , _selectInterrupt(createSelectInterrupt()) | ||||
|     { | ||||
|         ; | ||||
| #if defined(__APPLE__) | ||||
|         _kqueuefd = kqueue(); | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     Socket::~Socket() | ||||
|     { | ||||
|         close(); | ||||
|  | ||||
| #if defined(__APPLE__) | ||||
|         ::close(_kqueuefd); | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     PollResultType Socket::poll(bool readyToRead, | ||||
|                                 int timeoutMs, | ||||
|                                 int sockfd, | ||||
|                                 const SelectInterruptPtr& selectInterrupt) | ||||
|                                 const SelectInterruptPtr& selectInterrupt, | ||||
|                                 int kqueuefd) | ||||
|     { | ||||
| #if defined(__APPLE__) | ||||
|         // FIXME int kqueuefd = kqueue(); | ||||
|  | ||||
|         struct kevent ke; | ||||
|         EV_SET(&ke, sockfd, (readyToRead) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0, 0, NULL); | ||||
|         if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error; | ||||
|  | ||||
|         int retval, numevents = 0; | ||||
|  | ||||
|         int nfds = 1; | ||||
| #if 0 | ||||
|         if (selectInterrupt)  | ||||
|         { | ||||
|             nfds = 2; | ||||
|             int interruptFd = selectInterrupt->getFd(); | ||||
|  | ||||
|             struct kevent ke; | ||||
|             EV_SET(&ke, interruptFd, EVFILT_READ, EV_ADD, 0, 0, NULL); | ||||
|             if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error; | ||||
|         } | ||||
| #endif | ||||
|  | ||||
|         struct kevent *events; | ||||
|         events = (struct kevent*) malloc(sizeof(struct kevent) * nfds); | ||||
|  | ||||
|         if (timeoutMs != 0) | ||||
|         { | ||||
|             struct timespec timeout; | ||||
|             timeout.tv_sec = timeoutMs / 1000; | ||||
|             timeout.tv_nsec = (timeoutMs % 1000) * 1000 * 1000; | ||||
|             retval = kevent(kqueuefd, NULL, 0, events, nfds, &timeout); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             retval = kevent(kqueuefd, NULL, 0, events, nfds, NULL); | ||||
|         } | ||||
|  | ||||
| #if 0 | ||||
|         if (retval > 0) { | ||||
|             int j; | ||||
|  | ||||
|             numevents = retval; | ||||
|             for(j = 0; j < numevents; j++) { | ||||
|                 int mask = 0; | ||||
|                 struct kevent *e = events+j; | ||||
|  | ||||
|                 if (e->filter == EVFILT_READ) mask |= AE_READABLE; | ||||
|                 if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; | ||||
|                 eventLoop->fired[j].fd = e->ident; | ||||
|                 eventLoop->fired[j].mask = mask; | ||||
|             } | ||||
|         } | ||||
| #else | ||||
|         PollResultType pollResult = PollResultType::ReadyForRead; | ||||
|         if (retval < 0) | ||||
|         { | ||||
|             pollResult = PollResultType::Error; | ||||
|         } | ||||
|  | ||||
|         if (retval > 0) { | ||||
|             struct kevent *e = events; | ||||
|             if (e->filter == EVFILT_READ) | ||||
|             { | ||||
|                 pollResult = PollResultType::ReadyForRead; | ||||
|             } | ||||
|             else if (e->filter == EVFILT_WRITE)  | ||||
|             { | ||||
|                 pollResult = PollResultType::ReadyForWrite; | ||||
|  | ||||
|                 int optval = -1; | ||||
|                 socklen_t optlen = sizeof(optval); | ||||
|  | ||||
|                 // getsockopt() puts the errno value for connect into optval so 0 | ||||
|                 // means no-error. | ||||
|                 if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 || optval != 0) | ||||
|                 { | ||||
|                     pollResult = PollResultType::Error; | ||||
|  | ||||
|                     // set errno to optval so that external callers can have an | ||||
|                     // appropriate error description when calling strerror | ||||
|                     errno = optval; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             pollResult = PollResultType::Timeout; | ||||
|         } | ||||
| #endif | ||||
|  | ||||
|         free(events); | ||||
|  | ||||
|         // ::close(kqueuefd); //FMXE | ||||
|  | ||||
|         return pollResult; | ||||
| #else | ||||
|         // | ||||
|         // We used to use ::select to poll but on Android 9 we get large fds out of | ||||
|         // ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching | ||||
| @@ -142,6 +245,7 @@ namespace ix | ||||
|         } | ||||
|  | ||||
|         return pollResult; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     PollResultType Socket::isReadyToRead(int timeoutMs) | ||||
| @@ -152,7 +256,7 @@ namespace ix | ||||
|         } | ||||
|  | ||||
|         bool readyToRead = true; | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd); | ||||
|     } | ||||
|  | ||||
|     PollResultType Socket::isReadyToWrite(int timeoutMs) | ||||
| @@ -163,7 +267,7 @@ namespace ix | ||||
|         } | ||||
|  | ||||
|         bool readyToRead = false; | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd); | ||||
|     } | ||||
|  | ||||
|     // Wake up from poll/select by writing to the pipe which is watched by select | ||||
|   | ||||
| @@ -13,6 +13,13 @@ | ||||
| #include <string> | ||||
| #include <vector> | ||||
|  | ||||
| // For kqueue | ||||
| #if defined(__APPLE__) | ||||
| #include <sys/types.h> | ||||
| #include <sys/event.h> | ||||
| #include <sys/time.h> | ||||
| #endif | ||||
|  | ||||
| #ifdef _WIN32 | ||||
| #include <BaseTsd.h> | ||||
| typedef SSIZE_T ssize_t; | ||||
| @@ -94,7 +101,8 @@ namespace ix | ||||
|         static PollResultType poll(bool readyToRead, | ||||
|                                    int timeoutMs, | ||||
|                                    int sockfd, | ||||
|                                    const SelectInterruptPtr& selectInterrupt); | ||||
|                                    const SelectInterruptPtr& selectInterrupt, | ||||
|                                    int kqueuefd); | ||||
|  | ||||
|  | ||||
|         // Used as special codes for pipe communication | ||||
| @@ -114,5 +122,9 @@ namespace ix | ||||
|         static constexpr size_t kChunkSize = 1 << 15; | ||||
|  | ||||
|         SelectInterruptPtr _selectInterrupt; | ||||
|  | ||||
| #if defined(__APPLE__) | ||||
|         int _kqueuefd; | ||||
| #endif | ||||
|     }; | ||||
| } // namespace ix | ||||
|   | ||||
| @@ -66,7 +66,10 @@ namespace ix | ||||
|             int timeoutMs = 10; | ||||
|             bool readyToRead = false; | ||||
|             auto selectInterrupt = std::make_unique<SelectInterrupt>(); | ||||
|             PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt); | ||||
|  | ||||
|             int kqueuefd = kqueue(); | ||||
|             PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt, kqueuefd); | ||||
|             ::close(kqueuefd); | ||||
|  | ||||
|             if (pollResult == PollResultType::Timeout) | ||||
|             { | ||||
|   | ||||
| @@ -259,8 +259,11 @@ namespace ix | ||||
|             int timeoutMs = 10; | ||||
|             bool readyToRead = true; | ||||
|             auto selectInterrupt = std::make_unique<SelectInterrupt>(); | ||||
|  | ||||
|             int kqueuefd = kqueue(); | ||||
|             PollResultType pollResult = | ||||
|                 Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt); | ||||
|                 Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt, kqueuefd); | ||||
|             ::close(kqueuefd); | ||||
|  | ||||
|             if (pollResult == PollResultType::Error) | ||||
|             { | ||||
| @@ -379,10 +382,13 @@ namespace ix | ||||
|  | ||||
|             // Launch the handleConnection work asynchronously in its own thread. | ||||
|             std::lock_guard<std::mutex> lock(_connectionsThreadsMutex); | ||||
|             _connectionsThreads.push_back(std::make_pair( | ||||
|                 connectionState, | ||||
|                 std::thread( | ||||
|                     &SocketServer::handleConnection, this, std::move(socket), connectionState, std::move(connectionInfo)))); | ||||
|             _connectionsThreads.push_back( | ||||
|                 std::make_pair(connectionState, | ||||
|                                std::thread(&SocketServer::handleConnection, | ||||
|                                            this, | ||||
|                                            std::move(socket), | ||||
|                                            connectionState, | ||||
|                                            std::move(connectionInfo)))); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -8,7 +8,9 @@ | ||||
|  | ||||
| #include "IXWebSocketVersion.h" | ||||
| #include <sstream> | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
| #include <zlib.h> | ||||
| #endif | ||||
|  | ||||
| // Platform name | ||||
| #if defined(_WIN32) | ||||
| @@ -77,8 +79,10 @@ namespace ix | ||||
|         ss << " nossl"; | ||||
| #endif | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         // Zlib version | ||||
|         ss << " zlib " << ZLIB_VERSION; | ||||
| #endif | ||||
|  | ||||
|         return ss.str(); | ||||
|     } | ||||
|   | ||||
| @@ -46,6 +46,7 @@ namespace ix | ||||
|     WebSocket::~WebSocket() | ||||
|     { | ||||
|         stop(); | ||||
|         _ws.setOnCloseCallback(nullptr); | ||||
|     } | ||||
|  | ||||
|     void WebSocket::setUrl(const std::string& url) | ||||
|   | ||||
| @@ -28,21 +28,26 @@ namespace ix | ||||
|     WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor() | ||||
|         : _compressBufferSize(kBufferSize) | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         memset(&_deflateState, 0, sizeof(_deflateState)); | ||||
|  | ||||
|         _deflateState.zalloc = Z_NULL; | ||||
|         _deflateState.zfree = Z_NULL; | ||||
|         _deflateState.opaque = Z_NULL; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor() | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         deflateEnd(&_deflateState); | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits, | ||||
|                                                     bool clientNoContextTakeOver) | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         int ret = deflateInit2(&_deflateState, | ||||
|                                Z_DEFAULT_COMPRESSION, | ||||
|                                Z_DEFLATED, | ||||
| @@ -57,6 +62,9 @@ namespace ix | ||||
|         _flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH; | ||||
|  | ||||
|         return true; | ||||
| #else | ||||
|         return false; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     template<typename T> | ||||
| @@ -96,6 +104,7 @@ namespace ix | ||||
|     template<typename T, typename S> | ||||
|     bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out) | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         // | ||||
|         // 7.2.1.  Compression | ||||
|         // | ||||
| @@ -152,6 +161,9 @@ namespace ix | ||||
|         } | ||||
|  | ||||
|         return true; | ||||
| #else | ||||
|         return false; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     // | ||||
| @@ -160,6 +172,7 @@ namespace ix | ||||
|     WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor() | ||||
|         : _compressBufferSize(kBufferSize) | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         memset(&_inflateState, 0, sizeof(_inflateState)); | ||||
|  | ||||
|         _inflateState.zalloc = Z_NULL; | ||||
| @@ -167,16 +180,20 @@ namespace ix | ||||
|         _inflateState.opaque = Z_NULL; | ||||
|         _inflateState.avail_in = 0; | ||||
|         _inflateState.next_in = Z_NULL; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor() | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         inflateEnd(&_inflateState); | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits, | ||||
|                                                       bool clientNoContextTakeOver) | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         int ret = inflateInit2(&_inflateState, -1 * inflateBits); | ||||
|  | ||||
|         if (ret != Z_OK) return false; | ||||
| @@ -186,10 +203,14 @@ namespace ix | ||||
|         _flush = (clientNoContextTakeOver) ? Z_FULL_FLUSH : Z_SYNC_FLUSH; | ||||
|  | ||||
|         return true; | ||||
| #else | ||||
|         return false; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in, std::string& out) | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         // | ||||
|         // 7.2.2.  Decompression | ||||
|         // | ||||
| @@ -226,5 +247,8 @@ namespace ix | ||||
|         } while (_inflateState.avail_out == 0); | ||||
|  | ||||
|         return true; | ||||
| #else | ||||
|         return false; | ||||
| #endif | ||||
|     } | ||||
| } // namespace ix | ||||
|   | ||||
| @@ -6,7 +6,9 @@ | ||||
|  | ||||
| #pragma once | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
| #include "zlib.h" | ||||
| #endif | ||||
| #include <memory> | ||||
| #include <string> | ||||
| #include <vector> | ||||
| @@ -34,7 +36,10 @@ namespace ix | ||||
|         int _flush; | ||||
|         size_t _compressBufferSize; | ||||
|         std::unique_ptr<unsigned char[]> _compressBuffer; | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         z_stream _deflateState; | ||||
| #endif | ||||
|     }; | ||||
|  | ||||
|     class WebSocketPerMessageDeflateDecompressor | ||||
| @@ -50,7 +55,10 @@ namespace ix | ||||
|         int _flush; | ||||
|         size_t _compressBufferSize; | ||||
|         std::unique_ptr<unsigned char[]> _compressBuffer; | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         z_stream _inflateState; | ||||
| #endif | ||||
|     }; | ||||
|  | ||||
| } // namespace ix | ||||
|   | ||||
| @@ -61,6 +61,7 @@ namespace ix | ||||
|         _clientMaxWindowBits = kDefaultClientMaxWindowBits; | ||||
|         _serverMaxWindowBits = kDefaultServerMaxWindowBits; | ||||
|  | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         // Split by ; | ||||
|         std::string token; | ||||
|         std::stringstream tokenStream(extension); | ||||
| @@ -112,6 +113,7 @@ namespace ix | ||||
|                 sanitizeClientMaxWindowBits(); | ||||
|             } | ||||
|         } | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     void WebSocketPerMessageDeflateOptions::sanitizeClientMaxWindowBits() | ||||
| @@ -126,6 +128,7 @@ namespace ix | ||||
|  | ||||
|     std::string WebSocketPerMessageDeflateOptions::generateHeader() | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         std::stringstream ss; | ||||
|         ss << "Sec-WebSocket-Extensions: permessage-deflate"; | ||||
|  | ||||
| @@ -138,11 +141,18 @@ namespace ix | ||||
|         ss << "\r\n"; | ||||
|  | ||||
|         return ss.str(); | ||||
| #else | ||||
|         return std::string(); | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     bool WebSocketPerMessageDeflateOptions::enabled() const | ||||
|     { | ||||
| #ifdef IXWEBSOCKET_USE_ZLIB | ||||
|         return _enabled; | ||||
| #else | ||||
|         return false; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     bool WebSocketPerMessageDeflateOptions::getClientNoContextTakeover() const | ||||
|   | ||||
							
								
								
									
										123
									
								
								ixwebsocket/IXWebSocketProxyServer.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										123
									
								
								ixwebsocket/IXWebSocketProxyServer.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,123 @@ | ||||
| /* | ||||
|  *  IXWebSocketProxyServer.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2018 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include "IXWebSocketProxyServer.h" | ||||
|  | ||||
| #include "IXWebSocketServer.h" | ||||
| #include <sstream> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     class ProxyConnectionState : public ix::ConnectionState | ||||
|     { | ||||
|     public: | ||||
|         ProxyConnectionState() | ||||
|             : _connected(false) | ||||
|         { | ||||
|         } | ||||
|  | ||||
|         ix::WebSocket& webSocket() | ||||
|         { | ||||
|             return _serverWebSocket; | ||||
|         } | ||||
|  | ||||
|         bool isConnected() | ||||
|         { | ||||
|             return _connected; | ||||
|         } | ||||
|  | ||||
|         void setConnected() | ||||
|         { | ||||
|             _connected = true; | ||||
|         } | ||||
|  | ||||
|     private: | ||||
|         ix::WebSocket _serverWebSocket; | ||||
|         bool _connected; | ||||
|     }; | ||||
|  | ||||
|     int websocket_proxy_server_main(int port, | ||||
|                                     const std::string& hostname, | ||||
|                                     const ix::SocketTLSOptions& tlsOptions, | ||||
|                                     const std::string& remoteUrl, | ||||
|                                     bool /*verbose*/) | ||||
|     { | ||||
|         ix::WebSocketServer server(port, hostname); | ||||
|         server.setTLSOptions(tlsOptions); | ||||
|  | ||||
|         auto factory = []() -> std::shared_ptr<ix::ConnectionState> { | ||||
|             return std::make_shared<ProxyConnectionState>(); | ||||
|         }; | ||||
|         server.setConnectionStateFactory(factory); | ||||
|  | ||||
|         server.setOnConnectionCallback([remoteUrl](std::weak_ptr<ix::WebSocket> webSocket, | ||||
|                                                    std::shared_ptr<ConnectionState> connectionState, | ||||
|                                                    std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState); | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|  | ||||
|             // Server connection | ||||
|             state->webSocket().setOnMessageCallback( | ||||
|                 [webSocket, state, remoteIp](const WebSocketMessagePtr& msg) { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         state->setTerminated(); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                     { | ||||
|                         auto ws = webSocket.lock(); | ||||
|                         if (ws) | ||||
|                         { | ||||
|                             ws->send(msg->str, msg->binary); | ||||
|                         } | ||||
|                     } | ||||
|                 }); | ||||
|  | ||||
|             // Client connection | ||||
|             auto ws = webSocket.lock(); | ||||
|             if (ws) | ||||
|             { | ||||
|                 ws->setOnMessageCallback([state, remoteUrl](const WebSocketMessagePtr& msg) { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                     { | ||||
|                         // Connect to the 'real' server | ||||
|                         std::string url(remoteUrl); | ||||
|                         url += msg->openInfo.uri; | ||||
|                         state->webSocket().setUrl(url); | ||||
|                         state->webSocket().disableAutomaticReconnection(); | ||||
|                         state->webSocket().start(); | ||||
|  | ||||
|                         // we should sleep here for a bit until we've established the | ||||
|                         // connection with the remote server | ||||
|                         while (state->webSocket().getReadyState() != ReadyState::Open) | ||||
|                         { | ||||
|                             std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||||
|                         } | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                     { | ||||
|                         state->webSocket().send(msg->str, msg->binary); | ||||
|                     } | ||||
|                 }); | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|         { | ||||
|             return 1; | ||||
|         } | ||||
|  | ||||
|         server.start(); | ||||
|         server.wait(); | ||||
|  | ||||
|         return 0; | ||||
|     } | ||||
| } // namespace ix | ||||
							
								
								
									
										20
									
								
								ixwebsocket/IXWebSocketProxyServer.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								ixwebsocket/IXWebSocketProxyServer.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | ||||
| /* | ||||
|  *  IXWebSocketProxyServer.h | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
| #pragma once | ||||
|  | ||||
| #include "IXSocketTLSOptions.h" | ||||
| #include <cstdint> | ||||
| #include <stddef.h> | ||||
| #include <string> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     int websocket_proxy_server_main(int port, | ||||
|                                     const std::string& hostname, | ||||
|                                     const ix::SocketTLSOptions& tlsOptions, | ||||
|                                     const std::string& remoteUrl, | ||||
|                                     bool verbose); | ||||
| } // namespace ix | ||||
| @@ -71,6 +71,11 @@ namespace ix | ||||
|         _onConnectionCallback = callback; | ||||
|     } | ||||
|  | ||||
|     void WebSocketServer::setOnClientMessageCallback(const OnClientMessageCallback& callback) | ||||
|     { | ||||
|         _onClientMessageCallback = callback; | ||||
|     } | ||||
|  | ||||
|     void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket, | ||||
|                                            std::shared_ptr<ConnectionState> connectionState, | ||||
|                                            std::unique_ptr<ConnectionInfo> connectionInfo) | ||||
| @@ -78,7 +83,26 @@ namespace ix | ||||
|         setThreadName("WebSocketServer::" + connectionState->getId()); | ||||
|  | ||||
|         auto webSocket = std::make_shared<WebSocket>(); | ||||
|         _onConnectionCallback(webSocket, connectionState, std::move(connectionInfo)); | ||||
|         if (_onConnectionCallback) | ||||
|         { | ||||
|             _onConnectionCallback(webSocket, connectionState, std::move(connectionInfo)); | ||||
|         } | ||||
|         else if (_onClientMessageCallback) | ||||
|         { | ||||
|             webSocket->setOnMessageCallback( | ||||
|                 [this, &ws = *webSocket.get(), connectionState, &ci = *connectionInfo.get()]( | ||||
|                     const WebSocketMessagePtr& msg) { | ||||
|                     _onClientMessageCallback(connectionState, ci, ws, msg); | ||||
|                 }); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             logError( | ||||
|                 "WebSocketServer Application developer error: No server callback is registerered."); | ||||
|             logError("Missing call to setOnConnectionCallback or setOnClientMessageCallback."); | ||||
|             connectionState->setTerminated(); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         webSocket->disableAutomaticReconnection(); | ||||
|  | ||||
| @@ -112,6 +136,8 @@ namespace ix | ||||
|             logError(ss.str()); | ||||
|         } | ||||
|  | ||||
|         webSocket->setOnMessageCallback(nullptr); | ||||
|  | ||||
|         // Remove this client from our client set | ||||
|         { | ||||
|             std::lock_guard<std::mutex> lock(_clientsMutex); | ||||
|   | ||||
| @@ -23,9 +23,15 @@ namespace ix | ||||
|     { | ||||
|     public: | ||||
|         using OnConnectionCallback = | ||||
|             std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>, | ||||
|             std::function<void(std::weak_ptr<WebSocket>, | ||||
|                                std::shared_ptr<ConnectionState>, | ||||
|                                std::unique_ptr<ConnectionInfo> connectionInfo)>; | ||||
|  | ||||
|         using OnClientMessageCallback = std::function<void(std::shared_ptr<ConnectionState>, | ||||
|                                                            ConnectionInfo&, | ||||
|                                                            WebSocket&, | ||||
|                                                            const WebSocketMessagePtr&)>; | ||||
|  | ||||
|         WebSocketServer(int port = SocketServer::kDefaultPort, | ||||
|                         const std::string& host = SocketServer::kDefaultHost, | ||||
|                         int backlog = SocketServer::kDefaultTcpBacklog, | ||||
| @@ -40,6 +46,7 @@ namespace ix | ||||
|         void disablePerMessageDeflate(); | ||||
|  | ||||
|         void setOnConnectionCallback(const OnConnectionCallback& callback); | ||||
|         void setOnClientMessageCallback(const OnClientMessageCallback& callback); | ||||
|  | ||||
|         // Get all the connected clients | ||||
|         std::set<std::shared_ptr<WebSocket>> getClients(); | ||||
| @@ -53,6 +60,7 @@ namespace ix | ||||
|         bool _enablePerMessageDeflate; | ||||
|  | ||||
|         OnConnectionCallback _onConnectionCallback; | ||||
|         OnClientMessageCallback _onClientMessageCallback; | ||||
|  | ||||
|         std::mutex _clientsMutex; | ||||
|         std::set<std::shared_ptr<WebSocket>> _clients; | ||||
|   | ||||
| @@ -65,7 +65,6 @@ namespace ix | ||||
|         , _receivedMessageCompressed(false) | ||||
|         , _readyState(ReadyState::CLOSED) | ||||
|         , _closeCode(WebSocketCloseConstants::kInternalErrorCode) | ||||
|         , _closeReason(WebSocketCloseConstants::kInternalErrorMessage) | ||||
|         , _closeWireSize(0) | ||||
|         , _closeRemote(false) | ||||
|         , _enablePerMessageDeflate(false) | ||||
| @@ -77,6 +76,7 @@ namespace ix | ||||
|         , _pingCount(0) | ||||
|         , _lastSendPingTimePoint(std::chrono::steady_clock::now()) | ||||
|     { | ||||
|         setCloseReason(WebSocketCloseConstants::kInternalErrorMessage); | ||||
|         _readbuf.resize(kChunkSize); | ||||
|     } | ||||
|  | ||||
| @@ -179,10 +179,12 @@ namespace ix | ||||
|  | ||||
|         if (readyState == ReadyState::CLOSED) | ||||
|         { | ||||
|             std::lock_guard<std::mutex> lock(_closeDataMutex); | ||||
|             _onCloseCallback(_closeCode, _closeReason, _closeWireSize, _closeRemote); | ||||
|             if (_onCloseCallback) | ||||
|             { | ||||
|                 _onCloseCallback(_closeCode, getCloseReason(), _closeWireSize, _closeRemote); | ||||
|             } | ||||
|             setCloseReason(WebSocketCloseConstants::kInternalErrorMessage); | ||||
|             _closeCode = WebSocketCloseConstants::kInternalErrorCode; | ||||
|             _closeReason = WebSocketCloseConstants::kInternalErrorMessage; | ||||
|             _closeWireSize = 0; | ||||
|             _closeRemote = false; | ||||
|         } | ||||
| @@ -261,9 +263,10 @@ namespace ix | ||||
|         { | ||||
|             // compute lasting delay to wait for next ping / timeout, if at least one set | ||||
|             auto now = std::chrono::steady_clock::now(); | ||||
|             lastingTimeoutDelayInMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>( | ||||
|             int timeSinceLastPingMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>( | ||||
|                                           now - _lastSendPingTimePoint) | ||||
|                                           .count(); | ||||
|             lastingTimeoutDelayInMs = (1000 * _pingIntervalSecs) - timeSinceLastPingMs; | ||||
|         } | ||||
|  | ||||
| #ifdef _WIN32 | ||||
| @@ -639,11 +642,7 @@ namespace ix | ||||
|                 { | ||||
|                     // we got the CLOSE frame answer from our close, so we can close the connection | ||||
|                     // if the code/reason are the same | ||||
|                     bool identicalReason; | ||||
|                     { | ||||
|                         std::lock_guard<std::mutex> lock(_closeDataMutex); | ||||
|                         identicalReason = _closeCode == code && _closeReason == reason; | ||||
|                     } | ||||
|                     bool identicalReason = _closeCode == code && getCloseReason() == reason; | ||||
|  | ||||
|                     if (identicalReason) | ||||
|                     { | ||||
| @@ -797,6 +796,11 @@ namespace ix | ||||
|         if (wireSize < kChunkSize) | ||||
|         { | ||||
|             success = sendFragment(type, true, message_begin, message_end, compress); | ||||
|  | ||||
|             if (onProgressCallback) | ||||
|             { | ||||
|                 onProgressCallback(0, 1); | ||||
|             } | ||||
|         } | ||||
|         else | ||||
|         { | ||||
| @@ -1081,13 +1085,10 @@ namespace ix | ||||
|     { | ||||
|         closeSocket(); | ||||
|  | ||||
|         { | ||||
|             std::lock_guard<std::mutex> lock(_closeDataMutex); | ||||
|             _closeCode = code; | ||||
|             _closeReason = reason; | ||||
|             _closeWireSize = closeWireSize; | ||||
|             _closeRemote = remote; | ||||
|         } | ||||
|         setCloseReason(reason); | ||||
|         _closeCode = code; | ||||
|         _closeWireSize = closeWireSize; | ||||
|         _closeRemote = remote; | ||||
|  | ||||
|         setReadyState(ReadyState::CLOSED); | ||||
|         _requestInitCancellation = false; | ||||
| @@ -1107,13 +1108,11 @@ namespace ix | ||||
|             closeWireSize = reason.size(); | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             std::lock_guard<std::mutex> lock(_closeDataMutex); | ||||
|             _closeCode = code; | ||||
|             _closeReason = reason; | ||||
|             _closeWireSize = closeWireSize; | ||||
|             _closeRemote = remote; | ||||
|         } | ||||
|         setCloseReason(reason); | ||||
|         _closeCode = code; | ||||
|         _closeWireSize = closeWireSize; | ||||
|         _closeRemote = remote; | ||||
|  | ||||
|         { | ||||
|             std::lock_guard<std::mutex> lock(_closingTimePointMutex); | ||||
|             _closingTimePoint = std::chrono::steady_clock::now(); | ||||
| @@ -1158,4 +1157,15 @@ namespace ix | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     void WebSocketTransport::setCloseReason(const std::string& reason) | ||||
|     { | ||||
|         std::lock_guard<std::mutex> lock(_closeReasonMutex); | ||||
|         _closeReason = reason; | ||||
|     } | ||||
|  | ||||
|     const std::string& WebSocketTransport::getCloseReason() const | ||||
|     { | ||||
|         std::lock_guard<std::mutex> lock(_closeReasonMutex); | ||||
|         return _closeReason; | ||||
|     } | ||||
| } // namespace ix | ||||
|   | ||||
| @@ -178,11 +178,11 @@ namespace ix | ||||
|         std::atomic<ReadyState> _readyState; | ||||
|  | ||||
|         OnCloseCallback _onCloseCallback; | ||||
|         uint16_t _closeCode; | ||||
|         std::string _closeReason; | ||||
|         size_t _closeWireSize; | ||||
|         bool _closeRemote; | ||||
|         mutable std::mutex _closeDataMutex; | ||||
|         mutable std::mutex _closeReasonMutex; | ||||
|         std::atomic<uint16_t> _closeCode; | ||||
|         std::atomic<size_t> _closeWireSize; | ||||
|         std::atomic<bool> _closeRemote; | ||||
|  | ||||
|         // Data used for Per Message Deflate compression (with zlib) | ||||
|         WebSocketPerMessageDeflatePtr _perMessageDeflate; | ||||
| @@ -267,5 +267,8 @@ namespace ix | ||||
|         void unmaskReceiveBuffer(const wsheader_type& ws); | ||||
|  | ||||
|         std::string getMergedChunks() const; | ||||
|  | ||||
|         void setCloseReason(const std::string& reason); | ||||
|         const std::string& getCloseReason() const; | ||||
|     }; | ||||
| } // namespace ix | ||||
|   | ||||
| @@ -6,4 +6,4 @@ | ||||
|  | ||||
| #pragma once | ||||
|  | ||||
| #define IX_WEBSOCKET_VERSION "9.9.2" | ||||
| #define IX_WEBSOCKET_VERSION "10.1.5" | ||||
|   | ||||
							
								
								
									
										18
									
								
								makefile
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								makefile
									
									
									
									
									
								
							| @@ -34,7 +34,10 @@ ws: | ||||
| 	mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install) | ||||
|  | ||||
| ws_install: | ||||
| 	mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && make -j 4 install) | ||||
| 	mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install) | ||||
|  | ||||
| ws_install_release: | ||||
| 	mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install) | ||||
|  | ||||
| ws_openssl: | ||||
| 	mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4) | ||||
| @@ -174,7 +177,7 @@ autobahn_report: | ||||
| 	cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/ | ||||
|  | ||||
| httpd: | ||||
| 	clang++ --std=c++14 --stdlib=libc++ httpd.cpp \ | ||||
| 	clang++ --std=c++14 --stdlib=libc++ -o ixhttpd httpd.cpp \ | ||||
| 		ixwebsocket/IXSelectInterruptFactory.cpp \ | ||||
| 		ixwebsocket/IXCancellationRequest.cpp \ | ||||
| 		ixwebsocket/IXSocketTLSOptions.cpp \ | ||||
| @@ -193,11 +196,11 @@ httpd: | ||||
| 		ixwebsocket/IXConnectionState.cpp \ | ||||
| 		ixwebsocket/IXUrlParser.cpp \ | ||||
| 		ixwebsocket/IXSelectInterrupt.cpp \ | ||||
| 		ixwebsocket/apple/IXSetThreadName_apple.cpp \ | ||||
| 		ixwebsocket/IXSetThreadName.cpp \ | ||||
| 		-lz | ||||
|  | ||||
| httpd_linux: | ||||
| 	g++ --std=c++11 -o ixhttpd httpd.cpp -Iixwebsocket \ | ||||
| 	g++ --std=c++14 -o ixhttpd httpd.cpp -Iixwebsocket \ | ||||
| 		ixwebsocket/IXSelectInterruptFactory.cpp \ | ||||
| 		ixwebsocket/IXCancellationRequest.cpp \ | ||||
| 		ixwebsocket/IXSocketTLSOptions.cpp \ | ||||
| @@ -216,7 +219,7 @@ httpd_linux: | ||||
| 		ixwebsocket/IXConnectionState.cpp \ | ||||
| 		ixwebsocket/IXUrlParser.cpp \ | ||||
| 		ixwebsocket/IXSelectInterrupt.cpp \ | ||||
| 		ixwebsocket/linux/IXSetThreadName_linux.cpp \ | ||||
| 		ixwebsocket/IXSetThreadName.cpp \ | ||||
| 		-lz -lpthread | ||||
| 	cp -f ixhttpd /usr/local/bin | ||||
|  | ||||
| @@ -238,9 +241,12 @@ install_cmake_for_linux: | ||||
| doc: | ||||
| 	mkdocs gh-deploy | ||||
|  | ||||
| change: | ||||
| change: format | ||||
| 	vim ixwebsocket/IXWebSocketVersion.h docs/CHANGELOG.md | ||||
|  | ||||
| commit: | ||||
| 	git commit -am "`sh tools/extract_latest_change.sh`" | ||||
|  | ||||
| .PHONY: test | ||||
| .PHONY: build | ||||
| .PHONY: ws | ||||
|   | ||||
| @@ -37,7 +37,6 @@ set (SOURCES | ||||
|  | ||||
|   test_runner.cpp | ||||
|   IXTest.cpp | ||||
|   IXGetFreePort.cpp | ||||
|   ../third_party/msgpack11/msgpack11.cpp | ||||
|  | ||||
|   IXSocketTest.cpp | ||||
|   | ||||
| @@ -108,7 +108,7 @@ namespace | ||||
|             } | ||||
|             else if (event->type == ix::CobraEventType::UnSubscribed) | ||||
|             { | ||||
|                 TLogger() << "Subscriber: ununexpected from channel " << event->subscriptionId; | ||||
|                 TLogger() << "Subscriber: unsubscribed from channel " << event->subscriptionId; | ||||
|                 if (event->subscriptionId != channel) | ||||
|                 { | ||||
|                     TLogger() << "Subscriber: unexpected channel " << event->subscriptionId; | ||||
|   | ||||
| @@ -4,9 +4,9 @@ | ||||
|  *  Copyright (c) 2019 Machine Zone. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include "IXGetFreePort.h" | ||||
| #include "catch.hpp" | ||||
| #include <iostream> | ||||
| #include <ixwebsocket/IXGetFreePort.h> | ||||
| #include <ixwebsocket/IXHttpClient.h> | ||||
| #include <ixwebsocket/IXHttpServer.h> | ||||
|  | ||||
|   | ||||
| @@ -84,39 +84,38 @@ namespace ix | ||||
|  | ||||
|     bool startWebSocketEchoServer(ix::WebSocketServer& server) | ||||
|     { | ||||
|         server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                                  std::shared_ptr<ConnectionState> connectionState, | ||||
|                                                  std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|             webSocket->setOnMessageCallback( | ||||
|                 [webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|         server.setOnClientMessageCallback( | ||||
|             [&server](std::shared_ptr<ConnectionState> /*connectionState*/, | ||||
|                       ConnectionInfo& connectionInfo, | ||||
|                       WebSocket& webSocket, | ||||
|                       const ix::WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     TLogger() << "New connection"; | ||||
|                     TLogger() << "Remote ip: " << remoteIp; | ||||
|                     TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                     TLogger() << "Headers:"; | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         TLogger() << "New connection"; | ||||
|                         TLogger() << "Remote ip: " << remoteIp; | ||||
|                         TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                         TLogger() << "Headers:"; | ||||
|                         for (auto it : msg->openInfo.headers) | ||||
|                         TLogger() << it.first << ": " << it.second; | ||||
|                     } | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     TLogger() << "Closed connection"; | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     { | ||||
|                         if (client.get() != &webSocket) | ||||
|                         { | ||||
|                             TLogger() << it.first << ": " << it.second; | ||||
|                             client->send(msg->str, msg->binary); | ||||
|                         } | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         TLogger() << "Closed connection"; | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                     { | ||||
|                         for (auto&& client : server.getClients()) | ||||
|                         { | ||||
|                             if (client != webSocket) | ||||
|                             { | ||||
|                                 client->send(msg->str, msg->binary); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 }); | ||||
|         }); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|   | ||||
| @@ -6,9 +6,9 @@ | ||||
|  | ||||
| #pragma once | ||||
|  | ||||
| #include "IXGetFreePort.h" | ||||
| #include <iostream> | ||||
| #include <ixsnake/IXAppConfig.h> | ||||
| #include <ixwebsocket/IXGetFreePort.h> | ||||
| #include <ixwebsocket/IXSocketTLSOptions.h> | ||||
| #include <ixwebsocket/IXWebSocketServer.h> | ||||
| #include <mutex> | ||||
|   | ||||
| @@ -189,13 +189,14 @@ namespace | ||||
|         bool preferTLS = true; | ||||
|         server.setTLSOptions(makeServerTLSOptions(preferTLS)); | ||||
|  | ||||
|         server.setOnConnectionCallback([&server, &connectionId]( | ||||
|                                            std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                            std::shared_ptr<ConnectionState> connectionState, | ||||
|                                            std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|             webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server]( | ||||
|                                                 const ix::WebSocketMessagePtr& msg) { | ||||
|         server.setOnClientMessageCallback( | ||||
|             [&server, &connectionId](std::shared_ptr<ConnectionState> connectionState, | ||||
|                                      ConnectionInfo& connectionInfo, | ||||
|                                      WebSocket& webSocket, | ||||
|                                      const ix::WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|  | ||||
|  | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     TLogger() << "New connection"; | ||||
| @@ -219,14 +220,13 @@ namespace | ||||
|                 { | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     { | ||||
|                         if (client != webSocket) | ||||
|                         if (client.get() != &webSocket) | ||||
|                         { | ||||
|                             client->send(msg->str, msg->binary); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|         }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|   | ||||
| @@ -193,41 +193,39 @@ namespace | ||||
|  | ||||
|     bool startServer(ix::WebSocketServer& server) | ||||
|     { | ||||
|         server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                                  std::shared_ptr<ConnectionState> connectionState, | ||||
|  | ||||
|                                                  std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|             webSocket->setOnMessageCallback( | ||||
|                 [webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|         server.setOnClientMessageCallback( | ||||
|             [&server](std::shared_ptr<ConnectionState> connectionState, | ||||
|                       ConnectionInfo& connectionInfo, | ||||
|                       WebSocket& webSocket, | ||||
|                       const ix::WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     TLogger() << "New connection"; | ||||
|                     TLogger() << "remote ip: " << remoteIp; | ||||
|                     TLogger() << "id: " << connectionState->getId(); | ||||
|                     TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                     TLogger() << "Headers:"; | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         TLogger() << "New connection"; | ||||
|                         TLogger() << "remote ip: " << remoteIp; | ||||
|                         TLogger() << "id: " << connectionState->getId(); | ||||
|                         TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                         TLogger() << "Headers:"; | ||||
|                         for (auto it : msg->openInfo.headers) | ||||
|                         TLogger() << it.first << ": " << it.second; | ||||
|                     } | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     log("Closed connection"); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     { | ||||
|                         if (client.get() != &webSocket) | ||||
|                         { | ||||
|                             TLogger() << it.first << ": " << it.second; | ||||
|                             client->sendBinary(msg->str); | ||||
|                         } | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         log("Closed connection"); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                     { | ||||
|                         for (auto&& client : server.getClients()) | ||||
|                         { | ||||
|                             if (client != webSocket) | ||||
|                             { | ||||
|                                 client->sendBinary(msg->str); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 }); | ||||
|         }); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|   | ||||
| @@ -168,45 +168,38 @@ namespace | ||||
|                      std::mutex& mutexWrite) | ||||
|     { | ||||
|         // A dev/null server | ||||
|         server.setOnConnectionCallback( | ||||
|         server.setOnClientMessageCallback( | ||||
|             [&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite]( | ||||
|                 std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                 std::shared_ptr<ConnectionState> connectionState, | ||||
|                 std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|                 auto remoteIp = connectionInfo->remoteIp; | ||||
|                 webSocket->setOnMessageCallback([webSocket, | ||||
|                                                  connectionState, | ||||
|                                                  remoteIp, | ||||
|                                                  &receivedCloseCode, | ||||
|                                                  &receivedCloseReason, | ||||
|                                                  &receivedCloseRemote, | ||||
|                                                  &mutexWrite](const ix::WebSocketMessagePtr& msg) { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 ConnectionInfo& connectionInfo, | ||||
|                 WebSocket& /*webSocket*/, | ||||
|                 const ix::WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     TLogger() << "New server connection"; | ||||
|                     TLogger() << "remote ip: " << remoteIp; | ||||
|                     TLogger() << "id: " << connectionState->getId(); | ||||
|                     TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                     TLogger() << "Headers:"; | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         TLogger() << "New server connection"; | ||||
|                         TLogger() << "remote ip: " << remoteIp; | ||||
|                         TLogger() << "id: " << connectionState->getId(); | ||||
|                         TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                         TLogger() << "Headers:"; | ||||
|                         for (auto it : msg->openInfo.headers) | ||||
|                         { | ||||
|                             TLogger() << it.first << ": " << it.second; | ||||
|                         } | ||||
|                         TLogger() << it.first << ": " << it.second; | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         std::stringstream ss; | ||||
|                         ss << "Server closed connection(" << msg->closeInfo.code << "," | ||||
|                            << msg->closeInfo.reason << ")"; | ||||
|                         log(ss.str()); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     std::stringstream ss; | ||||
|                     ss << "Server closed connection(" << msg->closeInfo.code << "," | ||||
|                        << msg->closeInfo.reason << ")"; | ||||
|                     log(ss.str()); | ||||
|  | ||||
|                         std::lock_guard<std::mutex> lck(mutexWrite); | ||||
|                     std::lock_guard<std::mutex> lck(mutexWrite); | ||||
|  | ||||
|                         receivedCloseCode = msg->closeInfo.code; | ||||
|                         receivedCloseReason = std::string(msg->closeInfo.reason); | ||||
|                         receivedCloseRemote = msg->closeInfo.remote; | ||||
|                     } | ||||
|                 }); | ||||
|                     receivedCloseCode = msg->closeInfo.code; | ||||
|                     receivedCloseReason = std::string(msg->closeInfo.reason); | ||||
|                     receivedCloseRemote = msg->closeInfo.remote; | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|   | ||||
| @@ -5,13 +5,11 @@ | ||||
|  */ | ||||
|  | ||||
| #include "IXTest.h" | ||||
|  | ||||
| #include "catch.hpp" | ||||
| #include <memory> | ||||
| #include <sstream> | ||||
|  | ||||
| #include <ixwebsocket/IXWebSocket.h> | ||||
| #include <ixwebsocket/IXWebSocketServer.h> | ||||
| #include <memory> | ||||
| #include <sstream> | ||||
|  | ||||
| using namespace ix; | ||||
|  | ||||
| @@ -69,8 +67,7 @@ namespace | ||||
|         std::stringstream ss; | ||||
|         log(std::string("Connecting to url: ") + url); | ||||
|  | ||||
|         _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) | ||||
|         { | ||||
|         _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { | ||||
|             std::stringstream ss; | ||||
|             if (msg->type == ix::WebSocketMessageType::Open) | ||||
|             { | ||||
| @@ -118,34 +115,37 @@ TEST_CASE("Websocket leak test") | ||||
|             int port = getFreePort(); | ||||
|             WebSocketServer server(port); | ||||
|  | ||||
|             server.setOnConnectionCallback([&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                                            std::shared_ptr<ConnectionState> connectionState, | ||||
|                                                            std::unique_ptr<ConnectionInfo> connectionInfo) | ||||
|            { | ||||
|                 // original ptr in WebSocketServer::handleConnection and the callback argument | ||||
|                 REQUIRE(webSocket.use_count() == 2); | ||||
|                 webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](const ix::WebSocketMessagePtr& msg) | ||||
|                 { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                     { | ||||
|                         log(std::string("New connection id: ") + connectionState->getId()); | ||||
|                         // original ptr in WebSocketServer::handleConnection, captured ptr of this callback, and ptr in WebSocketServer::_clients | ||||
|                         REQUIRE(webSocket.use_count() == 3); | ||||
|                         webSocketPtr = std::shared_ptr<WebSocket>(webSocket); | ||||
|                         REQUIRE(webSocket.use_count() == 4); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         log(std::string("Client closed connection id: ") + connectionState->getId()); | ||||
|                     } | ||||
|                     else | ||||
|                     { | ||||
|                         log(std::string(msg->str)); | ||||
|                     } | ||||
|             server.setOnConnectionCallback( | ||||
|                 [&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                 std::shared_ptr<ConnectionState> connectionState, | ||||
|                                 std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|                     // original ptr in WebSocketServer::handleConnection and the callback argument | ||||
|                     REQUIRE(webSocket.use_count() == 2); | ||||
|                     webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState]( | ||||
|                                                         const ix::WebSocketMessagePtr& msg) { | ||||
|                         if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                         { | ||||
|                             log(std::string("New connection id: ") + connectionState->getId()); | ||||
|                             // original ptr in WebSocketServer::handleConnection, captured ptr of | ||||
|                             // this callback, and ptr in WebSocketServer::_clients | ||||
|                             REQUIRE(webSocket.use_count() == 3); | ||||
|                             webSocketPtr = std::shared_ptr<WebSocket>(webSocket); | ||||
|                             REQUIRE(webSocket.use_count() == 4); | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                         { | ||||
|                             log(std::string("Client closed connection id: ") + | ||||
|                                 connectionState->getId()); | ||||
|                         } | ||||
|                         else | ||||
|                         { | ||||
|                             log(std::string(msg->str)); | ||||
|                         } | ||||
|                     }); | ||||
|                     // original ptr in WebSocketServer::handleConnection, argument of this callback, | ||||
|                     // and captured ptr in websocket callback | ||||
|                     REQUIRE(webSocket.use_count() == 3); | ||||
|                 }); | ||||
|                 // original ptr in WebSocketServer::handleConnection, argument of this callback, and captured ptr in websocket callback | ||||
|                 REQUIRE(webSocket.use_count() == 3); | ||||
|             }); | ||||
|  | ||||
|             server.listen(); | ||||
|             server.start(); | ||||
| @@ -169,7 +169,8 @@ TEST_CASE("Websocket leak test") | ||||
|             ix::msleep(500); | ||||
|             REQUIRE(server.getClients().size() == 0); | ||||
|  | ||||
|             // websocket should only be referenced by webSocketPtr but is still used by the websocket callback | ||||
|             // websocket should only be referenced by webSocketPtr but is still used by the | ||||
|             // websocket callback | ||||
|             REQUIRE(webSocketPtr.use_count() == 1); | ||||
|             webSocketPtr->setOnMessageCallback(nullptr); | ||||
|             // websocket should only be referenced by webSocketPtr | ||||
|   | ||||
| @@ -33,13 +33,14 @@ namespace ix | ||||
|         }; | ||||
|         server.setConnectionStateFactory(factory); | ||||
|  | ||||
|         server.setOnConnectionCallback([&server, &connectionId]( | ||||
|                                            std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                            std::shared_ptr<ConnectionState> connectionState, | ||||
|                                            std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|             webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server]( | ||||
|                                                 const ix::WebSocketMessagePtr& msg) { | ||||
|         server.setOnClientMessageCallback( | ||||
|             [&server, &connectionId](std::shared_ptr<ConnectionState> connectionState, | ||||
|                                      ConnectionInfo& connectionInfo, | ||||
|                                      WebSocket& webSocket, | ||||
|                                      const ix::WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|  | ||||
|  | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     TLogger() << "New connection"; | ||||
| @@ -63,14 +64,13 @@ namespace ix | ||||
|                 { | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     { | ||||
|                         if (client != webSocket) | ||||
|                         if (client.get() != &webSocket) | ||||
|                         { | ||||
|                             client->send(msg->str, msg->binary); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|         }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|   | ||||
| @@ -16,42 +16,40 @@ using namespace ix; | ||||
|  | ||||
| bool startServer(ix::WebSocketServer& server, std::string& subProtocols) | ||||
| { | ||||
|     server.setOnConnectionCallback( | ||||
|         [&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                  std::shared_ptr<ConnectionState> connectionState, | ||||
|                                  std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|             webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server, &subProtocols]( | ||||
|                                                 const ix::WebSocketMessagePtr& msg) { | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|     server.setOnClientMessageCallback( | ||||
|         [&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState, | ||||
|                                  ConnectionInfo& connectionInfo, | ||||
|                                  WebSocket& webSocket, | ||||
|                                  const ix::WebSocketMessagePtr& msg) { | ||||
|             auto remoteIp = connectionInfo.remoteIp; | ||||
|             if (msg->type == ix::WebSocketMessageType::Open) | ||||
|             { | ||||
|                 TLogger() << "New connection"; | ||||
|                 TLogger() << "remote ip: " << remoteIp; | ||||
|                 TLogger() << "id: " << connectionState->getId(); | ||||
|                 TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                 TLogger() << "Headers:"; | ||||
|                 for (auto it : msg->openInfo.headers) | ||||
|                 { | ||||
|                     TLogger() << "New connection"; | ||||
|                     TLogger() << "remote ip: " << remoteIp; | ||||
|                     TLogger() << "id: " << connectionState->getId(); | ||||
|                     TLogger() << "Uri: " << msg->openInfo.uri; | ||||
|                     TLogger() << "Headers:"; | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         TLogger() << it.first << ": " << it.second; | ||||
|                     } | ||||
|                     TLogger() << it.first << ": " << it.second; | ||||
|                 } | ||||
|  | ||||
|                     subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"]; | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"]; | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|             { | ||||
|                 log("Closed connection"); | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|             { | ||||
|                 for (auto&& client : server.getClients()) | ||||
|                 { | ||||
|                     log("Closed connection"); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     if (client.get() != &webSocket) | ||||
|                     { | ||||
|                         if (client != webSocket) | ||||
|                         { | ||||
|                             client->sendBinary(msg->str); | ||||
|                         } | ||||
|                         client->sendBinary(msg->str); | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|     auto res = server.listen(); | ||||
|   | ||||
							
								
								
									
										189
									
								
								test/compatibility/cpp/libwebsockets/devnull_client.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										189
									
								
								test/compatibility/cpp/libwebsockets/devnull_client.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,189 @@ | ||||
| /* | ||||
|  * lws-minimal-ws-client | ||||
|  * | ||||
|  * Written in 2010-2019 by Andy Green <andy@warmcat.com> | ||||
|  * | ||||
|  * This file is made available under the Creative Commons CC0 1.0 | ||||
|  * Universal Public Domain Dedication. | ||||
|  * | ||||
|  * This demonstrates the a minimal ws client using lws. | ||||
|  * | ||||
|  * Original programs connects to https://libwebsockets.org/ and makes a | ||||
|  * wss connection to the dumb-increment protocol there.  While | ||||
|  * connected, it prints the numbers it is being sent by | ||||
|  * dumb-increment protocol. | ||||
|  * | ||||
|  * This is modified to make a test client which counts how much messages | ||||
|  * per second can be received. | ||||
|  * | ||||
|  * libwebsockets$ make && ./a.out | ||||
|  * g++ --std=c++14 -I/usr/local/opt/openssl/include devnull_client.cpp -lwebsockets | ||||
|  * messages received: 0 per second 0 total | ||||
|  * [2020/08/02 19:22:21:4774] U: LWS minimal ws client rx [-d <logs>] [--h2] | ||||
|  * [2020/08/02 19:22:21:4814] U: callback_dumb_increment: established | ||||
|  * messages received: 0 per second 0 total | ||||
|  * messages received: 180015 per second 180015 total | ||||
|  * messages received: 172866 per second 352881 total | ||||
|  * messages received: 176177 per second 529058 total | ||||
|  * messages received: 174191 per second 703249 total | ||||
|  * messages received: 193397 per second 896646 total | ||||
|  * messages received: 196385 per second 1093031 total | ||||
|  * messages received: 194593 per second 1287624 total | ||||
|  * messages received: 189484 per second 1477108 total | ||||
|  * messages received: 200825 per second 1677933 total | ||||
|  * messages received: 183542 per second 1861475 total | ||||
|  * ^C[2020/08/02 19:22:33:4450] U: Completed OK | ||||
|  *  | ||||
|  */ | ||||
|  | ||||
| #include <libwebsockets.h> | ||||
| #include <string.h> | ||||
| #include <signal.h> | ||||
|  | ||||
| #include <atomic> | ||||
| #include <thread> | ||||
| #include <iostream> | ||||
|  | ||||
| static int interrupted; | ||||
| static struct lws *client_wsi; | ||||
|  | ||||
| std::atomic<uint64_t> receivedCount(0); | ||||
|  | ||||
| static int | ||||
| callback_dumb_increment(struct lws *wsi, enum lws_callback_reasons reason, | ||||
|               void *user, void *in, size_t len) | ||||
| { | ||||
|         switch (reason) { | ||||
|  | ||||
|         /* because we are protocols[0] ... */ | ||||
|         case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: | ||||
|                 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", | ||||
|                          in ? (char *)in : "(null)"); | ||||
|                 client_wsi = NULL; | ||||
|                 break; | ||||
|  | ||||
|         case LWS_CALLBACK_CLIENT_ESTABLISHED: | ||||
|                 lwsl_user("%s: established\n", __func__); | ||||
|                 break; | ||||
|  | ||||
|         case LWS_CALLBACK_CLIENT_RECEIVE: | ||||
|                 receivedCount++; | ||||
|                 break; | ||||
|  | ||||
|         case LWS_CALLBACK_CLIENT_CLOSED: | ||||
|                 client_wsi = NULL; | ||||
|                 break; | ||||
|  | ||||
|         default: | ||||
|                 break; | ||||
|         } | ||||
|  | ||||
|         return lws_callback_http_dummy(wsi, reason, user, in, len); | ||||
| } | ||||
|  | ||||
| static const struct lws_protocols protocols[] = { | ||||
|         { | ||||
|                 "dumb-increment-protocol", | ||||
|                 callback_dumb_increment, | ||||
|                 0, | ||||
|                 0, | ||||
|         }, | ||||
|         { NULL, NULL, 0, 0 } | ||||
| }; | ||||
|  | ||||
| static void | ||||
| sigint_handler(int sig) | ||||
| { | ||||
|         interrupted = 1; | ||||
| } | ||||
|  | ||||
| int main(int argc, const char **argv) | ||||
| { | ||||
|         uint64_t receivedCountTotal(0); | ||||
|         uint64_t receivedCountPerSecs(0); | ||||
|  | ||||
|         auto timer = [&receivedCountTotal, &receivedCountPerSecs] { | ||||
|             while (!interrupted) | ||||
|             { | ||||
|                 std::cerr << "messages received: " | ||||
|                           << receivedCountPerSecs | ||||
|                           << " per second " | ||||
|                           << receivedCountTotal  | ||||
|                           << " total" | ||||
|                           << std::endl; | ||||
|  | ||||
|                 receivedCountPerSecs = receivedCount - receivedCountTotal; | ||||
|                 receivedCountTotal += receivedCountPerSecs; | ||||
|  | ||||
|                 auto duration = std::chrono::seconds(1); | ||||
|                 std::this_thread::sleep_for(duration); | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         std::thread t1(timer); | ||||
|  | ||||
|         struct lws_context_creation_info info; | ||||
|         struct lws_client_connect_info i; | ||||
|         struct lws_context *context; | ||||
|         const char *p; | ||||
|         int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE | ||||
|                 /* for LLL_ verbosity above NOTICE to be built into lws, lws | ||||
|                  * must have been configured with -DCMAKE_BUILD_TYPE=DEBUG | ||||
|                  * instead of =RELEASE */ | ||||
|                 /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ | ||||
|                 /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ | ||||
|                 /* | LLL_DEBUG */; | ||||
|  | ||||
|         signal(SIGINT, sigint_handler); | ||||
|         if ((p = lws_cmdline_option(argc, argv, "-d"))) | ||||
|                 logs = atoi(p); | ||||
|  | ||||
|         lws_set_log_level(logs, NULL); | ||||
|         lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n"); | ||||
|  | ||||
|         memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ | ||||
|         info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ | ||||
|         info.protocols = protocols; | ||||
|         info.timeout_secs = 10; | ||||
|  | ||||
|         /* | ||||
|          * since we know this lws context is only ever going to be used with | ||||
|          * one client wsis / fds / sockets at a time, let lws know it doesn't | ||||
|          * have to use the default allocations for fd tables up to ulimit -n. | ||||
|          * It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we | ||||
|          * will use. | ||||
|          */ | ||||
|         info.fd_limit_per_thread = 1 + 1 + 1; | ||||
|  | ||||
|         context = lws_create_context(&info); | ||||
|         if (!context) { | ||||
|                 lwsl_err("lws init failed\n"); | ||||
|                 return 1; | ||||
|         } | ||||
|  | ||||
|         memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */ | ||||
|         i.context = context; | ||||
|         i.port = 8008; | ||||
|         i.address = "127.0.0.1"; | ||||
|         i.path = "/"; | ||||
|         i.host = i.address; | ||||
|         i.origin = i.address; | ||||
|         i.protocol = protocols[0].name; /* "dumb-increment-protocol" */ | ||||
|         i.pwsi = &client_wsi; | ||||
|  | ||||
|         if (lws_cmdline_option(argc, argv, "--h2")) | ||||
|                 i.alpn = "h2"; | ||||
|  | ||||
|         lws_client_connect_via_info(&i); | ||||
|  | ||||
|         while (n >= 0 && client_wsi && !interrupted) | ||||
|                 n = lws_service(context, 0); | ||||
|  | ||||
|         lws_context_destroy(context); | ||||
|  | ||||
|         lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed"); | ||||
|  | ||||
|         t1.join(); | ||||
|  | ||||
|         return receivedCount > 10; | ||||
| } | ||||
							
								
								
									
										2
									
								
								test/compatibility/csharp/.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								test/compatibility/csharp/.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,2 @@ | ||||
| bin | ||||
| obj | ||||
							
								
								
									
										99
									
								
								test/compatibility/csharp/Main.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										99
									
								
								test/compatibility/csharp/Main.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,99 @@ | ||||
| // | ||||
| // Main.cs | ||||
| // Author: Benjamin Sergeant | ||||
| // Copyright (c) 2020 Machine Zone, Inc. All rights reserved. | ||||
| // | ||||
| // In a different terminal, start a push server: | ||||
| // $ ws push_server -q | ||||
| // | ||||
| // $ dotnet run | ||||
| // messages received per second: 145157 | ||||
| // messages received per second: 141405 | ||||
| // messages received per second: 152202 | ||||
| // messages received per second: 157149 | ||||
| // messages received per second: 157673 | ||||
| // messages received per second: 153594 | ||||
| // messages received per second: 157830 | ||||
| // messages received per second: 158422 | ||||
| // | ||||
|  | ||||
| using System; | ||||
| using System.Net.WebSockets; | ||||
| using System.Threading; | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| public class DevNullClientCli | ||||
| { | ||||
|     private static int receivedMessage = 0; | ||||
|  | ||||
|     public static async Task<byte[]> ReceiveAsync(ClientWebSocket ws, CancellationToken token) | ||||
|     { | ||||
|         int bufferSize = 8192; // 8K | ||||
|         var buffer = new byte[bufferSize]; | ||||
|         var offset = 0; | ||||
|         var free = buffer.Length; | ||||
|  | ||||
|         while (true) | ||||
|         { | ||||
|             var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer, offset, free), token).ConfigureAwait(false); | ||||
|  | ||||
|             offset += result.Count; | ||||
|             free -= result.Count; | ||||
|             if (result.EndOfMessage) break; | ||||
|  | ||||
|             if (free == 0) | ||||
|             { | ||||
|                 // No free space | ||||
|                 // Resize the outgoing buffer | ||||
|                 var newSize = buffer.Length + bufferSize; | ||||
|  | ||||
|                 var newBuffer = new byte[newSize]; | ||||
|                 Array.Copy(buffer, 0, newBuffer, 0, offset); | ||||
|                 buffer = newBuffer; | ||||
|                 free = buffer.Length - offset; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         return buffer; | ||||
|     } | ||||
|  | ||||
|     private static void OnTimedEvent(object source, EventArgs e) | ||||
|     { | ||||
|         Console.WriteLine($"messages received per second: {receivedMessage}"); | ||||
|         receivedMessage = 0; | ||||
|     } | ||||
|  | ||||
|     public static async Task ReceiveMessagesAsync(string url) | ||||
|     { | ||||
|         var ws = new ClientWebSocket(); | ||||
|  | ||||
|         System.Uri uri = new System.Uri(url); | ||||
|         var cancellationToken = CancellationToken.None; | ||||
|  | ||||
|         try  | ||||
|         { | ||||
|             await ws.ConnectAsync(uri, cancellationToken).ConfigureAwait(false); | ||||
|             while (true) | ||||
|             { | ||||
|                 var data = await DevNullClientCli.ReceiveAsync(ws, cancellationToken); | ||||
|                 receivedMessage += 1; | ||||
|             } | ||||
|         } | ||||
|         catch (System.Net.WebSockets.WebSocketException e) | ||||
|         { | ||||
|             Console.WriteLine($"WebSocket error: {e}"); | ||||
|             return; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public static async Task Main() | ||||
|     { | ||||
|         var timer = new System.Timers.Timer(1000); | ||||
|         timer.Elapsed += OnTimedEvent; | ||||
|         timer.Enabled = true; | ||||
|         timer.Start(); | ||||
|  | ||||
|         var url = "ws://localhost:8008"; | ||||
|         await ReceiveMessagesAsync(url); | ||||
|     } | ||||
| } | ||||
							
								
								
									
										6
									
								
								test/compatibility/csharp/devnull_client.csproj
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								test/compatibility/csharp/devnull_client.csproj
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,6 @@ | ||||
| <Project Sdk="Microsoft.NET.Sdk"> | ||||
|   <PropertyGroup> | ||||
|     <OutputType>Exe</OutputType> | ||||
|     <TargetFramework>netcoreapp3.1</TargetFramework> | ||||
|   </PropertyGroup> | ||||
| </Project> | ||||
							
								
								
									
										42
									
								
								test/compatibility/node/devnull_client.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								test/compatibility/node/devnull_client.js
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,42 @@ | ||||
| // | ||||
| // With ws@7.3.1 | ||||
| // and  | ||||
| // node --version | ||||
| // v13.11.0 | ||||
| // | ||||
| // In a different terminal, start a push server: | ||||
| // $ ws push_server -q | ||||
| // | ||||
| // $ node devnull_client.js | ||||
| // messages received per second: 16643 | ||||
| // messages received per second: 28065 | ||||
| // messages received per second: 28432 | ||||
| // messages received per second: 22207 | ||||
| // messages received per second: 28805 | ||||
| // messages received per second: 28694 | ||||
| // messages received per second: 28180 | ||||
| // messages received per second: 28601 | ||||
| // messages received per second: 28698 | ||||
| // messages received per second: 28931 | ||||
| // messages received per second: 27975 | ||||
| // | ||||
| const WebSocket = require('ws'); | ||||
|  | ||||
| const ws = new WebSocket('ws://localhost:8008'); | ||||
|  | ||||
| ws.on('open', function open() { | ||||
|   ws.send('hello from node'); | ||||
| }); | ||||
|  | ||||
| var receivedMessages = 0; | ||||
|  | ||||
| setInterval(function timeout() { | ||||
|   console.log(`messages received per second: ${receivedMessages}`) | ||||
|   receivedMessages = 0; | ||||
| }, 1000); | ||||
|  | ||||
| ws.on('message', function incoming(data) { | ||||
|   receivedMessages += 1; | ||||
| }); | ||||
|  | ||||
|  | ||||
							
								
								
									
										44
									
								
								test/compatibility/python/websockets/devnull_client.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								test/compatibility/python/websockets/devnull_client.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,44 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| # websocket send client | ||||
|  | ||||
| import argparse | ||||
| import asyncio | ||||
| import websockets | ||||
|  | ||||
| try: | ||||
|     import uvloop | ||||
|     uvloop.install() | ||||
| except ImportError: | ||||
|     print('uvloop not available') | ||||
|     pass | ||||
|  | ||||
| msgCount = 0 | ||||
|  | ||||
| async def timer(): | ||||
|     global msgCount | ||||
|  | ||||
|     while True: | ||||
|         print(f'Received messages: {msgCount}') | ||||
|         msgCount = 0 | ||||
|  | ||||
|         await asyncio.sleep(1) | ||||
|  | ||||
|  | ||||
| async def client(url): | ||||
|     global msgCount | ||||
|  | ||||
|     asyncio.ensure_future(timer()) | ||||
|  | ||||
|     async with websockets.connect(url) as ws: | ||||
|         async for message in ws: | ||||
|             msgCount += 1 | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     parser = argparse.ArgumentParser(description='websocket proxy.') | ||||
|     parser.add_argument('--url', help='Remote websocket url', | ||||
|                         default='wss://echo.websocket.org') | ||||
|     args = parser.parse_args() | ||||
|  | ||||
|     asyncio.get_event_loop().run_until_complete(client(args.url)) | ||||
| @@ -10,7 +10,7 @@ import websockets | ||||
| async def echo(websocket, path): | ||||
|     while True: | ||||
|         msg = await websocket.recv() | ||||
|         print(f'Received {len(msg)} bytes') | ||||
|         # print(f'Received {len(msg)} bytes') | ||||
|         await websocket.send(msg) | ||||
|  | ||||
| host = os.getenv('BIND_HOST', 'localhost') | ||||
|   | ||||
| @@ -10,10 +10,18 @@ | ||||
| #include <ixwebsocket/IXNetSystem.h> | ||||
| #include <spdlog/spdlog.h> | ||||
|  | ||||
| #ifndef _WIN32 | ||||
| #include <signal.h> | ||||
| #endif | ||||
|  | ||||
| int main(int argc, char* argv[]) | ||||
| { | ||||
|     ix::initNetSystem(); | ||||
|  | ||||
| #ifndef _WIN32 | ||||
|     signal(SIGPIPE, SIG_IGN); | ||||
| #endif | ||||
|  | ||||
|     ix::CoreLogger::LogFunc logFunc = [](const char* msg, ix::LogLevel level) { | ||||
|         switch (level) | ||||
|         { | ||||
| @@ -49,6 +57,7 @@ int main(int argc, char* argv[]) | ||||
|         } | ||||
|     }; | ||||
|     ix::CoreLogger::setLogFunction(logFunc); | ||||
|     spdlog::set_level(spdlog::level::debug); | ||||
|  | ||||
|     int result = Catch::Session().run(argc, argv); | ||||
|  | ||||
|   | ||||
							
								
								
									
										3
									
								
								tools/extract_latest_change.sh
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								tools/extract_latest_change.sh
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,3 @@ | ||||
| #!/bin/sh | ||||
|  | ||||
| grep -A 3 '^##' docs/CHANGELOG.md | head -n 3 | tail -n 1 | ||||
| @@ -20,7 +20,6 @@ option(USE_TLS "Add TLS support" ON) | ||||
|  | ||||
| include_directories(ws .) | ||||
| include_directories(ws ..) | ||||
| include_directories(ws ../third_party) | ||||
| include_directories(ws ../third_party/spdlog/include) | ||||
| include_directories(ws ../third_party/cpp-linenoise) | ||||
|  | ||||
| @@ -51,7 +50,9 @@ add_executable(ws | ||||
|   ws_http_client.cpp | ||||
|   ws_ping_pong.cpp | ||||
|   ws_broadcast_server.cpp | ||||
|   ws_push_server.cpp | ||||
|   ws_echo_server.cpp | ||||
|   ws_echo_client.cpp | ||||
|   ws_chat.cpp | ||||
|   ws_connect.cpp | ||||
|   ws_transfer.cpp | ||||
| @@ -66,7 +67,6 @@ add_executable(ws | ||||
|   ws_cobra_publish.cpp | ||||
|   ws_httpd.cpp | ||||
|   ws_autobahn.cpp | ||||
|   ws_proxy_server.cpp | ||||
|   ws_sentry_minidump_upload.cpp | ||||
|   ws_dns_lookup.cpp | ||||
|   ws.cpp) | ||||
|   | ||||
| @@ -14,6 +14,7 @@ function cleanup_and_exit { | ||||
| } | ||||
|  | ||||
| WITH_TLS=${WITH_TLS:-0} | ||||
| BLOCKS=${BLOCKS:-20000} | ||||
|  | ||||
| rm -rf /tmp/ws_test | ||||
| mkdir -p /tmp/ws_test | ||||
| @@ -57,7 +58,7 @@ ws receive "${protocol}127.0.0.1:8090" ${delay} --pidfile /tmp/ws_test/pidfile.r | ||||
|  | ||||
| mkdir -p /tmp/ws_test/send | ||||
| cd /tmp/ws_test/send | ||||
| dd if=/dev/urandom of=/tmp/ws_test/send/20M_file count=20000 bs=1024 | ||||
| dd if=/dev/urandom of=/tmp/ws_test/send/20M_file count=$BLOCKS bs=1024 | ||||
|  | ||||
| # Start the sender job | ||||
| ws send ${client_tls} --pidfile /tmp/ws_test/pidfile.send "${protocol}127.0.0.1:8090" /tmp/ws_test/send/20M_file | ||||
|   | ||||
							
								
								
									
										97
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										97
									
								
								ws/ws.cpp
									
									
									
									
									
								
							| @@ -22,6 +22,7 @@ | ||||
| #include <ixwebsocket/IXNetSystem.h> | ||||
| #include <ixwebsocket/IXSocket.h> | ||||
| #include <ixwebsocket/IXUserAgent.h> | ||||
| #include <ixwebsocket/IXWebSocketProxyServer.h> | ||||
| #include <spdlog/sinks/basic_file_sink.h> | ||||
| #include <spdlog/spdlog.h> | ||||
| #include <sstream> | ||||
| @@ -74,6 +75,7 @@ int main(int argc, char** argv) | ||||
|         } | ||||
|     }; | ||||
|     ix::CoreLogger::setLogFunction(logFunc); | ||||
|     spdlog::set_level(spdlog::level::debug); | ||||
|  | ||||
| #ifndef _WIN32 | ||||
|     signal(SIGPIPE, SIG_IGN); | ||||
| @@ -123,6 +125,7 @@ int main(int argc, char** argv) | ||||
|     std::string logfile; | ||||
|     std::string scriptPath; | ||||
|     std::string republishChannel; | ||||
|     std::string sendMsg("hello world"); | ||||
|     ix::SocketTLSOptions tlsOptions; | ||||
|     ix::CobraConfig cobraConfig; | ||||
|     ix::CobraBotConfig cobraBotConfig; | ||||
| @@ -145,6 +148,7 @@ int main(int argc, char** argv) | ||||
|     bool version = false; | ||||
|     bool verifyNone = false; | ||||
|     bool disablePong = false; | ||||
|     bool noSend = false; | ||||
|     int port = 8008; | ||||
|     int redisPort = 6379; | ||||
|     int statsdPort = 8125; | ||||
| @@ -241,6 +245,19 @@ int main(int argc, char** argv) | ||||
|     connectApp->add_option("--subprotocol", subprotocol, "Subprotocol"); | ||||
|     addTLSOptions(connectApp); | ||||
|  | ||||
|     CLI::App* echoClientApp = | ||||
|         app.add_subcommand("echo_client", "Echo messages sent by a remote server"); | ||||
|     echoClientApp->fallthrough(); | ||||
|     echoClientApp->add_option("url", url, "Connection url")->required(); | ||||
|     echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); | ||||
|     echoClientApp->add_flag("-b", binaryMode, "Send in binary mode"); | ||||
|     echoClientApp->add_option( | ||||
|         "--ping_interval", pingIntervalSecs, "Interval between sending pings"); | ||||
|     echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol"); | ||||
|     echoClientApp->add_option("--send_msg", sendMsg, "Send message"); | ||||
|     echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages"); | ||||
|     addTLSOptions(echoClientApp); | ||||
|  | ||||
|     CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); | ||||
|     chatApp->fallthrough(); | ||||
|     chatApp->add_option("url", url, "Connection url")->required(); | ||||
| @@ -250,12 +267,25 @@ int main(int argc, char** argv) | ||||
|     echoServerApp->fallthrough(); | ||||
|     echoServerApp->add_option("--port", port, "Port"); | ||||
|     echoServerApp->add_option("--host", hostname, "Hostname"); | ||||
|     echoServerApp->add_flag("-g", greetings, "Verbose"); | ||||
|     echoServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors"); | ||||
|     echoServerApp->add_flag("-g", greetings, "Greet"); | ||||
|     echoServerApp->add_flag("-6", ipv6, "IpV6"); | ||||
|     echoServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); | ||||
|     echoServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING"); | ||||
|     addTLSOptions(echoServerApp); | ||||
|  | ||||
|     CLI::App* pushServerApp = app.add_subcommand("push_server", "Push server"); | ||||
|     pushServerApp->fallthrough(); | ||||
|     pushServerApp->add_option("--port", port, "Port"); | ||||
|     pushServerApp->add_option("--host", hostname, "Hostname"); | ||||
|     pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors"); | ||||
|     pushServerApp->add_flag("-g", greetings, "Greet"); | ||||
|     pushServerApp->add_flag("-6", ipv6, "IpV6"); | ||||
|     pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); | ||||
|     pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING"); | ||||
|     pushServerApp->add_option("--send_msg", sendMsg, "Send message"); | ||||
|     addTLSOptions(pushServerApp); | ||||
|  | ||||
|     CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); | ||||
|     broadcastServerApp->fallthrough(); | ||||
|     broadcastServerApp->add_option("--port", port, "Port"); | ||||
| @@ -475,6 +505,11 @@ int main(int argc, char** argv) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     if (quiet) | ||||
|     { | ||||
|         spdlog::set_level(spdlog::level::warn); | ||||
|     } | ||||
|  | ||||
|     // Cobra config | ||||
|     cobraConfig.webSocketPerMessageDeflateOptions = ix::WebSocketPerMessageDeflateOptions(true); | ||||
|     cobraConfig.socketTLSOptions = tlsOptions; | ||||
| @@ -484,7 +519,46 @@ int main(int argc, char** argv) | ||||
|     cobraBotConfig.cobraConfig.socketTLSOptions = tlsOptions; | ||||
|  | ||||
|     int ret = 1; | ||||
|     if (app.got_subcommand("transfer")) | ||||
|     if (app.got_subcommand("connect")) | ||||
|     { | ||||
|         ret = ix::ws_connect_main(url, | ||||
|                                   headers, | ||||
|                                   disableAutomaticReconnection, | ||||
|                                   disablePerMessageDeflate, | ||||
|                                   binaryMode, | ||||
|                                   maxWaitBetweenReconnectionRetries, | ||||
|                                   tlsOptions, | ||||
|                                   subprotocol, | ||||
|                                   pingIntervalSecs); | ||||
|     } | ||||
|     else if (app.got_subcommand("echo_client")) | ||||
|     { | ||||
|         ret = ix::ws_echo_client(url, | ||||
|                                  disablePerMessageDeflate, | ||||
|                                  binaryMode, | ||||
|                                  tlsOptions, | ||||
|                                  subprotocol, | ||||
|                                  pingIntervalSecs, | ||||
|                                  sendMsg, | ||||
|                                  noSend); | ||||
|     } | ||||
|     else if (app.got_subcommand("echo_server")) | ||||
|     { | ||||
|         ret = ix::ws_echo_server_main( | ||||
|             port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong); | ||||
|     } | ||||
|     else if (app.got_subcommand("push_server")) | ||||
|     { | ||||
|         ret = ix::ws_push_server(port, | ||||
|                                  greetings, | ||||
|                                  hostname, | ||||
|                                  tlsOptions, | ||||
|                                  ipv6, | ||||
|                                  disablePerMessageDeflate, | ||||
|                                  disablePong, | ||||
|                                  sendMsg); | ||||
|     } | ||||
|     else if (app.got_subcommand("transfer")) | ||||
|     { | ||||
|         ret = ix::ws_transfer_main(port, hostname, tlsOptions); | ||||
|     } | ||||
| @@ -497,27 +571,10 @@ int main(int argc, char** argv) | ||||
|         bool enablePerMessageDeflate = false; | ||||
|         ret = ix::ws_receive_main(url, enablePerMessageDeflate, delayMs, tlsOptions); | ||||
|     } | ||||
|     else if (app.got_subcommand("connect")) | ||||
|     { | ||||
|         ret = ix::ws_connect_main(url, | ||||
|                                   headers, | ||||
|                                   disableAutomaticReconnection, | ||||
|                                   disablePerMessageDeflate, | ||||
|                                   binaryMode, | ||||
|                                   maxWaitBetweenReconnectionRetries, | ||||
|                                   tlsOptions, | ||||
|                                   subprotocol, | ||||
|                                   pingIntervalSecs); | ||||
|     } | ||||
|     else if (app.got_subcommand("chat")) | ||||
|     { | ||||
|         ret = ix::ws_chat_main(url, user); | ||||
|     } | ||||
|     else if (app.got_subcommand("echo_server")) | ||||
|     { | ||||
|         ret = ix::ws_echo_server_main( | ||||
|             port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong); | ||||
|     } | ||||
|     else if (app.got_subcommand("broadcast_server")) | ||||
|     { | ||||
|         ret = ix::ws_broadcast_server_main(port, hostname, tlsOptions); | ||||
| @@ -656,7 +713,7 @@ int main(int argc, char** argv) | ||||
|     } | ||||
|     else if (app.got_subcommand("proxy_server")) | ||||
|     { | ||||
|         ret = ix::ws_proxy_server_main(port, hostname, tlsOptions, remoteHost, verbose); | ||||
|         ret = ix::websocket_proxy_server_main(port, hostname, tlsOptions, remoteHost, verbose); | ||||
|     } | ||||
|     else if (app.got_subcommand("upload_minidump")) | ||||
|     { | ||||
|   | ||||
							
								
								
									
										24
									
								
								ws/ws.h
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								ws/ws.h
									
									
									
									
									
								
							| @@ -35,6 +35,15 @@ namespace ix | ||||
|                             bool disablePerMessageDeflate, | ||||
|                             bool disablePong); | ||||
|  | ||||
|     int ws_push_server(int port, | ||||
|                        bool greetings, | ||||
|                        const std::string& hostname, | ||||
|                        const ix::SocketTLSOptions& tlsOptions, | ||||
|                        bool ipv6, | ||||
|                        bool disablePerMessageDeflate, | ||||
|                        bool disablePong, | ||||
|                        const std::string& sendMsg); | ||||
|  | ||||
|     int ws_broadcast_server_main(int port, | ||||
|                                  const std::string& hostname, | ||||
|                                  const ix::SocketTLSOptions& tlsOptions); | ||||
| @@ -54,6 +63,15 @@ namespace ix | ||||
|                         const std::string& subprotocol, | ||||
|                         int pingIntervalSecs); | ||||
|  | ||||
|     int ws_echo_client(const std::string& url, | ||||
|                        bool disablePerMessageDeflate, | ||||
|                        bool binaryMode, | ||||
|                        const ix::SocketTLSOptions& tlsOptions, | ||||
|                        const std::string& subprotocol, | ||||
|                        int pingIntervalSecs, | ||||
|                        const std::string& sendMsg, | ||||
|                        bool noSend); | ||||
|  | ||||
|     int ws_receive_main(const std::string& url, | ||||
|                         bool enablePerMessageDeflate, | ||||
|                         int delayMs, | ||||
| @@ -116,12 +134,6 @@ namespace ix | ||||
|  | ||||
|     int ws_redis_server_main(int port, const std::string& hostname); | ||||
|  | ||||
|     int ws_proxy_server_main(int port, | ||||
|                              const std::string& hostname, | ||||
|                              const ix::SocketTLSOptions& tlsOptions, | ||||
|                              const std::string& remoteHost, | ||||
|                              bool verbose); | ||||
|  | ||||
|     int ws_sentry_minidump_upload(const std::string& metadataPath, | ||||
|                                   const std::string& minidump, | ||||
|                                   const std::string& project, | ||||
|   | ||||
| @@ -20,12 +20,12 @@ namespace ix | ||||
|         ix::WebSocketServer server(port, hostname); | ||||
|         server.setTLSOptions(tlsOptions); | ||||
|  | ||||
|         server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket, | ||||
|                                                  std::shared_ptr<ConnectionState> connectionState, | ||||
|                                                  std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|             webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server]( | ||||
|                                                 const WebSocketMessagePtr& msg) { | ||||
|         server.setOnClientMessageCallback( | ||||
|             [&server](std::shared_ptr<ConnectionState> connectionState, | ||||
|                       ConnectionInfo& connectionInfo, | ||||
|                       WebSocket& webSocket, | ||||
|                       const WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     spdlog::info("New connection"); | ||||
| @@ -63,7 +63,7 @@ namespace ix | ||||
|  | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     { | ||||
|                         if (client != webSocket) | ||||
|                         if (client.get() != &webSocket) | ||||
|                         { | ||||
|                             client->send(msg->str, msg->binary, [](int current, int total) -> bool { | ||||
|                                 spdlog::info("Step {} out of {}", current, total); | ||||
| @@ -82,7 +82,6 @@ namespace ix | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|         }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|   | ||||
| @@ -8,7 +8,6 @@ | ||||
| #include <chrono> | ||||
| #include <fstream> | ||||
| #include <ixcobra/IXCobraMetricsPublisher.h> | ||||
| #include <jsoncpp/json/json.h> | ||||
| #include <spdlog/spdlog.h> | ||||
| #include <sstream> | ||||
| #include <thread> | ||||
|   | ||||
| @@ -8,7 +8,6 @@ | ||||
| #include <chrono> | ||||
| #include <fstream> | ||||
| #include <ixcobra/IXCobraMetricsPublisher.h> | ||||
| #include <jsoncpp/json/json.h> | ||||
| #include <mutex> | ||||
| #include <spdlog/spdlog.h> | ||||
| #include <sstream> | ||||
|   | ||||
| @@ -160,7 +160,7 @@ namespace ix | ||||
|             std::stringstream ss; | ||||
|             if (msg->type == ix::WebSocketMessageType::Open) | ||||
|             { | ||||
|                 log("ws_connect: connected"); | ||||
|                 spdlog::info("ws_connect: connected"); | ||||
|                 spdlog::info("Uri: {}", msg->openInfo.uri); | ||||
|                 spdlog::info("Headers:"); | ||||
|                 for (auto it : msg->openInfo.headers) | ||||
| @@ -200,7 +200,7 @@ namespace ix | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Pong) | ||||
|             { | ||||
|                 spdlog::info("Received pong"); | ||||
|                 spdlog::info("Received pong {}", msg->str); | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|   | ||||
							
								
								
									
										121
									
								
								ws/ws_echo_client.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										121
									
								
								ws/ws_echo_client.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,121 @@ | ||||
| /* | ||||
|  *  ws_echo_client.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include <iostream> | ||||
| #include <ixcore/utils/IXCoreLogger.h> | ||||
| #include <ixwebsocket/IXNetSystem.h> | ||||
| #include <ixwebsocket/IXSetThreadName.h> | ||||
| #include <ixwebsocket/IXWebSocket.h> | ||||
| #include <spdlog/spdlog.h> | ||||
| #include <sstream> | ||||
| #include <thread> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     int ws_echo_client(const std::string& url, | ||||
|                        bool disablePerMessageDeflate, | ||||
|                        bool binaryMode, | ||||
|                        const ix::SocketTLSOptions& tlsOptions, | ||||
|                        const std::string& subprotocol, | ||||
|                        int pingIntervalSecs, | ||||
|                        const std::string& sendMsg, | ||||
|                        bool noSend) | ||||
|     { | ||||
|         // Our websocket object | ||||
|         ix::WebSocket webSocket; | ||||
|  | ||||
|         webSocket.setUrl(url); | ||||
|         webSocket.setTLSOptions(tlsOptions); | ||||
|         webSocket.setPingInterval(pingIntervalSecs); | ||||
|  | ||||
|         if (disablePerMessageDeflate) | ||||
|         { | ||||
|             webSocket.disablePerMessageDeflate(); | ||||
|         } | ||||
|  | ||||
|         if (!subprotocol.empty()) | ||||
|         { | ||||
|             webSocket.addSubProtocol(subprotocol); | ||||
|         } | ||||
|  | ||||
|         std::atomic<uint64_t> receivedCount(0); | ||||
|         uint64_t receivedCountTotal(0); | ||||
|         uint64_t receivedCountPerSecs(0); | ||||
|  | ||||
|         // Setup a callback to be fired (in a background thread, watch out for race conditions !) | ||||
|         // when a message or an event (open, close, error) is received | ||||
|         webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode]( | ||||
|                                            const ix::WebSocketMessagePtr& msg) { | ||||
|             if (msg->type == ix::WebSocketMessageType::Message) | ||||
|             { | ||||
|                 if (!noSend) | ||||
|                 { | ||||
|                     webSocket.send(msg->str, msg->binary); | ||||
|                 } | ||||
|                 receivedCount++; | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Open) | ||||
|             { | ||||
|                 spdlog::info("ws_echo_client: connected"); | ||||
|                 spdlog::info("Uri: {}", msg->openInfo.uri); | ||||
|                 spdlog::info("Headers:"); | ||||
|                 for (auto it : msg->openInfo.headers) | ||||
|                 { | ||||
|                     spdlog::info("{}: {}", it.first, it.second); | ||||
|                 } | ||||
|  | ||||
|                 webSocket.send(sendMsg, binaryMode); | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Pong) | ||||
|             { | ||||
|                 spdlog::info("Received pong {}", msg->str); | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|         auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] { | ||||
|             setThreadName("Timer"); | ||||
|             while (true) | ||||
|             { | ||||
|                 // | ||||
|                 // We cannot write to sentCount and receivedCount | ||||
|                 // as those are used externally, so we need to introduce | ||||
|                 // our own counters | ||||
|                 // | ||||
|                 std::stringstream ss; | ||||
|                 ss << "messages received: " << receivedCountPerSecs << " per second " | ||||
|                    << receivedCountTotal << " total"; | ||||
|  | ||||
|                 CoreLogger::info(ss.str()); | ||||
|  | ||||
|                 receivedCountPerSecs = receivedCount - receivedCountTotal; | ||||
|                 receivedCountTotal += receivedCountPerSecs; | ||||
|  | ||||
|                 auto duration = std::chrono::seconds(1); | ||||
|                 std::this_thread::sleep_for(duration); | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         std::thread t1(timer); | ||||
|  | ||||
|         // Now that our callback is setup, we can start our background thread and receive messages | ||||
|         std::cout << "Connecting to " << url << "..." << std::endl; | ||||
|         webSocket.start(); | ||||
|  | ||||
|         // Send a message to the server (default to TEXT mode) | ||||
|         webSocket.send("hello world"); | ||||
|  | ||||
|         while (true) | ||||
|         { | ||||
|             std::string text; | ||||
|             std::cout << "> " << std::flush; | ||||
|             std::getline(std::cin, text); | ||||
|  | ||||
|             webSocket.send(text); | ||||
|         } | ||||
|  | ||||
|         return 0; | ||||
|     } | ||||
| } // namespace ix | ||||
| @@ -42,50 +42,48 @@ namespace ix | ||||
|             server.disablePong(); | ||||
|         } | ||||
|  | ||||
|         server.setOnConnectionCallback( | ||||
|             [greetings](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                         std::shared_ptr<ConnectionState> connectionState, | ||||
|                         std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|                 auto remoteIp = connectionInfo->remoteIp; | ||||
|                 webSocket->setOnMessageCallback( | ||||
|                     [webSocket, connectionState, remoteIp, greetings](const WebSocketMessagePtr& msg) { | ||||
|                         if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                         { | ||||
|                             spdlog::info("New connection"); | ||||
|                             spdlog::info("remote ip: {}", remoteIp); | ||||
|                             spdlog::info("id: {}", connectionState->getId()); | ||||
|                             spdlog::info("Uri: {}", msg->openInfo.uri); | ||||
|                             spdlog::info("Headers:"); | ||||
|                             for (auto it : msg->openInfo.headers) | ||||
|                             { | ||||
|                                 spdlog::info("{}: {}", it.first, it.second); | ||||
|                             } | ||||
|         server.setOnClientMessageCallback( | ||||
|             [greetings](std::shared_ptr<ConnectionState> connectionState, | ||||
|                         ConnectionInfo& connectionInfo, | ||||
|                         WebSocket& webSocket, | ||||
|                         const WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     spdlog::info("New connection"); | ||||
|                     spdlog::info("remote ip: {}", remoteIp); | ||||
|                     spdlog::info("id: {}", connectionState->getId()); | ||||
|                     spdlog::info("Uri: {}", msg->openInfo.uri); | ||||
|                     spdlog::info("Headers:"); | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         spdlog::info("{}: {}", it.first, it.second); | ||||
|                     } | ||||
|  | ||||
|                             if (greetings) | ||||
|                             { | ||||
|                                 webSocket->sendText("Welcome !"); | ||||
|                             } | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                         { | ||||
|                             spdlog::info("Closed connection: client id {} code {} reason {}", | ||||
|                                          connectionState->getId(), | ||||
|                                          msg->closeInfo.code, | ||||
|                                          msg->closeInfo.reason); | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                         { | ||||
|                             spdlog::error("Connection error: {}", msg->errorInfo.reason); | ||||
|                             spdlog::error("#retries: {}", msg->errorInfo.retries); | ||||
|                             spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); | ||||
|                             spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); | ||||
|                         } | ||||
|                         else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                         { | ||||
|                             spdlog::info("Received {} bytes", msg->wireSize); | ||||
|                             webSocket->send(msg->str, msg->binary); | ||||
|                         } | ||||
|                     }); | ||||
|                     if (greetings) | ||||
|                     { | ||||
|                         webSocket.sendText("Welcome !"); | ||||
|                     } | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     spdlog::info("Closed connection: client id {} code {} reason {}", | ||||
|                                  connectionState->getId(), | ||||
|                                  msg->closeInfo.code, | ||||
|                                  msg->closeInfo.reason); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                 { | ||||
|                     spdlog::error("Connection error: {}", msg->errorInfo.reason); | ||||
|                     spdlog::error("#retries: {}", msg->errorInfo.retries); | ||||
|                     spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); | ||||
|                     spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     spdlog::info("Received {} bytes", msg->wireSize); | ||||
|                     webSocket.send(msg->str, msg->binary); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|   | ||||
| @@ -1,176 +0,0 @@ | ||||
| /* | ||||
|  *  ws_proxy_server.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2018 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include <ixwebsocket/IXWebSocketServer.h> | ||||
| #include <spdlog/spdlog.h> | ||||
| #include <sstream> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     class ProxyConnectionState : public ix::ConnectionState | ||||
|     { | ||||
|     public: | ||||
|         ProxyConnectionState() | ||||
|             : _connected(false) | ||||
|         { | ||||
|         } | ||||
|  | ||||
|         ix::WebSocket& webSocket() | ||||
|         { | ||||
|             return _serverWebSocket; | ||||
|         } | ||||
|  | ||||
|         bool isConnected() | ||||
|         { | ||||
|             return _connected; | ||||
|         } | ||||
|  | ||||
|         void setConnected() | ||||
|         { | ||||
|             _connected = true; | ||||
|         } | ||||
|  | ||||
|     private: | ||||
|         ix::WebSocket _serverWebSocket; | ||||
|         bool _connected; | ||||
|     }; | ||||
|  | ||||
|     int ws_proxy_server_main(int port, | ||||
|                              const std::string& hostname, | ||||
|                              const ix::SocketTLSOptions& tlsOptions, | ||||
|                              const std::string& remoteUrl, | ||||
|                              bool verbose) | ||||
|     { | ||||
|         spdlog::info("Listening on {}:{}", hostname, port); | ||||
|  | ||||
|         ix::WebSocketServer server(port, hostname); | ||||
|         server.setTLSOptions(tlsOptions); | ||||
|  | ||||
|         auto factory = []() -> std::shared_ptr<ix::ConnectionState> { | ||||
|             return std::make_shared<ProxyConnectionState>(); | ||||
|         }; | ||||
|         server.setConnectionStateFactory(factory); | ||||
|  | ||||
|         server.setOnConnectionCallback([remoteUrl, | ||||
|                                         verbose](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                                  std::shared_ptr<ConnectionState> connectionState, | ||||
|                                                  std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState); | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|  | ||||
|             // Server connection | ||||
|             state->webSocket().setOnMessageCallback([webSocket, state, remoteIp, verbose]( | ||||
|                                                         const WebSocketMessagePtr& msg) { | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     spdlog::info("New connection to remote server"); | ||||
|                     spdlog::info("remote ip: {}", remoteIp); | ||||
|                     spdlog::info("id: {}", state->getId()); | ||||
|                     spdlog::info("Uri: {}", msg->openInfo.uri); | ||||
|                     spdlog::info("Headers:"); | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         spdlog::info("{}: {}", it.first, it.second); | ||||
|                     } | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     spdlog::info("Closed remote server connection: client id {} code {} reason {}", | ||||
|                                  state->getId(), | ||||
|                                  msg->closeInfo.code, | ||||
|                                  msg->closeInfo.reason); | ||||
|                     state->setTerminated(); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                 { | ||||
|                     spdlog::error("Connection error: {}", msg->errorInfo.reason); | ||||
|                     spdlog::error("#retries: {}", msg->errorInfo.retries); | ||||
|                     spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); | ||||
|                     spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     spdlog::info("Received {} bytes from server", msg->wireSize); | ||||
|                     if (verbose) | ||||
|                     { | ||||
|                         spdlog::info("payload {}", msg->str); | ||||
|                     } | ||||
|  | ||||
|                     webSocket->send(msg->str, msg->binary); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|             // Client connection | ||||
|             webSocket->setOnMessageCallback( | ||||
|                 [state, remoteUrl, verbose](const WebSocketMessagePtr& msg) { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                     { | ||||
|                         spdlog::info("New connection from client"); | ||||
|                         spdlog::info("id: {}", state->getId()); | ||||
|                         spdlog::info("Uri: {}", msg->openInfo.uri); | ||||
|                         spdlog::info("Headers:"); | ||||
|                         for (auto it : msg->openInfo.headers) | ||||
|                         { | ||||
|                             spdlog::info("{}: {}", it.first, it.second); | ||||
|                         } | ||||
|  | ||||
|                         // Connect to the 'real' server | ||||
|                         std::string url(remoteUrl); | ||||
|                         url += msg->openInfo.uri; | ||||
|                         state->webSocket().setUrl(url); | ||||
|                         state->webSocket().disableAutomaticReconnection(); | ||||
|                         state->webSocket().start(); | ||||
|  | ||||
|                         // we should sleep here for a bit until we've established the | ||||
|                         // connection with the remote server | ||||
|                         while (state->webSocket().getReadyState() != ReadyState::Open) | ||||
|                         { | ||||
|                             spdlog::info("waiting for server connection establishment"); | ||||
|                             std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||||
|                         } | ||||
|                         spdlog::info("server connection established"); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         spdlog::info("Closed client connection: client id {} code {} reason {}", | ||||
|                                      state->getId(), | ||||
|                                      msg->closeInfo.code, | ||||
|                                      msg->closeInfo.reason); | ||||
|                         state->webSocket().close(msg->closeInfo.code, msg->closeInfo.reason); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                     { | ||||
|                         spdlog::error("Connection error: {}", msg->errorInfo.reason); | ||||
|                         spdlog::error("#retries: {}", msg->errorInfo.retries); | ||||
|                         spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); | ||||
|                         spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                     { | ||||
|                         spdlog::info("Received {} bytes from client", msg->wireSize); | ||||
|                         if (verbose) | ||||
|                         { | ||||
|                             spdlog::info("payload {}", msg->str); | ||||
|                         } | ||||
|  | ||||
|                         state->webSocket().send(msg->str, msg->binary); | ||||
|                     } | ||||
|                 }); | ||||
|         }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|         { | ||||
|             spdlog::info(res.second); | ||||
|             return 1; | ||||
|         } | ||||
|  | ||||
|         server.start(); | ||||
|         server.wait(); | ||||
|  | ||||
|         return 0; | ||||
|     } | ||||
| } // namespace ix | ||||
							
								
								
									
										108
									
								
								ws/ws_push_server.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										108
									
								
								ws/ws_push_server.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,108 @@ | ||||
| /* | ||||
|  *  ws_push_server.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include <ixwebsocket/IXNetSystem.h> | ||||
| #include <ixwebsocket/IXWebSocketServer.h> | ||||
| #include <spdlog/spdlog.h> | ||||
| #include <sstream> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     int ws_push_server(int port, | ||||
|                        bool greetings, | ||||
|                        const std::string& hostname, | ||||
|                        const ix::SocketTLSOptions& tlsOptions, | ||||
|                        bool ipv6, | ||||
|                        bool disablePerMessageDeflate, | ||||
|                        bool disablePong, | ||||
|                        const std::string& sendMsg) | ||||
|     { | ||||
|         spdlog::info("Listening on {}:{}", hostname, port); | ||||
|  | ||||
|         ix::WebSocketServer server(port, | ||||
|                                    hostname, | ||||
|                                    SocketServer::kDefaultTcpBacklog, | ||||
|                                    SocketServer::kDefaultMaxConnections, | ||||
|                                    WebSocketServer::kDefaultHandShakeTimeoutSecs, | ||||
|                                    (ipv6) ? AF_INET6 : AF_INET); | ||||
|  | ||||
|         server.setTLSOptions(tlsOptions); | ||||
|  | ||||
|         if (disablePerMessageDeflate) | ||||
|         { | ||||
|             spdlog::info("Disable per message deflate"); | ||||
|             server.disablePerMessageDeflate(); | ||||
|         } | ||||
|  | ||||
|         if (disablePong) | ||||
|         { | ||||
|             spdlog::info("Disable responding to PING messages with PONG"); | ||||
|             server.disablePong(); | ||||
|         } | ||||
|  | ||||
|         server.setOnClientMessageCallback( | ||||
|             [greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState, | ||||
|                                   ConnectionInfo& connectionInfo, | ||||
|                                   WebSocket& webSocket, | ||||
|                                   const WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     spdlog::info("New connection"); | ||||
|                     spdlog::info("remote ip: {}", remoteIp); | ||||
|                     spdlog::info("id: {}", connectionState->getId()); | ||||
|                     spdlog::info("Uri: {}", msg->openInfo.uri); | ||||
|                     spdlog::info("Headers:"); | ||||
|                     for (auto it : msg->openInfo.headers) | ||||
|                     { | ||||
|                         spdlog::info("{}: {}", it.first, it.second); | ||||
|                     } | ||||
|  | ||||
|                     if (greetings) | ||||
|                     { | ||||
|                         webSocket.sendText("Welcome !"); | ||||
|                     } | ||||
|  | ||||
|                     bool binary = false; | ||||
|                     while (true) | ||||
|                     { | ||||
|                         webSocket.send(sendMsg, binary); | ||||
|                     } | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                 { | ||||
|                     spdlog::info("Closed connection: client id {} code {} reason {}", | ||||
|                                  connectionState->getId(), | ||||
|                                  msg->closeInfo.code, | ||||
|                                  msg->closeInfo.reason); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                 { | ||||
|                     spdlog::error("Connection error: {}", msg->errorInfo.reason); | ||||
|                     spdlog::error("#retries: {}", msg->errorInfo.retries); | ||||
|                     spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time); | ||||
|                     spdlog::error("HTTP Status: {}", msg->errorInfo.http_status); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     spdlog::info("Received {} bytes", msg->wireSize); | ||||
|                     webSocket.send(msg->str, msg->binary); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|         { | ||||
|             spdlog::error(res.second); | ||||
|             return 1; | ||||
|         } | ||||
|  | ||||
|         server.start(); | ||||
|         server.wait(); | ||||
|  | ||||
|         return 0; | ||||
|     } | ||||
| } // namespace ix | ||||
| @@ -6,7 +6,6 @@ | ||||
|  | ||||
| #include <fstream> | ||||
| #include <ixsentry/IXSentryClient.h> | ||||
| #include <jsoncpp/json/json.h> | ||||
| #include <spdlog/spdlog.h> | ||||
| #include <sstream> | ||||
|  | ||||
|   | ||||
| @@ -19,12 +19,12 @@ namespace ix | ||||
|         ix::WebSocketServer server(port, hostname); | ||||
|         server.setTLSOptions(tlsOptions); | ||||
|  | ||||
|         server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                                  std::shared_ptr<ConnectionState> connectionState, | ||||
|                                                  std::unique_ptr<ConnectionInfo> connectionInfo) { | ||||
|             auto remoteIp = connectionInfo->remoteIp; | ||||
|             webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server]( | ||||
|                                                 const WebSocketMessagePtr& msg) { | ||||
|         server.setOnClientMessageCallback( | ||||
|             [&server](std::shared_ptr<ConnectionState> connectionState, | ||||
|                       ConnectionInfo& connectionInfo, | ||||
|                       WebSocket& webSocket, | ||||
|                       const WebSocketMessagePtr& msg) { | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
|                     spdlog::info("ws_transfer: New connection"); | ||||
| @@ -43,7 +43,7 @@ namespace ix | ||||
|                                  connectionState->getId(), | ||||
|                                  msg->closeInfo.code, | ||||
|                                  msg->closeInfo.reason); | ||||
|                     auto remaining = server.getClients().erase(webSocket); | ||||
|                     auto remaining = server.getClients().size() - 1; | ||||
|                     spdlog::info("ws_transfer: {} remaining clients", remaining); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Error) | ||||
| @@ -65,7 +65,7 @@ namespace ix | ||||
|                     size_t receivers = 0; | ||||
|                     for (auto&& client : server.getClients()) | ||||
|                     { | ||||
|                         if (client != webSocket) | ||||
|                         if (client.get() != &webSocket) | ||||
|                         { | ||||
|                             auto readyState = client->getReadyState(); | ||||
|                             auto id = connectionState->getId(); | ||||
| @@ -119,7 +119,6 @@ namespace ix | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|         }); | ||||
|  | ||||
|         auto res = server.listen(); | ||||
|         if (!res.first) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user