Fix 2 race conditions detected with TSan, one in CobraMetricsPublisher::push and another one in WebSocketTransport::sendData (that one was bad).

This commit is contained in:
Benjamin Sergeant 2019-09-24 11:46:54 -07:00
parent 490fbf4cb5
commit 94c589f696
8 changed files with 45 additions and 3 deletions

View File

@ -1 +1 @@
6.2.5
6.2.6

View File

@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.
## [6.2.5] - 2019-09-24
- Fix 2 race conditions detected with TSan, one in CobraMetricsPublisher::push and another one in WebSocketTransport::sendData (that one was bad).
## [6.2.5] - 2019-09-23
- Add simple Redis Server which is only capable of doing publish / subscribe. New ws redis_server sub-command to use it. The server is used in the unittest, so that we can run on CI in environment where redis isn not available like github actions env.

View File

@ -462,6 +462,8 @@ namespace ix
const Json::Value& msg,
bool addToQueue)
{
std::lock_guard<std::mutex> lock(_prePublishMutex);
invokePublishTrackerCallback(true, false);
CobraConnection::MsgId msgId = _id;

View File

@ -181,6 +181,7 @@ namespace ix
Json::Value _pdu;
Json::FastWriter _jsonWriter;
mutable std::mutex _jsonWriterMutex;
std::mutex _prePublishMutex;
/// Traffic tracker callback
static TrafficTrackerCallback _trafficTrackerCallback;

View File

@ -867,7 +867,10 @@ namespace ix
message_end = compressedMessage.end();
}
_txbuf.reserve(wireSize);
{
std::lock_guard<std::mutex> lock(_socketMutex);
_txbuf.reserve(wireSize);
}
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "6.2.5"
#define IX_WEBSOCKET_VERSION "6.2.6"

View File

@ -20,6 +20,9 @@ uninstall:
tag:
git tag v"`cat DOCKER_VERSION`"
xcode:
cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 -GXcode && open ixwebsocket.xcodeproj
.PHONY: docker
NAME := bsergean/ws

View File

@ -139,6 +139,21 @@ namespace
gUniqueMessageIdsCount = gIds.size();
}
// publish 100 messages, during roughly 100ms
// this is used to test thread safety of CobraMetricsPublisher::push
void runAdditionalPublisher(ix::CobraMetricsPublisher* cobraMetricsPublisher)
{
Json::Value data;
data["foo"] = "bar";
for (int i = 0; i < 100; ++i)
{
cobraMetricsPublisher->push("sms_metric_F_id", data);
ix::msleep(1);
}
}
} // namespace
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
@ -252,6 +267,19 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
ix::msleep(500);
// Test multi-threaded publish
std::thread bgPublisher1(&runAdditionalPublisher, &cobraMetricsPublisher);
std::thread bgPublisher2(&runAdditionalPublisher, &cobraMetricsPublisher);
std::thread bgPublisher3(&runAdditionalPublisher, &cobraMetricsPublisher);
std::thread bgPublisher4(&runAdditionalPublisher, &cobraMetricsPublisher);
std::thread bgPublisher5(&runAdditionalPublisher, &cobraMetricsPublisher);
bgPublisher1.join();
bgPublisher2.join();
bgPublisher3.join();
bgPublisher4.join();
bgPublisher5.join();
// Now stop the thread
gStop = true;
bgThread.join();
@ -264,6 +292,7 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
CHECK(gIds.count("sms_metric_C_id") == 1);
CHECK(gIds.count("sms_metric_D_id") == 1);
CHECK(gIds.count("sms_metric_E_id") == 1);
CHECK(gIds.count("sms_metric_F_id") == 1);
CHECK(gIds.count("sms_set_rate_control_id") == 1);
CHECK(gIds.count("sms_set_blacklist_id") == 1);