From 08c2cdbf1d50582358c166a960b3806b85258c18 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Wed, 13 Mar 2019 23:09:45 -0700 Subject: [PATCH] send can fail silently when sending would block (EWOULDBLOCK return for send) (#18) * try to use a pipe for communication * flush send buffer on the background thread * cleanup * linux fix / linux still use event fd for now * cleanup --- docker/Dockerfile.debian | 14 +++- ixwebsocket/IXEventFd.cpp | 57 ++++++++++--- ixwebsocket/IXEventFd.h | 12 ++- ixwebsocket/IXSocket.cpp | 73 ++++++++++------ ixwebsocket/IXSocket.h | 17 ++-- ixwebsocket/IXWebSocket.cpp | 5 ++ ixwebsocket/IXWebSocket.h | 1 + ixwebsocket/IXWebSocketTransport.cpp | 120 ++++++++++++++++----------- ixwebsocket/IXWebSocketTransport.h | 1 + makefile | 8 +- test/IXSocketTest.cpp | 8 +- test/cmd_websocket_chat.cpp | 15 +++- test/run.py | 4 +- ws/ws.cpp | 12 +-- ws/ws.h | 6 +- ws/ws_broadcast_server.cpp | 9 ++ ws/ws_echo_server.cpp | 6 +- ws/ws_send.cpp | 9 ++ ws/ws_transfer.cpp | 15 +++- 19 files changed, 277 insertions(+), 115 deletions(-) diff --git a/docker/Dockerfile.debian b/docker/Dockerfile.debian index 2fb37db9..56d2b75c 100644 --- a/docker/Dockerfile.debian +++ b/docker/Dockerfile.debian @@ -12,11 +12,19 @@ RUN apt-get -y install libz-dev RUN apt-get -y install vim RUN apt-get -y install make RUN apt-get -y install cmake +RUN apt-get -y install curl +RUN apt-get -y install python + +# debian strech cmake is too old for building with Docker +COPY makefile . +RUN ["make", "install_cmake_for_linux"] COPY . . -WORKDIR ws -RUN ["sh", "docker_build.sh"] +ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin +ENV PATH="${CMAKE_BIN_PATH}:${PATH}" + +# RUN ["make"] EXPOSE 8765 -CMD ["/ws/ws", "transfer", "--port", "8765", "--hostname", "0.0.0.0"] +CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"] diff --git a/ixwebsocket/IXEventFd.cpp b/ixwebsocket/IXEventFd.cpp index 1e13e826..658ba51b 100644 --- a/ixwebsocket/IXEventFd.cpp +++ b/ixwebsocket/IXEventFd.cpp @@ -17,6 +17,8 @@ // cf Android/Kernel table here // https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel // +// On macOS we use UNIX pipes to wake up select. +// #include "IXEventFd.h" @@ -24,17 +26,24 @@ # include #endif -#ifndef _WIN32 #include // for write -#endif +#include namespace ix { - EventFd::EventFd() : - _eventfd(-1) + EventFd::EventFd() { #ifdef __linux__ + _eventfd = -1; _eventfd = eventfd(0, 0); + fcntl(_eventfd, F_SETFL, O_NONBLOCK); +#else + _fildes[0] = -1; + _fildes[1] = -1; + + pipe(_fildes); + fcntl(_fildes[0], F_SETFL, O_NONBLOCK); + fcntl(_fildes[1], F_SETFL, O_NONBLOCK); #endif } @@ -42,22 +51,44 @@ namespace ix { #ifdef __linux__ ::close(_eventfd); +#else + ::close(_fildes[0]); + ::close(_fildes[1]); + _fildes[0] = -1; + _fildes[1] = -1; #endif } - bool EventFd::notify() + bool EventFd::notify(uint64_t value) { -#if defined(__linux__) - if (_eventfd == -1) return false; + int fd; - // select will wake up when a non-zero value is written to our eventfd - uint64_t value = 1; +#if defined(__linux__) + fd = _eventfd; +#else + // File descriptor at index 1 in _fildes is the write end of the pipe + fd = _fildes[1]; +#endif + + if (fd == -1) return false; // we should write 8 bytes for an uint64_t - return write(_eventfd, &value, sizeof(value)) == 8; + return write(fd, &value, sizeof(value)) == 8; + } + + // TODO: return max uint64_t for errors ? + uint64_t EventFd::read() + { + int fd; + +#if defined(__linux__) + fd = _eventfd; #else - return true; + fd = _fildes[0]; #endif + uint64_t value = 0; + ::read(fd, &value, sizeof(value)); + return value; } bool EventFd::clear() @@ -77,6 +108,10 @@ namespace ix int EventFd::getFd() { +#if defined(__linux__) return _eventfd; +#else + return _fildes[0]; +#endif } } diff --git a/ixwebsocket/IXEventFd.h b/ixwebsocket/IXEventFd.h index 4986b6f5..56db41b2 100644 --- a/ixwebsocket/IXEventFd.h +++ b/ixwebsocket/IXEventFd.h @@ -6,6 +6,8 @@ #pragma once +#include + namespace ix { class EventFd { @@ -13,11 +15,19 @@ namespace ix EventFd(); virtual ~EventFd(); - bool notify(); + bool notify(uint64_t value); bool clear(); + uint64_t read(); int getFd(); private: +#if defined(__linux__) int _eventfd; +#else + // Store file descriptors used by the communication pipe. Communication + // happens between a control thread and a background thread, which is + // blocked on select. + int _fildes[2]; +#endif }; } diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index d5593655..94219e6e 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -23,6 +23,8 @@ namespace ix { const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; + const uint8_t Socket::kSendRequest = 1; + const uint8_t Socket::kCloseRequest = 2; constexpr size_t Socket::kChunkSize; Socket::Socket(int fd) : @@ -44,7 +46,34 @@ namespace ix return; } - int ret = select(timeoutSecs, 0); + PollResultType pollResult = select(timeoutSecs, 0); + + if (onPollCallback) onPollCallback(pollResult); + } + + PollResultType Socket::select(int timeoutSecs, int timeoutMs) + { + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(_sockfd, &rfds); + + // File descriptor at index 0 in _fildes is the read end of the pipe + int eventfd = _eventfd.getFd(); + if (eventfd != -1) + { + FD_SET(eventfd, &rfds); + } + + struct timeval timeout; + timeout.tv_sec = timeoutSecs; + timeout.tv_usec = 1000 * timeoutMs; + + // Compute the highest fd. + int sockfd = _sockfd; + int nfds = (std::max)(sockfd, eventfd); + + int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, + (timeoutSecs < 0) ? nullptr : &timeout); PollResultType pollResult = PollResultType_ReadyForRead; if (ret < 0) @@ -55,35 +84,27 @@ namespace ix { pollResult = PollResultType_Timeout; } + else if (eventfd != -1 && FD_ISSET(eventfd, &rfds)) + { + uint8_t value = _eventfd.read(); - if (onPollCallback) onPollCallback(pollResult); + if (value == kSendRequest) + { + pollResult = PollResultType_SendRequest; + } + else if (value == kCloseRequest) + { + pollResult = PollResultType_CloseRequest; + } + } + + return pollResult; } - int Socket::select(int timeoutSecs, int timeoutMs) + // Wake up from poll/select by writing to the pipe which is watched by select + bool Socket::wakeUpFromPoll(uint8_t wakeUpCode) { - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(_sockfd, &rfds); - -#ifdef __linux__ - FD_SET(_eventfd.getFd(), &rfds); -#endif - - struct timeval timeout; - timeout.tv_sec = timeoutSecs; - timeout.tv_usec = 1000 * timeoutMs; - - int sockfd = _sockfd; - int nfds = (std::max)(sockfd, _eventfd.getFd()); - int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, - (timeoutSecs < 0) ? nullptr : &timeout); - return ret; - } - - void Socket::wakeUpFromPoll() - { - // this will wake up the thread blocked on select, only needed on Linux - _eventfd.notify(); + return _eventfd.notify(wakeUpCode); } bool Socket::connect(const std::string& host, diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index 7998a4e7..88f8725a 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -17,9 +17,9 @@ typedef SSIZE_T ssize_t; #endif -#include "IXEventFd.h" #include "IXCancellationRequest.h" #include "IXProgressCallback.h" +#include "IXEventFd.h" namespace ix { @@ -27,7 +27,9 @@ namespace ix { PollResultType_ReadyForRead = 0, PollResultType_Timeout = 1, - PollResultType_Error = 2 + PollResultType_Error = 2, + PollResultType_SendRequest = 3, + PollResultType_CloseRequest = 4 }; class Socket { @@ -39,10 +41,10 @@ namespace ix void configure(); - int select(int timeoutSecs, int timeoutMs); + PollResultType select(int timeoutSecs, int timeoutMs); virtual void poll(const OnPollCallback& onPollCallback, int timeoutSecs = kDefaultPollTimeout); - virtual void wakeUpFromPoll(); + virtual bool wakeUpFromPoll(uint8_t wakeUpCode); // Virtual methods virtual bool connect(const std::string& url, @@ -73,12 +75,15 @@ namespace ix static bool init(); // Required on Windows to initialize WinSocket static void cleanup(); // Required on Windows to cleanup WinSocket + // Used as special codes for pipe communication + static const uint8_t kSendRequest; + static const uint8_t kCloseRequest; + protected: void closeSocket(int fd); std::atomic _sockfd; std::mutex _socketMutex; - EventFd _eventfd; private: static const int kDefaultPollTimeout; @@ -87,5 +92,7 @@ namespace ix // Buffer for reading from our socket. That buffer is never resized. std::vector _readBuffer; static constexpr size_t kChunkSize = 1 << 15; + + EventFd _eventfd; }; } diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 19f8f0c3..d0cdee5b 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -379,4 +379,9 @@ namespace ix { _automaticReconnection = false; } + + size_t WebSocket::bufferedAmount() const + { + return _ws.bufferedAmount(); + } } diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 94ea047c..1c7fa5c1 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -112,6 +112,7 @@ namespace ix const std::string& getUrl() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; int getHeartBeatPeriod() const; + size_t bufferedAmount() const; void enableAutomaticReconnection(); void disableAutomaticReconnection(); diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 070bd4f6..0029e803 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -1,7 +1,31 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2012, 2013 + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + /* * IXWebSocketTransport.cpp * Author: Benjamin Sergeant - * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. */ // @@ -14,14 +38,6 @@ #include "IXUrlParser.h" #include "IXSocketFactory.h" -#ifdef IXWEBSOCKET_USE_TLS -# ifdef __APPLE__ -# include "IXSocketAppleSSL.h" -# else -# include "IXSocketOpenSSL.h" -# endif -#endif - #include #include @@ -80,16 +96,6 @@ namespace ix std::string("Could not parse URL ") + url); } - if (protocol != "ws" && protocol != "wss") - { - std::stringstream ss; - ss << "Invalid protocol: " << protocol - << " for url " << url - << " . Supported protocols are ws and wss"; - - return WebSocketInitResult(false, 0, ss.str()); - } - bool tls = protocol == "wss"; std::string errorMsg; _socket = createSocket(tls, errorMsg); @@ -184,38 +190,51 @@ namespace ix std::stringstream ss; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; sendPing(ss.str()); - return; } - - while (true) + // Make sure we send all the buffered data + // there can be a lot of it for large messages. + else if (pollResult == PollResultType_SendRequest) { - ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); - - if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || - _socket->getErrno() == EAGAIN)) + while (!isSendBufferEmpty() && !_requestInitCancellation) { - break; - } - else if (ret <= 0) - { - _rxbuf.clear(); - _socket->close(); - setReadyState(CLOSED); - break; - } - else - { - _rxbuf.insert(_rxbuf.end(), - _readbuf.begin(), - _readbuf.begin() + ret); + sendOnSocket(); } } + else if (pollResult == PollResultType_ReadyForRead) + { + while (true) + { + ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); - if (isSendBufferEmpty() && _readyState == CLOSING) + if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || + _socket->getErrno() == EAGAIN)) + { + break; + } + else if (ret <= 0) + { + _rxbuf.clear(); + _socket->close(); + setReadyState(CLOSED); + break; + } + else + { + _rxbuf.insert(_rxbuf.end(), + _readbuf.begin(), + _readbuf.begin() + ret); + } + } + } + else if (pollResult == PollResultType_Error) { _socket->close(); - setReadyState(CLOSED); } + else if (pollResult == PollResultType_CloseRequest) + { + ; + } + }, _heartBeatPeriod); } @@ -586,11 +605,7 @@ namespace ix } } - // Make sure we send all the buffered data ; there can be a lot of it - // for large messages. - // TODO / this will block the sending thread ; we need to eval whether - // this is the right fix - while (!isSendBufferEmpty()) sendOnSocket(); + _socket->wakeUpFromPoll(Socket::kSendRequest); return WebSocketSendInfo(true, compressionError, payloadSize, wireSize); } @@ -737,8 +752,17 @@ namespace ix sendData(wsheader_type::CLOSE, normalClosure, compress); setReadyState(CLOSING); - _socket->wakeUpFromPoll(); + _socket->wakeUpFromPoll(Socket::kCloseRequest); _socket->close(); + + _closeCode = 1000; + setReadyState(CLOSED); + } + + size_t WebSocketTransport::bufferedAmount() const + { + std::lock_guard lock(_txbufMutex); + return _txbuf.size(); } } // namespace ix diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index eb56e275..6c2c2853 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -77,6 +77,7 @@ namespace ix void setReadyState(ReadyStateValues readyStateValue); void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void dispatch(const OnMessageCallback& onMessageCallback); + size_t bufferedAmount() const; private: std::string _url; diff --git a/makefile b/makefile index 5fbbe3a7..094dc145 100644 --- a/makefile +++ b/makefile @@ -8,10 +8,10 @@ brew: .PHONY: docker docker: - docker build -t broadcast_server:latest . + docker build -t ws:latest . run: - docker run --cap-add sys_ptrace -it broadcast_server:latest bash + docker run --cap-add sys_ptrace -it ws:latest # this is helpful to remove trailing whitespaces trail: @@ -43,5 +43,9 @@ rebase_upstream: git reset --hard upstream/master git push origin master --force +install_cmake_for_linux: + mkdir -p /tmp/cmake + (cd /tmp/cmake ; curl -L -O https://github.com/Kitware/CMake/releases/download/v3.14.0-rc4/cmake-3.14.0-rc4-Linux-x86_64.tar.gz ; tar zxf cmake-3.14.0-rc4-Linux-x86_64.tar.gz) + .PHONY: test .PHONY: build diff --git a/test/IXSocketTest.cpp b/test/IXSocketTest.cpp index a024d999..0c409705 100644 --- a/test/IXSocketTest.cpp +++ b/test/IXSocketTest.cpp @@ -66,7 +66,13 @@ TEST_CASE("socket", "[socket]") std::shared_ptr socket(new Socket); std::string host("www.google.com"); int port = 80; - std::string request("GET / HTTP/1.1\r\n\r\n"); + + std::stringstream ss; + ss << "GET / HTTP/1.1\r\n"; + ss << "Host: " << host << "\r\n"; + ss << "\r\n"; + std::string request(ss.str()); + int expectedStatus = 200; int timeoutSecs = 3; diff --git a/test/cmd_websocket_chat.cpp b/test/cmd_websocket_chat.cpp index 8a929524..e8dc177d 100644 --- a/test/cmd_websocket_chat.cpp +++ b/test/cmd_websocket_chat.cpp @@ -164,10 +164,21 @@ namespace ss << "cmd_websocket_chat: Error ! " << error.reason; log(ss.str()); } + else if (messageType == ix::WebSocket_MessageType_Ping) + { + log("cmd_websocket_chat: received ping message"); + } + else if (messageType == ix::WebSocket_MessageType_Pong) + { + log("cmd_websocket_chat: received pong message"); + } + else if (messageType == ix::WebSocket_MessageType_Fragment) + { + log("cmd_websocket_chat: received message fragment"); + } else { - // FIXME: missing ping/pong messages - ss << "Invalid ix::WebSocketMessageType"; + ss << "Unexpected ix::WebSocketMessageType"; log(ss.str()); } }); diff --git a/test/run.py b/test/run.py index 664dee4c..79fdacea 100644 --- a/test/run.py +++ b/test/run.py @@ -77,6 +77,8 @@ shutil.copy(os.path.join( 'bin', 'zlib.dll'), '.') -testCommand = '{} {}'.format(testBinary, os.getenv('TEST', '')) +lldb = "lldb --batch -o 'run' -k 'thread backtrace all' -k 'quit 1'" +lldb = "" # Disabled for now +testCommand = '{} {} {}'.format(lldb, testBinary, os.getenv('TEST', '')) ret = os.system(testCommand) assert ret == 0, 'Test command failed' diff --git a/ws/ws.cpp b/ws/ws.cpp index 029ec84c..4f90b8e9 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -51,6 +51,7 @@ int main(int argc, char** argv) CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); transferApp->add_option("--port", port, "Connection url"); + transferApp->add_option("--host", hostname, "Hostname"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); connectApp->add_option("url", url, "Connection url")->required(); @@ -60,11 +61,12 @@ int main(int argc, char** argv) chatApp->add_option("user", user, "User name")->required(); CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server"); - echoServerApp->add_option("--port", port, "Connection url"); + echoServerApp->add_option("--port", port, "Port"); + echoServerApp->add_option("--host", hostname, "Hostname"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); - broadcastServerApp->add_option("--port", port, "Connection url"); - broadcastServerApp->add_option("--hostname", hostname, "Hostname"); + broadcastServerApp->add_option("--port", port, "Port"); + broadcastServerApp->add_option("--host", hostname, "Hostname"); CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong"); pingPongApp->add_option("url", url, "Connection url")->required(); @@ -90,7 +92,7 @@ int main(int argc, char** argv) if (app.got_subcommand("transfer")) { - return ix::ws_transfer_main(port); + return ix::ws_transfer_main(port, hostname); } else if (app.got_subcommand("send")) { @@ -111,7 +113,7 @@ int main(int argc, char** argv) } else if (app.got_subcommand("echo_server")) { - return ix::ws_echo_server_main(port); + return ix::ws_echo_server_main(port, hostname); } else if (app.got_subcommand("broadcast_server")) { diff --git a/ws/ws.h b/ws/ws.h index c9171bca..5730c262 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -24,9 +24,9 @@ namespace ix int ws_ping_pong_main(const std::string& url); - int ws_echo_server_main(int port); - + int ws_echo_server_main(int port, const std::string& hostname); int ws_broadcast_server_main(int port, const std::string& hostname); + int ws_transfer_main(int port, const std::string& hostname); int ws_chat_main(const std::string& url, const std::string& user); @@ -36,8 +36,6 @@ namespace ix int ws_receive_main(const std::string& url, bool enablePerMessageDeflate); - int ws_transfer_main(int port); - int ws_send_main(const std::string& url, const std::string& path); } diff --git a/ws/ws_broadcast_server.cpp b/ws/ws_broadcast_server.cpp index 3fad7315..12e7d068 100644 --- a/ws/ws_broadcast_server.cpp +++ b/ws/ws_broadcast_server.cpp @@ -71,6 +71,15 @@ namespace ix << " out of " << total << std::endl; return true; }); + + do + { + size_t bufferedAmount = client->bufferedAmount(); + std::cerr << bufferedAmount << " bytes left to be sent" << std::endl; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + } while (client->bufferedAmount() != 0); } } } diff --git a/ws/ws_echo_server.cpp b/ws/ws_echo_server.cpp index 55394200..ac778196 100644 --- a/ws/ws_echo_server.cpp +++ b/ws/ws_echo_server.cpp @@ -10,11 +10,11 @@ namespace ix { - int ws_echo_server_main(int port) + int ws_echo_server_main(int port, const std::string& hostname) { - std::cout << "Listening on port " << port << std::endl; + std::cout << "Listening on " << hostname << ":" << port << std::endl; - ix::WebSocketServer server(port); + ix::WebSocketServer server(port, hostname); server.setOnConnectionCallback( [](std::shared_ptr webSocket) diff --git a/ws/ws_send.cpp b/ws/ws_send.cpp index cd869662..2c61478b 100644 --- a/ws/ws_send.cpp +++ b/ws/ws_send.cpp @@ -257,6 +257,15 @@ namespace ix return true; }); + do + { + size_t bufferedAmount = _webSocket.bufferedAmount(); + std::cout << bufferedAmount << " bytes left to be sent" << std::endl; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + } while (_webSocket.bufferedAmount() != 0); + bench.report(); auto duration = bench.getDuration(); auto transferRate = 1000 * content.size() / duration; diff --git a/ws/ws_transfer.cpp b/ws/ws_transfer.cpp index cba06ed8..460f5262 100644 --- a/ws/ws_transfer.cpp +++ b/ws/ws_transfer.cpp @@ -10,11 +10,11 @@ namespace ix { - int ws_transfer_main(int port) + int ws_transfer_main(int port, const std::string& hostname) { - std::cout << "Listening on port " << port << std::endl; + std::cout << "Listening on " << hostname << ":" << port << std::endl; - ix::WebSocketServer server(port); + ix::WebSocketServer server(port, hostname); server.setOnConnectionCallback( [&server](std::shared_ptr webSocket) @@ -70,6 +70,15 @@ namespace ix << " out of " << total << std::endl; return true; }); + + do + { + size_t bufferedAmount = client->bufferedAmount(); + std::cerr << bufferedAmount << " bytes left to be sent" << std::endl; + + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + } while (client->bufferedAmount() != 0); } } }