Compare commits

..

38 Commits

Author SHA1 Message Date
Benjamin Sergeant
27dabaaf86 tsan openssl mac ci 2020-03-26 16:38:41 -07:00
Benjamin Sergeant
dffa759f71 move IXBench code under ixwebsocker folder 2020-03-24 20:53:25 -07:00
Benjamin Sergeant
61e789d6a4 formatting 2020-03-24 20:37:55 -07:00
Benjamin Sergeant
37cb2cc266 (ws connect) display statistics about how much time it takes to stop the connection / cf #168 2020-03-24 20:29:09 -07:00
Benjamin Sergeant
179e17895d unique_ptr for sockets 2020-03-24 12:48:55 -07:00
Benjamin Sergeant
9f818c7acf (socket) selectInterrupt member is an unique_ptr instead of being a shared_ptr 2020-03-24 10:00:41 -07:00
Benjamin Sergeant
9dcc2538ae (websocket) reset per-message deflate codec everytime we connect to a server/client 2020-03-23 18:46:30 -07:00
Benjamin Sergeant
f41a54186c (websocket) fix #167, a long standing issue with sending empty messages with per-message deflate extension (and hopefully other zlib bug) 2020-03-23 15:21:53 -07:00
Benjamin Sergeant
e0733d205c fix linux linker error 2020-03-22 21:59:30 -07:00
Benjamin Sergeant
f72f845ad2 trim headers and unused code in IXUdpSocket 2020-03-22 21:51:41 -07:00
Benjamin Sergeant
b7e7837d76 fix simple compile error 2020-03-22 19:43:43 -07:00
Benjamin Sergeant
fe966b19c7 re-enable unittests 2020-03-22 19:39:28 -07:00
Benjamin Sergeant
a0ffb2ba53 cobra to statsd bot ported to windows + add unittest 2020-03-22 19:37:04 -07:00
Benjamin Sergeant
5ad54a8904 pre-commit / fix simple file trailing things 2020-03-21 19:31:38 -07:00
Benjamin Sergeant
10e132e8ef remove std::cerr in IXRedisServer which triggers a tsan error 2020-03-20 17:50:08 -07:00
Benjamin Sergeant
5ce846f48b indent files 2020-03-20 17:00:18 -07:00
Benjamin Sergeant
1d6373335c (websocket+tls) fix hang in tls handshake which could lead to ANR, discovered through unittesting. 2020-03-20 16:57:27 -07:00
Benjamin Sergeant
829751b7af (cobra) CobraMetricsPublisher can be configure with an ix::CobraConfig + more unittest use SSL in server + client 2020-03-20 12:22:00 -07:00
Benjamin Sergeant
5691b55967 (unittest) / try to run the cobra 2 sentry bot test with SSL if the platform supports it 2020-03-19 18:50:46 -07:00
Benjamin Sergeant
575bceb1ec add make target for ubsan, tsan and asan, and enable running the unittest on mac with tsan 2020-03-18 20:53:54 -07:00
Benjamin Sergeant
6085839ef7 minor refactoring 2020-03-18 11:45:28 -07:00
Benjamin Sergeant
696d802703 bump version 2020-03-18 01:15:15 -07:00
Benjamin Sergeant
b287730c19 Simplify ping/pong based heartbeat implementation 2020-03-18 01:14:08 -07:00
Benjamin Sergeant
d6f534de06 (ws) ws echo_server gains a new option (-p) to disable responding to pings with pongs 2020-03-18 00:01:57 -07:00
Benjamin Sergeant
8ec515f292 (ws) ws connect gains a new option to set the interval at which to send pings 2020-03-17 23:54:32 -07:00
Benjamin Sergeant
c6204f4d90 tweak mkdoc action 2020-03-17 10:47:06 -07:00
Benjamin Sergeant
7dfad9c0cc more docs 2020-03-17 10:41:20 -07:00
Benjamin Sergeant
21fac0be6c Create mkdocs.yaml github action from the web-ui 2020-03-17 10:22:58 -07:00
Benjamin Sergeant
0bddf5e096 remove workflow created manually 2020-03-17 10:21:51 -07:00
Benjamin Sergeant
946a8231e0 add ci to build documentation 2020-03-17 10:14:41 -07:00
Benjamin Sergeant
49d1e8493a update build badge on doc 2020-03-17 10:09:41 -07:00
Benjamin Sergeant
6198657dd6 do not trigger unittest when the docs is changed 2020-03-17 08:54:31 -07:00
Benjamin Sergeant
385d6f5f4a (doc) move tls options to its own section 2020-03-17 08:49:42 -07:00
Benjamin Sergeant
3919153a7b close #164 / add reference to tls option which must be set to true in server mode 2020-03-17 08:45:39 -07:00
Benjamin Sergeant
e8f81776f9 (cobra to sentry bot + docker) default docker file uses mbedtls + ws cobra_to_sentry pass tls options to sentryClient. 2020-03-16 10:05:21 -07:00
Benjamin Sergeant
0bb5462504 Feature/ci windows (#163)
* win only

* disable ixcrypto mbedtls search on windows

* ws cmakefile do not search for openssl

* ci builds files on top of cmaking

* ci builds files on top of cmaking / syntax tweak

* use gha-setup-vsdevenv syntax

* build fix and hacks

* try to run unittest on win

* try to run unittest on win (syntax error)

* unittest wip

* wip

* wip again

* wip again (working-directory)

* cleanup

* dumb compile error
2020-03-15 18:38:09 -07:00
Benjamin Sergeant
44f599747e (cobra client) ws cobra subscribe resubscribe at latest position after being disconnected 2020-03-13 17:30:31 -07:00
Benjamin Sergeant
9801ebdb36 (cobra client) can subscribe with a position 2020-03-13 16:06:13 -07:00
108 changed files with 1450 additions and 1417 deletions

View File

@@ -1,51 +1,44 @@
name: unittest
on:
push:
paths-ignore:
- 'docs/**'
on: [push]
# fake comment to trigger an action 1
jobs:
linux:
runs-on: ubuntu-latest
# linux:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v1
# - name: make test
# run: make test
steps:
- uses: actions/checkout@v1
- name: make test
run: make test
mac:
mac_openssl_tsan:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: install openssl
run: brew install openssl
- name: make test
run: make test
run: make test_tsan_openssl
# We don't need to have redis running anymore, as we have our fake limited one
# - name: install redis
# run: brew install redis
#
# - name: start redis server
# run: brew services start redis
# tsan:
# runs-on: macOS-latest
# steps:
# - uses: actions/checkout@v1
# - name: make test_tsan
# run: make test_tsan
# # Windows does not work yet, I'm stuck at getting CMake to run + finding vcpkg
# win:
# runs-on: windows-2016
#
# steps:
# - uses: actions/checkout@v1
#
# - name: run cmake
# run: |
# "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\VC\Auxiliary\Build\vcvars64.bat"
# mkdir build
# cd build
# cmake -DCMAKE_TOOLCHAIN_FILE=%VCPKG_INSTALLATION_ROOT%\scripts\buildsystems\vcpkg.cmake -DUSE_WS=1 -DUSE_TEST=1 -DUSE_TLS=1 -G"NMake Makefiles" ..
# - name: build
# run: |
# "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\VC\Auxiliary\Build\vcvars64.bat"
# cd build
# nmake
# - name: run tests
# run:
# cd test
# ..\build\test\ixwebsocket_unittest.exe
# win:
# runs-on: windows-latest
# steps:
# - uses: actions/checkout@v1
# - uses: seanmiddleditch/gha-setup-vsdevenv@master
# - run: |
# mkdir build
# cd build
# cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
# - run: cmake --build build
# # Running the unittest does not work
# #- run: ../build/test/ixwebsocket_unittest.exe
# # working-directory: test

23
.github/workflows/mkdocs.yml vendored Normal file
View File

@@ -0,0 +1,23 @@
name: mkdocs
on:
push:
paths:
- 'docs/**'
jobs:
linux:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v1
with:
python-version: 3.8
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install mkdocs
- name: Build doc
run: |
git pull
mkdocs gh-deploy

View File

@@ -1,59 +0,0 @@
language: bash
# See https://github.com/amaiorano/vectrexy/blob/master/.travis.yml
# for ideas on installing vcpkg
matrix:
include:
# macOS
# - os: osx
# env:
# - HOMEBREW_NO_AUTO_UPDATE=1
# compiler: clang
# script:
# - brew install redis
# - brew services start redis
# - brew install mbedtls
# - python test/run.py
# - make ws
Linux
- os: linux
dist: bionic
before_install:
- sudo apt-get install -y libmbedtls-dev
- sudo apt-get install -y redis-server
script:
- python test/run.py
# - make ws
env:
- CC=gcc
- CXX=g++
# Clang + Linux disabled for now
# - os: linux
# dist: xenial
# script: python test/run.py
# env:
# - CC=clang
# - CXX=clang++
# Windows
# - os: windows
# env:
# - CMAKE_PATH="/c/Program Files/CMake/bin"
# script:
# - cd third_party/zlib
# - cmake .
# - cmake --build . --target install
# - cd ../..
# # - cd third_party/mbedtls
# # - cmake .
# # - cmake --build . --target install
# # - cd ../..
# - export PATH=$CMAKE_PATH:$PATH
# - cd test
# - cmake .
# - cmake --build --parallel .
# - ixwebsocket_unittest.exe
# # - python test/run.py

View File

@@ -21,6 +21,7 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXBench.cpp
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXDNSLookup.cpp
@@ -36,6 +37,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketTLSOptions.cpp
ixwebsocket/IXUdpSocket.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXUserAgent.cpp
ixwebsocket/IXWebSocket.cpp
@@ -52,6 +54,7 @@ set( IXWEBSOCKET_SOURCES
)
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXBench.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
@@ -69,6 +72,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketTLSOptions.h
ixwebsocket/IXUdpSocket.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXUtf8Validator.h
ixwebsocket/IXUserAgent.h

View File

@@ -2,6 +2,9 @@ FROM centos:8 as build
RUN yum install -y gcc-c++ make cmake zlib-devel openssl-devel redhat-rpm-config
RUN yum install -y epel-release
RUN yum install -y mbedtls-devel
RUN groupadd app && useradd -g app app
RUN chown -R app:app /opt
RUN chown -R app:app /usr/local
@@ -12,13 +15,16 @@ COPY --chown=app:app . /opt
WORKDIR /opt
USER app
RUN [ "make", "ws_install" ]
RUN [ "make", "ws_mbedtls_install" ]
RUN [ "rm", "-rf", "build" ]
FROM centos:8 as runtime
RUN yum install -y gdb strace
RUN yum install -y epel-release
RUN yum install -y mbedtls
RUN groupadd app && useradd -g app app
COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws
RUN chmod +x /usr/local/bin/ws

View File

@@ -1,6 +1,88 @@
# Changelog
All changes to this project will be documented in this file.
## [9.0.3] - 2020-03-24
(ws connect) display statistics about how much time it takes to stop the connection
## [9.0.2] - 2020-03-24
(socket) works with unique_ptr<Socket> instead of shared_ptr<Socket> in many places
## [9.0.1] - 2020-03-24
(socket) selectInterrupt member is an unique_ptr instead of being a shared_ptr
## [9.0.0] - 2020-03-23
(websocket) reset per-message deflate codec everytime we connect to a server/client
## [8.3.4] - 2020-03-23
(websocket) fix #167, a long standing issue with sending empty messages with per-message deflate extension (and hopefully other zlib bug)
## [8.3.3] - 2020-03-22
(cobra to statsd) port to windows and add a unittest
## [8.3.2] - 2020-03-20
(websocket+tls) fix hang in tls handshake which could lead to ANR, discovered through unittesting.
## [8.3.1] - 2020-03-20
(cobra) CobraMetricsPublisher can be configure with an ix::CobraConfig + more unittest use SSL in server + client
## [8.3.0] - 2020-03-18
(websocket) Simplify ping/pong based heartbeat implementation
## [8.2.7] - 2020-03-17
(ws) ws connect gains a new option to set the interval at which to send pings
(ws) ws echo_server gains a new option (-p) to disable responding to pings with pongs
```
IXWebSocket$ ws connect --ping_interval 2 wss://echo.websocket.org
Type Ctrl-D to exit prompt...
Connecting to url: wss://echo.websocket.org
> ws_connect: connected
[2020-03-17 23:53:02.726] [info] Uri: /
[2020-03-17 23:53:02.726] [info] Headers:
[2020-03-17 23:53:02.727] [info] Connection: Upgrade
[2020-03-17 23:53:02.727] [info] Date: Wed, 18 Mar 2020 06:45:05 GMT
[2020-03-17 23:53:02.727] [info] Sec-WebSocket-Accept: 0gtqbxW0aVL/QI/ICpLFnRaiKgA=
[2020-03-17 23:53:02.727] [info] sec-websocket-extensions:
[2020-03-17 23:53:02.727] [info] Server: Kaazing Gateway
[2020-03-17 23:53:02.727] [info] Upgrade: websocket
[2020-03-17 23:53:04.894] [info] Received pong
[2020-03-17 23:53:06.859] [info] Received pong
[2020-03-17 23:53:08.881] [info] Received pong
[2020-03-17 23:53:10.848] [info] Received pong
[2020-03-17 23:53:12.898] [info] Received pong
[2020-03-17 23:53:14.865] [info] Received pong
[2020-03-17 23:53:16.890] [info] Received pong
[2020-03-17 23:53:18.853] [info] Received pong
[2020-03-17 23:53:19.388] [info]
ws_connect: connection closed: code 1000 reason Normal closure
[2020-03-17 23:53:19.502] [info] Received 208 bytes
[2020-03-17 23:53:19.502] [info] Sent 0 bytes
```
## [8.2.6] - 2020-03-16
(cobra to sentry bot + docker) default docker file uses mbedtls + ws cobra_to_sentry pass tls options to sentryClient.
## [8.2.5] - 2020-03-13
(cobra client) ws cobra subscribe resubscribe at latest position after being disconnected
## [8.2.4] - 2020-03-13
(cobra client) can subscribe with a position
## [8.2.3] - 2020-03-13
(cobra client) pass the message position to the subscription data callback

View File

@@ -73,5 +73,3 @@ Here is a simplistic diagram which explains how the code is structured in term o
| |
+-----------------------+
```

View File

@@ -1,4 +1,4 @@
![Alt text](https://travis-ci.org/machinezone/IXWebSocket.svg?branch=master)
![Build status](https://github.com/machinezone/IXWebSocket/workflows/unittest/badge.svg)
## Introduction
@@ -44,7 +44,22 @@ webSocket.send("hello world");
There are 2 main reasons that explain why IXWebSocket got written. First, we needed a C++ cross-platform client library, which should have few dependencies. What looked like the most solid one, [websocketpp](https://github.com/zaphoyd/websocketpp) did depend on boost and this was not an option for us. Secondly, there were other available libraries with fewer dependencies (C ones), but they required calling an explicit poll routine periodically to know if a client had received data from a server, which was not elegant.
We started by solving those 2 problems, then we added server websocket code, then an HTTP client, and finally a very simple HTTP server.
We started by solving those 2 problems, then we added server websocket code, then an HTTP client, and finally a very simple HTTP server. IXWebSocket comes with a command line utility named ws which is quite handy, and is now packaged with alpine linux. You can install it with `apk add ws`.
* Few dependencies (only zlib)
* Simple to use ; uses std::string and std::function callbacks.
* Complete support of the websocket protocol, and basic http support.
* Client and Server
* TLS support
## Alternative libraries
There are plenty of great websocket libraries out there, which might work for you. Here are a couple of serious ones.
* [websocketpp](https://github.com/zaphoyd/websocketpp) - C++
* [beast](https://github.com/boostorg/beast) - C++
* [libwebsockets](https://libwebsockets.org/) - C
* [µWebSockets](https://github.com/uNetworking/uWebSockets) - C
## Contributing

View File

@@ -244,39 +244,6 @@ webSocket.setMaxWaitBetweenReconnectionRetries(5 * 1000); // 5000ms = 5s
uint32_t m = webSocket.getMaxWaitBetweenReconnectionRetries();
```
### TLS support and configuration
To leverage TLS features, the library must be compiled with the option `USE_TLS=1`.
Then, secure sockets are automatically used when connecting to a `wss://*` url.
Additional TLS options can be configured by passing a `ix::SocketTLSOptions` instance to the
`setTLSOptions` on `ix::WebSocket` (or `ix::WebSocketServer` or `ix::HttpServer`)
```cpp
webSocket.setTLSOptions({
.certFile = "path/to/cert/file.pem",
.keyFile = "path/to/key/file.pem",
.caFile = "path/to/trust/bundle/file.pem"
});
```
Specifying `certFile` and `keyFile` configures the certificate that will be used to communicate with TLS peers.
On a client, this is only necessary for connecting to servers that require a client certificate.
On a server, this is necessary for TLS support.
Specifying `caFile` configures the trusted roots bundle file (in PEM format) that will be used to verify peer certificates.
- The special value of `SYSTEM` (the default) indicates that the system-configured trust bundle should be used; this is generally what you want when connecting to any publicly exposed API/server.
- The special value of `NONE` can be used to disable peer verification; this is only recommended to rule out certificate verification when testing connectivity.
For a client, specifying `caFile` can be used if connecting to a server that uses a self-signed cert, or when using a custom CA in an internal environment.
For a server, specifying `caFile` implies that:
1. You require clients to present a certificate
1. It must be signed by one of the trusted roots in the file
## WebSocket server API
```cpp
@@ -464,3 +431,37 @@ setOnConnectionCallback(
content);
}
```
## TLS support and configuration
To leverage TLS features, the library must be compiled with the option `USE_TLS=1`.
Then, secure sockets are automatically used when connecting to a `wss://*` url.
Additional TLS options can be configured by passing a `ix::SocketTLSOptions` instance to the
`setTLSOptions` on `ix::WebSocket` (or `ix::WebSocketServer` or `ix::HttpServer`)
```cpp
webSocket.setTLSOptions({
.certFile = "path/to/cert/file.pem",
.keyFile = "path/to/key/file.pem",
.caFile = "path/to/trust/bundle/file.pem",
.tls = true // required in server mode
});
```
Specifying `certFile` and `keyFile` configures the certificate that will be used to communicate with TLS peers.
On a client, this is only necessary for connecting to servers that require a client certificate.
On a server, this is necessary for TLS support.
Specifying `caFile` configures the trusted roots bundle file (in PEM format) that will be used to verify peer certificates.
- The special value of `SYSTEM` (the default) indicates that the system-configured trust bundle should be used; this is generally what you want when connecting to any publicly exposed API/server.
- The special value of `NONE` can be used to disable peer verification; this is only recommended to rule out certificate verification when testing connectivity.
For a client, specifying `caFile` can be used if connecting to a server that uses a self-signed cert, or when using a custom CA in an internal environment.
For a server, specifying `caFile` implies that:
1. You require clients to present a certificate
1. It must be signed by one of the trusted roots in the file

View File

@@ -7,12 +7,14 @@ set (IXBOTS_SOURCES
ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp
ixbots/IXQueueManager.cpp
ixbots/IXStatsdClient.cpp
)
set (IXBOTS_HEADERS
ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h
ixbots/IXQueueManager.h
ixbots/IXStatsdClient.h
)
add_library(ixbots STATIC
@@ -30,16 +32,14 @@ if (NOT SPDLOG_FOUND)
set(SPDLOG_INCLUDE_DIRS ../third_party/spdlog/include)
endif()
set(STATSD_CLIENT_INCLUDE_DIRS ../third_party/statsd-client-cpp/src)
set(IXBOTS_INCLUDE_DIRS
.
..
../ixcore
../ixwebsocket
../ixcobra
../ixsentry
${JSONCPP_INCLUDE_DIRS}
${SPDLOG_INCLUDE_DIRS}
${STATSD_CLIENT_INCLUDE_DIRS})
${SPDLOG_INCLUDE_DIRS})
target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} )

View File

@@ -19,6 +19,7 @@ namespace ix
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient,
bool verbose,
bool strict,
@@ -37,7 +38,7 @@ namespace ix
std::atomic<bool> stop(false);
std::atomic<bool> throttled(false);
QueueManager queueManager(maxQueueSize, stop);
QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop)
@@ -173,6 +174,7 @@ namespace ix
conn.setEventCallback([&conn,
&channel,
&filter,
&position,
&jsonWriter,
verbose,
&throttled,
@@ -200,6 +202,7 @@ namespace ix
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
position,
[&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
const Json::Value& msg, const std::string& position) {
if (verbose)

View File

@@ -1,7 +1,7 @@
/*
* IXCobraToSentryBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
@@ -14,6 +14,7 @@ namespace ix
int cobra_to_sentry_bot(const CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
SentryClient& sentryClient,
bool verbose,
bool strict,

View File

@@ -6,6 +6,7 @@
#include "IXCobraToStatsdBot.h"
#include "IXQueueManager.h"
#include "IXStatsdClient.h"
#include <atomic>
#include <chrono>
@@ -16,10 +17,6 @@
#include <thread>
#include <vector>
#ifndef _WIN32
#include <statsd_client.h>
#endif
namespace ix
{
// fields are command line argument that can be specified multiple times
@@ -62,11 +59,13 @@ namespace ix
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
bool verbose)
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime)
{
ix::CobraConnection conn;
conn.configure(config);
@@ -79,11 +78,10 @@ namespace ix
std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false);
size_t maxQueueSize = 1000;
QueueManager queueManager(maxQueueSize, stop);
QueueManager queueManager(maxQueueSize);
auto timer = [&sentCount, &receivedCount] {
while (true)
auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop)
{
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
@@ -94,9 +92,11 @@ namespace ix
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount] {
auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
std::string state("na");
if (!enableHeartbeat) return;
while (true)
{
std::stringstream ss;
@@ -119,21 +119,13 @@ namespace ix
std::thread t2(heartbeat);
auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] {
// statsd client
// test with netcat as a server: `nc -ul 8125`
bool statsdBatch = true;
#ifndef _WIN32
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
#else
int statsdClient;
#endif
auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop] {
while (true)
{
Json::Value msg = queueManager.pop();
if (msg.isNull()) continue;
if (stop) return;
if (msg.isNull()) continue;
std::string id;
for (auto&& attr : tokens)
@@ -142,18 +134,15 @@ namespace ix
id += extractAttr(attr, msg);
}
sentCount += 1;
#ifndef _WIN32
statsdClient.count(id, 1);
#endif
sentCount += 1;
}
};
std::thread t3(statsdSender);
conn.setEventCallback(
[&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount](
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount](
ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
@@ -177,6 +166,7 @@ namespace ix
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
position,
[&jsonWriter, &queueManager, verbose, &receivedCount](
const Json::Value& msg, const std::string& position) {
if (verbose)
@@ -212,12 +202,38 @@ namespace ix
}
});
while (true)
// Run forever
if (runtime == -1)
{
std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration);
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
}
// Run for a duration, used by unittesting now
else
{
for (int i = 0 ; i < runtime; ++i)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
}
return 0;
//
// Cleanup.
// join all the bg threads and stop them.
//
conn.disconnect();
stop = true;
t1.join();
if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
t3.join();
return (int) sentCount;
}
} // namespace ix

View File

@@ -1,11 +1,12 @@
/*
* IXCobraToStatsdBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2019-2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixcobra/IXCobraConfig.h>
#include <ixbots/IXStatsdClient.h>
#include <string>
#include <stddef.h>
@@ -14,9 +15,11 @@ namespace ix
int cobra_to_statsd_bot(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& host,
int port,
const std::string& prefix,
const std::string& position,
StatsdClient& statsdClient,
const std::string& fields,
bool verbose);
bool verbose,
size_t maxQueueSize,
bool enableHeartbeat,
int runtime);
} // namespace ix

View File

@@ -7,7 +7,6 @@
#pragma once
#include <stddef.h>
#include <atomic>
#include <json/json.h>
#include <mutex>
#include <condition_variable>
@@ -19,9 +18,8 @@ namespace ix
class QueueManager
{
public:
QueueManager(size_t maxQueueSize, std::atomic<bool>& stop)
QueueManager(size_t maxQueueSize)
: _maxQueueSize(maxQueueSize)
, _stop(stop)
{
}
@@ -33,6 +31,5 @@ namespace ix
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
std::atomic<bool>& _stop;
};
}

View File

@@ -0,0 +1,157 @@
/*
* Copyright (c) 2014, Rex
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the {organization} nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* IXStatsdClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
// Adapted from statsd-client-cpp
// test with netcat as a server: `nc -ul 8125`
#include "IXStatsdClient.h"
#include <ixwebsocket/IXNetSystem.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
namespace ix
{
const uint64_t StatsdClient::_maxQueueSize = 32768;
StatsdClient::StatsdClient(const std::string& host,
int port,
const std::string& prefix)
: _host(host)
, _port(port)
, _prefix(prefix)
, _stop(false)
{
_thread = std::thread([this] {
while (!_stop)
{
std::deque<std::string> stagedQueue;
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.swap(stagedQueue);
}
while (!stagedQueue.empty())
{
auto message = stagedQueue.front();
_socket.sendto(message);
stagedQueue.pop_front();
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
}
StatsdClient::~StatsdClient()
{
_stop = true;
if (_thread.joinable()) _thread.join();
_socket.close();
}
bool StatsdClient::init(std::string& errMsg)
{
return _socket.init(_host, _port, errMsg);
}
/* will change the original string */
void StatsdClient::cleanup(std::string& key)
{
size_t pos = key.find_first_of(":|@");
while (pos != std::string::npos)
{
key[pos] = '_';
pos = key.find_first_of(":|@");
}
}
int StatsdClient::dec(const std::string& key)
{
return count(key, -1);
}
int StatsdClient::inc(const std::string& key)
{
return count(key, 1);
}
int StatsdClient::count(const std::string& key, size_t value)
{
return send(key, value, "c");
}
int StatsdClient::gauge(const std::string& key, size_t value)
{
return send(key, value, "g");
}
int StatsdClient::timing(const std::string& key, size_t ms)
{
return send(key, ms, "ms");
}
int StatsdClient::send(std::string key, size_t value, const std::string &type)
{
cleanup(key);
char buf[256];
snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
_prefix.c_str(), key.c_str(), value, type.c_str());
return send(buf);
}
int StatsdClient::send(const std::string &message)
{
std::lock_guard<std::mutex> lock(_mutex);
if (_queue.empty() ||
_queue.back().length() > _maxQueueSize)
{
_queue.push_back(message);
}
else
{
(*_queue.rbegin()).append("\n").append(message);
}
return 0;
}
} // end namespace ix

View File

@@ -0,0 +1,62 @@
/*
* IXStatsdClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <ixwebsocket/IXUdpSocket.h>
#include <string>
#include <thread>
#include <deque>
#include <mutex>
#include <atomic>
namespace ix
{
class StatsdClient
{
public:
StatsdClient(const std::string& host="127.0.0.1",
int port=8125,
const std::string& prefix = "");
~StatsdClient();
bool init(std::string& errMsg);
int inc(const std::string& key);
int dec(const std::string& key);
int count(const std::string& key, size_t value);
int gauge(const std::string& key, size_t value);
int timing(const std::string& key, size_t ms);
private:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
int send(const std::string& message);
/* (Low Level Api) manually send a message
* type = "c", "g" or "ms"
*/
int send(std::string key, size_t value, const std::string& type);
void cleanup(std::string& key);
UdpSocket _socket;
std::string _host;
int _port;
std::string _prefix;
std::atomic<bool> _stop;
std::thread _thread;
std::mutex _mutex; // for the queue
std::deque<std::string> _queue;
static const uint64_t _maxQueueSize;
};
} // end namespace ix

View File

@@ -270,10 +270,6 @@ namespace ix
// This should keep the connection open and prevent some load balancers such as
// the Amazon one from shutting it down
_webSocket->setPingInterval(kPingIntervalSecs);
// If we don't receive a pong back, declare loss after 3 * N seconds
// (will be 90s now), and close and restart the connection
_webSocket->setPingTimeout(3 * kPingIntervalSecs);
}
void CobraConnection::configure(const ix::CobraConfig& config)
@@ -565,6 +561,7 @@ namespace ix
void CobraConnection::subscribe(const std::string& channel,
const std::string& filter,
const std::string& position,
SubscriptionCallback cb)
{
// Create and send a subscribe pdu
@@ -576,6 +573,11 @@ namespace ix
body["filter"] = filter;
}
if (!position.empty())
{
body["position"] = position;
}
Json::Value pdu;
pdu["action"] = "rtm/subscribe";
pdu["body"] = body;

View File

@@ -19,6 +19,10 @@
#include "IXCobraConfig.h"
#ifdef max
#undef max
#endif
namespace ix
{
class WebSocket;
@@ -98,6 +102,7 @@ namespace ix
// message arrives.
void subscribe(const std::string& channel,
const std::string& filter = std::string(),
const std::string& position = std::string(),
SubscriptionCallback cb = nullptr);
/// Unsubscribe from a channel

View File

@@ -27,20 +27,12 @@ namespace ix
;
}
void CobraMetricsPublisher::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions)
void CobraMetricsPublisher::configure(const CobraConfig& config,
const std::string& channel)
{
// Configure the satori connection and start its publish background thread
_cobra_metrics_theaded_publisher.configure(config, channel);
_cobra_metrics_theaded_publisher.start();
_cobra_metrics_theaded_publisher.configure(appkey, endpoint, channel,
rolename, rolesecret,
enablePerMessageDeflate, socketTLSOptions);
}
Json::Value& CobraMetricsPublisher::getGenericAttributes()

View File

@@ -40,13 +40,8 @@ namespace ix
/// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions);
void configure(const CobraConfig& config,
const std::string& channel);
/// Setter for the list of blacklisted metrics ids.
/// That list is sorted internally for fast lookups

View File

@@ -92,22 +92,14 @@ namespace ix
_thread = std::thread(&CobraMetricsThreadedPublisher::run, this);
}
void CobraMetricsThreadedPublisher::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions)
void CobraMetricsThreadedPublisher::configure(const CobraConfig& config,
const std::string& channel)
{
ix::IXCoreLogger::Log(config.socketTLSOptions.getDescription().c_str());
_channel = channel;
_cobra_connection.configure(config);
ix::IXCoreLogger::Log(socketTLSOptions.getDescription().c_str());
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(enablePerMessageDeflate);
_cobra_connection.configure(appkey, endpoint,
rolename, rolesecret,
webSocketPerMessageDeflateOptions, socketTLSOptions);
}
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind)

View File

@@ -27,13 +27,8 @@ namespace ix
~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc...
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,
const std::string& rolename,
const std::string& rolesecret,
bool enablePerMessageDeflate,
const SocketTLSOptions& socketTLSOptions);
void configure(const CobraConfig& config,
const std::string& channel);
/// Start the worker thread, used for background publishing
void start();

View File

@@ -31,10 +31,6 @@ target_include_directories( ixcrypto PUBLIC ${IXCRYPTO_INCLUDE_DIRS} )
# hmac computation needs a crypto library
if (WIN32)
set(USE_MBED_TLS TRUE)
endif()
target_compile_definitions(ixcrypto PUBLIC IXCRYPTO_USE_TLS)
if (USE_MBED_TLS)
find_package(MbedTLS REQUIRED)
@@ -51,4 +47,3 @@ else()
target_link_libraries(ixcrypto ${OPENSSL_LIBRARIES})
target_compile_definitions(ixcrypto PUBLIC IXCRYPTO_USE_OPEN_SSL)
endif()

View File

@@ -14,7 +14,7 @@
#elif defined(IXCRYPTO_USE_OPEN_SSL)
# include <openssl/hmac.h>
#else
# error "Unsupported configuration"
# include <assert.h>
#endif
namespace ix
@@ -40,7 +40,7 @@ namespace ix
(unsigned char *) data.c_str(), (int) data.size(),
(unsigned char *) hash, nullptr);
#else
# error "Unsupported configuration"
assert(false && "hmac not implemented on this platform");
#endif
std::string hashString(reinterpret_cast<char*>(hash), hashSize);

View File

@@ -9,7 +9,6 @@
#include <cstring>
#include <iomanip>
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <sstream>

View File

@@ -9,11 +9,12 @@
#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <ixwebsocket/IXSocket.h>
namespace ix
{
class Socket;
class RedisClient
{
public:
@@ -56,7 +57,7 @@ namespace ix
private:
std::string writeString(const std::string& str);
std::shared_ptr<Socket> _socket;
std::unique_ptr<Socket> _socket;
std::atomic<bool> _stop;
};
} // namespace ix

View File

@@ -11,7 +11,6 @@
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h>
#include <fstream>
#include <iostream>
#include <sstream>
#include <vector>
@@ -44,7 +43,7 @@ namespace ix
SocketServer::stop();
}
void RedisServer::handleConnection(std::shared_ptr<Socket> socket,
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
{
_connectedClientsCount++;
@@ -103,13 +102,13 @@ namespace ix
_connectedClientsCount--;
}
void RedisServer::cleanupSubscribers(std::shared_ptr<Socket> socket)
void RedisServer::cleanupSubscribers(std::unique_ptr<Socket>& socket)
{
std::lock_guard<std::mutex> lock(_mutex);
for (auto&& it : _subscribers)
{
it.second.erase(socket);
it.second.erase(socket.get());
}
for (auto it : _subscribers)
@@ -146,7 +145,7 @@ namespace ix
}
bool RedisServer::parseRequest(
std::shared_ptr<Socket> socket,
std::unique_ptr<Socket>& socket,
std::vector<std::string>& tokens)
{
// Parse first line
@@ -188,17 +187,11 @@ namespace ix
tokens.push_back(readResult.second);
}
for (auto&& token : tokens)
{
std::cerr << token << " ";
}
std::cerr << std::endl;
return true;
}
bool RedisServer::handleCommand(
std::shared_ptr<Socket> socket,
std::unique_ptr<Socket>& socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 1) return false;
@@ -237,7 +230,7 @@ namespace ix
}
bool RedisServer::handleSubscribe(
std::shared_ptr<Socket> socket,
std::unique_ptr<Socket>& socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 2) return false;
@@ -252,13 +245,13 @@ namespace ix
socket->writeBytes(":1\r\n", cb);
std::lock_guard<std::mutex> lock(_mutex);
_subscribers[channel].insert(socket);
_subscribers[channel].insert(socket.get());
return true;
}
bool RedisServer::handlePublish(
std::shared_ptr<Socket> socket,
std::unique_ptr<Socket>& socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 3) return false;

View File

@@ -37,13 +37,13 @@ namespace ix
// Subscribers
// We could store connection states in there, to add better debugging
// since a connection state has a readable ID
std::map<std::string, std::set<std::shared_ptr<Socket>>> _subscribers;
std::map<std::string, std::set<Socket*>> _subscribers;
std::mutex _mutex;
std::atomic<bool> _stopHandlingConnections;
// Methods
virtual void handleConnection(std::shared_ptr<Socket>,
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
@@ -51,18 +51,18 @@ namespace ix
std::string writeString(const std::string& str);
bool parseRequest(
std::shared_ptr<Socket> socket,
std::unique_ptr<Socket>& socket,
std::vector<std::string>& tokens);
bool handlePublish(std::shared_ptr<Socket> socket,
bool handlePublish(std::unique_ptr<Socket>& socket,
const std::vector<std::string>& tokens);
bool handleSubscribe(std::shared_ptr<Socket> socket,
bool handleSubscribe(std::unique_ptr<Socket>& socket,
const std::vector<std::string>& tokens);
bool handleCommand(std::shared_ptr<Socket> socket,
bool handleCommand(std::unique_ptr<Socket>& socket,
const std::vector<std::string>& tokens);
void cleanupSubscribers(std::shared_ptr<Socket> socket);
void cleanupSubscribers(std::unique_ptr<Socket>& socket);
};
} // namespace ix

44
ixwebsocket/IXBench.cpp Normal file
View File

@@ -0,0 +1,44 @@
/*
* IXBench.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXBench.h"
#include <iostream>
namespace ix
{
Bench::Bench(const std::string& description)
: _description(description)
, _start(std::chrono::high_resolution_clock::now())
, _reported(false)
{
;
}
Bench::~Bench()
{
if (!_reported)
{
report();
}
}
void Bench::report()
{
auto now = std::chrono::high_resolution_clock::now();
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
_ms = milliseconds.count();
std::cerr << _description << " completed in " << _ms << "ms" << std::endl;
_reported = true;
}
uint64_t Bench::getDuration() const
{
return _ms;
}
} // namespace ix

28
ixwebsocket/IXBench.h Normal file
View File

@@ -0,0 +1,28 @@
/*
* IXBench.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2020 Machine Zone, Inc. All rights reserved.
*/
#include <chrono>
#include <stdint.h>
#include <string>
namespace ix
{
class Bench
{
public:
Bench(const std::string& description);
~Bench();
void report();
uint64_t getDuration() const;
private:
std::string _description;
std::chrono::time_point<std::chrono::high_resolution_clock> _start;
uint64_t _ms;
bool _reported;
};
} // namespace ix

View File

@@ -92,7 +92,8 @@ namespace ix
return std::make_tuple(method, requestUri, httpVersion);
}
std::tuple<bool, std::string, HttpRequestPtr> Http::parseRequest(std::shared_ptr<Socket> socket)
std::tuple<bool, std::string, HttpRequestPtr> Http::parseRequest(
std::unique_ptr<Socket>& socket)
{
HttpRequestPtr httpRequest;
@@ -133,7 +134,7 @@ namespace ix
return std::make_tuple(true, "", httpRequest);
}
bool Http::sendResponse(HttpResponsePtr response, std::shared_ptr<Socket> socket)
bool Http::sendResponse(HttpResponsePtr response, std::unique_ptr<Socket>& socket)
{
// Write the response to the socket
std::stringstream ss;

View File

@@ -115,8 +115,8 @@ namespace ix
{
public:
static std::tuple<bool, std::string, HttpRequestPtr> parseRequest(
std::shared_ptr<Socket> socket);
static bool sendResponse(HttpResponsePtr response, std::shared_ptr<Socket> socket);
std::unique_ptr<Socket>& socket);
static bool sendResponse(HttpResponsePtr response, std::unique_ptr<Socket>& socket);
static std::pair<std::string, int> parseStatusLine(const std::string& line);
static std::tuple<std::string, std::string, std::string> parseRequestLine(

View File

@@ -95,7 +95,7 @@ namespace ix
std::atomic<bool> _stop;
std::thread _thread;
std::shared_ptr<Socket> _socket;
std::unique_ptr<Socket> _socket;
std::mutex _mutex; // to protect accessing the _socket (only one socket per client)
SocketTLSOptions _tlsOptions;

View File

@@ -69,7 +69,7 @@ namespace ix
_onConnectionCallback = callback;
}
void HttpServer::handleConnection(std::shared_ptr<Socket> socket,
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
{
_connectedClientsCount++;

View File

@@ -43,7 +43,7 @@ namespace ix
std::atomic<int> _connectedClientsCount;
// Methods
virtual void handleConnection(std::shared_ptr<Socket>,
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;

View File

@@ -14,12 +14,12 @@
namespace ix
{
std::shared_ptr<SelectInterrupt> createSelectInterrupt()
SelectInterruptPtr createSelectInterrupt()
{
#if defined(__linux__) || defined(__APPLE__)
return std::make_shared<SelectInterruptPipe>();
return std::make_unique<SelectInterruptPipe>();
#else
return std::make_shared<SelectInterrupt>();
return std::make_unique<SelectInterrupt>();
#endif
}
} // namespace ix

View File

@@ -11,5 +11,6 @@
namespace ix
{
class SelectInterrupt;
std::shared_ptr<SelectInterrupt> createSelectInterrupt();
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
SelectInterruptPtr createSelectInterrupt();
} // namespace ix

View File

@@ -46,7 +46,7 @@ namespace ix
PollResultType Socket::poll(bool readyToRead,
int timeoutMs,
int sockfd,
std::shared_ptr<SelectInterrupt> selectInterrupt)
const SelectInterruptPtr& selectInterrupt)
{
//
// We used to use ::select to poll but on Android 9 we get large fds out of

View File

@@ -38,6 +38,7 @@ typedef SSIZE_T ssize_t;
namespace ix
{
class SelectInterrupt;
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
enum class PollResultType
{
@@ -66,7 +67,7 @@ namespace ix
// Virtual methods
virtual bool accept(std::string& errMsg);
virtual bool connect(const std::string& url,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested);
@@ -93,7 +94,7 @@ namespace ix
static PollResultType poll(bool readyToRead,
int timeoutMs,
int sockfd,
std::shared_ptr<SelectInterrupt> selectInterrupt = nullptr);
const SelectInterruptPtr& selectInterrupt);
// Used as special codes for pipe communication
@@ -112,6 +113,6 @@ namespace ix
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
std::shared_ptr<SelectInterrupt> _selectInterrupt;
SelectInterruptPtr _selectInterrupt;
};
} // namespace ix

View File

@@ -164,6 +164,26 @@ namespace ix
return false;
}
OSStatus SocketAppleSSL::tlsHandShake(std::string& errMsg,
const CancellationRequest& isCancellationRequested)
{
OSStatus status;
do
{
status = SSLHandshake(_sslContext);
// Interrupt the handshake
if (isCancellationRequested())
{
errMsg = "Cancellation requested";
return errSSLInternal;
}
} while (status == errSSLWouldBlock || status == errSSLServerAuthCompleted);
return status;
}
// No wait support
bool SocketAppleSSL::connect(const std::string& host,
int port,
@@ -190,26 +210,17 @@ namespace ix
Boolean option(1);
SSLSetSessionOption(_sslContext, kSSLSessionOptionBreakOnServerAuth, option);
do
{
status = SSLHandshake(_sslContext);
} while (status == errSSLWouldBlock || status == errSSLServerAuthCompleted);
status = tlsHandShake(errMsg, isCancellationRequested);
if (status == errSSLServerAuthCompleted)
{
// proceed with the handshake
do
{
status = SSLHandshake(_sslContext);
} while (status == errSSLWouldBlock || status == errSSLServerAuthCompleted);
status = tlsHandShake(errMsg, isCancellationRequested);
}
}
else
{
do
{
status = SSLHandshake(_sslContext);
} while (status == errSSLWouldBlock || status == errSSLServerAuthCompleted);
status = tlsHandShake(errMsg, isCancellationRequested);
}
}

View File

@@ -37,6 +37,9 @@ namespace ix
static OSStatus writeToSocket(SSLConnectionRef connection, const void* data, size_t* len);
static OSStatus readFromSocket(SSLConnectionRef connection, void* data, size_t* len);
OSStatus tlsHandShake(std::string& errMsg,
const CancellationRequest& isCancellationRequested);
SSLContextRef _sslContext;
mutable std::mutex _mutex; // AppleSSL routines are not thread-safe

View File

@@ -8,6 +8,7 @@
#include "IXDNSLookup.h"
#include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSocket.h"
#include <fcntl.h>
#include <string.h>
@@ -64,7 +65,8 @@ namespace ix
int timeoutMs = 10;
bool readyToRead = false;
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd);
auto selectInterrupt = std::make_unique<SelectInterrupt>();
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt);
if (pollResult == PollResultType::Timeout)
{

View File

@@ -24,28 +24,28 @@
namespace ix
{
std::shared_ptr<Socket> createSocket(bool tls,
std::unique_ptr<Socket> createSocket(bool tls,
int fd,
std::string& errorMsg,
const SocketTLSOptions& tlsOptions)
{
(void) tlsOptions;
errorMsg.clear();
std::shared_ptr<Socket> socket;
std::unique_ptr<Socket> socket;
if (!tls)
{
socket = std::make_shared<Socket>(fd);
socket = std::make_unique<Socket>(fd);
}
else
{
#ifdef IXWEBSOCKET_USE_TLS
#if defined(IXWEBSOCKET_USE_MBED_TLS)
socket = std::make_shared<SocketMbedTLS>(tlsOptions, fd);
socket = std::make_unique<SocketMbedTLS>(tlsOptions, fd);
#elif defined(IXWEBSOCKET_USE_OPEN_SSL)
socket = std::make_shared<SocketOpenSSL>(tlsOptions, fd);
socket = std::make_unique<SocketOpenSSL>(tlsOptions, fd);
#elif defined(__APPLE__)
socket = std::make_shared<SocketAppleSSL>(tlsOptions, fd);
socket = std::make_unique<SocketAppleSSL>(tlsOptions, fd);
#endif
#else
errorMsg = "TLS support is not enabled on this platform.";

View File

@@ -14,7 +14,7 @@
namespace ix
{
class Socket;
std::shared_ptr<Socket> createSocket(bool tls,
std::unique_ptr<Socket> createSocket(bool tls,
int fd,
std::string& errorMsg,
const SocketTLSOptions& tlsOptions);

View File

@@ -195,8 +195,17 @@ namespace ix
int res;
do
{
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl);
{
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl);
}
if (isCancellationRequested())
{
errMsg = "Cancellation requested";
close();
return false;
}
} while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
if (res != 0)

View File

@@ -224,7 +224,9 @@ namespace ix
return true;
}
bool SocketOpenSSL::openSSLClientHandshake(const std::string& host, std::string& errMsg)
bool SocketOpenSSL::openSSLClientHandshake(const std::string& host,
std::string& errMsg,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
@@ -233,6 +235,12 @@ namespace ix
return false;
}
if (isCancellationRequested())
{
errMsg = "Cancellation requested";
return false;
}
ERR_clear_error();
int connect_result = SSL_connect(_ssl_connection);
if (connect_result == 1)
@@ -577,7 +585,7 @@ namespace ix
X509_VERIFY_PARAM* param = SSL_get0_param(_ssl_connection);
X509_VERIFY_PARAM_set1_host(param, host.c_str(), 0);
#endif
handshakeSuccessful = openSSLClientHandshake(host, errMsg);
handshakeSuccessful = openSSLClientHandshake(host, errMsg, isCancellationRequested);
}
if (!handshakeSuccessful)

View File

@@ -39,7 +39,9 @@ namespace ix
void openSSLInitialize();
std::string getSSLError(int ret);
SSL_CTX* openSSLCreateContext(std::string& errMsg);
bool openSSLClientHandshake(const std::string& hostname, std::string& errMsg);
bool openSSLClientHandshake(const std::string& hostname,
std::string& errMsg,
const CancellationRequest& isCancellationRequested);
bool openSSLCheckServerCert(SSL* ssl, const std::string& hostname, std::string& errMsg);
bool checkHost(const std::string& host, const char* pattern);
bool handleTLSOptions(std::string& errMsg);

View File

@@ -7,6 +7,7 @@
#include "IXSocketServer.h"
#include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSetThreadName.h"
#include "IXSocket.h"
#include "IXSocketConnect.h"
@@ -257,7 +258,9 @@ namespace ix
// Use poll to check whether a new connection is in progress
int timeoutMs = 10;
bool readyToRead = true;
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, _serverFd);
auto selectInterrupt = std::make_unique<SelectInterrupt>();
PollResultType pollResult =
Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt);
if (pollResult == PollResultType::Error)
{
@@ -338,7 +341,8 @@ namespace ix
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::thread(&SocketServer::handleConnection, this, socket, connectionState)));
std::thread(
&SocketServer::handleConnection, this, std::move(socket), connectionState)));
}
}

View File

@@ -101,7 +101,7 @@ namespace ix
// the factory to create ConnectionState objects
ConnectionStateFactory _connectionStateFactory;
virtual void handleConnection(std::shared_ptr<Socket>,
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;

View File

@@ -0,0 +1,96 @@
/*
* IXUdpSocket.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXUdpSocket.h"
#include "IXNetSystem.h"
#include <cstring>
#include <sstream>
namespace ix
{
UdpSocket::UdpSocket(int fd)
: _sockfd(fd)
{
;
}
UdpSocket::~UdpSocket()
{
close();
}
void UdpSocket::close()
{
if (_sockfd == -1) return;
closeSocket(_sockfd);
_sockfd = -1;
}
int UdpSocket::getErrno()
{
int err;
#ifdef _WIN32
err = WSAGetLastError();
#else
err = errno;
#endif
return err;
}
void UdpSocket::closeSocket(int fd)
{
#ifdef _WIN32
closesocket(fd);
#else
::close(fd);
#endif
}
bool UdpSocket::init(const std::string& host, int port, std::string& errMsg)
{
_sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (_sockfd < 0)
{
errMsg = "Could not create socket";
return false;
}
memset(&_server, 0, sizeof(_server));
_server.sin_family = AF_INET;
_server.sin_port = htons(port);
// DNS resolution.
struct addrinfo hints, *result = nullptr;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
int ret = getaddrinfo(host.c_str(), nullptr, &hints, &result);
if (ret != 0)
{
errMsg = strerror(UdpSocket::getErrno());
freeaddrinfo(result);
close();
return false;
}
struct sockaddr_in* host_addr = (struct sockaddr_in*) result->ai_addr;
memcpy(&_server.sin_addr, &host_addr->sin_addr, sizeof(struct in_addr));
freeaddrinfo(result);
return true;
}
ssize_t UdpSocket::sendto(const std::string& buffer)
{
return (ssize_t)::sendto(
_sockfd, buffer.data(), buffer.size(), 0, (struct sockaddr*) &_server, sizeof(_server));
}
} // namespace ix

40
ixwebsocket/IXUdpSocket.h Normal file
View File

@@ -0,0 +1,40 @@
/*
* IXUdpSocket.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <atomic>
#include <memory>
#include <string>
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#endif
#include "IXNetSystem.h"
namespace ix
{
class UdpSocket
{
public:
UdpSocket(int fd = -1);
~UdpSocket();
// Virtual methods
bool init(const std::string& host, int port, std::string& errMsg);
ssize_t sendto(const std::string& buffer);
void close();
static int getErrno();
static void closeSocket(int fd);
private:
std::atomic<int> _sockfd;
struct sockaddr_in _server;
};
} // namespace ix

View File

@@ -19,7 +19,6 @@ namespace ix
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
const int WebSocket::kDefaultHandShakeTimeoutSecs(60);
const int WebSocket::kDefaultPingIntervalSecs(-1);
const int WebSocket::kDefaultPingTimeoutSecs(-1);
const bool WebSocket::kDefaultEnablePong(true);
const uint32_t WebSocket::kDefaultMaxWaitBetweenReconnectionRetries(10 * 1000); // 10s
@@ -31,7 +30,6 @@ namespace ix
, _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
, _enablePong(kDefaultEnablePong)
, _pingIntervalSecs(kDefaultPingIntervalSecs)
, _pingTimeoutSecs(kDefaultPingTimeoutSecs)
{
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) {
@@ -86,18 +84,6 @@ namespace ix
return _perMessageDeflateOptions;
}
void WebSocket::setHeartBeatPeriod(int heartBeatPeriodSecs)
{
std::lock_guard<std::mutex> lock(_configMutex);
_pingIntervalSecs = heartBeatPeriodSecs;
}
int WebSocket::getHeartBeatPeriod() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _pingIntervalSecs;
}
void WebSocket::setPingInterval(int pingIntervalSecs)
{
std::lock_guard<std::mutex> lock(_configMutex);
@@ -110,18 +96,6 @@ namespace ix
return _pingIntervalSecs;
}
void WebSocket::setPingTimeout(int pingTimeoutSecs)
{
std::lock_guard<std::mutex> lock(_configMutex);
_pingTimeoutSecs = pingTimeoutSecs;
}
int WebSocket::getPingTimeout() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _pingTimeoutSecs;
}
void WebSocket::enablePong()
{
std::lock_guard<std::mutex> lock(_configMutex);
@@ -186,11 +160,8 @@ namespace ix
{
{
std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions,
_socketTLSOptions,
_enablePong,
_pingIntervalSecs,
_pingTimeoutSecs);
_ws.configure(
_perMessageDeflateOptions, _socketTLSOptions, _enablePong, _pingIntervalSecs);
}
WebSocketHttpHeaders headers(_extraHeaders);
@@ -229,21 +200,25 @@ namespace ix
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers, status.protocol),
WebSocketCloseInfo()));
if (_pingIntervalSecs > 0)
{
// Send a heart beat right away
_ws.sendHeartBeat();
}
return status;
}
WebSocketInitResult WebSocket::connectToSocket(std::shared_ptr<Socket> socket, int timeoutSecs)
WebSocketInitResult WebSocket::connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs)
{
{
std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions,
_socketTLSOptions,
_enablePong,
_pingIntervalSecs,
_pingTimeoutSecs);
_ws.configure(
_perMessageDeflateOptions, _socketTLSOptions, _enablePong, _pingIntervalSecs);
}
WebSocketInitResult status = _ws.connectToSocket(socket, timeoutSecs);
WebSocketInitResult status = _ws.connectToSocket(std::move(socket), timeoutSecs);
if (!status.success)
{
return status;
@@ -256,6 +231,13 @@ namespace ix
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
if (_pingIntervalSecs > 0)
{
// Send a heart beat right away
_ws.sendHeartBeat();
}
return status;
}

View File

@@ -52,9 +52,7 @@ namespace ix
void setPerMessageDeflateOptions(
const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setTLSOptions(const SocketTLSOptions& socketTLSOptions);
void setHeartBeatPeriod(int heartBeatPeriodSecs);
void setPingInterval(int pingIntervalSecs); // alias of setHeartBeatPeriod
void setPingTimeout(int pingTimeoutSecs);
void setPingInterval(int pingIntervalSecs);
void enablePong();
void disablePong();
void enablePerMessageDeflate();
@@ -94,9 +92,7 @@ namespace ix
const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const;
int getPingInterval() const;
int getPingTimeout() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection();
@@ -117,7 +113,7 @@ namespace ix
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
// Server
WebSocketInitResult connectToSocket(std::shared_ptr<Socket>, int timeoutSecs);
WebSocketInitResult connectToSocket(std::unique_ptr<Socket>, int timeoutSecs);
WebSocketTransport _ws;

View File

@@ -21,8 +21,8 @@ namespace ix
{
WebSocketHandshake::WebSocketHandshake(
std::atomic<bool>& requestInitCancellation,
std::shared_ptr<Socket> socket,
WebSocketPerMessageDeflate& perMessageDeflate,
std::unique_ptr<Socket>& socket,
WebSocketPerMessageDeflatePtr& perMessageDeflate,
WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
std::atomic<bool>& enablePerMessageDeflate)
: _requestInitCancellation(requestInitCancellation)
@@ -230,7 +230,7 @@ namespace ix
_enablePerMessageDeflate = false;
}
// Otherwise try to initialize the deflate engine (zlib)
else if (!_perMessageDeflate.init(webSocketPerMessageDeflateOptions))
else if (!_perMessageDeflate->init(webSocketPerMessageDeflateOptions))
{
return WebSocketInitResult(
false, 0, "Failed to initialize per message deflate engine");
@@ -341,7 +341,7 @@ namespace ix
{
_enablePerMessageDeflate = true;
if (!_perMessageDeflate.init(webSocketPerMessageDeflateOptions))
if (!_perMessageDeflate->init(webSocketPerMessageDeflateOptions))
{
return WebSocketInitResult(
false, 0, "Failed to initialize per message deflate engine");

View File

@@ -23,8 +23,8 @@ namespace ix
{
public:
WebSocketHandshake(std::atomic<bool>& requestInitCancellation,
std::shared_ptr<Socket> _socket,
WebSocketPerMessageDeflate& perMessageDeflate,
std::unique_ptr<Socket>& _socket,
WebSocketPerMessageDeflatePtr& perMessageDeflate,
WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
std::atomic<bool>& enablePerMessageDeflate);
@@ -46,8 +46,8 @@ namespace ix
bool insensitiveStringCompare(const std::string& a, const std::string& b);
std::atomic<bool>& _requestInitCancellation;
std::shared_ptr<Socket> _socket;
WebSocketPerMessageDeflate& _perMessageDeflate;
std::unique_ptr<Socket>& _socket;
WebSocketPerMessageDeflatePtr& _perMessageDeflate;
WebSocketPerMessageDeflateOptions& _perMessageDeflateOptions;
std::atomic<bool>& _enablePerMessageDeflate;
};

View File

@@ -32,7 +32,7 @@ namespace ix
}
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket, const CancellationRequest& isCancellationRequested)
std::unique_ptr<Socket>& socket, const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;

View File

@@ -29,5 +29,5 @@ namespace ix
using WebSocketHttpHeaders = std::map<std::string, std::string, CaseInsensitiveLess>;
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket, const CancellationRequest& isCancellationRequested);
std::unique_ptr<Socket>& socket, const CancellationRequest& isCancellationRequested);
} // namespace ix

View File

@@ -57,4 +57,6 @@ namespace ix
std::unique_ptr<WebSocketPerMessageDeflateCompressor> _compressor;
std::unique_ptr<WebSocketPerMessageDeflateDecompressor> _decompressor;
};
using WebSocketPerMessageDeflatePtr = std::unique_ptr<WebSocketPerMessageDeflate>;
} // namespace ix

View File

@@ -89,8 +89,12 @@ namespace ix
if (in.empty())
{
uint8_t buf[6] = {0x02, 0x00, 0x00, 0x00, 0xff, 0xff};
out.append((char*) (buf), 6);
// See issue #167
// The normal buffer size should be 6 but
// we remove the 4 octets from the tail (#4)
uint8_t buf[2] = {0x02, 0x00};
out.append((char*) (buf), 2);
return true;
}

View File

@@ -71,7 +71,7 @@ namespace ix
_onConnectionCallback = callback;
}
void WebSocketServer::handleConnection(std::shared_ptr<Socket> socket,
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
{
setThreadName("WebSocketServer::" + connectionState->getId());
@@ -96,7 +96,7 @@ namespace ix
_clients.insert(webSocket);
}
auto status = webSocket->connectToSocket(socket, _handshakeTimeoutSecs);
auto status = webSocket->connectToSocket(std::move(socket), _handshakeTimeoutSecs);
if (status.success)
{
// Process incoming messages and execute callbacks

View File

@@ -59,7 +59,7 @@ namespace ix
const static bool kDefaultEnablePong;
// Methods
virtual void handleConnection(std::shared_ptr<Socket> socket,
virtual void handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
};

View File

@@ -51,26 +51,10 @@
#include <vector>
namespace
{
int greatestCommonDivisor(int a, int b)
{
while (b != 0)
{
int t = b;
b = a % b;
a = t;
}
return a;
}
} // namespace
namespace ix
{
const std::string WebSocketTransport::kPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultPingIntervalSecs(-1);
const int WebSocketTransport::kDefaultPingTimeoutSecs(-1);
const bool WebSocketTransport::kDefaultEnablePong(true);
const int WebSocketTransport::kClosingMaximumWaitingDelayInMs(300);
constexpr size_t WebSocketTransport::kChunkSize;
@@ -89,11 +73,8 @@ namespace ix
, _closingTimePoint(std::chrono::steady_clock::now())
, _enablePong(kDefaultEnablePong)
, _pingIntervalSecs(kDefaultPingIntervalSecs)
, _pingTimeoutSecs(kDefaultPingTimeoutSecs)
, _pingIntervalOrTimeoutGCDSecs(-1)
, _nextGCDTimePoint(std::chrono::steady_clock::now())
, _pongReceived(false)
, _lastSendPingTimePoint(std::chrono::steady_clock::now())
, _lastReceivePongTimePoint(std::chrono::steady_clock::now())
{
_readbuf.resize(kChunkSize);
}
@@ -107,29 +88,13 @@ namespace ix
const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions,
bool enablePong,
int pingIntervalSecs,
int pingTimeoutSecs)
int pingIntervalSecs)
{
_perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_socketTLSOptions = socketTLSOptions;
_enablePong = enablePong;
_pingIntervalSecs = pingIntervalSecs;
_pingTimeoutSecs = pingTimeoutSecs;
if (pingIntervalSecs > 0 && pingTimeoutSecs > 0)
{
_pingIntervalOrTimeoutGCDSecs =
greatestCommonDivisor(pingIntervalSecs, pingTimeoutSecs);
}
else if (_pingTimeoutSecs > 0)
{
_pingIntervalOrTimeoutGCDSecs = pingTimeoutSecs;
}
else
{
_pingIntervalOrTimeoutGCDSecs = pingIntervalSecs;
}
}
// Client
@@ -152,6 +117,7 @@ namespace ix
std::string errorMsg;
bool tls = protocol == "wss";
_socket = createSocket(tls, -1, errorMsg, _socketTLSOptions);
_perMessageDeflate = std::make_unique<WebSocketPerMessageDeflate>();
if (!_socket)
{
@@ -174,7 +140,7 @@ namespace ix
}
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(std::shared_ptr<Socket> socket,
WebSocketInitResult WebSocketTransport::connectToSocket(std::unique_ptr<Socket> socket,
int timeoutSecs)
{
std::lock_guard<std::mutex> lock(_socketMutex);
@@ -183,7 +149,8 @@ namespace ix
_useMask = false;
_blockingSend = true;
_socket = socket;
_socket = std::move(socket);
_perMessageDeflate = std::make_unique<WebSocketPerMessageDeflate>();
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket,
@@ -220,7 +187,8 @@ namespace ix
}
else if (readyState == ReadyState::OPEN)
{
initTimePointsAndGCDAfterConnect();
initTimePointsAfterConnect();
_pongReceived = false;
}
_readyState = readyState;
@@ -231,22 +199,12 @@ namespace ix
_onCloseCallback = onCloseCallback;
}
void WebSocketTransport::initTimePointsAndGCDAfterConnect()
void WebSocketTransport::initTimePointsAfterConnect()
{
{
std::lock_guard<std::mutex> lock(_lastSendPingTimePointMutex);
_lastSendPingTimePoint = std::chrono::steady_clock::now();
}
{
std::lock_guard<std::mutex> lock(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
}
if (_pingIntervalOrTimeoutGCDSecs > 0)
{
_nextGCDTimePoint = std::chrono::steady_clock::now() +
std::chrono::seconds(_pingIntervalOrTimeoutGCDSecs);
}
}
// Only consider send PING time points for that computation.
@@ -259,13 +217,12 @@ namespace ix
return now - _lastSendPingTimePoint > std::chrono::seconds(_pingIntervalSecs);
}
bool WebSocketTransport::pingTimeoutExceeded()
WebSocketSendInfo WebSocketTransport::sendHeartBeat()
{
if (_pingTimeoutSecs <= 0) return false;
std::lock_guard<std::mutex> lock(_lastReceivePongTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _lastReceivePongTimePoint > std::chrono::seconds(_pingTimeoutSecs);
_pongReceived = false;
std::stringstream ss;
ss << kPingMessage << "::" << _pingIntervalSecs << "s";
return sendPing(ss.str());
}
bool WebSocketTransport::closingDelayExceeded()
@@ -279,46 +236,32 @@ namespace ix
{
if (_readyState == ReadyState::OPEN)
{
// if (1) ping timeout is enabled and (2) duration since last received
// ping response (PONG) exceeds the maximum delay, then close the connection
if (pingTimeoutExceeded())
if (pingIntervalExceeded())
{
close(WebSocketCloseConstants::kInternalErrorCode,
WebSocketCloseConstants::kPingTimeoutMessage);
}
// If ping is enabled and no ping has been sent for a duration
// exceeding our ping interval, send a ping to the server.
else if (pingIntervalExceeded())
{
std::stringstream ss;
ss << kPingMessage << "::" << _pingIntervalSecs << "s";
sendPing(ss.str());
if (!_pongReceived)
{
// ping response (PONG) exceeds the maximum delay, close the connection
close(WebSocketCloseConstants::kInternalErrorCode,
WebSocketCloseConstants::kPingTimeoutMessage);
}
else
{
sendHeartBeat();
}
}
}
// No timeout if state is not OPEN, otherwise computed
// pingIntervalOrTimeoutGCD (equals to -1 if no ping and no ping timeout are set)
int lastingTimeoutDelayInMs =
(_readyState != ReadyState::OPEN) ? 0 : _pingIntervalOrTimeoutGCDSecs;
int lastingTimeoutDelayInMs = (_readyState != ReadyState::OPEN) ? 0 : _pingIntervalSecs;
if (_pingIntervalOrTimeoutGCDSecs > 0)
if (_pingIntervalSecs > 0)
{
// compute lasting delay to wait for next ping / timeout, if at least one set
auto now = std::chrono::steady_clock::now();
if (now >= _nextGCDTimePoint)
{
_nextGCDTimePoint = now + std::chrono::seconds(_pingIntervalOrTimeoutGCDSecs);
lastingTimeoutDelayInMs = _pingIntervalOrTimeoutGCDSecs * 1000;
}
else
{
lastingTimeoutDelayInMs =
(int) std::chrono::duration_cast<std::chrono::milliseconds>(_nextGCDTimePoint -
now)
.count();
}
lastingTimeoutDelayInMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>(
now - _lastSendPingTimePoint)
.count();
}
#ifdef _WIN32
@@ -626,9 +569,7 @@ namespace ix
}
else if (ws.opcode == wsheader_type::PONG)
{
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
_pongReceived = true;
emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)
@@ -772,7 +713,7 @@ namespace ix
if (compressedMessage && messageKind != MessageKind::FRAGMENT)
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
bool success = _perMessageDeflate->decompress(message, decompressedMessage);
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(decompressedMessage))
{
@@ -826,7 +767,7 @@ namespace ix
if (compress)
{
if (!_perMessageDeflate.compress(message, compressedMessage))
if (!_perMessageDeflate->compress(message, compressedMessage))
{
bool success = false;
compressionError = true;

View File

@@ -75,8 +75,7 @@ namespace ix
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
const SocketTLSOptions& socketTLSOptions,
bool enablePong,
int pingIntervalSecs,
int pingTimeoutSecs);
int pingIntervalSecs);
// Client
WebSocketInitResult connectToUrl(const std::string& url,
@@ -84,7 +83,7 @@ namespace ix
int timeoutSecs);
// Server
WebSocketInitResult connectToSocket(std::shared_ptr<Socket> socket, int timeoutSecs);
WebSocketInitResult connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs);
PollResult poll();
WebSocketSendInfo sendBinary(const std::string& message,
@@ -106,6 +105,9 @@ namespace ix
void dispatch(PollResult pollResult, const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
// internal
WebSocketSendInfo sendHeartBeat();
private:
std::string _url;
@@ -169,7 +171,7 @@ namespace ix
static constexpr size_t kChunkSize = 1 << 15;
// Underlying TCP socket
std::shared_ptr<Socket> _socket;
std::unique_ptr<Socket> _socket;
std::mutex _socketMutex;
// Hold the state of the connection (OPEN, CLOSED, etc...)
@@ -183,7 +185,7 @@ namespace ix
mutable std::mutex _closeDataMutex;
// Data used for Per Message Deflate compression (with zlib)
WebSocketPerMessageDeflate _perMessageDeflate;
WebSocketPerMessageDeflatePtr _perMessageDeflate;
WebSocketPerMessageDeflateOptions _perMessageDeflateOptions;
std::atomic<bool> _enablePerMessageDeflate;
@@ -202,39 +204,24 @@ namespace ix
static const bool kDefaultEnablePong;
// Optional ping and pong timeout
// if both ping interval and timeout are set (> 0),
// then use GCD of these value to wait for the lowest time
int _pingIntervalSecs;
int _pingTimeoutSecs;
int _pingIntervalOrTimeoutGCDSecs;
std::atomic<bool> _pongReceived;
static const int kDefaultPingIntervalSecs;
static const int kDefaultPingTimeoutSecs;
static const std::string kPingMessage;
// Record time step for ping/ ping timeout to ensure we wait for the right left duration
std::chrono::time_point<std::chrono::steady_clock> _nextGCDTimePoint;
// We record when ping are being sent so that we can know when to send the next one
// We also record when pong are being sent as a reply to pings, to close the connections
// if no pong were received sufficiently fast.
mutable std::mutex _lastSendPingTimePointMutex;
mutable std::mutex _lastReceivePongTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendPingTimePoint;
std::chrono::time_point<std::chrono::steady_clock> _lastReceivePongTimePoint;
// If this function returns true, it is time to send a new ping
bool pingIntervalExceeded();
// No PONG data was received through the socket for longer than ping timeout delay
bool pingTimeoutExceeded();
void initTimePointsAfterConnect();
// after calling close(), if no CLOSE frame answer is received back from the remote, we
// should close the connexion
bool closingDelayExceeded();
void initTimePointsAndGCDAfterConnect();
void sendCloseFrame(uint16_t code, const std::string& reason);
void closeSocketAndSwitchToClosedState(uint16_t code,

View File

@@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "8.2.3"
#define IX_WEBSOCKET_VERSION "9.0.3"

View File

@@ -23,6 +23,9 @@ ws_openssl:
ws_mbedtls:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4)
ws_mbedtls_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4 install)
ws_no_ssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_WS=1 .. ; make -j 4)
@@ -87,6 +90,26 @@ test:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; make -j 4)
(cd test ; python2.7 run.py -r)
test_tsan:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_ubsan:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableUndefinedBehaviorSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_asan:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableAddressSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_tsan_openssl:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_OPEN_SSL=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
(cd test ; python2.7 run.py -r)

View File

@@ -50,19 +50,21 @@ set (SOURCES
IXHttpServerTest.cpp
IXUnityBuildsTest.cpp
IXHttpTest.cpp
IXCobraChatTest.cpp
IXCobraMetricsPublisherTest.cpp
IXDNSLookupTest.cpp
IXWebSocketSubProtocolTest.cpp
IXSentryClientTest.cpp
IXWebSocketChatTest.cpp
IXCobraToSentryBotTest.cpp
)
# Some unittest don't work on windows yet
# Windows without TLS does not have hmac yet
if (UNIX)
list(APPEND SOURCES
IXWebSocketCloseTest.cpp
IXCobraChatTest.cpp
IXCobraMetricsPublisherTest.cpp
IXCobraToSentryBotTest.cpp
IXCobraToStatsdBotTest.cpp
)
endif()
@@ -94,13 +96,14 @@ if (JSONCPP_FOUND)
target_link_libraries(ixwebsocket_unittest ${JSONCPP_LIBRARIES})
endif()
# library with the most dependencies come first
target_link_libraries(ixwebsocket_unittest ixbots)
target_link_libraries(ixwebsocket_unittest ixsnake)
target_link_libraries(ixwebsocket_unittest ixcobra)
target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest ixwebsocket)
target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixsentry)
target_link_libraries(ixwebsocket_unittest ixbots)
target_link_libraries(ixwebsocket_unittest spdlog)

View File

@@ -14993,4 +14993,3 @@ using Catch::Detail::Approx;
// end catch_reenable_warnings.h
// end catch.hpp
#endif // TWOBLUECUBES_SINGLE_INCLUDE_CATCH_HPP_INCLUDED

View File

@@ -37,7 +37,9 @@ namespace
class CobraChat
{
public:
CobraChat(const std::string& user, const std::string& session, const std::string& endpoint);
CobraChat(const std::string& user,
const std::string& session,
const ix::CobraConfig& config);
void subscribe(const std::string& channel);
void start();
@@ -54,7 +56,7 @@ namespace
private:
std::string _user;
std::string _session;
std::string _endpoint;
ix::CobraConfig _cobraConfig;
std::queue<Json::Value> _publish_queue;
mutable std::mutex _queue_mutex;
@@ -72,10 +74,10 @@ namespace
CobraChat::CobraChat(const std::string& user,
const std::string& session,
const std::string& endpoint)
const ix::CobraConfig& config)
: _user(user)
, _session(session)
, _endpoint(endpoint)
, _cobraConfig(config)
, _stop(false)
, _connectedAndSubscribed(false)
{
@@ -122,32 +124,38 @@ namespace
void CobraChat::subscribe(const std::string& channel)
{
std::string filter;
_conn.subscribe(
channel, filter, [this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString());
std::string position("$");
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
_conn.subscribe(channel,
filter,
position,
[this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString());
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
// We are not interested in our own messages
if (msg_user == _user) return;
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
_receivedQueue.push(msg);
// We are not interested in our own messages
if (msg_user == _user) return;
std::stringstream ss;
ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > ";
log(ss.str());
});
_receivedQueue.push(msg);
std::stringstream ss;
ss << std::endl
<< msg_user << " > " << msg_text << std::endl
<< _user << " > ";
log(ss.str());
});
}
void CobraChat::sendMessage(const std::string& text)
@@ -167,19 +175,9 @@ namespace
//
void CobraChat::run()
{
// "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string channel = _session;
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
SocketTLSOptions socketTLSOptions;
_conn.configure(appkey,
_endpoint,
role,
secret,
ix::WebSocketPerMessageDeflateOptions(true),
socketTLSOptions);
_conn.configure(_cobraConfig);
_conn.connect();
_conn.setEventCallback([this, channel](ix::CobraConnectionEventType eventType,
@@ -263,7 +261,7 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
@@ -279,13 +277,20 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
setupTrafficTrackerCallback();
std::string session = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::string endpoint = makeCobraEndpoint(port, true);
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
CobraChat chatA("jean", session, endpoint);
CobraChat chatB("paul", session, endpoint);
CobraChat chatA("jean", session, config);
CobraChat chatB("paul", session, config);
chatA.start();
chatB.start();

View File

@@ -33,17 +33,6 @@ namespace
});
}
//
// This project / appkey is configure on cobra to not do any batching.
// This way we can start a subscriber and receive all messages as they come in.
//
std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f");
std::string CHANNEL("unittest_channel");
std::string PUBLISHER_ROLE("_pub");
std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db");
std::string SUBSCRIBER_ROLE("_sub");
std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba");
std::atomic<bool> gStop;
std::atomic<bool> gSubscriberConnectedAndSubscribed;
std::atomic<size_t> gUniqueMessageIdsCount;
@@ -55,28 +44,21 @@ namespace
//
// Background thread subscribe to the channel and validates what was sent
//
void startSubscriber(const std::string& endpoint)
void startSubscriber(const ix::CobraConfig& config, const std::string& channel)
{
gSubscriberConnectedAndSubscribed = false;
gUniqueMessageIdsCount = 0;
gMessageCount = 0;
ix::CobraConnection conn;
SocketTLSOptions socketTLSOptions;
conn.configure(APPKEY,
endpoint,
SUBSCRIBER_ROLE,
SUBSCRIBER_SECRET,
ix::WebSocketPerMessageDeflateOptions(true),
socketTLSOptions);
conn.configure(config);
conn.connect();
conn.setEventCallback([&conn](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
conn.setEventCallback([&conn, &channel](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
TLogger() << "Subscriber connected:";
@@ -93,23 +75,27 @@ namespace
{
log("Subscriber authenticated");
std::string filter;
conn.subscribe(
CHANNEL, filter, [](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString());
std::string position("$");
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
conn.subscribe(channel,
filter,
position,
[](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString());
gMessageCount++;
});
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
gMessageCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
TLogger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == CHANNEL)
if (subscriptionId == channel)
{
gSubscriberConnectedAndSubscribed = true;
}
@@ -121,7 +107,7 @@ namespace
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
TLogger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != CHANNEL)
if (subscriptionId != channel)
{
TLogger() << "Subscriber: unexpected channel " << subscriptionId;
}
@@ -138,7 +124,7 @@ namespace
std::this_thread::sleep_for(duration);
}
conn.unsubscribe(CHANNEL);
conn.unsubscribe(channel);
conn.disconnect();
gUniqueMessageIdsCount = gIds.size();
@@ -163,7 +149,8 @@ namespace
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
bool preferTLS = true;
snake::AppConfig appConfig = makeSnakeServerConfig(port, preferTLS);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
@@ -177,15 +164,21 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
setupTrafficTrackerCallback();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
std::string channel = ix::generateSessionId();
std::string endpoint = makeCobraEndpoint(port, preferTLS);
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
// Make channel name unique
CHANNEL += uuid4();
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
gStop = false;
std::thread bgThread(&startSubscriber, endpoint);
std::thread subscriberThread(&startSubscriber, config, channel);
int timeout = 10 * 1000; // 10s
@@ -205,18 +198,9 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
}
ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(APPKEY,
endpoint,
CHANNEL,
PUBLISHER_ROLE,
PUBLISHER_SECRET,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.configure(config, channel);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
cobraMetricsPublisher.enable(true);
Json::Value data;
data["foo"] = "bar";
@@ -292,7 +276,7 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
// Now stop the thread
gStop = true;
bgThread.join();
subscriberThread.join();
//
// Validate that we received all message kinds, and the correct number of messages

View File

@@ -1,7 +1,7 @@
/*
* cmd_satori_chat.cpp
* IXCobraToSentryTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
@@ -42,18 +42,9 @@ namespace
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
SocketTLSOptions socketTLSOptions;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
perMessageDeflate,
socketTLSOptions);
cobraMetricsPublisher.configure(config, channel);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
cobraMetricsPublisher.enable(true);
Json::Value msg;
msg["fps"] = 60;
@@ -78,12 +69,12 @@ namespace
}
} // namespace
TEST_CASE("Cobra_to_sentry_bot", "[foo]")
TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
@@ -96,10 +87,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]")
snakeServer.run();
// Start a fake sentry http server
SocketTLSOptions tlsOptionsServer;
tlsOptionsServer.certFile = ".certs/trusted-server-crt.pem";
tlsOptionsServer.keyFile = ".certs/trusted-server-key.pem";
tlsOptionsServer.caFile = ".certs/trusted-ca-crt.pem";
SocketTLSOptions tlsOptionsServer = makeServerTLSOptions(true);
int sentryPort = getFreePort();
ix::HttpServer sentryServer(sentryPort, "127.0.0.1");
@@ -139,20 +127,19 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]")
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
std::string endpoint = makeCobraEndpoint(port, true);
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
std::thread publisherThread(runPublisher, config, channel);
std::string filter;
std::string position("$");
bool verbose = true;
bool strict = true;
size_t maxQueueSize = 10;
@@ -163,15 +150,10 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]")
// -> https://github.com/openssl/openssl/issues/7967
// https://xxxxx:yyyyyy@sentry.io/1234567
std::stringstream oss;
std::string scheme("http://");
oss << scheme << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
oss << getHttpScheme() << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
std::string dsn = oss.str();
SocketTLSOptions tlsOptionsClient;
tlsOptionsClient.certFile = ".certs/trusted-client-crt.pem";
tlsOptionsClient.keyFile = ".certs/trusted-client-key.pem";
tlsOptionsClient.caFile = ".certs/trusted-ca-crt.pem";
SocketTLSOptions tlsOptionsClient = makeClientTLSOptions();
SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptionsClient);
@@ -182,6 +164,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[foo]")
int sentCount = cobra_to_sentry_bot(config,
channel,
filter,
position,
sentryClient,
verbose,
strict,

View File

@@ -0,0 +1,141 @@
/*
* IXCobraToStatsdTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <chrono>
#include <iostream>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
using namespace ix;
namespace
{
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
{
ix::CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.configure(config, channel);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true);
Json::Value msg;
msg["fps"] = 60;
cobraMetricsPublisher.setGenericAttributes("game", "ody");
// Wait a bit
ix::msleep(500);
// publish some messages
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
ix::msleep(500);
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
ix::msleep(500);
}
} // namespace
TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// Start a fake statsd server (ultimately)
// Run the bot for a small amount of time
std::string channel = ix::generateSessionId();
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
std::string endpoint = makeCobraEndpoint(port, true);
ix::CobraConfig config;
config.endpoint = endpoint;
config.appkey = appkey;
config.rolename = role;
config.rolesecret = secret;
config.socketTLSOptions = makeClientTLSOptions();
std::thread publisherThread(runPublisher, config, channel);
std::string filter;
std::string position("$");
bool verbose = true;
size_t maxQueueSize = 10;
bool enableHeartbeat = false;
// Only run the bot for 3 seconds
int runtime = 3;
std::string hostname("127.0.0.1");
// std::string hostname("www.google.com");
int statsdPort = 8125;
std::string prefix("ix.test");
StatsdClient statsdClient(hostname, statsdPort, prefix);
std::string errMsg;
bool initialized = statsdClient.init(errMsg);
if (!initialized)
{
spdlog::error(errMsg);
}
REQUIRE(initialized);
std::string fields("device.game\ndevice.os_name");
int sentCount = ix::cobra_to_statsd_bot(config,
channel,
filter,
position,
statsdClient,
fields,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
//
// We want at least 2 messages to be sent
//
REQUIRE(sentCount >= 2);
// Give us 1s for all messages to be received
ix::msleep(1000);
spdlog::info("Stopping snake server...");
snakeServer.stop();
spdlog::info("Stopping redis server...");
redisServer.stop();
publisherThread.join();
}
}

View File

@@ -149,7 +149,61 @@ namespace ix
return std::string(vec.begin(), vec.end());
}
snake::AppConfig makeSnakeServerConfig(int port)
SocketTLSOptions makeClientTLSOptions()
{
SocketTLSOptions tlsOptionsClient;
tlsOptionsClient.certFile = ".certs/trusted-client-crt.pem";
tlsOptionsClient.keyFile = ".certs/trusted-client-key.pem";
tlsOptionsClient.caFile = ".certs/trusted-ca-crt.pem";
return tlsOptionsClient;
}
SocketTLSOptions makeServerTLSOptions(bool preferTLS)
{
// Start a fake sentry http server
SocketTLSOptions tlsOptionsServer;
tlsOptionsServer.certFile = ".certs/trusted-server-crt.pem";
tlsOptionsServer.keyFile = ".certs/trusted-server-key.pem";
tlsOptionsServer.caFile = ".certs/trusted-ca-crt.pem";
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
tlsOptionsServer.tls = preferTLS;
#else
tlsOptionsServer.tls = false;
#endif
return tlsOptionsServer;
}
std::string getHttpScheme()
{
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
std::string scheme("https://");
#else
std::string scheme("http://");
#endif
return scheme;
}
std::string getWsScheme(bool preferTLS)
{
std::string scheme;
#if defined(IXWEBSOCKET_USE_MBED_TLS) || defined(IXWEBSOCKET_USE_OPEN_SSL)
if (preferTLS)
{
scheme = "wss://";
}
else
{
scheme = "ws://";
}
#else
scheme = "ws://";
#endif
return scheme;
}
snake::AppConfig makeSnakeServerConfig(int port, bool preferTLS)
{
snake::AppConfig appConfig;
appConfig.port = port;
@@ -158,6 +212,7 @@ namespace ix
appConfig.redisPort = getFreePort();
appConfig.redisPassword = "";
appConfig.redisHosts.push_back("localhost"); // only one host supported now
appConfig.socketTLSOptions = makeServerTLSOptions(preferTLS);
std::string appsConfigPath("appsConfig.json");
@@ -178,4 +233,13 @@ namespace ix
return appConfig;
}
std::string makeCobraEndpoint(int port, bool preferTLS)
{
std::stringstream ss;
ss << getWsScheme(preferTLS) << "localhost:" << port;
std::string endpoint = ss.str();
return endpoint;
}
} // namespace ix

View File

@@ -9,6 +9,7 @@
#include "IXGetFreePort.h"
#include <iostream>
#include <ixsnake/IXAppConfig.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex>
#include <spdlog/spdlog.h>
@@ -50,5 +51,12 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server);
snake::AppConfig makeSnakeServerConfig(int port);
snake::AppConfig makeSnakeServerConfig(int port, bool preferTLS);
SocketTLSOptions makeClientTLSOptions();
SocketTLSOptions makeServerTLSOptions(bool preferTLS);
std::string getHttpScheme();
std::string getWsScheme(bool preferTLS);
std::string makeCobraEndpoint(int port, bool preferTLS);
} // namespace ix

View File

@@ -33,4 +33,3 @@ start_server = websockets.serve(echo, host, 8766, max_size=2 ** 30)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@@ -39,5 +39,3 @@ start_server = websockets.serve(echo, host, 8766, max_size=2 ** 30, ssl=ssl_cont
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

4
third_party/.clang-format vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"DisableFormat": true,
"SortIncludes": false
}

View File

@@ -1,13 +0,0 @@
# Compiled Object files
*.slo
*.lo
*.o
# Compiled Dynamic libraries
*.so
*.dylib
# Compiled Static libraries
*.lai
*.la
*.a

View File

@@ -1,18 +0,0 @@
cmake_minimum_required(VERSION 3.1)
project(helloCLion)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
include_directories(
src
)
add_library(statsdcppclient STATIC src/statsd_client.cpp)
add_definitions("-fPIC")
target_link_libraries(statsdcppclient pthread)
add_executable(system_monitor demo/system_monitor.cpp)
target_link_libraries(system_monitor statsdcppclient)
add_executable(test_client demo/test_client.cpp)
target_link_libraries(test_client statsdcppclient)

View File

@@ -1,27 +0,0 @@
Copyright (c) 2014, Rex
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the {organization} nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -1,34 +0,0 @@
# a client sdk for StatsD, written in C++
## API
See [header file](src/statsd_client.h) for more api detail.
** Notice: this client is not thread-safe **
## Demo
### test\_client
This simple demo shows how the use this client.
### system\_monitor
This is a daemon for monitoring a Linux system.
It'll wake up every minute and monitor the following:
* load
* cpu
* free memory
* free swap (disabled)
* received bytes
* transmitted bytes
* procs
* uptime
The stats sent to statsd will be in "host.MACAddress" namespace.
Usage:
system_monitor statsd-host interface-to-monitor
e.g.
`system_monitor 172.16.42.1 eth0`

View File

@@ -1,164 +0,0 @@
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
#include <string.h>
#include <netdb.h>
#include <sys/sysinfo.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <net/if.h>
#include <string>
#include <vector>
#include "statsd_client.h"
using namespace std;
static int running = 1;
void sigterm(int sig)
{
running = 0;
}
string localhost() {
struct addrinfo hints, *info, *p;
string hostname(1024, '\0');
gethostname((char*)hostname.data(), hostname.capacity());
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC; /*either IPV4 or IPV6*/
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_CANONNAME;
if ( getaddrinfo(hostname.c_str(), "http", &hints, &info) == 0) {
for(p = info; p != NULL; p = p->ai_next) {
hostname = p->ai_canonname;
}
freeaddrinfo(info);
}
string::size_type pos = hostname.find(".");
while ( pos != string::npos )
{
hostname[pos] = '_';
pos = hostname.find(".", pos);
}
return hostname;
}
vector<string>& StringSplitTrim(const string& sData,
const string& sDelim, vector<string>& vItems)
{
vItems.clear();
string::size_type bpos = 0;
string::size_type epos = 0;
string::size_type nlen = sDelim.size();
while(sData.substr(epos,nlen) == sDelim)
{
epos += nlen;
}
bpos = epos;
while ((epos=sData.find(sDelim, epos)) != string::npos)
{
vItems.push_back(sData.substr(bpos, epos-bpos));
epos += nlen;
while(sData.substr(epos,nlen) == sDelim)
{
epos += nlen;
}
bpos = epos;
}
if(bpos != sData.size())
{
vItems.push_back(sData.substr(bpos, sData.size()-bpos));
}
return vItems;
}
int main(int argc, char *argv[])
{
FILE *net, *stat;
struct sysinfo si;
char line[256];
unsigned int user, nice, sys, idle, total, busy, old_total=0, old_busy=0;
if (argc != 3) {
printf( "Usage: %s host port\n"
"Eg: %s 127.0.0.1 8125\n",
argv[0], argv[0]);
exit(1);
}
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, SIG_IGN); /* will save one syscall per sleep */
signal(SIGTERM, sigterm);
if ( (net = fopen("/proc/net/dev", "r")) == NULL) {
perror("fopen");
exit(-1);
}
if ( (stat = fopen("/proc/stat", "r")) == NULL) {
perror("fopen");
exit(-1);
}
string ns = string("host.") + localhost().c_str() + ".";
statsd::StatsdClient client(argv[1], atoi(argv[2]), ns);
daemon(0,0);
printf("running in background.\n");
while(running) {
rewind(net);
vector<string> items;
while(!feof(net)) {
fgets(line, sizeof(line), net);
StringSplitTrim(line, " ", items);
if ( items.size() < 17 ) continue;
if ( items[0].find(":") == string::npos ) continue;
if ( items[1] == "0" and items[9] == "0" ) continue;
string netface = "network."+items[0].erase( items[0].find(":") );
client.count( netface+".receive.bytes", atoll(items[1].c_str()) );
client.count( netface+".receive.packets", atoll(items[2].c_str()) );
client.count( netface+".transmit.bytes", atoll(items[9].c_str()) );
client.count( netface+".transmit.packets", atoll(items[10].c_str()) );
}
sysinfo(&si);
client.gauge("system.load", 100*si.loads[0]/0x10000);
client.gauge("system.freemem", si.freeram/1024);
client.gauge("system.procs", si.procs);
client.count("system.uptime", si.uptime);
/* rewind doesn't do the trick for /proc/stat */
freopen("/proc/stat", "r", stat);
fgets(line, sizeof(line), stat);
sscanf(line, "cpu %u %u %u %u", &user, &nice, &sys, &idle);
total = user + sys + idle;
busy = user + sys;
client.send("system.cpu", 100 * (busy - old_busy)/(total - old_total), "g", 1.0);
old_total = total;
old_busy = busy;
sleep(6);
}
fclose(net);
fclose(stat);
exit(0);
}

View File

@@ -1,28 +0,0 @@
#include <iostream>
#include <unistd.h>
#include "statsd_client.h"
int main(void)
{
std::cout << "running..." << std::endl;
statsd::StatsdClient client;
statsd::StatsdClient client2("127.0.0.1", 8125, "myproject.abx.", true);
client.count("count1", 123, 1.0);
client.count("count2", 125, 1.0);
client.gauge("speed", 10);
int i;
for (i=0; i<1000; i++)
client2.timing("request", i);
sleep(1);
client.inc("count1", 1.0);
client2.dec("count2", 1.0);
// for(i=0; i<1000; i++) {
// client2.count("count3", i, 0.8);
// }
std::cout << "done" << std::endl;
return 0;
}

View File

@@ -1,245 +0,0 @@
#include <math.h>
#include <netdb.h>
#include <time.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <netinet/in.h>
#include "statsd_client.h"
using namespace std;
namespace statsd {
inline bool fequal(float a, float b)
{
const float epsilon = 0.0001;
return ( fabs(a - b) < epsilon );
}
inline bool should_send(float sample_rate)
{
if ( fequal(sample_rate, 1.0) )
{
return true;
}
float p = ((float)random() / RAND_MAX);
return sample_rate > p;
}
struct _StatsdClientData {
int sock;
struct sockaddr_in server;
string ns;
string host;
short port;
bool init;
char errmsg[1024];
};
StatsdClient::StatsdClient(const string& host,
int port,
const string& ns,
const bool batching)
: batching_(batching), exit_(false)
{
d = new _StatsdClientData;
d->sock = -1;
config(host, port, ns);
srandom((unsigned) time(NULL));
if (batching_) {
pthread_mutex_init(&batching_mutex_lock_, nullptr);
batching_thread_ = std::thread([this] {
while (!exit_) {
std::deque<std::string> staged_message_queue;
pthread_mutex_lock(&batching_mutex_lock_);
batching_message_queue_.swap(staged_message_queue);
pthread_mutex_unlock(&batching_mutex_lock_);
while(!staged_message_queue.empty()) {
send_to_daemon(staged_message_queue.front());
staged_message_queue.pop_front();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
}
}
StatsdClient::~StatsdClient()
{
if (batching_) {
exit_ = true;
batching_thread_.join();
pthread_mutex_destroy(&batching_mutex_lock_);
}
// close socket
if (d->sock >= 0) {
close(d->sock);
d->sock = -1;
delete d;
d = NULL;
}
}
void StatsdClient::config(const string& host, int port, const string& ns)
{
d->ns = ns;
d->host = host;
d->port = port;
d->init = false;
if ( d->sock >= 0 ) {
close(d->sock);
}
d->sock = -1;
}
int StatsdClient::init()
{
if ( d->init ) return 0;
d->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if ( d->sock == -1 ) {
snprintf(d->errmsg, sizeof(d->errmsg), "could not create socket, err=%s", strerror(errno));
return -1;
}
memset(&d->server, 0, sizeof(d->server));
d->server.sin_family = AF_INET;
d->server.sin_port = htons(d->port);
int ret = inet_aton(d->host.c_str(), &d->server.sin_addr);
if ( ret == 0 )
{
// host must be a domain, get it from internet
struct addrinfo hints, *result = NULL;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
ret = getaddrinfo(d->host.c_str(), NULL, &hints, &result);
if ( ret ) {
close(d->sock);
d->sock = -1;
snprintf(d->errmsg, sizeof(d->errmsg),
"getaddrinfo fail, error=%d, msg=%s", ret, gai_strerror(ret) );
return -2;
}
struct sockaddr_in* host_addr = (struct sockaddr_in*)result->ai_addr;
memcpy(&d->server.sin_addr, &host_addr->sin_addr, sizeof(struct in_addr));
freeaddrinfo(result);
}
d->init = true;
return 0;
}
/* will change the original string */
void StatsdClient::cleanup(string& key)
{
size_t pos = key.find_first_of(":|@");
while ( pos != string::npos )
{
key[pos] = '_';
pos = key.find_first_of(":|@");
}
}
int StatsdClient::dec(const string& key, float sample_rate)
{
return count(key, -1, sample_rate);
}
int StatsdClient::inc(const string& key, float sample_rate)
{
return count(key, 1, sample_rate);
}
int StatsdClient::count(const string& key, size_t value, float sample_rate)
{
return send(key, value, "c", sample_rate);
}
int StatsdClient::gauge(const string& key, size_t value, float sample_rate)
{
return send(key, value, "g", sample_rate);
}
int StatsdClient::timing(const string& key, size_t ms, float sample_rate)
{
return send(key, ms, "ms", sample_rate);
}
int StatsdClient::send(string key, size_t value, const string &type, float sample_rate)
{
if (!should_send(sample_rate)) {
return 0;
}
cleanup(key);
char buf[256];
if ( fequal( sample_rate, 1.0 ) )
{
snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
d->ns.c_str(), key.c_str(), value, type.c_str());
}
else
{
snprintf(buf, sizeof(buf), "%s%s:%zd|%s|@%.2f",
d->ns.c_str(), key.c_str(), value, type.c_str(), sample_rate);
}
return send(buf);
}
int StatsdClient::send(const string &message)
{
if (batching_) {
pthread_mutex_lock(&batching_mutex_lock_);
if (batching_message_queue_.empty() ||
batching_message_queue_.back().length() > max_batching_size) {
batching_message_queue_.push_back(message);
} else {
(*batching_message_queue_.rbegin()).append("\n").append(message);
}
pthread_mutex_unlock(&batching_mutex_lock_);
return 0;
} else {
return send_to_daemon(message);
}
}
int StatsdClient::send_to_daemon(const string &message) {
int ret = init();
if ( ret )
{
return ret;
}
ret = (int) sendto(d->sock, message.data(), message.size(), 0, (struct sockaddr *) &d->server, sizeof(d->server));
if ( ret == -1) {
snprintf(d->errmsg, sizeof(d->errmsg),
"sendto server fail, host=%s:%d, err=%s", d->host.c_str(), d->port, strerror(errno));
return -1;
}
return 0;
}
const char* StatsdClient::errmsg()
{
return d->errmsg;
}
}

View File

@@ -1,66 +0,0 @@
#ifndef STATSD_CLIENT_H
#define STATSD_CLIENT_H
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>
#include <string>
#include <thread>
#include <deque>
#include <iostream>
namespace statsd {
struct _StatsdClientData;
class StatsdClient {
public:
StatsdClient(const std::string& host="127.0.0.1", int port=8125, const std::string& ns = "", const bool batching = false);
~StatsdClient();
public:
// you can config at anytime; client will use new address (useful for Singleton)
void config(const std::string& host, int port, const std::string& ns = "");
const char* errmsg();
int send_to_daemon(const std::string &);
public:
int inc(const std::string& key, float sample_rate = 1.0);
int dec(const std::string& key, float sample_rate = 1.0);
int count(const std::string& key, size_t value, float sample_rate = 1.0);
int gauge(const std::string& key, size_t value, float sample_rate = 1.0);
int timing(const std::string& key, size_t ms, float sample_rate = 1.0);
public:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
int send(const std::string& message);
/* (Low Level Api) manually send a message
* type = "c", "g" or "ms"
*/
int send(std::string key, size_t value,
const std::string& type, float sample_rate);
protected:
int init();
void cleanup(std::string& key);
protected:
struct _StatsdClientData* d;
bool batching_;
bool exit_;
pthread_mutex_t batching_mutex_lock_;
std::thread batching_thread_;
std::deque<std::string> batching_message_queue_;
const uint64_t max_batching_size = 32768;
};
} // end namespace
#endif

View File

@@ -225,25 +225,3 @@ endif()
if(NOT SKIP_INSTALL_FILES AND NOT SKIP_INSTALL_ALL )
install(FILES ${ZLIB_PC} DESTINATION "${INSTALL_PKGCONFIG_DIR}")
endif()
#============================================================================
# Example binaries
#============================================================================
add_executable(example test/example.c)
target_link_libraries(example zlib)
add_test(example example)
add_executable(minigzip test/minigzip.c)
target_link_libraries(minigzip zlib)
if(HAVE_OFF64_T)
add_executable(example64 test/example.c)
target_link_libraries(example64 zlib)
set_target_properties(example64 PROPERTIES COMPILE_FLAGS "-D_FILE_OFFSET_BITS=64")
add_test(example64 example64)
add_executable(minigzip64 test/minigzip.c)
target_link_libraries(minigzip64 zlib)
set_target_properties(minigzip64 PROPERTIES COMPILE_FLAGS "-D_FILE_OFFSET_BITS=64")
endif()

View File

@@ -21,16 +21,11 @@ option(USE_TLS "Add TLS support" ON)
include_directories(ws .)
include_directories(ws ..)
include_directories(ws ../third_party)
include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise)
add_definitions(-DSPDLOG_COMPILED_LIB=1)
if (UNIX)
set( STATSD_CLIENT_SOURCES ../third_party/statsd-client-cpp/src/statsd_client.cpp)
endif()
find_package(JsonCpp)
if (NOT JSONCPP_FOUND)
include_directories(../third_party/jsoncpp)
@@ -39,7 +34,6 @@ endif()
add_executable(ws
../third_party/msgpack11/msgpack11.cpp
${STATSD_CLIENT_SOURCES}
${JSONCPP_SOURCES}
ws_http_client.cpp
@@ -66,24 +60,17 @@ add_executable(ws
ws_dns_lookup.cpp
ws.cpp)
# library with the most dependencies come first
target_link_libraries(ws ixbots)
target_link_libraries(ws ixsnake)
target_link_libraries(ws ixcobra)
target_link_libraries(ws ixsentry)
target_link_libraries(ws ixwebsocket)
target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore)
target_link_libraries(ws ixsentry)
target_link_libraries(ws ixbots)
target_link_libraries(ws spdlog)
if(NOT APPLE AND NOT USE_MBED_TLS)
find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS})
message(STATUS "OpenSSL: " ${OPENSSL_VERSION})
include_directories(${OPENSSL_INCLUDE_DIR})
target_link_libraries(ws ${OPENSSL_LIBRARIES})
endif()
if (JSONCPP_FOUND)
target_include_directories(ws PUBLIC ${JSONCPP_INCLUDE_DIRS})
target_link_libraries(ws ${JSONCPP_LIBRARIES})

View File

@@ -65,6 +65,7 @@ int main(int argc, char** argv)
std::string pidfile;
std::string channel;
std::string filter;
std::string position;
std::string message;
std::string password;
std::string prefix("ws.test.v0");
@@ -111,6 +112,7 @@ int main(int argc, char** argv)
int count = 1;
uint32_t maxWaitBetweenReconnectionRetries;
size_t maxQueueSize = 100;
int pingIntervalSecs = 30;
auto addTLSOptions = [&tlsOptions, &verifyNone](CLI::App* app) {
app->add_option(
@@ -169,6 +171,7 @@ int main(int argc, char** argv)
connectApp->add_option("--max_wait",
maxWaitBetweenReconnectionRetries,
"Max Wait Time between reconnection retries");
connectApp->add_option("--ping_interval", pingIntervalSecs, "Interval between sending pings");
connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
addTLSOptions(connectApp);
@@ -182,6 +185,7 @@ int main(int argc, char** argv)
echoServerApp->add_flag("-g", greetings, "Verbose");
echoServerApp->add_flag("-6", ipv6, "IpV6");
echoServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
addTLSOptions(echoServerApp);
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
@@ -229,6 +233,7 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_option("--channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_option("--position", position, "Stream position");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
cobraSubscribeApp->add_flag("--fluentd", fluentd, "Write fluentd prefix");
addTLSOptions(cobraSubscribeApp);
@@ -263,6 +268,10 @@ int main(int argc, char** argv)
cobra2statsd->add_flag("-v", verbose, "Verbose");
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
cobra2statsd->add_option("--position", position, "Stream position");
cobra2statsd->add_option("--queue_size",
maxQueueSize,
"Size of the queue to hold messages before they are sent to Sentry");
addTLSOptions(cobra2statsd);
addCobraConfig(cobra2statsd);
@@ -276,6 +285,7 @@ int main(int argc, char** argv)
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
cobra2sentry->add_option("--position", position, "Stream position");
addTLSOptions(cobra2sentry);
addCobraConfig(cobra2sentry);
@@ -284,6 +294,7 @@ int main(int argc, char** argv)
cobra2redisApp->add_option("channel", channel, "Channel")->required();
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
cobra2redisApp->add_option("--position", position, "Stream position");
cobra2redisApp->add_option("--hostname", hostname, "Redis hostname");
cobra2redisApp->add_option("--port", redisPort, "Redis port");
cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
@@ -384,7 +395,8 @@ int main(int argc, char** argv)
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions,
subprotocol);
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("chat"))
{
@@ -393,7 +405,7 @@ int main(int argc, char** argv)
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate);
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong);
}
else if (app.got_subcommand("broadcast_server"))
{
@@ -429,7 +441,7 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_subscribe"))
{
ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, quiet, fluentd);
ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd);
}
else if (app.got_subcommand("cobra_publish"))
{
@@ -441,18 +453,32 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_to_statsd"))
{
ret = ix::cobra_to_statsd_bot(
cobraConfig, channel, filter, hostname, statsdPort, prefix, fields, verbose);
bool enableHeartbeat = true;
int runtime = -1;
ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
ret = ix::cobra_to_statsd_bot(cobraConfig,
channel,
filter,
position,
statsdClient,
fields,
verbose,
maxQueueSize,
enableHeartbeat,
runtime);
}
else if (app.got_subcommand("cobra_to_sentry"))
{
bool enableHeartbeat = true;
int runtime = -1;
ix::SentryClient sentryClient(dsn);
sentryClient.setTLSOptions(tlsOptions);
ret = ix::cobra_to_sentry_bot(cobraConfig,
channel,
filter,
position,
sentryClient,
verbose,
strict,
@@ -462,7 +488,8 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("cobra_metrics_to_redis"))
{
ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, hostname, redisPort);
ret = ix::ws_cobra_metrics_to_redis(
cobraConfig, channel, filter, position, hostname, redisPort);
}
else if (app.got_subcommand("snake"))
{

View File

@@ -32,7 +32,8 @@ namespace ix
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate);
bool disablePerMessageDeflate,
bool disablePong);
int ws_broadcast_server_main(int port,
const std::string& hostname,
@@ -50,7 +51,8 @@ namespace ix
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol);
const std::string& subprotocol,
int pingIntervalSecs);
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate,
@@ -78,6 +80,7 @@ namespace ix
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool quiet,
bool fluentd);
@@ -93,6 +96,7 @@ namespace ix
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
const std::string& host,
int port);

View File

@@ -30,15 +30,7 @@ namespace ix
CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true);
bool enablePerMessageDeflate = true;
cobraMetricsPublisher.configure(config.appkey,
config.endpoint,
channel,
config.rolename,
config.rolesecret,
enablePerMessageDeflate,
config.socketTLSOptions);
cobraMetricsPublisher.configure(config, channel);
while (!cobraMetricsPublisher.isAuthenticated())
;

View File

@@ -20,6 +20,7 @@ namespace ix
int ws_cobra_metrics_to_redis(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
const std::string& host,
int port)
{
@@ -100,6 +101,7 @@ namespace ix
conn.setEventCallback([&conn,
&channel,
&filter,
&position,
&msgCount,
&msgPerSeconds,
&conditionVariableMutex,
@@ -125,6 +127,7 @@ namespace ix
conn.subscribe(
channel,
filter,
position,
[&msgPerSeconds, &msgCount, &conditionVariableMutex, &condition, &queue](
const Json::Value& msg, const std::string& /*position*/) {
{

View File

@@ -41,6 +41,7 @@ namespace ix
int ws_cobra_subscribe_main(const ix::CobraConfig& config,
const std::string& channel,
const std::string& filter,
const std::string& position,
bool quiet,
bool fluentd)
{
@@ -67,59 +68,76 @@ namespace ix
std::thread t(timer);
conn.setEventCallback(
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet, &fluentd](
ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
std::string subscriptionPosition(position);
for (auto it : headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd](
const Json::Value& msg, const std::string& position) {
if (!quiet)
{
writeToStdout(fluentd, jsonWriter, msg, position);
}
conn.setEventCallback([&conn,
&channel,
&jsonWriter,
&filter,
&subscriptionPosition,
&msgCount,
&msgPerSeconds,
&quiet,
&fluentd](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
msgPerSeconds++;
msgCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
for (auto it : headers)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
spdlog::info("{}: {}", it.first, it.second);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition);
conn.subscribe(
channel,
filter,
subscriptionPosition,
[&jsonWriter,
&quiet,
&msgPerSeconds,
&msgCount,
&fluentd,
&subscriptionPosition](const Json::Value& msg, const std::string& position) {
if (!quiet)
{
writeToStdout(fluentd, jsonWriter, msg, position);
}
msgPerSeconds++;
msgCount++;
subscriptionPosition = position;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Pong)
{
spdlog::info("Received websocket pong");
}
});
while (true)
{

View File

@@ -4,6 +4,7 @@
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXBench.h"
#include "linenoise.hpp"
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
@@ -24,7 +25,8 @@ namespace ix
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol);
const std::string& subprotocol,
int pingIntervalSecs);
void subscribe(const std::string& channel);
void start();
@@ -61,7 +63,8 @@ namespace ix
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol)
const std::string& subprotocol,
int pingIntervalSecs)
: _url(url)
, _disablePerMessageDeflate(disablePerMessageDeflate)
, _binaryMode(binaryMode)
@@ -74,6 +77,7 @@ namespace ix
}
_webSocket.setMaxWaitBetweenReconnectionRetries(maxWaitBetweenReconnectionRetries);
_webSocket.setTLSOptions(tlsOptions);
_webSocket.setPingInterval(pingIntervalSecs);
_headers = parseHeaders(headers);
@@ -126,7 +130,10 @@ namespace ix
void WebSocketConnect::stop()
{
_webSocket.stop();
{
Bench bench("ws_connect: stop connection");
_webSocket.stop();
}
}
void WebSocketConnect::start()
@@ -223,7 +230,8 @@ namespace ix
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol)
const std::string& subprotocol,
int pingIntervalSecs)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url,
@@ -233,7 +241,8 @@ namespace ix
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions,
subprotocol);
subprotocol,
pingIntervalSecs);
webSocketChat.start();
while (true)

View File

@@ -16,7 +16,8 @@ namespace ix
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
bool disablePerMessageDeflate)
bool disablePerMessageDeflate,
bool disablePong)
{
spdlog::info("Listening on {}:{}", hostname, port);
@@ -35,6 +36,12 @@ namespace ix
server.disablePerMessageDeflate();
}
if (disablePong)
{
spdlog::info("Disable responding to PING messages with PONG");
server.disablePong();
}
server.setOnConnectionCallback(
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {

View File

@@ -4,6 +4,7 @@
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXBench.h"
#include <chrono>
#include <condition_variable>
#include <fstream>
@@ -195,48 +196,6 @@ namespace ix
_webSocket.start();
}
class Bench
{
public:
Bench(const std::string& description)
: _description(description)
, _start(std::chrono::system_clock::now())
, _reported(false)
{
;
}
~Bench()
{
if (!_reported)
{
report();
}
}
void report()
{
auto now = std::chrono::system_clock::now();
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
_ms = milliseconds.count();
spdlog::info("{} completed in {} ms", _description, _ms);
_reported = true;
}
uint64_t getDuration() const
{
return _ms;
}
private:
std::string _description;
std::chrono::time_point<std::chrono::system_clock> _start;
uint64_t _ms;
bool _reported;
};
bool WebSocketSender::sendMessage(const std::string& filename, bool throttle)
{
std::vector<uint8_t> content;