cancellation refactoring
This commit is contained in:
		@@ -17,6 +17,7 @@ set( IXWEBSOCKET_SOURCES
 | 
				
			|||||||
    ixwebsocket/IXSocket.cpp
 | 
					    ixwebsocket/IXSocket.cpp
 | 
				
			||||||
    ixwebsocket/IXSocketConnect.cpp
 | 
					    ixwebsocket/IXSocketConnect.cpp
 | 
				
			||||||
    ixwebsocket/IXDNSLookup.cpp
 | 
					    ixwebsocket/IXDNSLookup.cpp
 | 
				
			||||||
 | 
					    ixwebsocket/IXCancellationRequest.cpp
 | 
				
			||||||
    ixwebsocket/IXWebSocket.cpp
 | 
					    ixwebsocket/IXWebSocket.cpp
 | 
				
			||||||
    ixwebsocket/IXWebSocketServer.cpp
 | 
					    ixwebsocket/IXWebSocketServer.cpp
 | 
				
			||||||
    ixwebsocket/IXWebSocketTransport.cpp
 | 
					    ixwebsocket/IXWebSocketTransport.cpp
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										33
									
								
								ixwebsocket/IXCancellationRequest.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								ixwebsocket/IXCancellationRequest.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,33 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  IXCancellationRequest.cpp
 | 
				
			||||||
 | 
					 *  Author: Benjamin Sergeant
 | 
				
			||||||
 | 
					 *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include "IXCancellationRequest.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include <chrono>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace ix 
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    CancellationRequest makeCancellationRequestWithTimeout(int secs,
 | 
				
			||||||
 | 
					                                                           std::atomic<bool>& requestInitCancellation)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        auto start = std::chrono::system_clock::now();
 | 
				
			||||||
 | 
					        auto timeout = std::chrono::seconds(secs);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        auto isCancellationRequested = [&requestInitCancellation, start, timeout]() -> bool
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            // Was an explicit cancellation requested ?
 | 
				
			||||||
 | 
					            if (requestInitCancellation) return true;
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            auto now = std::chrono::system_clock::now();
 | 
				
			||||||
 | 
					            if ((now - start) > timeout) return true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            // No cancellation request
 | 
				
			||||||
 | 
					            return false;
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return isCancellationRequested;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -7,9 +7,13 @@
 | 
				
			|||||||
#pragma once
 | 
					#pragma once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include <functional>
 | 
					#include <functional>
 | 
				
			||||||
 | 
					#include <atomic>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
namespace ix 
 | 
					namespace ix 
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
    using CancellationRequest = std::function<bool()>;
 | 
					    using CancellationRequest = std::function<bool()>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    CancellationRequest makeCancellationRequestWithTimeout(int seconds,
 | 
				
			||||||
 | 
					                                                           std::atomic<bool>& requestInitCancellation);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -14,9 +14,7 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
namespace ix 
 | 
					namespace ix 
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
    // 60s timeout, see IXSocketConnect.cpp
 | 
					    const int64_t DNSLookup::kDefaultWait = 10; // ms
 | 
				
			||||||
    const int64_t DNSLookup::kDefaultTimeout = 60 * 1000; // ms
 | 
					 | 
				
			||||||
    const int64_t DNSLookup::kDefaultWait = 10;           // ms
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    std::atomic<uint64_t> DNSLookup::_nextId(0);
 | 
					    std::atomic<uint64_t> DNSLookup::_nextId(0);
 | 
				
			||||||
    std::set<uint64_t> DNSLookup::_activeJobs;
 | 
					    std::set<uint64_t> DNSLookup::_activeJobs;
 | 
				
			||||||
@@ -112,7 +110,6 @@ namespace ix
 | 
				
			|||||||
        _thread = std::thread(&DNSLookup::run, this);
 | 
					        _thread = std::thread(&DNSLookup::run, this);
 | 
				
			||||||
        _thread.detach();
 | 
					        _thread.detach();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        int64_t timeout = kDefaultTimeout;
 | 
					 | 
				
			||||||
        std::unique_lock<std::mutex> lock(_conditionVariableMutex);
 | 
					        std::unique_lock<std::mutex> lock(_conditionVariableMutex);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        while (!_done)
 | 
					        while (!_done)
 | 
				
			||||||
@@ -131,14 +128,6 @@ namespace ix
 | 
				
			|||||||
                errMsg = "cancellation requested";
 | 
					                errMsg = "cancellation requested";
 | 
				
			||||||
                return nullptr;
 | 
					                return nullptr;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					 | 
				
			||||||
            // Have we exceeded the timeout ?
 | 
					 | 
				
			||||||
            timeout -= _wait;
 | 
					 | 
				
			||||||
            if (timeout <= 0)
 | 
					 | 
				
			||||||
            {
 | 
					 | 
				
			||||||
                errMsg = "dns lookup timed out after 60 seconds";
 | 
					 | 
				
			||||||
                return nullptr;
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Maybe a cancellation request got in before the bg terminated ?
 | 
					        // Maybe a cancellation request got in before the bg terminated ?
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -61,7 +61,6 @@ namespace ix
 | 
				
			|||||||
        static std::set<uint64_t> _activeJobs;
 | 
					        static std::set<uint64_t> _activeJobs;
 | 
				
			||||||
        static std::mutex _activeJobsMutex;
 | 
					        static std::mutex _activeJobsMutex;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        const static int64_t kDefaultTimeout;
 | 
					 | 
				
			||||||
        const static int64_t kDefaultWait;
 | 
					        const static int64_t kDefaultWait;
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -80,21 +80,9 @@ namespace ix
 | 
				
			|||||||
            return -1;
 | 
					            return -1;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        //
 | 
					        for (;;)
 | 
				
			||||||
        // If during a connection attempt the request remains idle for longer
 | 
					 | 
				
			||||||
        // than the timeout interval, the request is considered to have timed
 | 
					 | 
				
			||||||
        // out. The default timeout interval is 60 seconds.
 | 
					 | 
				
			||||||
        //
 | 
					 | 
				
			||||||
        // See https://developer.apple.com/documentation/foundation/nsmutableurlrequest/1414063-timeoutinterval?language=objc
 | 
					 | 
				
			||||||
        //
 | 
					 | 
				
			||||||
        // 60 seconds timeout, each time we wait for 50ms with select -> 1200 attempts
 | 
					 | 
				
			||||||
        //
 | 
					 | 
				
			||||||
        int selectTimeOut = 50 * 1000; // In micro-seconds => 50ms
 | 
					 | 
				
			||||||
        int maxRetries = 60 * 1000 * 1000 / selectTimeOut;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for (int i = 0; i < maxRetries; ++i)
 | 
					 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            if (isCancellationRequested())
 | 
					            if (isCancellationRequested()) // Must handle timeout as well
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                closeSocket(fd);
 | 
					                closeSocket(fd);
 | 
				
			||||||
                errMsg = "Cancelled";
 | 
					                errMsg = "Cancelled";
 | 
				
			||||||
@@ -105,10 +93,10 @@ namespace ix
 | 
				
			|||||||
            FD_ZERO(&wfds);
 | 
					            FD_ZERO(&wfds);
 | 
				
			||||||
            FD_SET(fd, &wfds);
 | 
					            FD_SET(fd, &wfds);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // 50ms timeout
 | 
					            // 50ms select timeout
 | 
				
			||||||
            struct timeval timeout;
 | 
					            struct timeval timeout;
 | 
				
			||||||
            timeout.tv_sec = 0;
 | 
					            timeout.tv_sec = 0;
 | 
				
			||||||
            timeout.tv_usec = selectTimeOut;
 | 
					            timeout.tv_usec = 50 * 1000;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            select(fd + 1, nullptr, &wfds, nullptr, &timeout);
 | 
					            select(fd + 1, nullptr, &wfds, nullptr, &timeout);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -175,6 +163,7 @@ namespace ix
 | 
				
			|||||||
        return sockfd;
 | 
					        return sockfd;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // FIXME: configure is a terrible name
 | 
				
			||||||
    void SocketConnect::configure(int sockfd)
 | 
					    void SocketConnect::configure(int sockfd)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        // 1. disable Nagle's algorithm
 | 
					        // 1. disable Nagle's algorithm
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -203,18 +203,10 @@ namespace ix
 | 
				
			|||||||
        ss << reason;
 | 
					        ss << reason;
 | 
				
			||||||
        ss << "\r\n";
 | 
					        ss << "\r\n";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // FIXME refactoring
 | 
					        // Socket write can only be cancelled through a timeout here, not manually.
 | 
				
			||||||
        auto start = std::chrono::system_clock::now();
 | 
					        static std::atomic<bool> requestInitCancellation(false);
 | 
				
			||||||
        auto timeout = std::chrono::seconds(1);
 | 
					        auto isCancellationRequested =
 | 
				
			||||||
 | 
					            makeCancellationRequestWithTimeout(1, requestInitCancellation);
 | 
				
			||||||
        auto isCancellationRequested = [start, timeout]() -> bool
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            auto now = std::chrono::system_clock::now();
 | 
					 | 
				
			||||||
            if ((now - start) > timeout) return true;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            // No cancellation request
 | 
					 | 
				
			||||||
            return false;
 | 
					 | 
				
			||||||
        };
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (!_socket->writeBytes(ss.str(), isCancellationRequested))
 | 
					        if (!_socket->writeBytes(ss.str(), isCancellationRequested))
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
@@ -231,21 +223,8 @@ namespace ix
 | 
				
			|||||||
    {
 | 
					    {
 | 
				
			||||||
        _requestInitCancellation = false;
 | 
					        _requestInitCancellation = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // FIXME: timeout should be configurable
 | 
					        auto isCancellationRequested = 
 | 
				
			||||||
        auto start = std::chrono::system_clock::now();
 | 
					            makeCancellationRequestWithTimeout(60, _requestInitCancellation);
 | 
				
			||||||
        auto timeout = std::chrono::seconds(10);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        auto isCancellationRequested = [this, start, timeout]() -> bool
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            // Was an explicit cancellation requested ?
 | 
					 | 
				
			||||||
            if (_requestInitCancellation) return true;
 | 
					 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            auto now = std::chrono::system_clock::now();
 | 
					 | 
				
			||||||
            if ((now - start) > timeout) return true;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            // No cancellation request
 | 
					 | 
				
			||||||
            return false;
 | 
					 | 
				
			||||||
        };
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        std::string errMsg;
 | 
					        std::string errMsg;
 | 
				
			||||||
        bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
 | 
					        bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
 | 
				
			||||||
@@ -369,20 +348,8 @@ namespace ix
 | 
				
			|||||||
        SocketConnect::configure(fd);
 | 
					        SocketConnect::configure(fd);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // FIXME: timeout should be configurable
 | 
					        // FIXME: timeout should be configurable
 | 
				
			||||||
        auto start = std::chrono::system_clock::now();
 | 
					        auto isCancellationRequested = 
 | 
				
			||||||
        auto timeout = std::chrono::seconds(3);
 | 
					            makeCancellationRequestWithTimeout(3, _requestInitCancellation);
 | 
				
			||||||
 | 
					 | 
				
			||||||
        auto isCancellationRequested = [this, start, timeout]() -> bool
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            // Was an explicit cancellation requested ?
 | 
					 | 
				
			||||||
            if (_requestInitCancellation) return true;
 | 
					 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            auto now = std::chrono::system_clock::now();
 | 
					 | 
				
			||||||
            if ((now - start) > timeout) return true;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            // No cancellation request
 | 
					 | 
				
			||||||
            return false;
 | 
					 | 
				
			||||||
        };
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        std::string remote = std::string("remote fd ") + std::to_string(fd);
 | 
					        std::string remote = std::string("remote fd ") + std::to_string(fd);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user