non blocking dns lookup

This commit is contained in:
Benjamin Sergeant 2018-12-14 16:28:11 -08:00
parent 8c079787f0
commit cbadecab33
17 changed files with 323 additions and 52 deletions

View File

@ -14,6 +14,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
@ -24,6 +25,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXWebSocketSendInfo.h

View File

@ -9,6 +9,7 @@ RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
COPY . .

View File

@ -13,6 +13,7 @@ g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
../../ixwebsocket/IXDNSLookup.cpp \
../../ixwebsocket/IXSocketConnect.cpp \
../../ixwebsocket/IXSocketOpenSSL.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \

View File

@ -0,0 +1,15 @@
/*
* IXCancellationRequest.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <functional>
namespace ix
{
using CancellationRequest = std::function<bool()>;
}

148
ixwebsocket/IXDNSLookup.cpp Normal file
View File

@ -0,0 +1,148 @@
/*
* IXDNSLookup.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXDNSLookup.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include <chrono>
namespace ix
{
// 60s timeout, see IXSocketConnect.cpp
const int64_t DNSLookup::kDefaultTimeout = 60 * 1000; // ms
const int64_t DNSLookup::kDefaultWait = 10; // ms
DNSLookup::DNSLookup(const std::string& hostname, int port, int wait) :
_hostname(hostname),
_port(port),
_res(nullptr),
_done(false),
_wait(wait)
{
}
DNSLookup::~DNSLookup()
{
;
}
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_ADDRCONFIG | AI_NUMERICSERV;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
std::string sport = std::to_string(port);
struct addrinfo* res;
int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(),
&hints, &res);
if (getaddrinfo_result)
{
errMsg = gai_strerror(getaddrinfo_result);
res = nullptr;
}
return res;
}
struct addrinfo* DNSLookup::resolve(std::string& errMsg,
const CancellationRequest& isCancellationRequested,
bool blocking)
{
return blocking ? resolveBlocking(errMsg, isCancellationRequested)
: resolveAsync(errMsg, isCancellationRequested);
}
struct addrinfo* DNSLookup::resolveBlocking(std::string& errMsg,
const CancellationRequest& isCancellationRequested)
{
errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
}
return getAddrInfo(_hostname, _port, errMsg);
}
struct addrinfo* DNSLookup::resolveAsync(std::string& errMsg,
const CancellationRequest& isCancellationRequested)
{
errMsg = "no error";
// Can only be called once, otherwise we would have to manage a pool
// of background thread which is overkill for our usage.
if (_done)
{
return nullptr; // programming error, create a second DNSLookup instance
// if you need a second lookup.
}
//
// Good resource on thread forced termination
// https://www.bo-yang.net/2017/11/19/cpp-kill-detached-thread
//
_thread = std::thread(&DNSLookup::run, this);
_thread.detach();
int64_t timeout = kDefaultTimeout;
std::unique_lock<std::mutex> lock(_mutex);
while (!_done)
{
// Wait for 10 milliseconds on the condition variable, to see
// if the bg thread has terminated.
if (_condition.wait_for(lock, std::chrono::milliseconds(_wait)) == std::cv_status::no_timeout)
{
// Background thread has terminated, so we can break of this loop
break;
}
// Were we cancelled ?
if (isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
}
// Have we exceeded the timeout ?
timeout -= _wait;
if (timeout <= 0)
{
errMsg = "dns lookup timed out after 60 seconds";
return nullptr;
}
}
// Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
}
return _res;
}
void DNSLookup::run()
{
_res = getAddrInfo(_hostname, _port, _errMsg);
_condition.notify_one();
_done = true;
}
}

61
ixwebsocket/IXDNSLookup.h Normal file
View File

@ -0,0 +1,61 @@
/*
* IXDNSLookup.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*
* Resolve a hostname+port to a struct addrinfo obtained with getaddrinfo
* Does this in a background thread so that it can be cancelled, since
* getaddrinfo is a blocking call, and we don't want to block the main thread on Mobile.
*/
#pragma once
#include "IXCancellationRequest.h"
#include <string>
#include <thread>
#include <atomic>
#include <condition_variable>
struct addrinfo;
namespace ix
{
class DNSLookup {
public:
DNSLookup(const std::string& hostname,
int port,
int wait = DNSLookup::kDefaultWait);
~DNSLookup();
struct addrinfo* resolve(std::string& errMsg,
const CancellationRequest& isCancellationRequested,
bool blocking = false);
private:
struct addrinfo* resolveAsync(std::string& errMsg,
const CancellationRequest& isCancellationRequested);
struct addrinfo* resolveBlocking(std::string& errMsg,
const CancellationRequest& isCancellationRequested);
static struct addrinfo* getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg);
void run(); // thread runner
std::string _hostname;
int _port;
int _wait;
std::string _errMsg;
struct addrinfo* _res;
std::atomic<bool> _done;
std::thread _thread;
std::condition_variable _condition;
std::mutex _mutex;
const static int64_t kDefaultTimeout;
const static int64_t kDefaultWait;
};
}

View File

@ -5,6 +5,7 @@
*/
#include "IXSocket.h"
#include "IXSocketConnect.h"
#ifdef _WIN32
# include <basetsd.h>
@ -79,7 +80,7 @@ namespace ix
bool Socket::connect(const std::string& host,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested)
const CancellationRequest& isCancellationRequested)
{
std::lock_guard<std::mutex> lock(_socketMutex);

View File

@ -12,7 +12,7 @@
#include <atomic>
#include "IXEventFd.h"
#include "IXSocketConnect.h"
#include "IXCancellationRequest.h"
struct addrinfo;
@ -34,7 +34,7 @@ namespace ix
virtual bool connect(const std::string& url,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested);
const CancellationRequest& isCancellationRequested);
virtual void close();
virtual int send(char* buffer, size_t length);

View File

@ -6,6 +6,7 @@
* Adapted from Satori SDK Apple SSL code.
*/
#include "IXSocketAppleSSL.h"
#include "IXSocketConnect.h"
#include <fcntl.h>
#include <netdb.h>
@ -157,7 +158,7 @@ namespace ix
bool SocketAppleSSL::connect(const std::string& host,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested)
const CancellationRequest& isCancellationRequested)
{
OSStatus status;
{

View File

@ -7,7 +7,7 @@
#pragma once
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXCancellationRequest.h"
#include <Security/Security.h>
#include <Security/SecureTransport.h>
@ -25,7 +25,7 @@ namespace ix
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested) final;
const CancellationRequest& isCancellationRequested) final;
virtual void close() final;
virtual int send(char* buffer, size_t length) final;

View File

@ -5,6 +5,7 @@
*/
#include "IXSocketConnect.h"
#include "IXDNSLookup.h"
#ifdef _WIN32
# include <basetsd.h>
@ -47,10 +48,15 @@ namespace
namespace ix
{
//
// This function can be cancelled every 50 ms
// This is important so that we don't block the main UI thread when shutting down a connection which is
// already trying to reconnect, and can be blocked waiting for ::connect to respond.
//
bool SocketConnect::connectToAddress(const struct addrinfo *address,
int& sockfd,
std::string& errMsg,
CancellationRequest isCancellationRequested)
const CancellationRequest& isCancellationRequested)
{
sockfd = -1;
@ -64,20 +70,17 @@ namespace ix
}
// Set the socket to non blocking mode, so that slow responses cannot
// block us for too long while we are trying to shut-down.
// block us for too long
SocketConnect::configure(fd);
if (::connect(fd, address->ai_addr, address->ai_addrlen) == -1
&& errno != EINPROGRESS)
{
closeSocket(fd);
sockfd = -1;
errMsg = strerror(errno);
return false;
}
// std::cout << "I WAS HERE A" << std::endl;
//
// If during a connection attempt the request remains idle for longer
// than the timeout interval, the request is considered to have timed
@ -95,7 +98,6 @@ namespace ix
if (isCancellationRequested())
{
closeSocket(fd);
sockfd = -1;
errMsg = "Cancelled";
return false;
}
@ -115,7 +117,7 @@ namespace ix
if (!FD_ISSET(fd, &wfds)) continue;
// Something was written to the socket
int optval;
int optval = -1;
socklen_t optlen = sizeof(optval);
// getsockopt() puts the errno value for connect into optval so 0
@ -124,7 +126,6 @@ namespace ix
optval != 0)
{
closeSocket(fd);
sockfd = -1;
errMsg = strerror(optval);
return false;
}
@ -137,33 +138,22 @@ namespace ix
}
closeSocket(fd);
sockfd = -1;
errMsg = strerror(errno);
errMsg = "connect timed out after 60 seconds";
return false;
}
int SocketConnect::connect(const std::string& hostname,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested)
const CancellationRequest& isCancellationRequested)
{
//
// First do DNS resolution
//
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_ADDRCONFIG | AI_NUMERICSERV;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
std::string sport = std::to_string(port);
struct addrinfo *res = nullptr;
int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(),
&hints, &res);
if (getaddrinfo_result)
DNSLookup dnsLookup(hostname, port);
struct addrinfo *res = dnsLookup.resolve(errMsg, isCancellationRequested);
if (res == nullptr)
{
errMsg = gai_strerror(getaddrinfo_result);
return -1;
}
@ -183,15 +173,18 @@ namespace ix
break;
}
}
freeaddrinfo(res);
return sockfd;
}
void SocketConnect::configure(int sockfd)
{
// 1. disable Nagle's algorithm
int flag = 1;
setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)); // Disable Nagle's algorithm
setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag));
// 2. make socket non blocking
#ifdef _WIN32
unsigned long nonblocking = 1;
ioctlsocket(_sockfd, FIONBIO, &nonblocking);
@ -199,6 +192,7 @@ namespace ix
fcntl(sockfd, F_SETFL, O_NONBLOCK); // make socket non blocking
#endif
// 3. (apple) prevent SIGPIPE from being emitted when the remote end disconnect
#ifdef SO_NOSIGPIPE
int value = 1;
setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE,

View File

@ -6,26 +6,26 @@
#pragma once
#include "IXCancellationRequest.h"
#include <string>
#include <functional>
struct addrinfo;
namespace ix
{
using CancellationRequest = std::function<bool()>;
class SocketConnect {
public:
static int connect(const std::string& hostname,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested);
const CancellationRequest& isCancellationRequested);
private:
static bool connectToAddress(const struct addrinfo *address,
int& sockfd,
std::string& errMsg,
CancellationRequest isCancellationRequested);
const CancellationRequest& isCancellationRequested);
static void configure(int sockfd);
};

View File

@ -7,6 +7,8 @@
*/
#include "IXSocketOpenSSL.h"
#include "IXSocketConnect.h"
#include <cassert>
#include <iostream>
@ -275,7 +277,7 @@ namespace ix
bool SocketOpenSSL::connect(const std::string& host,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested)
const CancellationRequest& isCancellationRequested)
{
bool handshakeSuccessful = false;
{
@ -415,10 +417,8 @@ namespace ix
if (reason == SSL_ERROR_WANT_READ || reason == SSL_ERROR_WANT_WRITE)
{
errno = EWOULDBLOCK;
return -1;
} else {
return -1;
}
return -1;
}
}

View File

@ -7,7 +7,7 @@
#pragma once
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXCancellationRequest.h"
#include <openssl/bio.h>
#include <openssl/hmac.h>
@ -28,7 +28,7 @@ namespace ix
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
CancellationRequest isCancellationRequested) final;
const CancellationRequest& isCancellationRequested) final;
virtual void close() final;
virtual int send(char* buffer, size_t length) final;

View File

@ -40,7 +40,10 @@ namespace ix
{
WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED),
_enablePerMessageDeflate(false)
_enablePerMessageDeflate(false),
_closeCode(0),
_closeWireSize(0),
_requestInitCancellation(false)
{
}
@ -90,6 +93,12 @@ namespace ix
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
@ -164,6 +173,8 @@ namespace ix
std::string protocol, host, path, query;
int port;
_requestInitCancellation = false;
if (!WebSocketTransport::parseUrl(_url, protocol, host,
path, query, port))
{
@ -192,9 +203,9 @@ namespace ix
std::string errMsg;
bool success = _socket->connect(host, port, errMsg,
[this]
[this]() -> bool
{
return _readyState == CLOSING;
return _requestInitCancellation;
}
);
if (!success)
@ -231,9 +242,7 @@ namespace ix
ss << "\r\n";
std::string request = ss.str();
int requestSize = (int) request.size();
if (_socket->send(const_cast<char*>(request.c_str()), requestSize) != requestSize)
if (!writeBytes(ss.str()))
{
return WebSocketInitResult(false, 0, std::string("Failed sending GET request to ") + _url);
}
@ -369,6 +378,7 @@ namespace ix
_onCloseCallback(_closeCode, _closeReason, _closeWireSize);
_closeCode = 0;
_closeReason = std::string();
_closeWireSize = 0;
}
_readyState = readyStateValue;
@ -792,6 +802,8 @@ namespace ix
void WebSocketTransport::close()
{
_requestInitCancellation = true;
if (_readyState == CLOSING || _readyState == CLOSED) return;
// See list of close events here:
@ -810,10 +822,9 @@ namespace ix
_socket->close();
}
// Used by init
bool WebSocketTransport::readByte(void* buffer)
{
while (true)
while (true)
{
if (_readyState == CLOSING) return false;
@ -826,13 +837,43 @@ namespace ix
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
continue;
}
// There was an error during the read, abort
else
else
{
return false;
}
}
}
bool WebSocketTransport::writeBytes(const std::string& str)
{
while (true)
{
if (_readyState == CLOSING) return false;
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
int ret = _socket->send(buffer, len);
// We wrote some bytes, as needed, all good.
if (ret > 0)
{
return ret == len;
}
// There is possibly something to be write, try again
else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
continue;
}
// There was an error during the write, abort
else
{
return false;
}

View File

@ -136,6 +136,9 @@ namespace ix
WebSocketPerMessageDeflateOptions _perMessageDeflateOptions;
std::atomic<bool> _enablePerMessageDeflate;
// Used to cancel dns lookup + socket connect + http upgrade
std::atomic<bool> _requestInitCancellation;
void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message,
@ -158,6 +161,8 @@ namespace ix
void unmaskReceiveBuffer(const wsheader_type& ws);
std::string genRandomString(const int len);
// Non blocking versions of read/write, used during http upgrade
bool readByte(void* buffer);
bool writeBytes(const std::string& str);
};
}

View File

@ -3,6 +3,7 @@
#
all: run
.PHONY: docker
docker:
docker build -t ws_connect:latest .