(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.
This commit is contained in:
		@@ -1,67 +1,11 @@
 | 
			
		||||
version: "3"
 | 
			
		||||
version: "3.3"
 | 
			
		||||
services:
 | 
			
		||||
  # snake:
 | 
			
		||||
  #   image: bsergean/ws:build
 | 
			
		||||
  #   entrypoint: ws snake --port 8767 --host 0.0.0.0 --redis_hosts redis1
 | 
			
		||||
  #   ports:
 | 
			
		||||
  #     - "8767:8767"
 | 
			
		||||
  #   networks:
 | 
			
		||||
  #     - ws-net
 | 
			
		||||
  #   depends_on:
 | 
			
		||||
  #     - redis1
 | 
			
		||||
  push:
 | 
			
		||||
    entrypoint: ws push_server --host 0.0.0.0
 | 
			
		||||
    image: ${DOCKER_REPO}/ws:build
 | 
			
		||||
 | 
			
		||||
  # proxy:
 | 
			
		||||
  #   image: bsergean/ws:build
 | 
			
		||||
  #   entrypoint: strace ws proxy_server --remote_host 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 -v
 | 
			
		||||
  #   ports:
 | 
			
		||||
  #     - "8765:8765"
 | 
			
		||||
  #   networks:
 | 
			
		||||
  #     - ws-net
 | 
			
		||||
 | 
			
		||||
  #pyproxy:
 | 
			
		||||
  #  image: bsergean/ws_proxy:build
 | 
			
		||||
  #  entrypoint: /usr/bin/ws_proxy.py --remote_url 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765
 | 
			
		||||
  #  ports:
 | 
			
		||||
  #    - "8765:8765"
 | 
			
		||||
  #  networks:
 | 
			
		||||
  #    - ws-net
 | 
			
		||||
 | 
			
		||||
  #      #  ws:
 | 
			
		||||
  #      #    security_opt:
 | 
			
		||||
  #      #    - seccomp:unconfined
 | 
			
		||||
  #      #    cap_add:
 | 
			
		||||
  #      #    - SYS_PTRACE
 | 
			
		||||
  #      #    stdin_open: true
 | 
			
		||||
  #      #    tty: true
 | 
			
		||||
  #      #    image: bsergean/ws:build
 | 
			
		||||
  #      #    entrypoint: sh
 | 
			
		||||
  #      #    networks:
 | 
			
		||||
  #      #      - ws-net
 | 
			
		||||
  #      #    depends_on:
 | 
			
		||||
  #      #      - redis1
 | 
			
		||||
  #      #
 | 
			
		||||
  #      #  redis1:
 | 
			
		||||
  #      #    image: redis:alpine
 | 
			
		||||
  #      #    networks:
 | 
			
		||||
  #      #      - ws-net
 | 
			
		||||
  #      #
 | 
			
		||||
  #      #  statsd:
 | 
			
		||||
  #      #    image: jaconel/statsd
 | 
			
		||||
  #      #    ports:
 | 
			
		||||
  #      #      - "8125:8125"
 | 
			
		||||
  #      #    environment:
 | 
			
		||||
  #      #      - STATSD_DUMP_MSG=true
 | 
			
		||||
  #      #      - GRAPHITE_HOST=127.0.0.1
 | 
			
		||||
  #      #    networks:
 | 
			
		||||
  #      #      - ws-net
 | 
			
		||||
 | 
			
		||||
  compile:
 | 
			
		||||
    image: alpine
 | 
			
		||||
    entrypoint: sh
 | 
			
		||||
    stdin_open: true
 | 
			
		||||
    tty: true
 | 
			
		||||
    volumes:
 | 
			
		||||
      - /Users/bsergeant/src/foss:/home/bsergean/src/foss
 | 
			
		||||
 | 
			
		||||
networks:
 | 
			
		||||
  ws-net:
 | 
			
		||||
  autoroute:
 | 
			
		||||
    entrypoint: ws autoroute ws://push:8008
 | 
			
		||||
    image: ${DOCKER_REPO}/ws:build
 | 
			
		||||
    depends_on:
 | 
			
		||||
      - push
 | 
			
		||||
 
 | 
			
		||||
@@ -2,6 +2,10 @@
 | 
			
		||||
 | 
			
		||||
All changes to this project will be documented in this file.
 | 
			
		||||
 | 
			
		||||
## [10.3.3] - 2020-09-02
 | 
			
		||||
 | 
			
		||||
(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.
 | 
			
		||||
 | 
			
		||||
## [10.3.2] - 2020-08-31
 | 
			
		||||
 | 
			
		||||
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel
 | 
			
		||||
 
 | 
			
		||||
@@ -12,10 +12,8 @@ namespace ix
 | 
			
		||||
{
 | 
			
		||||
    Bench::Bench(const std::string& description)
 | 
			
		||||
        : _description(description)
 | 
			
		||||
        , _start(std::chrono::high_resolution_clock::now())
 | 
			
		||||
        , _reported(false)
 | 
			
		||||
    {
 | 
			
		||||
        ;
 | 
			
		||||
        reset();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Bench::~Bench()
 | 
			
		||||
@@ -26,6 +24,12 @@ namespace ix
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void Bench::reset()
 | 
			
		||||
    {
 | 
			
		||||
        _start = std::chrono::high_resolution_clock::now();
 | 
			
		||||
        _reported = false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void Bench::report()
 | 
			
		||||
    {
 | 
			
		||||
        auto now = std::chrono::high_resolution_clock::now();
 | 
			
		||||
 
 | 
			
		||||
@@ -17,6 +17,7 @@ namespace ix
 | 
			
		||||
        Bench(const std::string& description);
 | 
			
		||||
        ~Bench();
 | 
			
		||||
 | 
			
		||||
        void reset();
 | 
			
		||||
        void report();
 | 
			
		||||
        uint64_t getDuration() const;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -6,4 +6,4 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "10.3.2"
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "10.3.3"
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										135
									
								
								ws/ws.cpp
									
									
									
									
									
								
							
							
						
						
									
										135
									
								
								ws/ws.cpp
									
									
									
									
									
								
							@@ -1103,21 +1103,26 @@ namespace ix
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int ws_echo_client(const std::string& url,
 | 
			
		||||
    int ws_autoroute(const std::string& url,
 | 
			
		||||
                     bool disablePerMessageDeflate,
 | 
			
		||||
                       bool binaryMode,
 | 
			
		||||
                     const ix::SocketTLSOptions& tlsOptions,
 | 
			
		||||
                     const std::string& subprotocol,
 | 
			
		||||
                     int pingIntervalSecs,
 | 
			
		||||
                       const std::string& sendMsg,
 | 
			
		||||
                       bool noSend)
 | 
			
		||||
                     int msgCount)
 | 
			
		||||
    {
 | 
			
		||||
        Bench bench("ws_autoroute full test");
 | 
			
		||||
 | 
			
		||||
        // Our websocket object
 | 
			
		||||
        ix::WebSocket webSocket;
 | 
			
		||||
 | 
			
		||||
        webSocket.setUrl(url);
 | 
			
		||||
        std::string fullUrl(url);
 | 
			
		||||
        fullUrl += "/";
 | 
			
		||||
        fullUrl += std::to_string(msgCount);
 | 
			
		||||
 | 
			
		||||
        webSocket.setUrl(fullUrl);
 | 
			
		||||
        webSocket.setTLSOptions(tlsOptions);
 | 
			
		||||
        webSocket.setPingInterval(pingIntervalSecs);
 | 
			
		||||
        webSocket.disableAutomaticReconnection();
 | 
			
		||||
 | 
			
		||||
        if (disablePerMessageDeflate)
 | 
			
		||||
        {
 | 
			
		||||
@@ -1129,43 +1134,52 @@ namespace ix
 | 
			
		||||
            webSocket.addSubProtocol(subprotocol);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        std::atomic<uint64_t> receivedCount(0);
 | 
			
		||||
        uint64_t receivedCountTotal(0);
 | 
			
		||||
        uint64_t receivedCountPerSecs(0);
 | 
			
		||||
        std::atomic<uint64_t> receivedCountTotal(0);
 | 
			
		||||
        std::atomic<uint64_t> receivedCountPerSecs(0);
 | 
			
		||||
        std::mutex conditionVariableMutex;
 | 
			
		||||
        std::condition_variable condition;
 | 
			
		||||
 | 
			
		||||
        // Setup a callback to be fired (in a background thread, watch out for race conditions !)
 | 
			
		||||
        // when a message or an event (open, close, error) is received
 | 
			
		||||
        webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
 | 
			
		||||
        std::atomic<bool> stop(false);
 | 
			
		||||
 | 
			
		||||
        // Setup a callback to be fired
 | 
			
		||||
        // when a message or an event (open, close, ping, pong, error) is received
 | 
			
		||||
        webSocket.setOnMessageCallback(
 | 
			
		||||
            [&webSocket, &receivedCountPerSecs, &receivedCountTotal, &stop, &condition, &bench](
 | 
			
		||||
                const ix::WebSocketMessagePtr& msg) {
 | 
			
		||||
                if (msg->type == ix::WebSocketMessageType::Message)
 | 
			
		||||
                {
 | 
			
		||||
                if (!noSend)
 | 
			
		||||
                {
 | 
			
		||||
                    webSocket.send(msg->str, msg->binary);
 | 
			
		||||
                }
 | 
			
		||||
                receivedCount++;
 | 
			
		||||
                    receivedCountPerSecs++;
 | 
			
		||||
                    receivedCountTotal++;
 | 
			
		||||
                }
 | 
			
		||||
                else if (msg->type == ix::WebSocketMessageType::Open)
 | 
			
		||||
                {
 | 
			
		||||
                spdlog::info("ws_echo_client: connected");
 | 
			
		||||
                    bench.reset();
 | 
			
		||||
 | 
			
		||||
                    spdlog::info("ws_autoroute: connected");
 | 
			
		||||
                    spdlog::info("Uri: {}", msg->openInfo.uri);
 | 
			
		||||
                    spdlog::info("Headers:");
 | 
			
		||||
                    for (auto it : msg->openInfo.headers)
 | 
			
		||||
                    {
 | 
			
		||||
                        spdlog::info("{}: {}", it.first, it.second);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                webSocket.send(sendMsg, binaryMode);
 | 
			
		||||
                }
 | 
			
		||||
                else if (msg->type == ix::WebSocketMessageType::Pong)
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::info("Received pong {}", msg->str);
 | 
			
		||||
                }
 | 
			
		||||
                else if (msg->type == ix::WebSocketMessageType::Close)
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::info("ws_autoroute: connection closed");
 | 
			
		||||
                    stop = true;
 | 
			
		||||
                    condition.notify_one();
 | 
			
		||||
 | 
			
		||||
                    bench.report();
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
 | 
			
		||||
        auto timer = [&receivedCountTotal, &receivedCountPerSecs, &stop] {
 | 
			
		||||
            setThreadName("Timer");
 | 
			
		||||
            while (true)
 | 
			
		||||
            while (!stop)
 | 
			
		||||
            {
 | 
			
		||||
                //
 | 
			
		||||
                // We cannot write to sentCount and receivedCount
 | 
			
		||||
@@ -1178,8 +1192,7 @@ namespace ix
 | 
			
		||||
 | 
			
		||||
                CoreLogger::info(ss.str());
 | 
			
		||||
 | 
			
		||||
                receivedCountPerSecs = receivedCount - receivedCountTotal;
 | 
			
		||||
                receivedCountTotal += receivedCountPerSecs;
 | 
			
		||||
                receivedCountPerSecs = 0;
 | 
			
		||||
 | 
			
		||||
                auto duration = std::chrono::seconds(1);
 | 
			
		||||
                std::this_thread::sleep_for(duration);
 | 
			
		||||
@@ -1192,17 +1205,17 @@ namespace ix
 | 
			
		||||
        std::cout << "Connecting to " << url << "..." << std::endl;
 | 
			
		||||
        webSocket.start();
 | 
			
		||||
 | 
			
		||||
        // Send a message to the server (default to TEXT mode)
 | 
			
		||||
        webSocket.send("hello world");
 | 
			
		||||
        // Wait for all the messages to be received
 | 
			
		||||
        std::unique_lock<std::mutex> lock(conditionVariableMutex);
 | 
			
		||||
        condition.wait(lock);
 | 
			
		||||
 | 
			
		||||
        while (true)
 | 
			
		||||
        {
 | 
			
		||||
            std::string text;
 | 
			
		||||
            std::cout << "> " << std::flush;
 | 
			
		||||
            std::getline(std::cin, text);
 | 
			
		||||
        t1.join();
 | 
			
		||||
        webSocket.stop();
 | 
			
		||||
 | 
			
		||||
            webSocket.send(text);
 | 
			
		||||
        }
 | 
			
		||||
        std::stringstream ss;
 | 
			
		||||
        ss << "messages received: " << receivedCountTotal << " total";
 | 
			
		||||
 | 
			
		||||
        CoreLogger::info(ss.str());
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
@@ -1640,7 +1653,6 @@ namespace ix
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int ws_push_server(int port,
 | 
			
		||||
                       bool greetings,
 | 
			
		||||
                       const std::string& hostname,
 | 
			
		||||
                       const ix::SocketTLSOptions& tlsOptions,
 | 
			
		||||
                       bool ipv6,
 | 
			
		||||
@@ -1671,8 +1683,11 @@ namespace ix
 | 
			
		||||
            server.disablePong();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // push one million messages
 | 
			
		||||
        std::atomic<bool> stop(false);
 | 
			
		||||
 | 
			
		||||
        server.setOnClientMessageCallback(
 | 
			
		||||
            [greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
 | 
			
		||||
            [&sendMsg, &stop](std::shared_ptr<ConnectionState> connectionState,
 | 
			
		||||
                              WebSocket& webSocket,
 | 
			
		||||
                              const WebSocketMessagePtr& msg) {
 | 
			
		||||
                auto remoteIp = connectionState->getRemoteIp();
 | 
			
		||||
@@ -1688,13 +1703,22 @@ namespace ix
 | 
			
		||||
                        spdlog::info("{}: {}", it.first, it.second);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    if (greetings)
 | 
			
		||||
                    {
 | 
			
		||||
                        webSocket.sendText("Welcome !");
 | 
			
		||||
                    }
 | 
			
		||||
                    // Parse the msg count from the uri.
 | 
			
		||||
                    int msgCount = -1;
 | 
			
		||||
                    std::stringstream ss;
 | 
			
		||||
                    auto uriSize = msg->openInfo.uri.size();
 | 
			
		||||
                    ss << msg->openInfo.uri.substr(1, uriSize - 1);
 | 
			
		||||
                    ss >> msgCount;
 | 
			
		||||
 | 
			
		||||
                    if (msgCount == -1)
 | 
			
		||||
                    {
 | 
			
		||||
                        spdlog::info("Error parsing message count, closing connection");
 | 
			
		||||
                        webSocket.close();
 | 
			
		||||
                    }
 | 
			
		||||
                    else
 | 
			
		||||
                    {
 | 
			
		||||
                        bool binary = false;
 | 
			
		||||
                    while (true)
 | 
			
		||||
                        for (int i = 0; i < msgCount; ++i)
 | 
			
		||||
                        {
 | 
			
		||||
                            auto sendInfo = webSocket.send(sendMsg, binary);
 | 
			
		||||
                            if (!sendInfo.success)
 | 
			
		||||
@@ -1705,12 +1729,14 @@ namespace ix
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                else if (msg->type == ix::WebSocketMessageType::Close)
 | 
			
		||||
                {
 | 
			
		||||
                    spdlog::info("Closed connection: client id {} code {} reason {}",
 | 
			
		||||
                                 connectionState->getId(),
 | 
			
		||||
                                 msg->closeInfo.code,
 | 
			
		||||
                                 msg->closeInfo.reason);
 | 
			
		||||
                    stop = true;
 | 
			
		||||
                }
 | 
			
		||||
                else if (msg->type == ix::WebSocketMessageType::Error)
 | 
			
		||||
                {
 | 
			
		||||
@@ -1734,7 +1760,14 @@ namespace ix
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        server.start();
 | 
			
		||||
        server.wait();
 | 
			
		||||
 | 
			
		||||
        while (!stop)
 | 
			
		||||
        {
 | 
			
		||||
            auto duration = std::chrono::seconds(1);
 | 
			
		||||
            std::this_thread::sleep_for(duration);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        server.stop();
 | 
			
		||||
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
@@ -2848,6 +2881,7 @@ int main(int argc, char** argv)
 | 
			
		||||
    int maxRedirects = 5;
 | 
			
		||||
    int delayMs = -1;
 | 
			
		||||
    int count = 1;
 | 
			
		||||
    int msgCount = 1000 * 1000;
 | 
			
		||||
    uint32_t maxWaitBetweenReconnectionRetries;
 | 
			
		||||
    int pingIntervalSecs = 30;
 | 
			
		||||
 | 
			
		||||
@@ -2941,17 +2975,14 @@ int main(int argc, char** argv)
 | 
			
		||||
    addGenericOptions(connectApp);
 | 
			
		||||
    addTLSOptions(connectApp);
 | 
			
		||||
 | 
			
		||||
    CLI::App* echoClientApp =
 | 
			
		||||
        app.add_subcommand("echo_client", "Echo messages sent by a remote server");
 | 
			
		||||
    CLI::App* echoClientApp = app.add_subcommand("autoroute", "Test websocket client performance");
 | 
			
		||||
    echoClientApp->fallthrough();
 | 
			
		||||
    echoClientApp->add_option("url", url, "Connection url")->required();
 | 
			
		||||
    echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
 | 
			
		||||
    echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
 | 
			
		||||
    echoClientApp->add_option(
 | 
			
		||||
        "--ping_interval", pingIntervalSecs, "Interval between sending pings");
 | 
			
		||||
    echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
 | 
			
		||||
    echoClientApp->add_option("--send_msg", sendMsg, "Send message");
 | 
			
		||||
    echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
 | 
			
		||||
    echoClientApp->add_option("--msg_count", msgCount, "Total message count to be sent");
 | 
			
		||||
    addTLSOptions(echoClientApp);
 | 
			
		||||
 | 
			
		||||
    CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
 | 
			
		||||
@@ -2976,7 +3007,6 @@ int main(int argc, char** argv)
 | 
			
		||||
    pushServerApp->add_option("--port", port, "Port");
 | 
			
		||||
    pushServerApp->add_option("--host", hostname, "Hostname");
 | 
			
		||||
    pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
 | 
			
		||||
    pushServerApp->add_flag("-g", greetings, "Greet");
 | 
			
		||||
    pushServerApp->add_flag("-6", ipv6, "IpV6");
 | 
			
		||||
    pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
 | 
			
		||||
    pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
 | 
			
		||||
@@ -3267,16 +3297,10 @@ int main(int argc, char** argv)
 | 
			
		||||
                                  subprotocol,
 | 
			
		||||
                                  pingIntervalSecs);
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("echo_client"))
 | 
			
		||||
    else if (app.got_subcommand("autoroute"))
 | 
			
		||||
    {
 | 
			
		||||
        ret = ix::ws_echo_client(url,
 | 
			
		||||
                                 disablePerMessageDeflate,
 | 
			
		||||
                                 binaryMode,
 | 
			
		||||
                                 tlsOptions,
 | 
			
		||||
                                 subprotocol,
 | 
			
		||||
                                 pingIntervalSecs,
 | 
			
		||||
                                 sendMsg,
 | 
			
		||||
                                 noSend);
 | 
			
		||||
        ret = ix::ws_autoroute(
 | 
			
		||||
            url, disablePerMessageDeflate, tlsOptions, subprotocol, pingIntervalSecs, msgCount);
 | 
			
		||||
    }
 | 
			
		||||
    else if (app.got_subcommand("echo_server"))
 | 
			
		||||
    {
 | 
			
		||||
@@ -3286,7 +3310,6 @@ int main(int argc, char** argv)
 | 
			
		||||
    else if (app.got_subcommand("push_server"))
 | 
			
		||||
    {
 | 
			
		||||
        ret = ix::ws_push_server(port,
 | 
			
		||||
                                 greetings,
 | 
			
		||||
                                 hostname,
 | 
			
		||||
                                 tlsOptions,
 | 
			
		||||
                                 ipv6,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user