From cbadecab336a807deead62fa968325f6e69585cf Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Fri, 14 Dec 2018 16:28:11 -0800 Subject: [PATCH] non blocking dns lookup --- CMakeLists.txt | 2 + docker/Dockerfile.debian | 1 + examples/ws_connect/build_linux.sh | 1 + ixwebsocket/IXCancellationRequest.h | 15 +++ ixwebsocket/IXDNSLookup.cpp | 148 +++++++++++++++++++++++++++ ixwebsocket/IXDNSLookup.h | 61 +++++++++++ ixwebsocket/IXSocket.cpp | 3 +- ixwebsocket/IXSocket.h | 4 +- ixwebsocket/IXSocketAppleSSL.cpp | 3 +- ixwebsocket/IXSocketAppleSSL.h | 4 +- ixwebsocket/IXSocketConnect.cpp | 44 ++++---- ixwebsocket/IXSocketConnect.h | 10 +- ixwebsocket/IXSocketOpenSSL.cpp | 8 +- ixwebsocket/IXSocketOpenSSL.h | 4 +- ixwebsocket/IXWebSocketTransport.cpp | 61 +++++++++-- ixwebsocket/IXWebSocketTransport.h | 5 + makefile | 1 + 17 files changed, 323 insertions(+), 52 deletions(-) create mode 100644 ixwebsocket/IXCancellationRequest.h create mode 100644 ixwebsocket/IXDNSLookup.cpp create mode 100644 ixwebsocket/IXDNSLookup.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 53fa4713..01945b6e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,7 @@ set( IXWEBSOCKET_SOURCES ixwebsocket/IXEventFd.cpp ixwebsocket/IXSocket.cpp ixwebsocket/IXSocketConnect.cpp + ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXWebSocket.cpp ixwebsocket/IXWebSocketTransport.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp @@ -24,6 +25,7 @@ set( IXWEBSOCKET_HEADERS ixwebsocket/IXEventFd.h ixwebsocket/IXSocket.h ixwebsocket/IXSocketConnect.h + ixwebsocket/IXDNSLookup.h ixwebsocket/IXWebSocket.h ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketSendInfo.h diff --git a/docker/Dockerfile.debian b/docker/Dockerfile.debian index 2c11e46f..cbf2f387 100644 --- a/docker/Dockerfile.debian +++ b/docker/Dockerfile.debian @@ -9,6 +9,7 @@ RUN apt-get -y install screen RUN apt-get -y install procps RUN apt-get -y install lsof RUN apt-get -y install libz-dev +RUN apt-get -y install vim COPY . . diff --git a/examples/ws_connect/build_linux.sh b/examples/ws_connect/build_linux.sh index 5392163d..2b7f392c 100644 --- a/examples/ws_connect/build_linux.sh +++ b/examples/ws_connect/build_linux.sh @@ -13,6 +13,7 @@ g++ --std=c++11 \ ../../ixwebsocket/IXSocket.cpp \ ../../ixwebsocket/IXWebSocketTransport.cpp \ ../../ixwebsocket/IXWebSocket.cpp \ + ../../ixwebsocket/IXDNSLookup.cpp \ ../../ixwebsocket/IXSocketConnect.cpp \ ../../ixwebsocket/IXSocketOpenSSL.cpp \ ../../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \ diff --git a/ixwebsocket/IXCancellationRequest.h b/ixwebsocket/IXCancellationRequest.h new file mode 100644 index 00000000..e95bf3f7 --- /dev/null +++ b/ixwebsocket/IXCancellationRequest.h @@ -0,0 +1,15 @@ +/* + * IXCancellationRequest.h + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include + +namespace ix +{ + using CancellationRequest = std::function; +} + diff --git a/ixwebsocket/IXDNSLookup.cpp b/ixwebsocket/IXDNSLookup.cpp new file mode 100644 index 00000000..70b4cfeb --- /dev/null +++ b/ixwebsocket/IXDNSLookup.cpp @@ -0,0 +1,148 @@ +/* + * IXDNSLookup.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#include "IXDNSLookup.h" + +#include +#include +#include +#include +#include + +namespace ix +{ + // 60s timeout, see IXSocketConnect.cpp + const int64_t DNSLookup::kDefaultTimeout = 60 * 1000; // ms + const int64_t DNSLookup::kDefaultWait = 10; // ms + + DNSLookup::DNSLookup(const std::string& hostname, int port, int wait) : + _hostname(hostname), + _port(port), + _res(nullptr), + _done(false), + _wait(wait) + { + + } + + DNSLookup::~DNSLookup() + { + ; + } + + struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname, + int port, + std::string& errMsg) + { + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_flags = AI_ADDRCONFIG | AI_NUMERICSERV; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + std::string sport = std::to_string(port); + + struct addrinfo* res; + int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(), + &hints, &res); + if (getaddrinfo_result) + { + errMsg = gai_strerror(getaddrinfo_result); + res = nullptr; + } + return res; + } + + struct addrinfo* DNSLookup::resolve(std::string& errMsg, + const CancellationRequest& isCancellationRequested, + bool blocking) + { + return blocking ? resolveBlocking(errMsg, isCancellationRequested) + : resolveAsync(errMsg, isCancellationRequested); + } + + struct addrinfo* DNSLookup::resolveBlocking(std::string& errMsg, + const CancellationRequest& isCancellationRequested) + { + errMsg = "no error"; + + // Maybe a cancellation request got in before the background thread terminated ? + if (isCancellationRequested()) + { + errMsg = "cancellation requested"; + return nullptr; + } + + return getAddrInfo(_hostname, _port, errMsg); + } + + struct addrinfo* DNSLookup::resolveAsync(std::string& errMsg, + const CancellationRequest& isCancellationRequested) + { + errMsg = "no error"; + + // Can only be called once, otherwise we would have to manage a pool + // of background thread which is overkill for our usage. + if (_done) + { + return nullptr; // programming error, create a second DNSLookup instance + // if you need a second lookup. + } + + // + // Good resource on thread forced termination + // https://www.bo-yang.net/2017/11/19/cpp-kill-detached-thread + // + _thread = std::thread(&DNSLookup::run, this); + _thread.detach(); + + int64_t timeout = kDefaultTimeout; + std::unique_lock lock(_mutex); + + while (!_done) + { + // Wait for 10 milliseconds on the condition variable, to see + // if the bg thread has terminated. + if (_condition.wait_for(lock, std::chrono::milliseconds(_wait)) == std::cv_status::no_timeout) + { + // Background thread has terminated, so we can break of this loop + break; + } + + // Were we cancelled ? + if (isCancellationRequested()) + { + errMsg = "cancellation requested"; + return nullptr; + } + + // Have we exceeded the timeout ? + timeout -= _wait; + if (timeout <= 0) + { + errMsg = "dns lookup timed out after 60 seconds"; + return nullptr; + } + } + + // Maybe a cancellation request got in before the bg terminated ? + if (isCancellationRequested()) + { + errMsg = "cancellation requested"; + return nullptr; + } + + return _res; + } + + void DNSLookup::run() + { + _res = getAddrInfo(_hostname, _port, _errMsg); + _condition.notify_one(); + _done = true; + } + +} diff --git a/ixwebsocket/IXDNSLookup.h b/ixwebsocket/IXDNSLookup.h new file mode 100644 index 00000000..a3d072ae --- /dev/null +++ b/ixwebsocket/IXDNSLookup.h @@ -0,0 +1,61 @@ +/* + * IXDNSLookup.h + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + * + * Resolve a hostname+port to a struct addrinfo obtained with getaddrinfo + * Does this in a background thread so that it can be cancelled, since + * getaddrinfo is a blocking call, and we don't want to block the main thread on Mobile. + */ + +#pragma once + +#include "IXCancellationRequest.h" + +#include +#include +#include +#include + +struct addrinfo; + +namespace ix +{ + class DNSLookup { + public: + DNSLookup(const std::string& hostname, + int port, + int wait = DNSLookup::kDefaultWait); + ~DNSLookup(); + + struct addrinfo* resolve(std::string& errMsg, + const CancellationRequest& isCancellationRequested, + bool blocking = false); + + private: + struct addrinfo* resolveAsync(std::string& errMsg, + const CancellationRequest& isCancellationRequested); + struct addrinfo* resolveBlocking(std::string& errMsg, + const CancellationRequest& isCancellationRequested); + + static struct addrinfo* getAddrInfo(const std::string& hostname, + int port, + std::string& errMsg); + + void run(); // thread runner + + std::string _hostname; + int _port; + int _wait; + std::string _errMsg; + struct addrinfo* _res; + + std::atomic _done; + std::thread _thread; + std::condition_variable _condition; + std::mutex _mutex; + + const static int64_t kDefaultTimeout; + const static int64_t kDefaultWait; + }; +} diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index c86c6a94..3a2330b6 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -5,6 +5,7 @@ */ #include "IXSocket.h" +#include "IXSocketConnect.h" #ifdef _WIN32 # include @@ -79,7 +80,7 @@ namespace ix bool Socket::connect(const std::string& host, int port, std::string& errMsg, - CancellationRequest isCancellationRequested) + const CancellationRequest& isCancellationRequested) { std::lock_guard lock(_socketMutex); diff --git a/ixwebsocket/IXSocket.h b/ixwebsocket/IXSocket.h index f0876e17..ea16160d 100644 --- a/ixwebsocket/IXSocket.h +++ b/ixwebsocket/IXSocket.h @@ -12,7 +12,7 @@ #include #include "IXEventFd.h" -#include "IXSocketConnect.h" +#include "IXCancellationRequest.h" struct addrinfo; @@ -34,7 +34,7 @@ namespace ix virtual bool connect(const std::string& url, int port, std::string& errMsg, - CancellationRequest isCancellationRequested); + const CancellationRequest& isCancellationRequested); virtual void close(); virtual int send(char* buffer, size_t length); diff --git a/ixwebsocket/IXSocketAppleSSL.cpp b/ixwebsocket/IXSocketAppleSSL.cpp index 0e575113..bd60533d 100644 --- a/ixwebsocket/IXSocketAppleSSL.cpp +++ b/ixwebsocket/IXSocketAppleSSL.cpp @@ -6,6 +6,7 @@ * Adapted from Satori SDK Apple SSL code. */ #include "IXSocketAppleSSL.h" +#include "IXSocketConnect.h" #include #include @@ -157,7 +158,7 @@ namespace ix bool SocketAppleSSL::connect(const std::string& host, int port, std::string& errMsg, - CancellationRequest isCancellationRequested) + const CancellationRequest& isCancellationRequested) { OSStatus status; { diff --git a/ixwebsocket/IXSocketAppleSSL.h b/ixwebsocket/IXSocketAppleSSL.h index 19cd344a..bcf8b23f 100644 --- a/ixwebsocket/IXSocketAppleSSL.h +++ b/ixwebsocket/IXSocketAppleSSL.h @@ -7,7 +7,7 @@ #pragma once #include "IXSocket.h" -#include "IXSocketConnect.h" +#include "IXCancellationRequest.h" #include #include @@ -25,7 +25,7 @@ namespace ix virtual bool connect(const std::string& host, int port, std::string& errMsg, - CancellationRequest isCancellationRequested) final; + const CancellationRequest& isCancellationRequested) final; virtual void close() final; virtual int send(char* buffer, size_t length) final; diff --git a/ixwebsocket/IXSocketConnect.cpp b/ixwebsocket/IXSocketConnect.cpp index 8fe7123f..ee974121 100644 --- a/ixwebsocket/IXSocketConnect.cpp +++ b/ixwebsocket/IXSocketConnect.cpp @@ -5,6 +5,7 @@ */ #include "IXSocketConnect.h" +#include "IXDNSLookup.h" #ifdef _WIN32 # include @@ -47,10 +48,15 @@ namespace namespace ix { + // + // This function can be cancelled every 50 ms + // This is important so that we don't block the main UI thread when shutting down a connection which is + // already trying to reconnect, and can be blocked waiting for ::connect to respond. + // bool SocketConnect::connectToAddress(const struct addrinfo *address, int& sockfd, std::string& errMsg, - CancellationRequest isCancellationRequested) + const CancellationRequest& isCancellationRequested) { sockfd = -1; @@ -64,20 +70,17 @@ namespace ix } // Set the socket to non blocking mode, so that slow responses cannot - // block us for too long while we are trying to shut-down. + // block us for too long SocketConnect::configure(fd); if (::connect(fd, address->ai_addr, address->ai_addrlen) == -1 && errno != EINPROGRESS) { closeSocket(fd); - sockfd = -1; errMsg = strerror(errno); return false; } - // std::cout << "I WAS HERE A" << std::endl; - // // If during a connection attempt the request remains idle for longer // than the timeout interval, the request is considered to have timed @@ -95,7 +98,6 @@ namespace ix if (isCancellationRequested()) { closeSocket(fd); - sockfd = -1; errMsg = "Cancelled"; return false; } @@ -115,7 +117,7 @@ namespace ix if (!FD_ISSET(fd, &wfds)) continue; // Something was written to the socket - int optval; + int optval = -1; socklen_t optlen = sizeof(optval); // getsockopt() puts the errno value for connect into optval so 0 @@ -124,7 +126,6 @@ namespace ix optval != 0) { closeSocket(fd); - sockfd = -1; errMsg = strerror(optval); return false; } @@ -137,33 +138,22 @@ namespace ix } closeSocket(fd); - sockfd = -1; - errMsg = strerror(errno); + errMsg = "connect timed out after 60 seconds"; return false; } int SocketConnect::connect(const std::string& hostname, int port, std::string& errMsg, - CancellationRequest isCancellationRequested) + const CancellationRequest& isCancellationRequested) { // // First do DNS resolution // - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_flags = AI_ADDRCONFIG | AI_NUMERICSERV; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - - std::string sport = std::to_string(port); - - struct addrinfo *res = nullptr; - int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(), - &hints, &res); - if (getaddrinfo_result) + DNSLookup dnsLookup(hostname, port); + struct addrinfo *res = dnsLookup.resolve(errMsg, isCancellationRequested); + if (res == nullptr) { - errMsg = gai_strerror(getaddrinfo_result); return -1; } @@ -183,15 +173,18 @@ namespace ix break; } } + freeaddrinfo(res); return sockfd; } void SocketConnect::configure(int sockfd) { + // 1. disable Nagle's algorithm int flag = 1; - setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)); // Disable Nagle's algorithm + setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)); + // 2. make socket non blocking #ifdef _WIN32 unsigned long nonblocking = 1; ioctlsocket(_sockfd, FIONBIO, &nonblocking); @@ -199,6 +192,7 @@ namespace ix fcntl(sockfd, F_SETFL, O_NONBLOCK); // make socket non blocking #endif + // 3. (apple) prevent SIGPIPE from being emitted when the remote end disconnect #ifdef SO_NOSIGPIPE int value = 1; setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, diff --git a/ixwebsocket/IXSocketConnect.h b/ixwebsocket/IXSocketConnect.h index 1857c8a2..86b93602 100644 --- a/ixwebsocket/IXSocketConnect.h +++ b/ixwebsocket/IXSocketConnect.h @@ -6,26 +6,26 @@ #pragma once +#include "IXCancellationRequest.h" + #include -#include struct addrinfo; namespace ix { - using CancellationRequest = std::function; - class SocketConnect { public: static int connect(const std::string& hostname, int port, std::string& errMsg, - CancellationRequest isCancellationRequested); + const CancellationRequest& isCancellationRequested); + private: static bool connectToAddress(const struct addrinfo *address, int& sockfd, std::string& errMsg, - CancellationRequest isCancellationRequested); + const CancellationRequest& isCancellationRequested); static void configure(int sockfd); }; diff --git a/ixwebsocket/IXSocketOpenSSL.cpp b/ixwebsocket/IXSocketOpenSSL.cpp index e6d987db..b6c34617 100644 --- a/ixwebsocket/IXSocketOpenSSL.cpp +++ b/ixwebsocket/IXSocketOpenSSL.cpp @@ -7,6 +7,8 @@ */ #include "IXSocketOpenSSL.h" +#include "IXSocketConnect.h" + #include #include @@ -275,7 +277,7 @@ namespace ix bool SocketOpenSSL::connect(const std::string& host, int port, std::string& errMsg, - CancellationRequest isCancellationRequested) + const CancellationRequest& isCancellationRequested) { bool handshakeSuccessful = false; { @@ -415,10 +417,8 @@ namespace ix if (reason == SSL_ERROR_WANT_READ || reason == SSL_ERROR_WANT_WRITE) { errno = EWOULDBLOCK; - return -1; - } else { - return -1; } + return -1; } } diff --git a/ixwebsocket/IXSocketOpenSSL.h b/ixwebsocket/IXSocketOpenSSL.h index 671de936..7e360a02 100644 --- a/ixwebsocket/IXSocketOpenSSL.h +++ b/ixwebsocket/IXSocketOpenSSL.h @@ -7,7 +7,7 @@ #pragma once #include "IXSocket.h" -#include "IXSocketConnect.h" +#include "IXCancellationRequest.h" #include #include @@ -28,7 +28,7 @@ namespace ix virtual bool connect(const std::string& host, int port, std::string& errMsg, - CancellationRequest isCancellationRequested) final; + const CancellationRequest& isCancellationRequested) final; virtual void close() final; virtual int send(char* buffer, size_t length) final; diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 21a83bc1..a4f06d89 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -40,7 +40,10 @@ namespace ix { WebSocketTransport::WebSocketTransport() : _readyState(CLOSED), - _enablePerMessageDeflate(false) + _enablePerMessageDeflate(false), + _closeCode(0), + _closeWireSize(0), + _requestInitCancellation(false) { } @@ -90,6 +93,12 @@ namespace ix { port = 443; } + else + { + // Invalid protocol. Should be caught by regex check + // but this missing branch trigger cpplint linter. + return false; + } } else { @@ -164,6 +173,8 @@ namespace ix std::string protocol, host, path, query; int port; + _requestInitCancellation = false; + if (!WebSocketTransport::parseUrl(_url, protocol, host, path, query, port)) { @@ -192,9 +203,9 @@ namespace ix std::string errMsg; bool success = _socket->connect(host, port, errMsg, - [this] + [this]() -> bool { - return _readyState == CLOSING; + return _requestInitCancellation; } ); if (!success) @@ -231,9 +242,7 @@ namespace ix ss << "\r\n"; - std::string request = ss.str(); - int requestSize = (int) request.size(); - if (_socket->send(const_cast(request.c_str()), requestSize) != requestSize) + if (!writeBytes(ss.str())) { return WebSocketInitResult(false, 0, std::string("Failed sending GET request to ") + _url); } @@ -369,6 +378,7 @@ namespace ix _onCloseCallback(_closeCode, _closeReason, _closeWireSize); _closeCode = 0; _closeReason = std::string(); + _closeWireSize = 0; } _readyState = readyStateValue; @@ -792,6 +802,8 @@ namespace ix void WebSocketTransport::close() { + _requestInitCancellation = true; + if (_readyState == CLOSING || _readyState == CLOSED) return; // See list of close events here: @@ -810,10 +822,9 @@ namespace ix _socket->close(); } - // Used by init bool WebSocketTransport::readByte(void* buffer) { - while (true) + while (true) { if (_readyState == CLOSING) return false; @@ -826,13 +837,43 @@ namespace ix return true; } // There is possibly something to be read, try again - else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || + else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || _socket->getErrno() == EAGAIN)) { continue; } // There was an error during the read, abort - else + else + { + return false; + } + } + } + + bool WebSocketTransport::writeBytes(const std::string& str) + { + while (true) + { + if (_readyState == CLOSING) return false; + + char* buffer = const_cast(str.c_str()); + int len = (int) str.size(); + + int ret = _socket->send(buffer, len); + + // We wrote some bytes, as needed, all good. + if (ret > 0) + { + return ret == len; + } + // There is possibly something to be write, try again + else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || + _socket->getErrno() == EAGAIN)) + { + continue; + } + // There was an error during the write, abort + else { return false; } diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index 541a85df..a3fe9640 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -136,6 +136,9 @@ namespace ix WebSocketPerMessageDeflateOptions _perMessageDeflateOptions; std::atomic _enablePerMessageDeflate; + // Used to cancel dns lookup + socket connect + http upgrade + std::atomic _requestInitCancellation; + void sendOnSocket(); WebSocketSendInfo sendData(wsheader_type::opcode_type type, const std::string& message, @@ -158,6 +161,8 @@ namespace ix void unmaskReceiveBuffer(const wsheader_type& ws); std::string genRandomString(const int len); + // Non blocking versions of read/write, used during http upgrade bool readByte(void* buffer); + bool writeBytes(const std::string& str); }; } diff --git a/makefile b/makefile index c2b068d0..3bb9fb1a 100644 --- a/makefile +++ b/makefile @@ -3,6 +3,7 @@ # all: run +.PHONY: docker docker: docker build -t ws_connect:latest .