Compare commits

..

1 Commits

Author SHA1 Message Date
27dabaaf86 tsan openssl mac ci 2020-03-26 16:38:41 -07:00
13 changed files with 52 additions and 165 deletions

View File

@ -5,21 +5,14 @@ on:
- 'docs/**' - 'docs/**'
jobs: jobs:
linux: # linux:
runs-on: ubuntu-latest # runs-on: ubuntu-latest
steps: # steps:
- uses: actions/checkout@v1 # - uses: actions/checkout@v1
- name: make test # - name: make test
run: make test # run: make test
mac_tsan_sectransport: mac_openssl_tsan:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: make test_tsan
run: make test_tsan
mac_tsan_openssl:
runs-on: macOS-latest runs-on: macOS-latest
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
@ -28,26 +21,24 @@ jobs:
- name: make test - name: make test
run: make test_tsan_openssl run: make test_tsan_openssl
mac_tsan_mbedtls: # tsan:
runs-on: macOS-latest # runs-on: macOS-latest
steps: # steps:
- uses: actions/checkout@v1 # - uses: actions/checkout@v1
- name: install mbedtls # - name: make test_tsan
run: brew install mbedtls # run: make test_tsan
- name: make test
run: make test_tsan_mbedtls
win: # win:
runs-on: windows-latest # runs-on: windows-latest
steps: # steps:
- uses: actions/checkout@v1 # - uses: actions/checkout@v1
- uses: seanmiddleditch/gha-setup-vsdevenv@master # - uses: seanmiddleditch/gha-setup-vsdevenv@master
- run: | # - run: |
mkdir build # mkdir build
cd build # cd build
cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 .. # cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
- run: cmake --build build # - run: cmake --build build
# Running the unittest does not work # # Running the unittest does not work
#- run: ../build/test/ixwebsocket_unittest.exe # #- run: ../build/test/ixwebsocket_unittest.exe
# working-directory: test # # working-directory: test

View File

@ -200,9 +200,7 @@ if (USE_TLS AND USE_MBED_TLS)
endif() endif()
endif() endif()
if (NOT ZLIB_FOUND) find_package(ZLIB)
find_package(ZLIB)
endif()
if (ZLIB_FOUND) if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS}) include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES}) target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})

View File

@ -1,18 +1,6 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.1.2] - 2020-03-26
(mac ssl) rename DarwinSSL -> SecureTransport (see this too -> https://github.com/curl/curl/issues/3733)
## [9.1.1] - 2020-03-26
(websocket) fix data race accessing _socket object without mutex protection when calling wakeUpFromPoll in WebSocketTransport.cpp
## [9.1.0] - 2020-03-26
(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends
## [9.0.3] - 2020-03-24 ## [9.0.3] - 2020-03-24
(ws connect) display statistics about how much time it takes to stop the connection (ws connect) display statistics about how much time it takes to stop the connection

View File

@ -37,7 +37,6 @@ namespace ix
std::atomic<bool> errorSending(false); std::atomic<bool> errorSending(false);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize); QueueManager queueManager(maxQueueSize);
@ -180,7 +179,6 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&fatalCobraError,
&queueManager](ix::CobraConnectionEventType eventType, &queueManager](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -242,21 +240,6 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
// Run forever // Run forever
@ -268,7 +251,6 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (strict && errorSending) break; if (strict && errorSending) break;
if (fatalCobraError) break;
} }
} }
// Run for a duration, used by unittesting now // Run for a duration, used by unittesting now
@ -280,7 +262,6 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (strict && errorSending) break; if (strict && errorSending) break;
if (fatalCobraError) break;
} }
} }
@ -291,15 +272,12 @@ namespace ix
conn.disconnect(); conn.disconnect();
stop = true; stop = true;
// progress thread
t1.join(); t1.join();
// heartbeat thread
if (t2.joinable()) t2.join(); if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// sentry sender thread
t3.join(); t3.join();
return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount; return (strict && errorSending) ? -1 : (int) sentCount;
} }
} // namespace ix } // namespace ix

View File

@ -77,7 +77,6 @@ namespace ix
std::atomic<uint64_t> sentCount(0); std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0); std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize); QueueManager queueManager(maxQueueSize);
@ -89,18 +88,16 @@ namespace ix
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("timer thread done");
}; };
std::thread t1(timer); std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] { auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
std::string state("na"); std::string state("na");
if (!enableHeartbeat) return; if (!enableHeartbeat) return;
while (!stop) while (true)
{ {
std::stringstream ss; std::stringstream ss;
ss << "messages received " << receivedCount; ss << "messages received " << receivedCount;
@ -118,8 +115,6 @@ namespace ix
auto duration = std::chrono::minutes(1); auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("heartbeat thread done");
}; };
std::thread t2(heartbeat); std::thread t2(heartbeat);
@ -147,7 +142,7 @@ namespace ix
std::thread t3(statsdSender); std::thread t3(statsdSender);
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError]( [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount](
ix::CobraConnectionEventType eventType, ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -205,21 +200,6 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
// Run forever // Run forever
@ -229,8 +209,6 @@ namespace ix
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
} }
} }
// Run for a duration, used by unittesting now // Run for a duration, used by unittesting now
@ -240,8 +218,6 @@ namespace ix
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
} }
} }
@ -252,15 +228,12 @@ namespace ix
conn.disconnect(); conn.disconnect();
stop = true; stop = true;
// progress thread
t1.join(); t1.join();
// heartbeat thread
if (t2.joinable()) t2.join(); if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// statsd sender thread
t3.join(); t3.join();
return fatalCobraError ? -1 : (int) sentCount; return (int) sentCount;
} }
} // namespace ix } // namespace ix

View File

@ -166,8 +166,7 @@ namespace ix
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error, invokeErrorCallback("Handshake error", msg->str);
msg->str);
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -177,8 +176,7 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error, invokeErrorCallback("Authentication error", msg->str);
msg->str);
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -193,8 +191,7 @@ namespace ix
} }
else if (action == "rtm/subscribe/error") else if (action == "rtm/subscribe/error")
{ {
invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error, invokeErrorCallback("Subscription error", msg->str);
msg->str);
} }
else if (action == "rtm/unsubscribe/ok") else if (action == "rtm/unsubscribe/ok")
{ {

View File

@ -37,10 +37,7 @@ namespace ix
CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5, CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6, CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7, CobraConnection_EventType_Pong = 7
CobraConnection_EventType_Handshake_Error = 8,
CobraConnection_EventType_Authentication_Error = 9,
CobraConnection_EventType_Subscription_Error = 10
}; };
enum CobraConnectionPublishMode enum CobraConnectionPublishMode

View File

@ -71,7 +71,7 @@ namespace ix
#elif defined(IXWEBSOCKET_USE_OPEN_SSL) #elif defined(IXWEBSOCKET_USE_OPEN_SSL)
ss << " ssl/OpenSSL " << OPENSSL_VERSION_TEXT; ss << " ssl/OpenSSL " << OPENSSL_VERSION_TEXT;
#elif __APPLE__ #elif __APPLE__
ss << " ssl/SecureTransport"; ss << " ssl/DarwinSSL";
#endif #endif
#else #else
ss << " nossl"; ss << " nossl";

View File

@ -625,7 +625,7 @@ namespace ix
// send back the CLOSE frame // send back the CLOSE frame
sendCloseFrame(code, reason); sendCloseFrame(code, reason);
wakeUpFromPoll(Socket::kCloseRequest); _socket->wakeUpFromPoll(Socket::kCloseRequest);
bool remote = true; bool remote = true;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote); closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
@ -845,7 +845,7 @@ namespace ix
// Request to flush the send buffer on the background thread if it isn't empty // Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty()) if (!isSendBufferEmpty())
{ {
wakeUpFromPoll(Socket::kSendRequest); _socket->wakeUpFromPoll(Socket::kSendRequest);
// FIXME: we should have a timeout when sending large messages: see #131 // FIXME: we should have a timeout when sending large messages: see #131
if (_blockingSend && !flushSendBuffer()) if (_blockingSend && !flushSendBuffer())
@ -1063,12 +1063,6 @@ namespace ix
_socket->close(); _socket->close();
} }
bool WebSocketTransport::wakeUpFromPoll(uint64_t wakeUpCode)
{
std::lock_guard<std::mutex> lock(_socketMutex);
return _socket->wakeUpFromPoll(wakeUpCode);
}
void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code, void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code,
const std::string& reason, const std::string& reason,
size_t closeWireSize, size_t closeWireSize,
@ -1116,9 +1110,8 @@ namespace ix
setReadyState(ReadyState::CLOSING); setReadyState(ReadyState::CLOSING);
sendCloseFrame(code, reason); sendCloseFrame(code, reason);
// wake up the poll, but do not close yet // wake up the poll, but do not close yet
wakeUpFromPoll(Socket::kSendRequest); _socket->wakeUpFromPoll(Socket::kSendRequest);
} }
size_t WebSocketTransport::bufferedAmount() const size_t WebSocketTransport::bufferedAmount() const

View File

@ -229,8 +229,6 @@ namespace ix
size_t closeWireSize, size_t closeWireSize,
bool remote); bool remote);
bool wakeUpFromPoll(uint64_t wakeUpCode);
bool flushSendBuffer(); bool flushSendBuffer();
bool sendOnSocket(); bool sendOnSocket();
bool receiveFromSocket(); bool receiveFromSocket();

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.1.2" #define IX_WEBSOCKET_VERSION "9.0.3"

View File

@ -110,11 +110,6 @@ test_tsan_openssl:
(cd build/test ; ln -sf Debug/ixwebsocket_unittest) (cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)
test_tsan_mbedtls:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_MBED_TLS=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_openssl: test_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)

View File

@ -55,10 +55,8 @@ namespace ix
std::atomic<int> msgPerSeconds(0); std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0); std::atomic<int> msgCount(0);
std::atomic<bool> fatalCobraError(false); auto timer = [&msgPerSeconds, &msgCount] {
while (true)
auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] {
while (!fatalCobraError)
{ {
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
@ -80,12 +78,11 @@ namespace ix
&msgCount, &msgCount,
&msgPerSeconds, &msgPerSeconds,
&quiet, &quiet,
&fluentd, &fluentd](ix::CobraConnectionEventType eventType,
&fatalCobraError](ix::CobraConnectionEventType eventType, const std::string& errMsg,
const std::string& errMsg, const ix::WebSocketHttpHeaders& headers,
const ix::WebSocketHttpHeaders& headers, const std::string& subscriptionId,
const std::string& subscriptionId, CobraConnection::MsgId msgId) {
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
spdlog::info("Subscriber connected"); spdlog::info("Subscriber connected");
@ -140,32 +137,14 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
while (!fatalCobraError) while (true)
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
conn.disconnect(); return 0;
t.join();
return fatalCobraError ? 1 : 0;
} }
} // namespace ix } // namespace ix