Compare commits

...

30 Commits

Author SHA1 Message Date
2cbe198497 try to run the unittest 2020-04-05 12:39:19 -07:00
f9d75c9374 (windows) when using OpenSSL, the system store is used to populate the cacert. No need to ship a cacert.pem file with your app. 2020-04-04 18:33:01 -07:00
d1cd5e62ac update doc 2020-04-04 17:54:15 -07:00
f3b97097cd (windows) ci: windows build with TLS (mbedtls) + verify that we can be build with OpenSSL 2020-04-04 17:49:52 -07:00
605be72579 use default mkdocs theme 2020-04-04 11:05:14 -07:00
49ff3789b5 mkdocs / use codehilite engine for syntax highlighting 2020-03-31 23:18:47 -07:00
96d61c6e5b doc - add code block highlighting 2020-03-31 20:56:51 -07:00
9a23c5aaac (doc) use c++ instead of cpp to mark a block of C++ code 2020-03-31 20:29:40 -07:00
d81e4d4fc0 setHeartBeatPeriod -> setPingInterval (in doc + disabled unittests) 2020-03-31 18:36:50 -07:00
bd44d32fdb try the material theme for the documentation 2020-03-31 18:32:48 -07:00
b6abc12ecd Add documentation about how to make a pull request to get the latest version of the package in vcpkg (#173) 2020-03-31 15:58:01 -07:00
2268b743ae add broadcasting test where 10 clients exchange messages, to try to trigger threading errors 2020-03-30 22:27:41 -07:00
1d3db5f75b (cobra to statsd bot) add ability to extract a numerical value and send a timer event to statsd, with the --timer option 2020-03-30 16:08:47 -07:00
296762ce06 add a docker deploy makefile target to build docker and push the built container in one shot 2020-03-29 22:08:36 -07:00
e465f7af52 (cobra to statsd bot) bot init was missing + capture socket error 2020-03-29 22:03:27 -07:00
f8bf1fe7cd (cobra to statsd bot) add ability to extract a numerical value and send a gauge event to statsd 2020-03-29 19:32:43 -07:00
cfa5718e40 (ws cobra subscriber) use a Json::StreamWriter to write to std::cout, and save one std::string allocation for each message printed 2020-03-29 15:24:46 -07:00
40c619c1ec (docker) trim down docker image (300M -> 12M) / binary built without symbol and size optimization, and source code not copied over 2020-03-29 13:06:44 -07:00
22b02e0e5c update doc 2020-03-28 10:46:42 -07:00
738a3bf1c5 update bundled jsoncpp to 1.9.3
(still comment the deprecation warning, which we should eventually fix ...)
2020-03-28 10:44:05 -07:00
598fb071e3 have some make target compile in release with debug 2020-03-28 10:33:22 -07:00
686aface26 bump version to 9.1.3 2020-03-28 10:33:05 -07:00
3073dd3f06 alpine docker file installs ca-certificates (for TLS) 2020-03-28 10:32:25 -07:00
68c64f3f69 use alpine as the docker distribution 2020-03-27 17:38:35 -07:00
771ebb2a4c (mac ssl) rename DarwinSSL -> SecureTransport (see this too -> https://github.com/curl/curl/issues/3733) 2020-03-26 19:40:52 -07:00
0fffb1e894 (websocket) fix data race accessing _socket object without mutex protection when calling wakeUpFromPoll in WebSocketTransport.cpp 2020-03-26 19:31:59 -07:00
18164c0c38 New CI builder: Mac + MbedTLS + Thread Sanitizer 2020-03-26 19:16:04 -07:00
d2db7310ff (ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends 2020-03-26 18:54:28 -07:00
09e4584fc8 New CI builder: Mac + OpenSSL + Thread Sanitizer 2020-03-26 16:46:47 -07:00
da36856d85 Only find zlib if it has not already been found. (#169) 2020-03-26 09:39:51 -07:00
34 changed files with 3855 additions and 2301 deletions

View File

@ -12,21 +12,32 @@ jobs:
- name: make test
run: make test
mac:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: make test
run: make test
tsan:
mac_tsan_sectransport:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: make test_tsan
run: make test_tsan
win:
mac_tsan_openssl:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: install openssl
run: brew install openssl
- name: make test
run: make test_tsan_openssl
mac_tsan_mbedtls:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: install mbedtls
run: brew install mbedtls
- name: make test
run: make test_tsan_mbedtls
windows_no_tls:
runs-on: windows-latest
steps:
- uses: actions/checkout@v1
@ -34,9 +45,48 @@ jobs:
- run: |
mkdir build
cd build
cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_TEST=1 ..
- run: cmake --build build
- run: |
cd test
..\build\test\ixwebsocket_unittest.exe
# Running the unittest does not work
# windows_mbedtls:
# runs-on: windows-latest
# steps:
# - uses: actions/checkout@v1
# - uses: seanmiddleditch/gha-setup-vsdevenv@master
# - run: |
# vcpkg install zlib:x64-windows
# vcpkg install mbedtls:x64-windows
# - run: |
# mkdir build
# cd build
# cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_MBED_TLS=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 ..
# - run: cmake --build build
# Running the unittest does not work, the binary cannot be found
#- run: ../build/test/ixwebsocket_unittest.exe
# working-directory: test
#
# Windows with OpenSSL is working but disabled as it takes 13 minutes (10 for openssl) to build with vcpkg
#
# windows_openssl:
# runs-on: windows-latest
# steps:
# - uses: actions/checkout@v1
# - uses: seanmiddleditch/gha-setup-vsdevenv@master
# - run: |
# vcpkg install zlib:x64-windows
# vcpkg install openssl:x64-windows
# - run: |
# mkdir build
# cd build
# cmake -DCMAKE_TOOLCHAIN_FILE=c:/vcpkg/scripts/buildsystems/vcpkg.cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_OPEN_SSL=1 -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 ..
# - run: cmake --build build
#
# # Running the unittest does not work, the binary cannot be found
# #- run: ../build/test/ixwebsocket_unittest.exe
# # working-directory: test

View File

@ -17,6 +17,8 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install mkdocs
pip install mkdocs-material
pip install pygments
- name: Build doc
run: |
git pull

View File

@ -120,13 +120,14 @@ endif()
option(USE_TLS "Enable TLS support" FALSE)
if (USE_TLS)
if (WIN32)
option(USE_MBED_TLS "Use Mbed TLS" ON)
else()
option(USE_MBED_TLS "Use Mbed TLS" OFF)
endif()
option(USE_MBED_TLS "Use Mbed TLS" OFF)
option(USE_OPEN_SSL "Use OpenSSL" OFF)
# default to mbedtls on windows if nothing is configured
if (WIN32 AND NOT USE_OPEN_SSL AND NOT USE_MBED_TLS)
option(USE_MBED_TLS "Use Mbed TLS" ON)
endif()
if (USE_MBED_TLS)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketMbedTLS.h)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketMbedTLS.cpp)
@ -200,7 +201,9 @@ if (USE_TLS AND USE_MBED_TLS)
endif()
endif()
find_package(ZLIB)
if (NOT ZLIB_FOUND)
find_package(ZLIB)
endif()
if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})

View File

@ -1 +1 @@
docker/Dockerfile.centos
docker/Dockerfile.alpine

View File

@ -1,12 +1,13 @@
FROM alpine:3.11 as build
RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev
RUN apk add --no-cache make
RUN apk add --no-cache zlib-dev
RUN apk add --no-cache \
gcc g++ musl-dev linux-headers \
cmake mbedtls-dev make zlib-dev
RUN addgroup -S app && adduser -S -G app app
RUN chown -R app:app /opt
RUN chown -R app:app /usr/local
RUN addgroup -S app && \
adduser -S -G app app && \
chown -R app:app /opt && \
chown -R app:app /usr/local
# There is a bug in CMake where we cannot build from the root top folder
# So we build from /opt
@ -14,22 +15,21 @@ COPY --chown=app:app . /opt
WORKDIR /opt
USER app
RUN [ "make", "ws_install" ]
RUN [ "rm", "-rf", "build" ]
RUN make ws_mbedtls_install && \
sh tools/trim_repo_for_docker.sh
FROM alpine:3.11 as runtime
RUN apk add --no-cache libstdc++
RUN apk add --no-cache strace
RUN apk add --no-cache gdb
RUN apk add --no-cache libstdc++ mbedtls ca-certificates && \
addgroup -S app && \
adduser -S -G app app
RUN addgroup -S app && adduser -S -G app app
COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws
RUN chmod +x /usr/local/bin/ws
RUN ldd /usr/local/bin/ws
# Copy source code for gcc
COPY --chown=app:app --from=build /opt /opt
# COPY --chown=app:app --from=build /opt /opt
RUN chmod +x /usr/local/bin/ws && \
ldd /usr/local/bin/ws
# Now run in usermode
USER app

View File

@ -1,6 +1,54 @@
# Changelog
All changes to this project will be documented in this file.
## [9.2.1] - 2020-04-04
(windows) when using OpenSSL, the system store is used to populate the cacert. No need to ship a cacert.pem file with your app.
## [9.2.0] - 2020-04-04
(windows) ci: windows build with TLS (mbedtls) + verify that we can be build with OpenSSL
## [9.1.9] - 2020-03-30
(cobra to statsd bot) add ability to extract a numerical value and send a timer event to statsd, with the --timer option
## [9.1.8] - 2020-03-29
(cobra to statsd bot) bot init was missing + capture socket error
## [9.1.7] - 2020-03-29
(cobra to statsd bot) add ability to extract a numerical value and send a gauge event to statsd, with the --gauge option
## [9.1.6] - 2020-03-29
(ws cobra subscriber) use a Json::StreamWriter to write to std::cout, and save one std::string allocation for each message printed
## [9.1.5] - 2020-03-29
(docker) trim down docker image (300M -> 12M) / binary built without symbol and size optimization, and source code not copied over
## [9.1.4] - 2020-03-28
(jsoncpp) update bundled copy to version 1.9.3 (at sha 3beb37ea14aec1bdce1a6d542dc464d00f4a6cec)
## [9.1.3] - 2020-03-27
(docker) alpine docker build with release with debug info, and bundle ca-certificates
## [9.1.2] - 2020-03-26
(mac ssl) rename DarwinSSL -> SecureTransport (see this too -> https://github.com/curl/curl/issues/3733)
## [9.1.1] - 2020-03-26
(websocket) fix data race accessing _socket object without mutex protection when calling wakeUpFromPoll in WebSocketTransport.cpp
## [9.1.0] - 2020-03-26
(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends
## [9.0.3] - 2020-03-24
(ws connect) display statistics about how much time it takes to stop the connection

View File

@ -8,6 +8,8 @@ The per message deflate compression option is supported. It can lead to very nic
Connections can be optionally secured and encrypted with TLS/SSL when using a wss:// endpoint, or using normal un-encrypted socket with ws:// endpoints. AppleSSL is used on iOS and macOS, OpenSSL and mbedTLS can be used on Android, Linux and Windows.
If you are using OpenSSL, try to be on a version higher than 1.1.x as there there are thread safety problems with 1.0.x.
### Polling and background thread work
No manual polling to fetch data is required. Data is sent and received instantly by using a background thread for receiving data and the select [system](http://man7.org/linux/man-pages/man2/select.2.html) call to be notified by the OS of incoming data. No timeout is used for select so that the background thread is only woken up when data is available, to optimize battery life. This is also the recommended way of using select according to the select tutorial, section [select law](https://linux.die.net/man/2/select_tut). Read and Writes to the socket are non blocking. Data is sent right away and not enqueued by writing directly to the socket, which is [possible](https://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid) since system socket implementations allow concurrent read/writes. However concurrent writes need to be protected with mutex.
@ -26,7 +28,13 @@ The library has an interactive tool which is handy for testing compatibility ith
The unittest tries to be comprehensive, and has been running on multiple platforms, with different sanitizers such as a thread sanitizer to catch data races or the undefined behavior sanitizer.
The regression test is running after each commit on travis.
The regression test is running after each commit on github actions for multiple configurations.
* Linux
* macOS with thread sanitizer
* macOS, with OpenSSL, with thread sanitizer
* macOS, with MbedTLS, with thread sanitizer
* Windows, with MbedTLS (the unittest is not run yet)
## Limitations

View File

@ -13,7 +13,7 @@
## Example code
```cpp
```c++
// Required on Windows
ix::initNetSystem();

94
docs/packages.md Normal file
View File

@ -0,0 +1,94 @@
Notes on how we can update the different packages for ixwebsocket.
## VCPKG
Visit the [releases](https://github.com/machinezone/IXWebSocket/releases) page on Github. A tag must have been made first.
Download the latest entry.
```
$ cd /tmp
/tmp$ curl -s -O -L https://github.com/machinezone/IXWebSocket/archive/v9.1.9.tar.gz
/tmp$
/tmp$ openssl sha512 v9.1.9.tar.gz
SHA512(v9.1.9.tar.gz)= f1fd731b5f6a9ce6d6d10bee22a5d9d9baaa8ea0564d6c4cd7eb91dcb88a45c49b2c7fdb75f8640a3589c1b30cee33ef5df8dcbb55920d013394d1e33ddd3c8e
```
Now go punch those values in the vcpkg ixwebsocket port config files. Here is what the diff look like.
```
vcpkg$ git diff
diff --git a/ports/ixwebsocket/CONTROL b/ports/ixwebsocket/CONTROL
index db9c2adc9..4acae5c3f 100644
--- a/ports/ixwebsocket/CONTROL
+++ b/ports/ixwebsocket/CONTROL
@@ -1,5 +1,5 @@
Source: ixwebsocket
-Version: 8.0.5
+Version: 9.1.9
Build-Depends: zlib
Homepage: https://github.com/machinezone/IXWebSocket
Description: Lightweight WebSocket Client and Server + HTTP Client and Server
diff --git a/ports/ixwebsocket/portfile.cmake b/ports/ixwebsocket/portfile.cmake
index de082aece..68e523a05 100644
--- a/ports/ixwebsocket/portfile.cmake
+++ b/ports/ixwebsocket/portfile.cmake
@@ -1,8 +1,8 @@
vcpkg_from_github(
OUT_SOURCE_PATH SOURCE_PATH
REPO machinezone/IXWebSocket
- REF v8.0.5
- SHA512 9dcc20d9a0629b92c62a68a8bd7c8206f18dbd9e93289b0b687ec13c478ce9ad1f3563b38c399c8277b0d3812cc78ca725786ba1dedbc3445b9bdb9b689e8add
+ REF v9.1.9
+ SHA512 f1fd731b5f6a9ce6d6d10bee22a5d9d9baaa8ea0564d6c4cd7eb91dcb88a45c49b2c7fdb75f8640a3589c1b30cee33ef5df8dcbb55920d013394d1e33ddd3c8e
)
```
You will need a fork of the vcpkg repo to make a pull request.
```
git fetch upstream
git co master
git reset --hard upstream/master
git push origin master --force
```
Make the pull request (I use a new branch to do that).
```
vcpkg$ git co -b feature/ixwebsocket_9.1.9
M ports/ixwebsocket/CONTROL
M ports/ixwebsocket/portfile.cmake
Switched to a new branch 'feature/ixwebsocket_9.1.9'
vcpkg$
vcpkg$
vcpkg$ git commit -am 'ixwebsocket: update to 9.1.9'
[feature/ixwebsocket_9.1.9 8587a4881] ixwebsocket: update to 9.1.9
2 files changed, 3 insertions(+), 3 deletions(-)
vcpkg$
vcpkg$ git push
fatal: The current branch feature/ixwebsocket_9.1.9 has no upstream branch.
To push the current branch and set the remote as upstream, use
git push --set-upstream origin feature/ixwebsocket_9.1.9
vcpkg$ git push --set-upstream origin feature/ixwebsocket_9.1.9
Enumerating objects: 11, done.
Counting objects: 100% (11/11), done.
Delta compression using up to 8 threads
Compressing objects: 100% (6/6), done.
Writing objects: 100% (6/6), 621 bytes | 621.00 KiB/s, done.
Total 6 (delta 4), reused 0 (delta 0)
remote: Resolving deltas: 100% (4/4), completed with 4 local objects.
remote:
remote: Create a pull request for 'feature/ixwebsocket_9.1.9' on GitHub by visiting:
remote: https://github.com/bsergean/vcpkg/pull/new/feature/ixwebsocket_9.1.9
remote:
To https://github.com/bsergean/vcpkg.git
* [new branch] feature/ixwebsocket_9.1.9 -> feature/ixwebsocket_9.1.9
Branch 'feature/ixwebsocket_9.1.9' set up to track remote branch 'feature/ixwebsocket_9.1.9' from 'origin' by rebasing.
vcpkg$
```
Just visit this url, https://github.com/bsergean/vcpkg/pull/new/feature/ixwebsocket_9.1.9, printed on the console, to make the pull request.

View File

@ -35,7 +35,7 @@ webSocket.setUrl(url);
// Optional heart beat, sent every 45 seconds when there is not any traffic
// to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45);
webSocket.setPingInterval(45);
// Per message deflate connection is enabled by default. You can tweak its parameters or disable it
webSocket.disablePerMessageDeflate();
@ -174,7 +174,7 @@ when there is no any traffic to make sure that load balancers do not kill an
idle connection.
```cpp
webSocket.setHeartBeatPeriod(45);
webSocket.setPingInterval(45);
```
### Supply extra HTTP headers.
@ -436,6 +436,8 @@ setOnConnectionCallback(
To leverage TLS features, the library must be compiled with the option `USE_TLS=1`.
If you are using OpenSSL, try to be on a version higher than 1.1.x as there there are thread safety problems with 1.0.x.
Then, secure sockets are automatically used when connecting to a `wss://*` url.
Additional TLS options can be configured by passing a `ix::SocketTLSOptions` instance to the

View File

@ -37,6 +37,7 @@ namespace ix
std::atomic<bool> errorSending(false);
std::atomic<bool> stop(false);
std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize);
@ -179,6 +180,7 @@ namespace ix
verbose,
&throttled,
&receivedCount,
&fatalCobraError,
&queueManager](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
@ -240,6 +242,21 @@ namespace ix
{
spdlog::info("Received websocket pong");
}
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
});
// Run forever
@ -251,6 +268,7 @@ namespace ix
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
if (fatalCobraError) break;
}
}
// Run for a duration, used by unittesting now
@ -262,6 +280,7 @@ namespace ix
std::this_thread::sleep_for(duration);
if (strict && errorSending) break;
if (fatalCobraError) break;
}
}
@ -272,12 +291,15 @@ namespace ix
conn.disconnect();
stop = true;
// progress thread
t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// heartbeat thread
if (t2.joinable()) t2.join();
// sentry sender thread
t3.join();
return (strict && errorSending) ? -1 : (int) sentCount;
return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount;
}
} // namespace ix

View File

@ -40,7 +40,7 @@ namespace ix
// Extract an attribute from a Json Value.
// extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz
//
std::string extractAttr(const std::string& attr, const Json::Value& jsonValue)
Json::Value extractAttr(const std::string& attr, const Json::Value& jsonValue)
{
// Split by .
std::string token;
@ -53,7 +53,7 @@ namespace ix
val = val[token];
}
return val.asString();
return val;
}
int cobra_to_statsd_bot(const ix::CobraConfig& config,
@ -62,6 +62,8 @@ namespace ix
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
@ -77,10 +79,11 @@ namespace ix
std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, &receivedCount, &stop] {
auto progress = [&sentCount, &receivedCount, &stop] {
while (!stop)
{
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
@ -88,16 +91,18 @@ namespace ix
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("timer thread done");
};
std::thread t1(timer);
std::thread t1(progress);
auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] {
std::string state("na");
if (!enableHeartbeat) return;
while (true)
while (!stop)
{
std::stringstream ss;
ss << "messages received " << receivedCount;
@ -115,11 +120,13 @@ namespace ix
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
spdlog::info("heartbeat thread done");
};
std::thread t2(heartbeat);
auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop] {
auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop, &gauge, &timer, &fatalCobraError, &verbose] {
while (true)
{
Json::Value msg = queueManager.pop();
@ -131,10 +138,62 @@ namespace ix
for (auto&& attr : tokens)
{
id += ".";
id += extractAttr(attr, msg);
auto val = extractAttr(attr, msg);
id += val.asString();
}
if (gauge.empty() && timer.empty())
{
statsdClient.count(id, 1);
}
else
{
std::string attrName = (!gauge.empty()) ? gauge : timer;
auto val = extractAttr(attrName, msg);
size_t x;
if (val.isInt())
{
x = (size_t) val.asInt();
}
else if (val.isInt64())
{
x = (size_t) val.asInt64();
}
else if (val.isUInt())
{
x = (size_t) val.asUInt();
}
else if (val.isUInt64())
{
x = (size_t) val.asUInt64();
}
else if (val.isDouble())
{
x = (size_t) val.asUInt64();
}
else
{
spdlog::error("Gauge {} is not a numberic type", gauge);
fatalCobraError = true;
break;
}
if (verbose)
{
spdlog::info("{} - {} -> {}", id, attrName, x);
}
if (!gauge.empty())
{
statsdClient.gauge(id, x);
}
else
{
statsdClient.timing(id, x);
}
}
statsdClient.count(id, 1);
sentCount += 1;
}
};
@ -142,7 +201,7 @@ namespace ix
std::thread t3(statsdSender);
conn.setEventCallback(
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount](
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError](
ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
@ -200,6 +259,21 @@ namespace ix
{
spdlog::info("Received websocket pong");
}
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
});
// Run forever
@ -209,6 +283,8 @@ namespace ix
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
}
}
// Run for a duration, used by unittesting now
@ -218,6 +294,8 @@ namespace ix
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
}
}
@ -228,12 +306,15 @@ namespace ix
conn.disconnect();
stop = true;
// progress thread
t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// heartbeat thread
if (t2.joinable()) t2.join();
// statsd sender thread
t3.join();
return (int) sentCount;
return fatalCobraError ? -1 : (int) sentCount;
}
} // namespace ix

View File

@ -18,6 +18,8 @@ namespace ix
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,

View File

@ -43,11 +43,10 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <iostream>
namespace ix
{
const uint64_t StatsdClient::_maxQueueSize = 32768;
StatsdClient::StatsdClient(const std::string& host,
int port,
const std::string& prefix)
@ -56,23 +55,11 @@ namespace ix
, _prefix(prefix)
, _stop(false)
{
_thread = std::thread([this] {
_thread = std::thread([this]
{
while (!_stop)
{
std::deque<std::string> stagedQueue;
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.swap(stagedQueue);
}
while (!stagedQueue.empty())
{
auto message = stagedQueue.front();
_socket.sendto(message);
stagedQueue.pop_front();
}
flushQueue();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
@ -127,31 +114,39 @@ namespace ix
return send(key, ms, "ms");
}
int StatsdClient::send(std::string key, size_t value, const std::string &type)
int StatsdClient::send(std::string key, size_t value, const std::string& type)
{
cleanup(key);
char buf[256];
snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
snprintf(buf, sizeof(buf), "%s%s:%zd|%s\n",
_prefix.c_str(), key.c_str(), value, type.c_str());
return send(buf);
enqueue(buf);
return 0;
}
int StatsdClient::send(const std::string &message)
void StatsdClient::enqueue(const std::string& message)
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.push_back(message);
}
void StatsdClient::flushQueue()
{
std::lock_guard<std::mutex> lock(_mutex);
if (_queue.empty() ||
_queue.back().length() > _maxQueueSize)
while (!_queue.empty())
{
_queue.push_back(message);
auto message = _queue.front();
auto ret = _socket.sendto(message);
if (ret != 0)
{
std::cerr << "error: "
<< strerror(UdpSocket::getErrno())
<< std::endl;
}
_queue.pop_front();
}
else
{
(*_queue.rbegin()).append("\n").append(message);
}
return 0;
}
} // end namespace ix

View File

@ -32,11 +32,7 @@ namespace ix
int timing(const std::string& key, size_t ms);
private:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
int send(const std::string& message);
void enqueue(const std::string& message);
/* (Low Level Api) manually send a message
* type = "c", "g" or "ms"
@ -44,6 +40,7 @@ namespace ix
int send(std::string key, size_t value, const std::string& type);
void cleanup(std::string& key);
void flushQueue();
UdpSocket _socket;
@ -56,7 +53,6 @@ namespace ix
std::mutex _mutex; // for the queue
std::deque<std::string> _queue;
static const uint64_t _maxQueueSize;
};
} // end namespace ix

View File

@ -166,7 +166,8 @@ namespace ix
}
else if (action == "auth/handshake/error")
{
invokeErrorCallback("Handshake error", msg->str);
invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error,
msg->str);
}
else if (action == "auth/authenticate/ok")
{
@ -176,7 +177,8 @@ namespace ix
}
else if (action == "auth/authenticate/error")
{
invokeErrorCallback("Authentication error", msg->str);
invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error,
msg->str);
}
else if (action == "rtm/subscription/data")
{
@ -191,7 +193,8 @@ namespace ix
}
else if (action == "rtm/subscribe/error")
{
invokeErrorCallback("Subscription error", msg->str);
invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error,
msg->str);
}
else if (action == "rtm/unsubscribe/ok")
{

View File

@ -37,7 +37,10 @@ namespace ix
CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7
CobraConnection_EventType_Pong = 7,
CobraConnection_EventType_Handshake_Error = 8,
CobraConnection_EventType_Authentication_Error = 9,
CobraConnection_EventType_Subscription_Error = 10
};
enum CobraConnectionPublishMode

View File

@ -21,6 +21,57 @@
#endif
#define socketerrno errno
#ifdef _WIN32
namespace
{
bool loadWindowsSystemCertificates(SSL_CTX* ssl, std::string& errorMsg)
{
DWORD flags = CERT_STORE_READONLY_FLAG | CERT_STORE_OPEN_EXISTING_FLAG |
CERT_SYSTEM_STORE_CURRENT_USER;
HCERTSTORE systemStore = CertOpenStore(CERT_STORE_PROV_SYSTEM, 0, 0, flags, L"Root");
if (!systemStore)
{
errorMsg = "CertOpenStore failed with ";
errorMsg += std::to_string(GetLastError());
return false;
}
PCCERT_CONTEXT certificateIterator = NULL;
X509_STORE* opensslStore = SSL_CTX_get_cert_store(ssl);
int certificateCount = 0;
while (certificateIterator = CertEnumCertificatesInStore(systemStore, certificateIterator))
{
X509* x509 = d2i_X509(NULL,
(const unsigned char**) &certificateIterator->pbCertEncoded,
certificateIterator->cbCertEncoded);
if (x509)
{
if (X509_STORE_add_cert(opensslStore, x509) == 1)
{
++certificateCount;
}
X509_free(x509);
}
}
CertFreeCertificateContext(certificateIterator);
CertCloseStore(systemStore, 0);
if (certificateCount == 0)
{
errorMsg = "No certificates found";
return false;
}
return true;
}
} // namespace
#endif
namespace ix
{
const std::string kDefaultCiphers =
@ -336,6 +387,12 @@ namespace ix
{
if (_tlsOptions.isUsingSystemDefaults())
{
#ifdef _WIN32
if (!loadWindowsSystemCertificates(_ssl_context, errMsg))
{
return false;
}
#else
if (SSL_CTX_set_default_verify_paths(_ssl_context) == 0)
{
auto sslErr = ERR_get_error();
@ -343,6 +400,7 @@ namespace ix
errMsg += ERR_error_string(sslErr, nullptr);
return false;
}
#endif
}
else if (SSL_CTX_load_verify_locations(
_ssl_context, _tlsOptions.caFile.c_str(), NULL) != 1)

View File

@ -71,7 +71,7 @@ namespace ix
#elif defined(IXWEBSOCKET_USE_OPEN_SSL)
ss << " ssl/OpenSSL " << OPENSSL_VERSION_TEXT;
#elif __APPLE__
ss << " ssl/DarwinSSL";
ss << " ssl/SecureTransport";
#endif
#else
ss << " nossl";

View File

@ -625,7 +625,7 @@ namespace ix
// send back the CLOSE frame
sendCloseFrame(code, reason);
_socket->wakeUpFromPoll(Socket::kCloseRequest);
wakeUpFromPoll(Socket::kCloseRequest);
bool remote = true;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
@ -845,7 +845,7 @@ namespace ix
// Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty())
{
_socket->wakeUpFromPoll(Socket::kSendRequest);
wakeUpFromPoll(Socket::kSendRequest);
// FIXME: we should have a timeout when sending large messages: see #131
if (_blockingSend && !flushSendBuffer())
@ -1063,6 +1063,12 @@ namespace ix
_socket->close();
}
bool WebSocketTransport::wakeUpFromPoll(uint64_t wakeUpCode)
{
std::lock_guard<std::mutex> lock(_socketMutex);
return _socket->wakeUpFromPoll(wakeUpCode);
}
void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code,
const std::string& reason,
size_t closeWireSize,
@ -1110,8 +1116,9 @@ namespace ix
setReadyState(ReadyState::CLOSING);
sendCloseFrame(code, reason);
// wake up the poll, but do not close yet
_socket->wakeUpFromPoll(Socket::kSendRequest);
wakeUpFromPoll(Socket::kSendRequest);
}
size_t WebSocketTransport::bufferedAmount() const

View File

@ -229,6 +229,8 @@ namespace ix
size_t closeWireSize,
bool remote);
bool wakeUpFromPoll(uint64_t wakeUpCode);
bool flushSendBuffer();
bool sendOnSocket();
bool receiveFromSocket();

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "9.0.3"
#define IX_WEBSOCKET_VERSION "9.2.1"

View File

@ -1,5 +1,13 @@
#
# This makefile is just used to easily work with docker (linux build)
# This makefile is used for convenience, and wrap simple cmake commands
# You don't need to use it as an end user, it is more for developer.
#
# * work with docker (linux build)
# * execute the unittest
#
# The default target will install ws, the command line tool coming with
# IXWebSocket into /usr/local/bin
#
#
all: brew
@ -8,14 +16,23 @@ install: brew
# Use -DCMAKE_INSTALL_PREFIX= to install into another location
# on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local
#
# Release, Debug, MinSizeRel, RelWithDebInfo are the build types
#
brew:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; make -j 4 install)
# Docker default target. We've add problem with OpenSSL and TLS 1.3 (on the
# server side ?) and I can't work-around it easily, so we're using mbedtls on
# Linux for the SSL backend, which works great.
ws_mbedtls_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4 install)
ws:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4)
ws_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4 install)
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4 install)
ws_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
@ -23,9 +40,6 @@ ws_openssl:
ws_mbedtls:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4)
ws_mbedtls_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4 install)
ws_no_ssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_WS=1 .. ; make -j 4)
@ -68,6 +82,8 @@ docker_push:
docker push ${LATEST}
docker push ${IMG}
deploy: docker docker_push
run:
docker run --cap-add sys_ptrace --entrypoint=sh -it bsergean/ws:build
@ -105,12 +121,36 @@ test_asan:
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
test_tsan_openssl:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_OPEN_SSL=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_mbedtls:
test_ubsan_openssl:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_OPEN_SSL=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableUndefinedBehaviorSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_tsan_openssl_release:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Release -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_OPEN_SSL=1 .. && xcodebuild -project ixwebsocket.xcodeproj -configuration Release -target ixwebsocket_unittest -enableThreadSanitizer YES)
(cd build/test ; ln -sf Release/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_tsan_mbedtls:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_MBED_TLS=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
build_test_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
test_openssl: build_test_openssl
(cd test ; python2.7 run.py -r)
build_test_mbedtls:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_MBED_TLS=1 -DUSE_TEST=1 .. ; make -j 4)
test_mbedtls: build_test_mbedtls
(cd test ; python2.7 run.py -r)
test_no_ssl:

View File

@ -54,6 +54,7 @@ set (SOURCES
IXWebSocketSubProtocolTest.cpp
IXSentryClientTest.cpp
IXWebSocketChatTest.cpp
IXWebSocketBroadcastTest.cpp
)
# Some unittest don't work on windows yet

View File

@ -111,6 +111,8 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
REQUIRE(initialized);
std::string fields("device.game\ndevice.os_name");
std::string gauge;
std::string timer;
int sentCount = ix::cobra_to_statsd_bot(config,
channel,
@ -118,6 +120,8 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,

View File

@ -0,0 +1,305 @@
/*
* IXWebSocketServerTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include "msgpack11.hpp"
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
using msgpack11::MsgPack;
using namespace ix;
namespace
{
class WebSocketChat
{
public:
WebSocketChat(const std::string& user, const std::string& session, int port);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
const std::vector<std::string>& getReceivedMessages() const;
std::string encodeMessage(const std::string& text);
std::pair<std::string, std::string> decodeMessage(const std::string& str);
void appendMessage(const std::string& message);
private:
std::string _user;
std::string _session;
int _port;
ix::WebSocket _webSocket;
std::vector<std::string> _receivedMessages;
mutable std::mutex _mutex;
};
WebSocketChat::WebSocketChat(const std::string& user, const std::string& session, int port)
: _user(user)
, _session(session)
, _port(port)
{
_webSocket.setTLSOptions(makeClientTLSOptions());
}
size_t WebSocketChat::getReceivedMessagesCount() const
{
std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages.size();
}
const std::vector<std::string>& WebSocketChat::getReceivedMessages() const
{
std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages;
}
void WebSocketChat::appendMessage(const std::string& message)
{
std::lock_guard<std::mutex> lock(_mutex);
_receivedMessages.push_back(message);
}
bool WebSocketChat::isReady() const
{
return _webSocket.getReadyState() == ix::ReadyState::Open;
}
void WebSocketChat::stop()
{
_webSocket.stop();
}
void WebSocketChat::start()
{
std::string url;
{
bool preferTLS = true;
url = makeCobraEndpoint(_port, preferTLS);
}
_webSocket.setUrl(url);
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
ss << "websocket_broadcast_client: " << _user << " Connected !";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "websocket_broadcast_client: " << _user << " disconnected !";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user
// as we do for the satori chat example.
// store text
appendMessage(result.second);
std::string payload = result.second;
if (payload.size() > 2000)
{
payload = "<message too large>";
}
ss << std::endl << result.first << " > " << payload << std::endl << _user << " > ";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "websocket_broadcast_client: " << _user << " Error ! " << msg->errorInfo.reason;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("websocket_broadcast_client: received ping message");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("websocket_broadcast_client: received pong message");
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
log("websocket_broadcast_client: received message fragment");
}
else
{
ss << "Unexpected ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
std::pair<std::string, std::string> WebSocketChat::decodeMessage(const std::string& str)
{
std::string errMsg;
MsgPack msg = MsgPack::parse(str, errMsg);
std::string msg_user = msg["user"].string_value();
std::string msg_text = msg["text"].string_value();
return std::pair<std::string, std::string>(msg_user, msg_text);
}
std::string WebSocketChat::encodeMessage(const std::string& text)
{
std::map<MsgPack, MsgPack> obj;
obj["user"] = _user;
obj["text"] = text;
MsgPack msg(obj);
std::string output = msg.dump();
return output;
}
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.sendBinary(encodeMessage(text));
}
bool startServer(ix::WebSocketServer& server, std::string& connectionId)
{
bool preferTLS = true;
server.setTLSOptions(makeServerTLSOptions(preferTLS));
server.setOnConnectionCallback([&server, &connectionId](
std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
connectionState->computeId();
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
TLogger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
TLogger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str, msg->binary);
}
}
}
});
});
auto res = server.listen();
if (!res.first)
{
TLogger() << res.second;
return false;
}
server.start();
return true;
}
} // namespace ix
TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
{
SECTION("Connect to the server, do not send anything. Should timeout and return 400")
{
int port = getFreePort();
ix::WebSocketServer server(port);
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string session = ix::generateSessionId();
std::vector<std::shared_ptr<WebSocketChat>> chatClients;
for (int i = 0 ; i < 10; ++i)
{
std::string user("user_" + std::to_string(i));
chatClients.push_back(std::make_shared<WebSocketChat>(user, session, port));
chatClients[i]->start();
ix::msleep(50);
}
// Wait for all chat instance to be ready
while (true)
{
bool allReady = true;
for (size_t i = 0 ; i < chatClients.size(); ++i)
{
allReady &= chatClients[i]->isReady();
}
if (allReady) break;
ix::msleep(10);
}
for (int j = 0; j < 1000; j++)
{
for (size_t i = 0 ; i < chatClients.size(); ++i)
{
chatClients[i]->sendMessage("hello world");
}
if (j == 250)
{
server.stop();
ix::msleep(100);
}
if (j == 500)
{
server.start();
ix::msleep(100);
}
}
// wait 1 second
ix::msleep(2000);
// Stop all clients
size_t messageCount = chatClients.size() * 50;
for (size_t i = 0 ; i < chatClients.size(); ++i)
{
REQUIRE(chatClients[i]->getReceivedMessagesCount() >= messageCount);
chatClients[i]->stop();
}
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
server.stop();
REQUIRE(server.getClients().size() == 0);
}
}

View File

@ -65,7 +65,7 @@ namespace
// Set a 1 second heartbeat with the setter method to test
if (_useHeartBeatMethod)
{
_webSocket.setHeartBeatPeriod(1);
_webSocket.setPingInterval(1);
}
else
{
@ -378,9 +378,9 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval_full", "[setPingInterval]")
}
}
// Using setHeartBeatPeriod
// Using setPingInterval
TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]")
TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setPingInterval]")
{
SECTION("Make sure that ping messages are sent when no other data are sent.")
{
@ -424,7 +424,7 @@ TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod
}
}
TEST_CASE("Websocket_ping_data_sent_setHeartBeatPeriod", "[setHeartBeatPeriod]")
TEST_CASE("Websocket_ping_data_sent_setHeartBeatPeriod", "[setPingInterval]")
{
SECTION("Make sure that ping messages are sent, even if other messages are sent")
{

View File

@ -62,7 +62,7 @@ namespace ix
{
if (client != webSocket)
{
client->send(msg->str);
client->send(msg->str, msg->binary);
}
}
}

View File

@ -1,4 +1,4 @@
/// Json-cpp amalgated forward header (http://jsoncpp.sourceforge.net/).
/// Json-cpp amalgamated forward header (http://jsoncpp.sourceforge.net/).
/// It is intended to be used with #include "json/json-forwards.h"
/// This header provides forward declaration for all JsonCpp types.
@ -11,13 +11,13 @@ The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following
conditions...
The author (Baptiste Lepilleur) explicitly disclaims copyright in all
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur, and is
released under the terms of the MIT License (see below).
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and
The JsonCpp Authors, and is released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
@ -32,7 +32,7 @@ described in clear, concise terms at:
The full text of the MIT License follows:
========================================================================
Copyright (c) 2007-2010 Baptiste Lepilleur
Copyright (c) 2007-2010 Baptiste Lepilleur and The JsonCpp Authors
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
@ -73,9 +73,9 @@ license you like.
#ifndef JSON_FORWARD_AMALGATED_H_INCLUDED
# define JSON_FORWARD_AMALGATED_H_INCLUDED
/// If defined, indicates that the source file is amalgated
#ifndef JSON_FORWARD_AMALGAMATED_H_INCLUDED
# define JSON_FORWARD_AMALGAMATED_H_INCLUDED
/// If defined, indicates that the source file is amalgamated
/// to prevent private header inclusion.
#define JSON_IS_AMALGAMATION
@ -83,23 +83,21 @@ license you like.
// Beginning of content of file: include/json/config.h
// //////////////////////////////////////////////////////////////////////
// Copyright 2007-2010 Baptiste Lepilleur
// Copyright 2007-2010 Baptiste Lepilleur and The JsonCpp Authors
// Distributed under MIT license, or public domain if desired and
// recognized in your jurisdiction.
// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
#ifndef JSON_CONFIG_H_INCLUDED
#define JSON_CONFIG_H_INCLUDED
/// If defined, indicates that json library is embedded in CppTL library.
//# define JSON_IN_CPPTL 1
/// If defined, indicates that json may leverage CppTL library
//# define JSON_USE_CPPTL 1
/// If defined, indicates that cpptl vector based map should be used instead of
/// std::map
/// as Value container.
//# define JSON_USE_CPPTL_SMALLMAP 1
#include <cstddef>
#include <cstdint>
#include <istream>
#include <memory>
#include <ostream>
#include <sstream>
#include <string>
#include <type_traits>
// If non-zero, the library uses exceptions to report bad input instead of C
// assertion macros. The default is to use exceptions.
@ -107,43 +105,49 @@ license you like.
#define JSON_USE_EXCEPTION 1
#endif
/// If defined, indicates that the source file is amalgated
// Temporary, tracked for removal with issue #982.
#ifndef JSON_USE_NULLREF
#define JSON_USE_NULLREF 1
#endif
/// If defined, indicates that the source file is amalgamated
/// to prevent private header inclusion.
/// Remarks: it is automatically defined in the generated amalgated header.
/// Remarks: it is automatically defined in the generated amalgamated header.
// #define JSON_IS_AMALGAMATION
#ifdef JSON_IN_CPPTL
#include <cpptl/config.h>
#ifndef JSON_USE_CPPTL
#define JSON_USE_CPPTL 1
#endif
#endif
#ifdef JSON_IN_CPPTL
#define JSON_API CPPTL_API
#elif defined(JSON_DLL_BUILD)
#if defined(_MSC_VER)
// Export macros for DLL visibility
#if defined(JSON_DLL_BUILD)
#if defined(_MSC_VER) || defined(__MINGW32__)
#define JSON_API __declspec(dllexport)
#define JSONCPP_DISABLE_DLL_INTERFACE_WARNING
#elif defined(__GNUC__) || defined(__clang__)
#define JSON_API __attribute__((visibility("default")))
#endif // if defined(_MSC_VER)
#elif defined(JSON_DLL)
#if defined(_MSC_VER)
#if defined(_MSC_VER) || defined(__MINGW32__)
#define JSON_API __declspec(dllimport)
#define JSONCPP_DISABLE_DLL_INTERFACE_WARNING
#endif // if defined(_MSC_VER)
#endif // ifdef JSON_IN_CPPTL
#endif // ifdef JSON_DLL_BUILD
#if !defined(JSON_API)
#define JSON_API
#endif
#if !defined(JSON_HAS_UNIQUE_PTR)
#if __cplusplus >= 201103L
#define JSON_HAS_UNIQUE_PTR (1)
#elif _MSC_VER >= 1600
#define JSON_HAS_UNIQUE_PTR (1)
#else
#define JSON_HAS_UNIQUE_PTR (0)
#if defined(_MSC_VER) && _MSC_VER < 1800
#error \
"ERROR: Visual Studio 12 (2013) with _MSC_VER=1800 is the oldest supported compiler with sufficient C++11 capabilities"
#endif
#if defined(_MSC_VER) && _MSC_VER < 1900
// As recommended at
// https://stackoverflow.com/questions/2915672/snprintf-and-visual-studio-2010
extern JSON_API int msvc_pre1900_c99_snprintf(char* outBuf, size_t size,
const char* format, ...);
#define jsoncpp_snprintf msvc_pre1900_c99_snprintf
#else
#define jsoncpp_snprintf std::snprintf
#endif
// If JSON_NO_INT64 is defined, then Json only support C++ "int" type for
@ -151,55 +155,96 @@ license you like.
// Storages, and 64 bits integer support is disabled.
// #define JSON_NO_INT64 1
#if defined(_MSC_VER) && _MSC_VER <= 1200 // MSVC 6
// Microsoft Visual Studio 6 only support conversion from __int64 to double
// (no conversion from unsigned __int64).
#define JSON_USE_INT64_DOUBLE_CONVERSION 1
// Disable warning 4786 for VS6 caused by STL (identifier was truncated to '255'
// characters in the debug information)
// All projects I've ever seen with VS6 were using this globally (not bothering
// with pragma push/pop).
#pragma warning(disable : 4786)
#endif // if defined(_MSC_VER) && _MSC_VER < 1200 // MSVC 6
// JSONCPP_OVERRIDE is maintained for backwards compatibility of external tools.
// C++11 should be used directly in JSONCPP.
#define JSONCPP_OVERRIDE override
#if defined(_MSC_VER) && _MSC_VER >= 1500 // MSVC 2008
/// Indicates that the following function is deprecated.
#if __cplusplus >= 201103L
#define JSONCPP_NOEXCEPT noexcept
#define JSONCPP_OP_EXPLICIT explicit
#elif defined(_MSC_VER) && _MSC_VER < 1900
#define JSONCPP_NOEXCEPT throw()
#define JSONCPP_OP_EXPLICIT explicit
#elif defined(_MSC_VER) && _MSC_VER >= 1900
#define JSONCPP_NOEXCEPT noexcept
#define JSONCPP_OP_EXPLICIT explicit
#else
#define JSONCPP_NOEXCEPT throw()
#define JSONCPP_OP_EXPLICIT
#endif
#ifdef __clang__
#if __has_extension(attribute_deprecated_with_message)
#define JSONCPP_DEPRECATED(message) __attribute__((deprecated(message)))
#endif
#elif defined(__GNUC__) // not clang (gcc comes later since clang emulates gcc)
#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5))
#define JSONCPP_DEPRECATED(message) __attribute__((deprecated(message)))
#elif (__GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1))
#define JSONCPP_DEPRECATED(message) __attribute__((__deprecated__))
#endif // GNUC version
#elif defined(_MSC_VER) // MSVC (after clang because clang on Windows emulates
// MSVC)
#define JSONCPP_DEPRECATED(message) __declspec(deprecated(message))
#elif defined(__clang__) && defined(__has_feature)
#if __has_feature(attribute_deprecated_with_message)
#define JSONCPP_DEPRECATED(message) __attribute__ ((deprecated(message)))
#endif
#elif defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5))
#define JSONCPP_DEPRECATED(message) __attribute__ ((deprecated(message)))
#elif defined(__GNUC__) && (__GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1))
#define JSONCPP_DEPRECATED(message) __attribute__((__deprecated__))
#endif
#endif // __clang__ || __GNUC__ || _MSC_VER
#if !defined(JSONCPP_DEPRECATED)
#define JSONCPP_DEPRECATED(message)
#endif // if !defined(JSONCPP_DEPRECATED)
#if defined(__clang__) || (defined(__GNUC__) && (__GNUC__ >= 6))
#define JSON_USE_INT64_DOUBLE_CONVERSION 1
#endif
#if !defined(JSON_IS_AMALGAMATION)
#include "allocator.h"
#include "version.h"
#endif // if !defined(JSON_IS_AMALGAMATION)
namespace Json {
typedef int Int;
typedef unsigned int UInt;
using Int = int;
using UInt = unsigned int;
#if defined(JSON_NO_INT64)
typedef int LargestInt;
typedef unsigned int LargestUInt;
using LargestInt = int;
using LargestUInt = unsigned int;
#undef JSON_HAS_INT64
#else // if defined(JSON_NO_INT64)
// For Microsoft Visual use specific types as long long is not supported
#if defined(_MSC_VER) // Microsoft Visual Studio
typedef __int64 Int64;
typedef unsigned __int64 UInt64;
using Int64 = __int64;
using UInt64 = unsigned __int64;
#else // if defined(_MSC_VER) // Other platforms, use long long
typedef long long int Int64;
typedef unsigned long long int UInt64;
#endif // if defined(_MSC_VER)
typedef Int64 LargestInt;
typedef UInt64 LargestUInt;
using Int64 = int64_t;
using UInt64 = uint64_t;
#endif // if defined(_MSC_VER)
using LargestInt = Int64;
using LargestUInt = UInt64;
#define JSON_HAS_INT64
#endif // if defined(JSON_NO_INT64)
} // end namespace Json
template <typename T>
using Allocator =
typename std::conditional<JSONCPP_USING_SECURE_MEMORY, SecureAllocator<T>,
std::allocator<T>>::type;
using String = std::basic_string<char, std::char_traits<char>, Allocator<char>>;
using IStringStream =
std::basic_istringstream<String::value_type, String::traits_type,
String::allocator_type>;
using OStringStream =
std::basic_ostringstream<String::value_type, String::traits_type,
String::allocator_type>;
using IStream = std::istream;
using OStream = std::ostream;
} // namespace Json
// Legacy names (formerly macros).
using JSONCPP_STRING = Json::String;
using JSONCPP_ISTRINGSTREAM = Json::IStringStream;
using JSONCPP_OSTRINGSTREAM = Json::OStringStream;
using JSONCPP_ISTREAM = Json::IStream;
using JSONCPP_OSTREAM = Json::OStream;
#endif // JSON_CONFIG_H_INCLUDED
@ -216,7 +261,7 @@ typedef UInt64 LargestUInt;
// Beginning of content of file: include/json/forwards.h
// //////////////////////////////////////////////////////////////////////
// Copyright 2007-2010 Baptiste Lepilleur
// Copyright 2007-2010 Baptiste Lepilleur and The JsonCpp Authors
// Distributed under MIT license, or public domain if desired and
// recognized in your jurisdiction.
// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
@ -231,17 +276,23 @@ typedef UInt64 LargestUInt;
namespace Json {
// writer.h
class StreamWriter;
class StreamWriterBuilder;
class Writer;
class FastWriter;
class StyledWriter;
class StyledStreamWriter;
// reader.h
class Reader;
class CharReader;
class CharReaderBuilder;
// features.h
// json_features.h
class Features;
// value.h
typedef unsigned int ArrayIndex;
using ArrayIndex = unsigned int;
class StaticString;
class Path;
class PathArgument;
@ -262,4 +313,4 @@ class ValueConstIterator;
#endif //ifndef JSON_FORWARD_AMALGATED_H_INCLUDED
#endif //ifndef JSON_FORWARD_AMALGAMATED_H_INCLUDED

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
#!/bin/sh
# Used to clean up some non essential folders to make the ws container
rm -rf build
rm -rf third_party/zlib
rm -rf third_party/mbedtls

View File

@ -70,6 +70,8 @@ int main(int argc, char** argv)
std::string password;
std::string prefix("ws.test.v0");
std::string fields;
std::string gauge;
std::string timer;
std::string dsn;
std::string redisHosts("127.0.0.1");
std::string redisPassword;
@ -264,6 +266,8 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--port", statsdPort, "Statsd port");
cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join();
cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")->join();
cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")->join();
cobra2statsd->add_option("channel", channel, "Channel")->required();
cobra2statsd->add_flag("-v", verbose, "Verbose");
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
@ -453,20 +457,41 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_to_statsd"))
{
bool enableHeartbeat = true;
int runtime = -1;
ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
if (!timer.empty() && !gauge.empty())
{
spdlog::error("--gauge and --timer options are exclusive. " \
"you can only supply one");
ret = 1;
}
else
{
bool enableHeartbeat = true;
int runtime = -1; // run indefinitely
ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
ret = ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
std::string errMsg;
bool initialized = statsdClient.init(errMsg);
if (!initialized)
{
spdlog::error(errMsg);
ret = 1;
}
else
{
ret = ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
gauge,
timer,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
}
}
}
else if (app.got_subcommand("cobra_to_sentry"))
{

View File

@ -14,8 +14,19 @@
namespace ix
{
using StreamWriterPtr = std::unique_ptr<Json::StreamWriter>;
StreamWriterPtr makeStreamWriter()
{
Json::StreamWriterBuilder builder;
builder["commentStyle"] = "None";
builder["indentation"] = ""; // will make the JSON object compact
std::unique_ptr<Json::StreamWriter> jsonWriter(builder.newStreamWriter());
return jsonWriter;
}
void writeToStdout(bool fluentd,
Json::FastWriter& jsonWriter,
const StreamWriterPtr& jsonWriter,
const Json::Value& msg,
const std::string& position)
{
@ -29,12 +40,15 @@ namespace ix
msgWithPosition["position"] = position;
enveloppe["message"] = msgWithPosition;
std::cout << jsonWriter.write(enveloppe);
jsonWriter->write(enveloppe, &std::cout);
std::cout << std::endl; // add lf and flush
}
else
{
enveloppe = msg;
std::cout << position << " " << jsonWriter.write(enveloppe);
std::cout << position << " ";
jsonWriter->write(enveloppe, &std::cout);
std::cout << std::endl;
}
}
@ -49,14 +63,13 @@ namespace ix
conn.configure(config);
conn.connect();
Json::FastWriter jsonWriter;
// Display incoming messages
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
std::atomic<bool> fatalCobraError(false);
auto jsonWriter = makeStreamWriter();
auto timer = [&msgPerSeconds, &msgCount] {
while (true)
auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] {
while (!fatalCobraError)
{
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
@ -78,11 +91,12 @@ namespace ix
&msgCount,
&msgPerSeconds,
&quiet,
&fluentd](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
&fluentd,
&fatalCobraError](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
@ -137,14 +151,32 @@ namespace ix
{
spdlog::info("Received websocket pong");
}
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
});
while (true)
while (!fatalCobraError)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
return 0;
conn.disconnect();
t.join();
return fatalCobraError ? 1 : 0;
}
} // namespace ix