Compare commits
6 Commits
feature/ts
...
v9.1.2
Author | SHA1 | Date | |
---|---|---|---|
771ebb2a4c | |||
0fffb1e894 | |||
18164c0c38 | |||
d2db7310ff | |||
09e4584fc8 | |||
da36856d85 |
61
.github/workflows/ccpp.yml
vendored
61
.github/workflows/ccpp.yml
vendored
@ -5,14 +5,21 @@ on:
|
|||||||
- 'docs/**'
|
- 'docs/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
# linux:
|
linux:
|
||||||
# runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
# steps:
|
steps:
|
||||||
# - uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
# - name: make test
|
- name: make test
|
||||||
# run: make test
|
run: make test
|
||||||
|
|
||||||
mac_openssl_tsan:
|
mac_tsan_sectransport:
|
||||||
|
runs-on: macOS-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v1
|
||||||
|
- name: make test_tsan
|
||||||
|
run: make test_tsan
|
||||||
|
|
||||||
|
mac_tsan_openssl:
|
||||||
runs-on: macOS-latest
|
runs-on: macOS-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
@ -21,24 +28,26 @@ jobs:
|
|||||||
- name: make test
|
- name: make test
|
||||||
run: make test_tsan_openssl
|
run: make test_tsan_openssl
|
||||||
|
|
||||||
# tsan:
|
mac_tsan_mbedtls:
|
||||||
# runs-on: macOS-latest
|
runs-on: macOS-latest
|
||||||
# steps:
|
steps:
|
||||||
# - uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
# - name: make test_tsan
|
- name: install mbedtls
|
||||||
# run: make test_tsan
|
run: brew install mbedtls
|
||||||
|
- name: make test
|
||||||
|
run: make test_tsan_mbedtls
|
||||||
|
|
||||||
# win:
|
win:
|
||||||
# runs-on: windows-latest
|
runs-on: windows-latest
|
||||||
# steps:
|
steps:
|
||||||
# - uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
# - uses: seanmiddleditch/gha-setup-vsdevenv@master
|
- uses: seanmiddleditch/gha-setup-vsdevenv@master
|
||||||
# - run: |
|
- run: |
|
||||||
# mkdir build
|
mkdir build
|
||||||
# cd build
|
cd build
|
||||||
# cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
|
cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
|
||||||
# - run: cmake --build build
|
- run: cmake --build build
|
||||||
|
|
||||||
# # Running the unittest does not work
|
# Running the unittest does not work
|
||||||
# #- run: ../build/test/ixwebsocket_unittest.exe
|
#- run: ../build/test/ixwebsocket_unittest.exe
|
||||||
# # working-directory: test
|
# working-directory: test
|
||||||
|
@ -200,7 +200,9 @@ if (USE_TLS AND USE_MBED_TLS)
|
|||||||
endif()
|
endif()
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
find_package(ZLIB)
|
if (NOT ZLIB_FOUND)
|
||||||
|
find_package(ZLIB)
|
||||||
|
endif()
|
||||||
if (ZLIB_FOUND)
|
if (ZLIB_FOUND)
|
||||||
include_directories(${ZLIB_INCLUDE_DIRS})
|
include_directories(${ZLIB_INCLUDE_DIRS})
|
||||||
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
|
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
|
||||||
|
@ -1,6 +1,18 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
All changes to this project will be documented in this file.
|
All changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [9.1.2] - 2020-03-26
|
||||||
|
|
||||||
|
(mac ssl) rename DarwinSSL -> SecureTransport (see this too -> https://github.com/curl/curl/issues/3733)
|
||||||
|
|
||||||
|
## [9.1.1] - 2020-03-26
|
||||||
|
|
||||||
|
(websocket) fix data race accessing _socket object without mutex protection when calling wakeUpFromPoll in WebSocketTransport.cpp
|
||||||
|
|
||||||
|
## [9.1.0] - 2020-03-26
|
||||||
|
|
||||||
|
(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends
|
||||||
|
|
||||||
## [9.0.3] - 2020-03-24
|
## [9.0.3] - 2020-03-24
|
||||||
|
|
||||||
(ws connect) display statistics about how much time it takes to stop the connection
|
(ws connect) display statistics about how much time it takes to stop the connection
|
||||||
|
@ -37,6 +37,7 @@ namespace ix
|
|||||||
std::atomic<bool> errorSending(false);
|
std::atomic<bool> errorSending(false);
|
||||||
std::atomic<bool> stop(false);
|
std::atomic<bool> stop(false);
|
||||||
std::atomic<bool> throttled(false);
|
std::atomic<bool> throttled(false);
|
||||||
|
std::atomic<bool> fatalCobraError(false);
|
||||||
|
|
||||||
QueueManager queueManager(maxQueueSize);
|
QueueManager queueManager(maxQueueSize);
|
||||||
|
|
||||||
@ -179,6 +180,7 @@ namespace ix
|
|||||||
verbose,
|
verbose,
|
||||||
&throttled,
|
&throttled,
|
||||||
&receivedCount,
|
&receivedCount,
|
||||||
|
&fatalCobraError,
|
||||||
&queueManager](ix::CobraConnectionEventType eventType,
|
&queueManager](ix::CobraConnectionEventType eventType,
|
||||||
const std::string& errMsg,
|
const std::string& errMsg,
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
const ix::WebSocketHttpHeaders& headers,
|
||||||
@ -240,6 +242,21 @@ namespace ix
|
|||||||
{
|
{
|
||||||
spdlog::info("Received websocket pong");
|
spdlog::info("Received websocket pong");
|
||||||
}
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Handshake error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Authentication error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Subscription error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Run forever
|
// Run forever
|
||||||
@ -251,6 +268,7 @@ namespace ix
|
|||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
|
|
||||||
if (strict && errorSending) break;
|
if (strict && errorSending) break;
|
||||||
|
if (fatalCobraError) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Run for a duration, used by unittesting now
|
// Run for a duration, used by unittesting now
|
||||||
@ -262,6 +280,7 @@ namespace ix
|
|||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
|
|
||||||
if (strict && errorSending) break;
|
if (strict && errorSending) break;
|
||||||
|
if (fatalCobraError) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,12 +291,15 @@ namespace ix
|
|||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
stop = true;
|
stop = true;
|
||||||
|
|
||||||
|
// progress thread
|
||||||
t1.join();
|
t1.join();
|
||||||
if (t2.joinable()) t2.join();
|
|
||||||
spdlog::info("heartbeat thread done");
|
|
||||||
|
|
||||||
|
// heartbeat thread
|
||||||
|
if (t2.joinable()) t2.join();
|
||||||
|
|
||||||
|
// sentry sender thread
|
||||||
t3.join();
|
t3.join();
|
||||||
|
|
||||||
return (strict && errorSending) ? -1 : (int) sentCount;
|
return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount;
|
||||||
}
|
}
|
||||||
} // namespace ix
|
} // namespace ix
|
||||||
|
@ -77,6 +77,7 @@ namespace ix
|
|||||||
std::atomic<uint64_t> sentCount(0);
|
std::atomic<uint64_t> sentCount(0);
|
||||||
std::atomic<uint64_t> receivedCount(0);
|
std::atomic<uint64_t> receivedCount(0);
|
||||||
std::atomic<bool> stop(false);
|
std::atomic<bool> stop(false);
|
||||||
|
std::atomic<bool> fatalCobraError(false);
|
||||||
|
|
||||||
QueueManager queueManager(maxQueueSize);
|
QueueManager queueManager(maxQueueSize);
|
||||||
|
|
||||||
@ -88,16 +89,18 @@ namespace ix
|
|||||||
auto duration = std::chrono::seconds(1);
|
auto duration = std::chrono::seconds(1);
|
||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
spdlog::info("timer thread done");
|
||||||
};
|
};
|
||||||
|
|
||||||
std::thread t1(timer);
|
std::thread t1(timer);
|
||||||
|
|
||||||
auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
|
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
|
||||||
std::string state("na");
|
std::string state("na");
|
||||||
|
|
||||||
if (!enableHeartbeat) return;
|
if (!enableHeartbeat) return;
|
||||||
|
|
||||||
while (true)
|
while (!stop)
|
||||||
{
|
{
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << "messages received " << receivedCount;
|
ss << "messages received " << receivedCount;
|
||||||
@ -115,6 +118,8 @@ namespace ix
|
|||||||
auto duration = std::chrono::minutes(1);
|
auto duration = std::chrono::minutes(1);
|
||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
spdlog::info("heartbeat thread done");
|
||||||
};
|
};
|
||||||
|
|
||||||
std::thread t2(heartbeat);
|
std::thread t2(heartbeat);
|
||||||
@ -142,7 +147,7 @@ namespace ix
|
|||||||
std::thread t3(statsdSender);
|
std::thread t3(statsdSender);
|
||||||
|
|
||||||
conn.setEventCallback(
|
conn.setEventCallback(
|
||||||
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount](
|
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](
|
||||||
ix::CobraConnectionEventType eventType,
|
ix::CobraConnectionEventType eventType,
|
||||||
const std::string& errMsg,
|
const std::string& errMsg,
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
const ix::WebSocketHttpHeaders& headers,
|
||||||
@ -200,6 +205,21 @@ namespace ix
|
|||||||
{
|
{
|
||||||
spdlog::info("Received websocket pong");
|
spdlog::info("Received websocket pong");
|
||||||
}
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Handshake error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Authentication error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Subscription error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Run forever
|
// Run forever
|
||||||
@ -209,6 +229,8 @@ namespace ix
|
|||||||
{
|
{
|
||||||
auto duration = std::chrono::seconds(1);
|
auto duration = std::chrono::seconds(1);
|
||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
|
|
||||||
|
if (fatalCobraError) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Run for a duration, used by unittesting now
|
// Run for a duration, used by unittesting now
|
||||||
@ -218,6 +240,8 @@ namespace ix
|
|||||||
{
|
{
|
||||||
auto duration = std::chrono::seconds(1);
|
auto duration = std::chrono::seconds(1);
|
||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
|
|
||||||
|
if (fatalCobraError) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,12 +252,15 @@ namespace ix
|
|||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
stop = true;
|
stop = true;
|
||||||
|
|
||||||
|
// progress thread
|
||||||
t1.join();
|
t1.join();
|
||||||
if (t2.joinable()) t2.join();
|
|
||||||
spdlog::info("heartbeat thread done");
|
|
||||||
|
|
||||||
|
// heartbeat thread
|
||||||
|
if (t2.joinable()) t2.join();
|
||||||
|
|
||||||
|
// statsd sender thread
|
||||||
t3.join();
|
t3.join();
|
||||||
|
|
||||||
return (int) sentCount;
|
return fatalCobraError ? -1 : (int) sentCount;
|
||||||
}
|
}
|
||||||
} // namespace ix
|
} // namespace ix
|
||||||
|
@ -166,7 +166,8 @@ namespace ix
|
|||||||
}
|
}
|
||||||
else if (action == "auth/handshake/error")
|
else if (action == "auth/handshake/error")
|
||||||
{
|
{
|
||||||
invokeErrorCallback("Handshake error", msg->str);
|
invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error,
|
||||||
|
msg->str);
|
||||||
}
|
}
|
||||||
else if (action == "auth/authenticate/ok")
|
else if (action == "auth/authenticate/ok")
|
||||||
{
|
{
|
||||||
@ -176,7 +177,8 @@ namespace ix
|
|||||||
}
|
}
|
||||||
else if (action == "auth/authenticate/error")
|
else if (action == "auth/authenticate/error")
|
||||||
{
|
{
|
||||||
invokeErrorCallback("Authentication error", msg->str);
|
invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error,
|
||||||
|
msg->str);
|
||||||
}
|
}
|
||||||
else if (action == "rtm/subscription/data")
|
else if (action == "rtm/subscription/data")
|
||||||
{
|
{
|
||||||
@ -191,7 +193,8 @@ namespace ix
|
|||||||
}
|
}
|
||||||
else if (action == "rtm/subscribe/error")
|
else if (action == "rtm/subscribe/error")
|
||||||
{
|
{
|
||||||
invokeErrorCallback("Subscription error", msg->str);
|
invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error,
|
||||||
|
msg->str);
|
||||||
}
|
}
|
||||||
else if (action == "rtm/unsubscribe/ok")
|
else if (action == "rtm/unsubscribe/ok")
|
||||||
{
|
{
|
||||||
|
@ -37,7 +37,10 @@ namespace ix
|
|||||||
CobraConnection_EventType_Subscribed = 4,
|
CobraConnection_EventType_Subscribed = 4,
|
||||||
CobraConnection_EventType_UnSubscribed = 5,
|
CobraConnection_EventType_UnSubscribed = 5,
|
||||||
CobraConnection_EventType_Published = 6,
|
CobraConnection_EventType_Published = 6,
|
||||||
CobraConnection_EventType_Pong = 7
|
CobraConnection_EventType_Pong = 7,
|
||||||
|
CobraConnection_EventType_Handshake_Error = 8,
|
||||||
|
CobraConnection_EventType_Authentication_Error = 9,
|
||||||
|
CobraConnection_EventType_Subscription_Error = 10
|
||||||
};
|
};
|
||||||
|
|
||||||
enum CobraConnectionPublishMode
|
enum CobraConnectionPublishMode
|
||||||
|
@ -71,7 +71,7 @@ namespace ix
|
|||||||
#elif defined(IXWEBSOCKET_USE_OPEN_SSL)
|
#elif defined(IXWEBSOCKET_USE_OPEN_SSL)
|
||||||
ss << " ssl/OpenSSL " << OPENSSL_VERSION_TEXT;
|
ss << " ssl/OpenSSL " << OPENSSL_VERSION_TEXT;
|
||||||
#elif __APPLE__
|
#elif __APPLE__
|
||||||
ss << " ssl/DarwinSSL";
|
ss << " ssl/SecureTransport";
|
||||||
#endif
|
#endif
|
||||||
#else
|
#else
|
||||||
ss << " nossl";
|
ss << " nossl";
|
||||||
|
@ -625,7 +625,7 @@ namespace ix
|
|||||||
// send back the CLOSE frame
|
// send back the CLOSE frame
|
||||||
sendCloseFrame(code, reason);
|
sendCloseFrame(code, reason);
|
||||||
|
|
||||||
_socket->wakeUpFromPoll(Socket::kCloseRequest);
|
wakeUpFromPoll(Socket::kCloseRequest);
|
||||||
|
|
||||||
bool remote = true;
|
bool remote = true;
|
||||||
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
|
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
|
||||||
@ -845,7 +845,7 @@ namespace ix
|
|||||||
// Request to flush the send buffer on the background thread if it isn't empty
|
// Request to flush the send buffer on the background thread if it isn't empty
|
||||||
if (!isSendBufferEmpty())
|
if (!isSendBufferEmpty())
|
||||||
{
|
{
|
||||||
_socket->wakeUpFromPoll(Socket::kSendRequest);
|
wakeUpFromPoll(Socket::kSendRequest);
|
||||||
|
|
||||||
// FIXME: we should have a timeout when sending large messages: see #131
|
// FIXME: we should have a timeout when sending large messages: see #131
|
||||||
if (_blockingSend && !flushSendBuffer())
|
if (_blockingSend && !flushSendBuffer())
|
||||||
@ -1063,6 +1063,12 @@ namespace ix
|
|||||||
_socket->close();
|
_socket->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool WebSocketTransport::wakeUpFromPoll(uint64_t wakeUpCode)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(_socketMutex);
|
||||||
|
return _socket->wakeUpFromPoll(wakeUpCode);
|
||||||
|
}
|
||||||
|
|
||||||
void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code,
|
void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code,
|
||||||
const std::string& reason,
|
const std::string& reason,
|
||||||
size_t closeWireSize,
|
size_t closeWireSize,
|
||||||
@ -1110,8 +1116,9 @@ namespace ix
|
|||||||
setReadyState(ReadyState::CLOSING);
|
setReadyState(ReadyState::CLOSING);
|
||||||
|
|
||||||
sendCloseFrame(code, reason);
|
sendCloseFrame(code, reason);
|
||||||
|
|
||||||
// wake up the poll, but do not close yet
|
// wake up the poll, but do not close yet
|
||||||
_socket->wakeUpFromPoll(Socket::kSendRequest);
|
wakeUpFromPoll(Socket::kSendRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t WebSocketTransport::bufferedAmount() const
|
size_t WebSocketTransport::bufferedAmount() const
|
||||||
|
@ -229,6 +229,8 @@ namespace ix
|
|||||||
size_t closeWireSize,
|
size_t closeWireSize,
|
||||||
bool remote);
|
bool remote);
|
||||||
|
|
||||||
|
bool wakeUpFromPoll(uint64_t wakeUpCode);
|
||||||
|
|
||||||
bool flushSendBuffer();
|
bool flushSendBuffer();
|
||||||
bool sendOnSocket();
|
bool sendOnSocket();
|
||||||
bool receiveFromSocket();
|
bool receiveFromSocket();
|
||||||
|
@ -6,4 +6,4 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#define IX_WEBSOCKET_VERSION "9.0.3"
|
#define IX_WEBSOCKET_VERSION "9.1.2"
|
||||||
|
5
makefile
5
makefile
@ -110,6 +110,11 @@ test_tsan_openssl:
|
|||||||
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
|
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
|
||||||
(cd test ; python2.7 run.py -r)
|
(cd test ; python2.7 run.py -r)
|
||||||
|
|
||||||
|
test_tsan_mbedtls:
|
||||||
|
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_MBED_TLS=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)
|
||||||
|
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
|
||||||
|
(cd test ; python2.7 run.py -r)
|
||||||
|
|
||||||
test_openssl:
|
test_openssl:
|
||||||
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
|
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
|
||||||
(cd test ; python2.7 run.py -r)
|
(cd test ; python2.7 run.py -r)
|
||||||
|
@ -55,8 +55,10 @@ namespace ix
|
|||||||
std::atomic<int> msgPerSeconds(0);
|
std::atomic<int> msgPerSeconds(0);
|
||||||
std::atomic<int> msgCount(0);
|
std::atomic<int> msgCount(0);
|
||||||
|
|
||||||
auto timer = [&msgPerSeconds, &msgCount] {
|
std::atomic<bool> fatalCobraError(false);
|
||||||
while (true)
|
|
||||||
|
auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] {
|
||||||
|
while (!fatalCobraError)
|
||||||
{
|
{
|
||||||
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
|
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
|
||||||
|
|
||||||
@ -78,11 +80,12 @@ namespace ix
|
|||||||
&msgCount,
|
&msgCount,
|
||||||
&msgPerSeconds,
|
&msgPerSeconds,
|
||||||
&quiet,
|
&quiet,
|
||||||
&fluentd](ix::CobraConnectionEventType eventType,
|
&fluentd,
|
||||||
const std::string& errMsg,
|
&fatalCobraError](ix::CobraConnectionEventType eventType,
|
||||||
const ix::WebSocketHttpHeaders& headers,
|
const std::string& errMsg,
|
||||||
const std::string& subscriptionId,
|
const ix::WebSocketHttpHeaders& headers,
|
||||||
CobraConnection::MsgId msgId) {
|
const std::string& subscriptionId,
|
||||||
|
CobraConnection::MsgId msgId) {
|
||||||
if (eventType == ix::CobraConnection_EventType_Open)
|
if (eventType == ix::CobraConnection_EventType_Open)
|
||||||
{
|
{
|
||||||
spdlog::info("Subscriber connected");
|
spdlog::info("Subscriber connected");
|
||||||
@ -137,14 +140,32 @@ namespace ix
|
|||||||
{
|
{
|
||||||
spdlog::info("Received websocket pong");
|
spdlog::info("Received websocket pong");
|
||||||
}
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Handshake error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Authentication error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
|
||||||
|
{
|
||||||
|
spdlog::error("Subscriber: Subscription error: {}", errMsg);
|
||||||
|
fatalCobraError = true;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
while (true)
|
while (!fatalCobraError)
|
||||||
{
|
{
|
||||||
auto duration = std::chrono::seconds(1);
|
auto duration = std::chrono::seconds(1);
|
||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
conn.disconnect();
|
||||||
|
t.join();
|
||||||
|
|
||||||
|
return fatalCobraError ? 1 : 0;
|
||||||
}
|
}
|
||||||
} // namespace ix
|
} // namespace ix
|
||||||
|
Reference in New Issue
Block a user