cancellation refactoring
This commit is contained in:
parent
df6a17dcc2
commit
ed3a50d9b5
@ -17,6 +17,7 @@ set( IXWEBSOCKET_SOURCES
|
|||||||
ixwebsocket/IXSocket.cpp
|
ixwebsocket/IXSocket.cpp
|
||||||
ixwebsocket/IXSocketConnect.cpp
|
ixwebsocket/IXSocketConnect.cpp
|
||||||
ixwebsocket/IXDNSLookup.cpp
|
ixwebsocket/IXDNSLookup.cpp
|
||||||
|
ixwebsocket/IXCancellationRequest.cpp
|
||||||
ixwebsocket/IXWebSocket.cpp
|
ixwebsocket/IXWebSocket.cpp
|
||||||
ixwebsocket/IXWebSocketServer.cpp
|
ixwebsocket/IXWebSocketServer.cpp
|
||||||
ixwebsocket/IXWebSocketTransport.cpp
|
ixwebsocket/IXWebSocketTransport.cpp
|
||||||
|
33
ixwebsocket/IXCancellationRequest.cpp
Normal file
33
ixwebsocket/IXCancellationRequest.cpp
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
/*
|
||||||
|
* IXCancellationRequest.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXCancellationRequest.h"
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
CancellationRequest makeCancellationRequestWithTimeout(int secs,
|
||||||
|
std::atomic<bool>& requestInitCancellation)
|
||||||
|
{
|
||||||
|
auto start = std::chrono::system_clock::now();
|
||||||
|
auto timeout = std::chrono::seconds(secs);
|
||||||
|
|
||||||
|
auto isCancellationRequested = [&requestInitCancellation, start, timeout]() -> bool
|
||||||
|
{
|
||||||
|
// Was an explicit cancellation requested ?
|
||||||
|
if (requestInitCancellation) return true;
|
||||||
|
|
||||||
|
auto now = std::chrono::system_clock::now();
|
||||||
|
if ((now - start) > timeout) return true;
|
||||||
|
|
||||||
|
// No cancellation request
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
return isCancellationRequested;
|
||||||
|
}
|
||||||
|
}
|
@ -7,9 +7,13 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
{
|
{
|
||||||
using CancellationRequest = std::function<bool()>;
|
using CancellationRequest = std::function<bool()>;
|
||||||
|
|
||||||
|
CancellationRequest makeCancellationRequestWithTimeout(int seconds,
|
||||||
|
std::atomic<bool>& requestInitCancellation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,9 +14,7 @@
|
|||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
{
|
{
|
||||||
// 60s timeout, see IXSocketConnect.cpp
|
const int64_t DNSLookup::kDefaultWait = 10; // ms
|
||||||
const int64_t DNSLookup::kDefaultTimeout = 60 * 1000; // ms
|
|
||||||
const int64_t DNSLookup::kDefaultWait = 10; // ms
|
|
||||||
|
|
||||||
std::atomic<uint64_t> DNSLookup::_nextId(0);
|
std::atomic<uint64_t> DNSLookup::_nextId(0);
|
||||||
std::set<uint64_t> DNSLookup::_activeJobs;
|
std::set<uint64_t> DNSLookup::_activeJobs;
|
||||||
@ -112,7 +110,6 @@ namespace ix
|
|||||||
_thread = std::thread(&DNSLookup::run, this);
|
_thread = std::thread(&DNSLookup::run, this);
|
||||||
_thread.detach();
|
_thread.detach();
|
||||||
|
|
||||||
int64_t timeout = kDefaultTimeout;
|
|
||||||
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
|
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
|
||||||
|
|
||||||
while (!_done)
|
while (!_done)
|
||||||
@ -131,14 +128,6 @@ namespace ix
|
|||||||
errMsg = "cancellation requested";
|
errMsg = "cancellation requested";
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Have we exceeded the timeout ?
|
|
||||||
timeout -= _wait;
|
|
||||||
if (timeout <= 0)
|
|
||||||
{
|
|
||||||
errMsg = "dns lookup timed out after 60 seconds";
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maybe a cancellation request got in before the bg terminated ?
|
// Maybe a cancellation request got in before the bg terminated ?
|
||||||
|
@ -61,7 +61,6 @@ namespace ix
|
|||||||
static std::set<uint64_t> _activeJobs;
|
static std::set<uint64_t> _activeJobs;
|
||||||
static std::mutex _activeJobsMutex;
|
static std::mutex _activeJobsMutex;
|
||||||
|
|
||||||
const static int64_t kDefaultTimeout;
|
|
||||||
const static int64_t kDefaultWait;
|
const static int64_t kDefaultWait;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -80,21 +80,9 @@ namespace ix
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
for (;;)
|
||||||
// If during a connection attempt the request remains idle for longer
|
|
||||||
// than the timeout interval, the request is considered to have timed
|
|
||||||
// out. The default timeout interval is 60 seconds.
|
|
||||||
//
|
|
||||||
// See https://developer.apple.com/documentation/foundation/nsmutableurlrequest/1414063-timeoutinterval?language=objc
|
|
||||||
//
|
|
||||||
// 60 seconds timeout, each time we wait for 50ms with select -> 1200 attempts
|
|
||||||
//
|
|
||||||
int selectTimeOut = 50 * 1000; // In micro-seconds => 50ms
|
|
||||||
int maxRetries = 60 * 1000 * 1000 / selectTimeOut;
|
|
||||||
|
|
||||||
for (int i = 0; i < maxRetries; ++i)
|
|
||||||
{
|
{
|
||||||
if (isCancellationRequested())
|
if (isCancellationRequested()) // Must handle timeout as well
|
||||||
{
|
{
|
||||||
closeSocket(fd);
|
closeSocket(fd);
|
||||||
errMsg = "Cancelled";
|
errMsg = "Cancelled";
|
||||||
@ -105,10 +93,10 @@ namespace ix
|
|||||||
FD_ZERO(&wfds);
|
FD_ZERO(&wfds);
|
||||||
FD_SET(fd, &wfds);
|
FD_SET(fd, &wfds);
|
||||||
|
|
||||||
// 50ms timeout
|
// 50ms select timeout
|
||||||
struct timeval timeout;
|
struct timeval timeout;
|
||||||
timeout.tv_sec = 0;
|
timeout.tv_sec = 0;
|
||||||
timeout.tv_usec = selectTimeOut;
|
timeout.tv_usec = 50 * 1000;
|
||||||
|
|
||||||
select(fd + 1, nullptr, &wfds, nullptr, &timeout);
|
select(fd + 1, nullptr, &wfds, nullptr, &timeout);
|
||||||
|
|
||||||
@ -175,6 +163,7 @@ namespace ix
|
|||||||
return sockfd;
|
return sockfd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME: configure is a terrible name
|
||||||
void SocketConnect::configure(int sockfd)
|
void SocketConnect::configure(int sockfd)
|
||||||
{
|
{
|
||||||
// 1. disable Nagle's algorithm
|
// 1. disable Nagle's algorithm
|
||||||
|
@ -203,18 +203,10 @@ namespace ix
|
|||||||
ss << reason;
|
ss << reason;
|
||||||
ss << "\r\n";
|
ss << "\r\n";
|
||||||
|
|
||||||
// FIXME refactoring
|
// Socket write can only be cancelled through a timeout here, not manually.
|
||||||
auto start = std::chrono::system_clock::now();
|
static std::atomic<bool> requestInitCancellation(false);
|
||||||
auto timeout = std::chrono::seconds(1);
|
auto isCancellationRequested =
|
||||||
|
makeCancellationRequestWithTimeout(1, requestInitCancellation);
|
||||||
auto isCancellationRequested = [start, timeout]() -> bool
|
|
||||||
{
|
|
||||||
auto now = std::chrono::system_clock::now();
|
|
||||||
if ((now - start) > timeout) return true;
|
|
||||||
|
|
||||||
// No cancellation request
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
if (!_socket->writeBytes(ss.str(), isCancellationRequested))
|
if (!_socket->writeBytes(ss.str(), isCancellationRequested))
|
||||||
{
|
{
|
||||||
@ -231,21 +223,8 @@ namespace ix
|
|||||||
{
|
{
|
||||||
_requestInitCancellation = false;
|
_requestInitCancellation = false;
|
||||||
|
|
||||||
// FIXME: timeout should be configurable
|
auto isCancellationRequested =
|
||||||
auto start = std::chrono::system_clock::now();
|
makeCancellationRequestWithTimeout(60, _requestInitCancellation);
|
||||||
auto timeout = std::chrono::seconds(10);
|
|
||||||
|
|
||||||
auto isCancellationRequested = [this, start, timeout]() -> bool
|
|
||||||
{
|
|
||||||
// Was an explicit cancellation requested ?
|
|
||||||
if (_requestInitCancellation) return true;
|
|
||||||
|
|
||||||
auto now = std::chrono::system_clock::now();
|
|
||||||
if ((now - start) > timeout) return true;
|
|
||||||
|
|
||||||
// No cancellation request
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::string errMsg;
|
std::string errMsg;
|
||||||
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
|
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
|
||||||
@ -369,20 +348,8 @@ namespace ix
|
|||||||
SocketConnect::configure(fd);
|
SocketConnect::configure(fd);
|
||||||
|
|
||||||
// FIXME: timeout should be configurable
|
// FIXME: timeout should be configurable
|
||||||
auto start = std::chrono::system_clock::now();
|
auto isCancellationRequested =
|
||||||
auto timeout = std::chrono::seconds(3);
|
makeCancellationRequestWithTimeout(3, _requestInitCancellation);
|
||||||
|
|
||||||
auto isCancellationRequested = [this, start, timeout]() -> bool
|
|
||||||
{
|
|
||||||
// Was an explicit cancellation requested ?
|
|
||||||
if (_requestInitCancellation) return true;
|
|
||||||
|
|
||||||
auto now = std::chrono::system_clock::now();
|
|
||||||
if ((now - start) > timeout) return true;
|
|
||||||
|
|
||||||
// No cancellation request
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::string remote = std::string("remote fd ") + std::to_string(fd);
|
std::string remote = std::string("remote fd ") + std::to_string(fd);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user