Fix data races in DNSLookup (tsan)
This commit is contained in:
parent
26ee46b246
commit
a443bbdf80
@ -17,6 +17,7 @@ namespace ix
|
|||||||
std::atomic<uint64_t> DNSLookup::_nextId(0);
|
std::atomic<uint64_t> DNSLookup::_nextId(0);
|
||||||
std::set<uint64_t> DNSLookup::_activeJobs;
|
std::set<uint64_t> DNSLookup::_activeJobs;
|
||||||
std::mutex DNSLookup::_activeJobsMutex;
|
std::mutex DNSLookup::_activeJobsMutex;
|
||||||
|
std::mutex DNSLookup::_resMutex;
|
||||||
|
|
||||||
DNSLookup::DNSLookup(const std::string& hostname, int port, int64_t wait) :
|
DNSLookup::DNSLookup(const std::string& hostname, int port, int64_t wait) :
|
||||||
_hostname(hostname),
|
_hostname(hostname),
|
||||||
@ -36,7 +37,8 @@ namespace ix
|
|||||||
_activeJobs.erase(_id);
|
_activeJobs.erase(_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
|
// we want hostname to be copied, not passed as a const reference
|
||||||
|
struct addrinfo* DNSLookup::getAddrInfo(std::string hostname,
|
||||||
int port,
|
int port,
|
||||||
std::string& errMsg)
|
std::string& errMsg)
|
||||||
{
|
{
|
||||||
@ -135,6 +137,7 @@ namespace ix
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> rlock(_resMutex);
|
||||||
return _res;
|
return _res;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,7 +159,10 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy result into the member variables
|
// Copy result into the member variables
|
||||||
_res = res;
|
{
|
||||||
|
std::unique_lock<std::mutex> rlock(_resMutex);
|
||||||
|
_res = res;
|
||||||
|
}
|
||||||
_errMsg = errMsg;
|
_errMsg = errMsg;
|
||||||
_condition.notify_one();
|
_condition.notify_one();
|
||||||
_done = true;
|
_done = true;
|
||||||
|
@ -39,7 +39,7 @@ namespace ix
|
|||||||
struct addrinfo* resolveBlocking(std::string& errMsg,
|
struct addrinfo* resolveBlocking(std::string& errMsg,
|
||||||
const CancellationRequest& isCancellationRequested);
|
const CancellationRequest& isCancellationRequested);
|
||||||
|
|
||||||
static struct addrinfo* getAddrInfo(const std::string& hostname,
|
static struct addrinfo* getAddrInfo(std::string hostname,
|
||||||
int port,
|
int port,
|
||||||
std::string& errMsg);
|
std::string& errMsg);
|
||||||
|
|
||||||
@ -50,6 +50,7 @@ namespace ix
|
|||||||
int64_t _wait;
|
int64_t _wait;
|
||||||
std::string _errMsg;
|
std::string _errMsg;
|
||||||
struct addrinfo* _res;
|
struct addrinfo* _res;
|
||||||
|
static std::mutex _resMutex;
|
||||||
|
|
||||||
std::atomic<bool> _done;
|
std::atomic<bool> _done;
|
||||||
std::thread _thread;
|
std::thread _thread;
|
||||||
|
@ -144,8 +144,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
closeTerminatedThreads();
|
if (closeTerminatedThreads()) break;
|
||||||
if (_connectionsThreads.empty()) break;
|
|
||||||
|
|
||||||
// wait 10ms and try again later.
|
// wait 10ms and try again later.
|
||||||
// we could have a timeout, but if we exit of here
|
// we could have a timeout, but if we exit of here
|
||||||
@ -176,8 +175,9 @@ namespace ix
|
|||||||
// field becomes true, and we can use that to know that we can join that thread
|
// field becomes true, and we can use that to know that we can join that thread
|
||||||
// and remove it from our _connectionsThreads data structure (a list).
|
// and remove it from our _connectionsThreads data structure (a list).
|
||||||
//
|
//
|
||||||
void SocketServer::closeTerminatedThreads()
|
bool SocketServer::closeTerminatedThreads()
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
|
||||||
auto it = _connectionsThreads.begin();
|
auto it = _connectionsThreads.begin();
|
||||||
auto itEnd = _connectionsThreads.end();
|
auto itEnd = _connectionsThreads.end();
|
||||||
|
|
||||||
@ -195,6 +195,8 @@ namespace ix
|
|||||||
if (thread.joinable()) thread.join();
|
if (thread.joinable()) thread.join();
|
||||||
it = _connectionsThreads.erase(it);
|
it = _connectionsThreads.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return _connectionsThreads.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SocketServer::run()
|
void SocketServer::run()
|
||||||
@ -278,6 +280,7 @@ namespace ix
|
|||||||
if (_stop) return;
|
if (_stop) return;
|
||||||
|
|
||||||
// Launch the handleConnection work asynchronously in its own thread.
|
// Launch the handleConnection work asynchronously in its own thread.
|
||||||
|
std::lock_guard<std::mutex> lock(_conditionVariableMutex);
|
||||||
_connectionsThreads.push_back(std::make_pair(
|
_connectionsThreads.push_back(std::make_pair(
|
||||||
connectionState,
|
connectionState,
|
||||||
std::thread(&SocketServer::handleConnection,
|
std::thread(&SocketServer::handleConnection,
|
||||||
|
@ -77,6 +77,7 @@ namespace ix
|
|||||||
|
|
||||||
// the list of (connectionState, threads) for each connections
|
// the list of (connectionState, threads) for each connections
|
||||||
ConnectionThreads _connectionsThreads;
|
ConnectionThreads _connectionsThreads;
|
||||||
|
std::mutex _connectionsThreadsMutex;
|
||||||
|
|
||||||
// used to have the main control thread for a server
|
// used to have the main control thread for a server
|
||||||
// wait for a 'terminate' notification without busy polling
|
// wait for a 'terminate' notification without busy polling
|
||||||
@ -92,6 +93,7 @@ namespace ix
|
|||||||
std::shared_ptr<ConnectionState> connectionState) = 0;
|
std::shared_ptr<ConnectionState> connectionState) = 0;
|
||||||
virtual size_t getConnectedClientsCount() = 0;
|
virtual size_t getConnectedClientsCount() = 0;
|
||||||
|
|
||||||
void closeTerminatedThreads();
|
// Returns true if all connection threads are joined
|
||||||
|
bool closeTerminatedThreads();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user