(ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235)
This commit is contained in:
		@@ -1,6 +1,14 @@
 | 
				
			|||||||
# Changelog
 | 
					# Changelog
 | 
				
			||||||
All changes to this project will be documented in this file.
 | 
					All changes to this project will be documented in this file.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## [10.1.4] - 2020-08-02
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					## [10.1.3] - 2020-08-02
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(ws) ws echo_server. Add a -q option to only enable warning and error log levels. This is useful for bench-marking so that we do not print a lot of things on the console. (cf #235)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
## [10.1.2] - 2020-07-31
 | 
					## [10.1.2] - 2020-07-31
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent
 | 
					(build) make using zlib optional, with the caveat that some http and websocket features are not available when zlib is absent
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,4 +6,4 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
#pragma once
 | 
					#pragma once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define IX_WEBSOCKET_VERSION "10.1.2"
 | 
					#define IX_WEBSOCKET_VERSION "10.1.4"
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										5
									
								
								makefile
									
									
									
									
									
								
							
							
						
						
									
										5
									
								
								makefile
									
									
									
									
									
								
							@@ -34,7 +34,10 @@ ws:
 | 
				
			|||||||
	mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install)
 | 
						mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
ws_install:
 | 
					ws_install:
 | 
				
			||||||
	mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. && make -j 4 install)
 | 
						mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					ws_install_release:
 | 
				
			||||||
 | 
						mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 .. -DUSE_TEST=0 && ninja install)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
ws_openssl:
 | 
					ws_openssl:
 | 
				
			||||||
	mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
 | 
						mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_PYTHON=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -10,7 +10,7 @@ import websockets
 | 
				
			|||||||
async def echo(websocket, path):
 | 
					async def echo(websocket, path):
 | 
				
			||||||
    while True:
 | 
					    while True:
 | 
				
			||||||
        msg = await websocket.recv()
 | 
					        msg = await websocket.recv()
 | 
				
			||||||
        print(f'Received {len(msg)} bytes')
 | 
					        # print(f'Received {len(msg)} bytes')
 | 
				
			||||||
        await websocket.send(msg)
 | 
					        await websocket.send(msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
host = os.getenv('BIND_HOST', 'localhost')
 | 
					host = os.getenv('BIND_HOST', 'localhost')
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -51,6 +51,7 @@ add_executable(ws
 | 
				
			|||||||
  ws_ping_pong.cpp
 | 
					  ws_ping_pong.cpp
 | 
				
			||||||
  ws_broadcast_server.cpp
 | 
					  ws_broadcast_server.cpp
 | 
				
			||||||
  ws_echo_server.cpp
 | 
					  ws_echo_server.cpp
 | 
				
			||||||
 | 
					  ws_echo_client.cpp
 | 
				
			||||||
  ws_chat.cpp
 | 
					  ws_chat.cpp
 | 
				
			||||||
  ws_connect.cpp
 | 
					  ws_connect.cpp
 | 
				
			||||||
  ws_transfer.cpp
 | 
					  ws_transfer.cpp
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										23
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										23
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -125,6 +125,7 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    std::string logfile;
 | 
					    std::string logfile;
 | 
				
			||||||
    std::string scriptPath;
 | 
					    std::string scriptPath;
 | 
				
			||||||
    std::string republishChannel;
 | 
					    std::string republishChannel;
 | 
				
			||||||
 | 
					    std::string sendMsg("hello world");
 | 
				
			||||||
    ix::SocketTLSOptions tlsOptions;
 | 
					    ix::SocketTLSOptions tlsOptions;
 | 
				
			||||||
    ix::CobraConfig cobraConfig;
 | 
					    ix::CobraConfig cobraConfig;
 | 
				
			||||||
    ix::CobraBotConfig cobraBotConfig;
 | 
					    ix::CobraBotConfig cobraBotConfig;
 | 
				
			||||||
@@ -243,6 +244,18 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
 | 
					    connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
 | 
				
			||||||
    addTLSOptions(connectApp);
 | 
					    addTLSOptions(connectApp);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    CLI::App* echoClientApp =
 | 
				
			||||||
 | 
					        app.add_subcommand("echo_client", "Echo messages sent by a remote server");
 | 
				
			||||||
 | 
					    echoClientApp->fallthrough();
 | 
				
			||||||
 | 
					    echoClientApp->add_option("url", url, "Connection url")->required();
 | 
				
			||||||
 | 
					    echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
 | 
				
			||||||
 | 
					    echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
 | 
				
			||||||
 | 
					    echoClientApp->add_option(
 | 
				
			||||||
 | 
					        "--ping_interval", pingIntervalSecs, "Interval between sending pings");
 | 
				
			||||||
 | 
					    echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
 | 
				
			||||||
 | 
					    echoClientApp->add_option("--send_msg", sendMsg, "Send message");
 | 
				
			||||||
 | 
					    addTLSOptions(echoClientApp);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
 | 
					    CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
 | 
				
			||||||
    chatApp->fallthrough();
 | 
					    chatApp->fallthrough();
 | 
				
			||||||
    chatApp->add_option("url", url, "Connection url")->required();
 | 
					    chatApp->add_option("url", url, "Connection url")->required();
 | 
				
			||||||
@@ -504,6 +517,16 @@ int main(int argc, char** argv)
 | 
				
			|||||||
                                  subprotocol,
 | 
					                                  subprotocol,
 | 
				
			||||||
                                  pingIntervalSecs);
 | 
					                                  pingIntervalSecs);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    else if (app.got_subcommand("echo_client"))
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        ret = ix::ws_echo_client(url,
 | 
				
			||||||
 | 
					                                 disablePerMessageDeflate,
 | 
				
			||||||
 | 
					                                 binaryMode,
 | 
				
			||||||
 | 
					                                 tlsOptions,
 | 
				
			||||||
 | 
					                                 subprotocol,
 | 
				
			||||||
 | 
					                                 pingIntervalSecs,
 | 
				
			||||||
 | 
					                                 sendMsg);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    else if (app.got_subcommand("echo_server"))
 | 
					    else if (app.got_subcommand("echo_server"))
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        ret = ix::ws_echo_server_main(
 | 
					        ret = ix::ws_echo_server_main(
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										8
									
								
								ws/ws.h
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								ws/ws.h
									
									
									
									
									
								
							@@ -54,6 +54,14 @@ namespace ix
 | 
				
			|||||||
                        const std::string& subprotocol,
 | 
					                        const std::string& subprotocol,
 | 
				
			||||||
                        int pingIntervalSecs);
 | 
					                        int pingIntervalSecs);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    int ws_echo_client(const std::string& url,
 | 
				
			||||||
 | 
					                       bool disablePerMessageDeflate,
 | 
				
			||||||
 | 
					                       bool binaryMode,
 | 
				
			||||||
 | 
					                       const ix::SocketTLSOptions& tlsOptions,
 | 
				
			||||||
 | 
					                       const std::string& subprotocol,
 | 
				
			||||||
 | 
					                       int pingIntervalSecs,
 | 
				
			||||||
 | 
					                       const std::string& sendMsg);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    int ws_receive_main(const std::string& url,
 | 
					    int ws_receive_main(const std::string& url,
 | 
				
			||||||
                        bool enablePerMessageDeflate,
 | 
					                        bool enablePerMessageDeflate,
 | 
				
			||||||
                        int delayMs,
 | 
					                        int delayMs,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -160,7 +160,7 @@ namespace ix
 | 
				
			|||||||
            std::stringstream ss;
 | 
					            std::stringstream ss;
 | 
				
			||||||
            if (msg->type == ix::WebSocketMessageType::Open)
 | 
					            if (msg->type == ix::WebSocketMessageType::Open)
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                log("ws_connect: connected");
 | 
					                spdlog::info("ws_connect: connected");
 | 
				
			||||||
                spdlog::info("Uri: {}", msg->openInfo.uri);
 | 
					                spdlog::info("Uri: {}", msg->openInfo.uri);
 | 
				
			||||||
                spdlog::info("Headers:");
 | 
					                spdlog::info("Headers:");
 | 
				
			||||||
                for (auto it : msg->openInfo.headers)
 | 
					                for (auto it : msg->openInfo.headers)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										113
									
								
								ws/ws_echo_client.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										113
									
								
								ws/ws_echo_client.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,113 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  ws_echo_client.cpp
 | 
				
			||||||
 | 
					 *  Author: Benjamin Sergeant
 | 
				
			||||||
 | 
					 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include <iostream>
 | 
				
			||||||
 | 
					#include <ixcore/utils/IXCoreLogger.h>
 | 
				
			||||||
 | 
					#include <ixwebsocket/IXNetSystem.h>
 | 
				
			||||||
 | 
					#include <ixwebsocket/IXSetThreadName.h>
 | 
				
			||||||
 | 
					#include <ixwebsocket/IXWebSocket.h>
 | 
				
			||||||
 | 
					#include <spdlog/spdlog.h>
 | 
				
			||||||
 | 
					#include <sstream>
 | 
				
			||||||
 | 
					#include <thread>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace ix
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    int ws_echo_client(const std::string& url,
 | 
				
			||||||
 | 
					                       bool disablePerMessageDeflate,
 | 
				
			||||||
 | 
					                       bool binaryMode,
 | 
				
			||||||
 | 
					                       const ix::SocketTLSOptions& tlsOptions,
 | 
				
			||||||
 | 
					                       const std::string& subprotocol,
 | 
				
			||||||
 | 
					                       int pingIntervalSecs,
 | 
				
			||||||
 | 
					                       const std::string& sendMsg)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        // Our websocket object
 | 
				
			||||||
 | 
					        ix::WebSocket webSocket;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        webSocket.setUrl(url);
 | 
				
			||||||
 | 
					        webSocket.setTLSOptions(tlsOptions);
 | 
				
			||||||
 | 
					        webSocket.setPingInterval(pingIntervalSecs);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (disablePerMessageDeflate)
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            webSocket.disablePerMessageDeflate();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (!subprotocol.empty())
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            webSocket.addSubProtocol(subprotocol);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        std::atomic<uint64_t> receivedCount(0);
 | 
				
			||||||
 | 
					        uint64_t receivedCountTotal(0);
 | 
				
			||||||
 | 
					        uint64_t receivedCountPerSecs(0);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // Setup a callback to be fired (in a background thread, watch out for race conditions !)
 | 
				
			||||||
 | 
					        // when a message or an event (open, close, error) is received
 | 
				
			||||||
 | 
					        webSocket.setOnMessageCallback(
 | 
				
			||||||
 | 
					            [&webSocket, &receivedCount, &sendMsg, binaryMode](const ix::WebSocketMessagePtr& msg) {
 | 
				
			||||||
 | 
					                if (msg->type == ix::WebSocketMessageType::Message)
 | 
				
			||||||
 | 
					                {
 | 
				
			||||||
 | 
					                    webSocket.send(msg->str, msg->binary);
 | 
				
			||||||
 | 
					                    receivedCount++;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                else if (msg->type == ix::WebSocketMessageType::Open)
 | 
				
			||||||
 | 
					                {
 | 
				
			||||||
 | 
					                    spdlog::info("ws_echo_client: connected");
 | 
				
			||||||
 | 
					                    spdlog::info("Uri: {}", msg->openInfo.uri);
 | 
				
			||||||
 | 
					                    spdlog::info("Headers:");
 | 
				
			||||||
 | 
					                    for (auto it : msg->openInfo.headers)
 | 
				
			||||||
 | 
					                    {
 | 
				
			||||||
 | 
					                        spdlog::info("{}: {}", it.first, it.second);
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    webSocket.send(sendMsg, binaryMode);
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
 | 
				
			||||||
 | 
					            setThreadName("Timer");
 | 
				
			||||||
 | 
					            while (true)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                //
 | 
				
			||||||
 | 
					                // We cannot write to sentCount and receivedCount
 | 
				
			||||||
 | 
					                // as those are used externally, so we need to introduce
 | 
				
			||||||
 | 
					                // our own counters
 | 
				
			||||||
 | 
					                //
 | 
				
			||||||
 | 
					                std::stringstream ss;
 | 
				
			||||||
 | 
					                ss << "messages received: " << receivedCountPerSecs << " per second "
 | 
				
			||||||
 | 
					                   << receivedCountTotal << " total";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                CoreLogger::info(ss.str());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                receivedCountPerSecs = receivedCount - receivedCountTotal;
 | 
				
			||||||
 | 
					                receivedCountTotal += receivedCountPerSecs;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                auto duration = std::chrono::seconds(1);
 | 
				
			||||||
 | 
					                std::this_thread::sleep_for(duration);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        std::thread t1(timer);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // Now that our callback is setup, we can start our background thread and receive messages
 | 
				
			||||||
 | 
					        std::cout << "Connecting to " << url << "..." << std::endl;
 | 
				
			||||||
 | 
					        webSocket.start();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // Send a message to the server (default to TEXT mode)
 | 
				
			||||||
 | 
					        webSocket.send("hello world");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        while (true)
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            std::string text;
 | 
				
			||||||
 | 
					            std::cout << "> " << std::flush;
 | 
				
			||||||
 | 
					            std::getline(std::cin, text);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            webSocket.send(text);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return 0;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					} // namespace ix
 | 
				
			||||||
		Reference in New Issue
	
	Block a user