Compare commits

..

1 Commits

Author SHA1 Message Date
27dabaaf86 tsan openssl mac ci 2020-03-26 16:38:41 -07:00
24 changed files with 2291 additions and 3306 deletions

View File

@ -5,21 +5,14 @@ on:
- 'docs/**' - 'docs/**'
jobs: jobs:
linux: # linux:
runs-on: ubuntu-latest # runs-on: ubuntu-latest
steps: # steps:
- uses: actions/checkout@v1 # - uses: actions/checkout@v1
- name: make test # - name: make test
run: make test # run: make test
mac_tsan_sectransport: mac_openssl_tsan:
runs-on: macOS-latest
steps:
- uses: actions/checkout@v1
- name: make test_tsan
run: make test_tsan
mac_tsan_openssl:
runs-on: macOS-latest runs-on: macOS-latest
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
@ -28,26 +21,24 @@ jobs:
- name: make test - name: make test
run: make test_tsan_openssl run: make test_tsan_openssl
mac_tsan_mbedtls: # tsan:
runs-on: macOS-latest # runs-on: macOS-latest
steps: # steps:
- uses: actions/checkout@v1 # - uses: actions/checkout@v1
- name: install mbedtls # - name: make test_tsan
run: brew install mbedtls # run: make test_tsan
- name: make test
run: make test_tsan_mbedtls
win: # win:
runs-on: windows-latest # runs-on: windows-latest
steps: # steps:
- uses: actions/checkout@v1 # - uses: actions/checkout@v1
- uses: seanmiddleditch/gha-setup-vsdevenv@master # - uses: seanmiddleditch/gha-setup-vsdevenv@master
- run: | # - run: |
mkdir build # mkdir build
cd build # cd build
cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 .. # cmake -DCMAKE_CXX_COMPILER=cl.exe -DUSE_WS=1 -DUSE_TEST=1 ..
- run: cmake --build build # - run: cmake --build build
# Running the unittest does not work # # Running the unittest does not work
#- run: ../build/test/ixwebsocket_unittest.exe # #- run: ../build/test/ixwebsocket_unittest.exe
# working-directory: test # # working-directory: test

View File

@ -200,9 +200,7 @@ if (USE_TLS AND USE_MBED_TLS)
endif() endif()
endif() endif()
if (NOT ZLIB_FOUND)
find_package(ZLIB) find_package(ZLIB)
endif()
if (ZLIB_FOUND) if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS}) include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES}) target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})

View File

@ -1 +1 @@
docker/Dockerfile.alpine docker/Dockerfile.centos

View File

@ -1,13 +1,12 @@
FROM alpine:3.11 as build FROM alpine:3.11 as build
RUN apk add --no-cache \ RUN apk add --no-cache gcc g++ musl-dev linux-headers cmake openssl-dev
gcc g++ musl-dev linux-headers \ RUN apk add --no-cache make
cmake mbedtls-dev make zlib-dev RUN apk add --no-cache zlib-dev
RUN addgroup -S app && \ RUN addgroup -S app && adduser -S -G app app
adduser -S -G app app && \ RUN chown -R app:app /opt
chown -R app:app /opt && \ RUN chown -R app:app /usr/local
chown -R app:app /usr/local
# There is a bug in CMake where we cannot build from the root top folder # There is a bug in CMake where we cannot build from the root top folder
# So we build from /opt # So we build from /opt
@ -15,21 +14,22 @@ COPY --chown=app:app . /opt
WORKDIR /opt WORKDIR /opt
USER app USER app
RUN make ws_mbedtls_install && \ RUN [ "make", "ws_install" ]
sh tools/trim_repo_for_docker.sh RUN [ "rm", "-rf", "build" ]
FROM alpine:3.11 as runtime FROM alpine:3.11 as runtime
RUN apk add --no-cache libstdc++ mbedtls ca-certificates && \ RUN apk add --no-cache libstdc++
addgroup -S app && \ RUN apk add --no-cache strace
adduser -S -G app app RUN apk add --no-cache gdb
RUN addgroup -S app && adduser -S -G app app
COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws
RUN chmod +x /usr/local/bin/ws
RUN ldd /usr/local/bin/ws
# COPY --chown=app:app --from=build /opt /opt # Copy source code for gcc
COPY --chown=app:app --from=build /opt /opt
RUN chmod +x /usr/local/bin/ws && \
ldd /usr/local/bin/ws
# Now run in usermode # Now run in usermode
USER app USER app

View File

@ -1,46 +1,6 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.1.9] - 2020-03-30
(cobra to statsd bot) add ability to extract a numerical value and send a timer event to statsd, with the --timer option
## [9.1.8] - 2020-03-29
(cobra to statsd bot) bot init was missing + capture socket error
## [9.1.7] - 2020-03-29
(cobra to statsd bot) add ability to extract a numerical value and send a gauge event to statsd, with the --gauge option
## [9.1.6] - 2020-03-29
(ws cobra subscriber) use a Json::StreamWriter to write to std::cout, and save one std::string allocation for each message printed
## [9.1.5] - 2020-03-29
(docker) trim down docker image (300M -> 12M) / binary built without symbol and size optimization, and source code not copied over
## [9.1.4] - 2020-03-28
(jsoncpp) update bundled copy to version 1.9.3 (at sha 3beb37ea14aec1bdce1a6d542dc464d00f4a6cec)
## [9.1.3] - 2020-03-27
(docker) alpine docker build with release with debug info, and bundle ca-certificates
## [9.1.2] - 2020-03-26
(mac ssl) rename DarwinSSL -> SecureTransport (see this too -> https://github.com/curl/curl/issues/3733)
## [9.1.1] - 2020-03-26
(websocket) fix data race accessing _socket object without mutex protection when calling wakeUpFromPoll in WebSocketTransport.cpp
## [9.1.0] - 2020-03-26
(ixcobra) add explicit event types for handshake, authentication and subscription failure, and handle those by exiting in ws_cobra_subcribe and friends
## [9.0.3] - 2020-03-24 ## [9.0.3] - 2020-03-24
(ws connect) display statistics about how much time it takes to stop the connection (ws connect) display statistics about how much time it takes to stop the connection

View File

@ -37,7 +37,6 @@ namespace ix
std::atomic<bool> errorSending(false); std::atomic<bool> errorSending(false);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> throttled(false); std::atomic<bool> throttled(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize); QueueManager queueManager(maxQueueSize);
@ -180,7 +179,6 @@ namespace ix
verbose, verbose,
&throttled, &throttled,
&receivedCount, &receivedCount,
&fatalCobraError,
&queueManager](ix::CobraConnectionEventType eventType, &queueManager](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -242,21 +240,6 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
// Run forever // Run forever
@ -268,7 +251,6 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (strict && errorSending) break; if (strict && errorSending) break;
if (fatalCobraError) break;
} }
} }
// Run for a duration, used by unittesting now // Run for a duration, used by unittesting now
@ -280,7 +262,6 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (strict && errorSending) break; if (strict && errorSending) break;
if (fatalCobraError) break;
} }
} }
@ -291,15 +272,12 @@ namespace ix
conn.disconnect(); conn.disconnect();
stop = true; stop = true;
// progress thread
t1.join(); t1.join();
// heartbeat thread
if (t2.joinable()) t2.join(); if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// sentry sender thread
t3.join(); t3.join();
return ((strict && errorSending) || fatalCobraError) ? -1 : (int) sentCount; return (strict && errorSending) ? -1 : (int) sentCount;
} }
} // namespace ix } // namespace ix

View File

@ -40,7 +40,7 @@ namespace ix
// Extract an attribute from a Json Value. // Extract an attribute from a Json Value.
// extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz // extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz
// //
Json::Value extractAttr(const std::string& attr, const Json::Value& jsonValue) std::string extractAttr(const std::string& attr, const Json::Value& jsonValue)
{ {
// Split by . // Split by .
std::string token; std::string token;
@ -53,7 +53,7 @@ namespace ix
val = val[token]; val = val[token];
} }
return val; return val.asString();
} }
int cobra_to_statsd_bot(const ix::CobraConfig& config, int cobra_to_statsd_bot(const ix::CobraConfig& config,
@ -62,8 +62,6 @@ namespace ix
const std::string& position, const std::string& position,
StatsdClient& statsdClient, StatsdClient& statsdClient,
const std::string& fields, const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose, bool verbose,
size_t maxQueueSize, size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,
@ -79,11 +77,10 @@ namespace ix
std::atomic<uint64_t> sentCount(0); std::atomic<uint64_t> sentCount(0);
std::atomic<uint64_t> receivedCount(0); std::atomic<uint64_t> receivedCount(0);
std::atomic<bool> stop(false); std::atomic<bool> stop(false);
std::atomic<bool> fatalCobraError(false);
QueueManager queueManager(maxQueueSize); QueueManager queueManager(maxQueueSize);
auto progress = [&sentCount, &receivedCount, &stop] { auto timer = [&sentCount, &receivedCount, &stop] {
while (!stop) while (!stop)
{ {
spdlog::info("messages received {} sent {}", receivedCount, sentCount); spdlog::info("messages received {} sent {}", receivedCount, sentCount);
@ -91,18 +88,16 @@ namespace ix
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("timer thread done");
}; };
std::thread t1(progress); std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat] { auto heartbeat = [&sentCount, &receivedCount, &enableHeartbeat] {
std::string state("na"); std::string state("na");
if (!enableHeartbeat) return; if (!enableHeartbeat) return;
while (!stop) while (true)
{ {
std::stringstream ss; std::stringstream ss;
ss << "messages received " << receivedCount; ss << "messages received " << receivedCount;
@ -120,13 +115,11 @@ namespace ix
auto duration = std::chrono::minutes(1); auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
spdlog::info("heartbeat thread done");
}; };
std::thread t2(heartbeat); std::thread t2(heartbeat);
auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop, &gauge, &timer, &fatalCobraError, &verbose] { auto statsdSender = [&statsdClient, &queueManager, &sentCount, &tokens, &stop] {
while (true) while (true)
{ {
Json::Value msg = queueManager.pop(); Json::Value msg = queueManager.pop();
@ -138,62 +131,10 @@ namespace ix
for (auto&& attr : tokens) for (auto&& attr : tokens)
{ {
id += "."; id += ".";
auto val = extractAttr(attr, msg); id += extractAttr(attr, msg);
id += val.asString();
} }
if (gauge.empty() && timer.empty())
{
statsdClient.count(id, 1); statsdClient.count(id, 1);
}
else
{
std::string attrName = (!gauge.empty()) ? gauge : timer;
auto val = extractAttr(attrName, msg);
size_t x;
if (val.isInt())
{
x = (size_t) val.asInt();
}
else if (val.isInt64())
{
x = (size_t) val.asInt64();
}
else if (val.isUInt())
{
x = (size_t) val.asUInt();
}
else if (val.isUInt64())
{
x = (size_t) val.asUInt64();
}
else if (val.isDouble())
{
x = (size_t) val.asUInt64();
}
else
{
spdlog::error("Gauge {} is not a numberic type", gauge);
fatalCobraError = true;
break;
}
if (verbose)
{
spdlog::info("{} - {} -> {}", id, attrName, x);
}
if (!gauge.empty())
{
statsdClient.gauge(id, x);
}
else
{
statsdClient.timing(id, x);
}
}
sentCount += 1; sentCount += 1;
} }
}; };
@ -201,7 +142,7 @@ namespace ix
std::thread t3(statsdSender); std::thread t3(statsdSender);
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount, &fatalCobraError]( [&conn, &channel, &filter, &position, &jsonWriter, verbose, &queueManager, &receivedCount](
ix::CobraConnectionEventType eventType, ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -259,21 +200,6 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
// Run forever // Run forever
@ -283,8 +209,6 @@ namespace ix
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
} }
} }
// Run for a duration, used by unittesting now // Run for a duration, used by unittesting now
@ -294,8 +218,6 @@ namespace ix
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
if (fatalCobraError) break;
} }
} }
@ -306,15 +228,12 @@ namespace ix
conn.disconnect(); conn.disconnect();
stop = true; stop = true;
// progress thread
t1.join(); t1.join();
// heartbeat thread
if (t2.joinable()) t2.join(); if (t2.joinable()) t2.join();
spdlog::info("heartbeat thread done");
// statsd sender thread
t3.join(); t3.join();
return fatalCobraError ? -1 : (int) sentCount; return (int) sentCount;
} }
} // namespace ix } // namespace ix

View File

@ -18,8 +18,6 @@ namespace ix
const std::string& position, const std::string& position,
StatsdClient& statsdClient, StatsdClient& statsdClient,
const std::string& fields, const std::string& fields,
const std::string& gauge,
const std::string& timer,
bool verbose, bool verbose,
size_t maxQueueSize, size_t maxQueueSize,
bool enableHeartbeat, bool enableHeartbeat,

View File

@ -43,10 +43,11 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
#include <iostream>
namespace ix namespace ix
{ {
const uint64_t StatsdClient::_maxQueueSize = 32768;
StatsdClient::StatsdClient(const std::string& host, StatsdClient::StatsdClient(const std::string& host,
int port, int port,
const std::string& prefix) const std::string& prefix)
@ -55,11 +56,23 @@ namespace ix
, _prefix(prefix) , _prefix(prefix)
, _stop(false) , _stop(false)
{ {
_thread = std::thread([this] _thread = std::thread([this] {
{
while (!_stop) while (!_stop)
{ {
flushQueue(); std::deque<std::string> stagedQueue;
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.swap(stagedQueue);
}
while (!stagedQueue.empty())
{
auto message = stagedQueue.front();
_socket.sendto(message);
stagedQueue.pop_front();
}
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
}); });
@ -119,34 +132,26 @@ namespace ix
cleanup(key); cleanup(key);
char buf[256]; char buf[256];
snprintf(buf, sizeof(buf), "%s%s:%zd|%s\n", snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
_prefix.c_str(), key.c_str(), value, type.c_str()); _prefix.c_str(), key.c_str(), value, type.c_str());
enqueue(buf); return send(buf);
return 0;
} }
void StatsdClient::enqueue(const std::string& message) int StatsdClient::send(const std::string &message)
{ {
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
if (_queue.empty() ||
_queue.back().length() > _maxQueueSize)
{
_queue.push_back(message); _queue.push_back(message);
} }
else
void StatsdClient::flushQueue()
{ {
std::lock_guard<std::mutex> lock(_mutex); (*_queue.rbegin()).append("\n").append(message);
while (!_queue.empty())
{
auto message = _queue.front();
auto ret = _socket.sendto(message);
if (ret != 0)
{
std::cerr << "error: "
<< strerror(UdpSocket::getErrno())
<< std::endl;
}
_queue.pop_front();
} }
return 0;
} }
} // end namespace ix } // end namespace ix

View File

@ -32,7 +32,11 @@ namespace ix
int timing(const std::string& key, size_t ms); int timing(const std::string& key, size_t ms);
private: private:
void enqueue(const std::string& message); /**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
int send(const std::string& message);
/* (Low Level Api) manually send a message /* (Low Level Api) manually send a message
* type = "c", "g" or "ms" * type = "c", "g" or "ms"
@ -40,7 +44,6 @@ namespace ix
int send(std::string key, size_t value, const std::string& type); int send(std::string key, size_t value, const std::string& type);
void cleanup(std::string& key); void cleanup(std::string& key);
void flushQueue();
UdpSocket _socket; UdpSocket _socket;
@ -53,6 +56,7 @@ namespace ix
std::mutex _mutex; // for the queue std::mutex _mutex; // for the queue
std::deque<std::string> _queue; std::deque<std::string> _queue;
static const uint64_t _maxQueueSize;
}; };
} // end namespace ix } // end namespace ix

View File

@ -166,8 +166,7 @@ namespace ix
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
invokeEventCallback(ix::CobraConnection_EventType_Handshake_Error, invokeErrorCallback("Handshake error", msg->str);
msg->str);
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -177,8 +176,7 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
invokeEventCallback(ix::CobraConnection_EventType_Authentication_Error, invokeErrorCallback("Authentication error", msg->str);
msg->str);
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -193,8 +191,7 @@ namespace ix
} }
else if (action == "rtm/subscribe/error") else if (action == "rtm/subscribe/error")
{ {
invokeEventCallback(ix::CobraConnection_EventType_Subscription_Error, invokeErrorCallback("Subscription error", msg->str);
msg->str);
} }
else if (action == "rtm/unsubscribe/ok") else if (action == "rtm/unsubscribe/ok")
{ {

View File

@ -37,10 +37,7 @@ namespace ix
CobraConnection_EventType_Subscribed = 4, CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5, CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6, CobraConnection_EventType_Published = 6,
CobraConnection_EventType_Pong = 7, CobraConnection_EventType_Pong = 7
CobraConnection_EventType_Handshake_Error = 8,
CobraConnection_EventType_Authentication_Error = 9,
CobraConnection_EventType_Subscription_Error = 10
}; };
enum CobraConnectionPublishMode enum CobraConnectionPublishMode

View File

@ -71,7 +71,7 @@ namespace ix
#elif defined(IXWEBSOCKET_USE_OPEN_SSL) #elif defined(IXWEBSOCKET_USE_OPEN_SSL)
ss << " ssl/OpenSSL " << OPENSSL_VERSION_TEXT; ss << " ssl/OpenSSL " << OPENSSL_VERSION_TEXT;
#elif __APPLE__ #elif __APPLE__
ss << " ssl/SecureTransport"; ss << " ssl/DarwinSSL";
#endif #endif
#else #else
ss << " nossl"; ss << " nossl";

View File

@ -625,7 +625,7 @@ namespace ix
// send back the CLOSE frame // send back the CLOSE frame
sendCloseFrame(code, reason); sendCloseFrame(code, reason);
wakeUpFromPoll(Socket::kCloseRequest); _socket->wakeUpFromPoll(Socket::kCloseRequest);
bool remote = true; bool remote = true;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote); closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
@ -845,7 +845,7 @@ namespace ix
// Request to flush the send buffer on the background thread if it isn't empty // Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty()) if (!isSendBufferEmpty())
{ {
wakeUpFromPoll(Socket::kSendRequest); _socket->wakeUpFromPoll(Socket::kSendRequest);
// FIXME: we should have a timeout when sending large messages: see #131 // FIXME: we should have a timeout when sending large messages: see #131
if (_blockingSend && !flushSendBuffer()) if (_blockingSend && !flushSendBuffer())
@ -1063,12 +1063,6 @@ namespace ix
_socket->close(); _socket->close();
} }
bool WebSocketTransport::wakeUpFromPoll(uint64_t wakeUpCode)
{
std::lock_guard<std::mutex> lock(_socketMutex);
return _socket->wakeUpFromPoll(wakeUpCode);
}
void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code, void WebSocketTransport::closeSocketAndSwitchToClosedState(uint16_t code,
const std::string& reason, const std::string& reason,
size_t closeWireSize, size_t closeWireSize,
@ -1116,9 +1110,8 @@ namespace ix
setReadyState(ReadyState::CLOSING); setReadyState(ReadyState::CLOSING);
sendCloseFrame(code, reason); sendCloseFrame(code, reason);
// wake up the poll, but do not close yet // wake up the poll, but do not close yet
wakeUpFromPoll(Socket::kSendRequest); _socket->wakeUpFromPoll(Socket::kSendRequest);
} }
size_t WebSocketTransport::bufferedAmount() const size_t WebSocketTransport::bufferedAmount() const

View File

@ -229,8 +229,6 @@ namespace ix
size_t closeWireSize, size_t closeWireSize,
bool remote); bool remote);
bool wakeUpFromPoll(uint64_t wakeUpCode);
bool flushSendBuffer(); bool flushSendBuffer();
bool sendOnSocket(); bool sendOnSocket();
bool receiveFromSocket(); bool receiveFromSocket();

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.1.9" #define IX_WEBSOCKET_VERSION "9.0.3"

View File

@ -1,13 +1,5 @@
# #
# This makefile is used for convenience, and wrap simple cmake commands # This makefile is just used to easily work with docker (linux build)
# You don't need to use it as an end user, it is more for developer.
#
# * work with docker (linux build)
# * execute the unittest
#
# The default target will install ws, the command line tool coming with
# IXWebSocket into /usr/local/bin
#
# #
all: brew all: brew
@ -16,23 +8,14 @@ install: brew
# Use -DCMAKE_INSTALL_PREFIX= to install into another location # Use -DCMAKE_INSTALL_PREFIX= to install into another location
# on osx it is good practice to make /usr/local user writable # on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local # sudo chown -R `whoami`/staff /usr/local
#
# Release, Debug, MinSizeRel, RelWithDebInfo are the build types
#
brew: brew:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; make -j 4 install) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; make -j 4 install)
# Docker default target. We've add problem with OpenSSL and TLS 1.3 (on the
# server side ?) and I can't work-around it easily, so we're using mbedtls on
# Linux for the SSL backend, which works great.
ws_mbedtls_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4 install)
ws: ws:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4)
ws_install: ws_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4 install) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4 install)
ws_openssl: ws_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)
@ -40,6 +23,9 @@ ws_openssl:
ws_mbedtls: ws_mbedtls:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4)
ws_mbedtls_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j 4 install)
ws_no_ssl: ws_no_ssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_WS=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_WS=1 .. ; make -j 4)
@ -82,8 +68,6 @@ docker_push:
docker push ${LATEST} docker push ${LATEST}
docker push ${IMG} docker push ${IMG}
deploy: docker docker_push
run: run:
docker run --cap-add sys_ptrace --entrypoint=sh -it bsergean/ws:build docker run --cap-add sys_ptrace --entrypoint=sh -it bsergean/ws:build
@ -126,11 +110,6 @@ test_tsan_openssl:
(cd build/test ; ln -sf Debug/ixwebsocket_unittest) (cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)
test_tsan_mbedtls:
mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_MBED_TLS=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES)
(cd build/test ; ln -sf Debug/ixwebsocket_unittest)
(cd test ; python2.7 run.py -r)
test_openssl: test_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4)
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)

View File

@ -111,8 +111,6 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
REQUIRE(initialized); REQUIRE(initialized);
std::string fields("device.game\ndevice.os_name"); std::string fields("device.game\ndevice.os_name");
std::string gauge;
std::string timer;
int sentCount = ix::cobra_to_statsd_bot(config, int sentCount = ix::cobra_to_statsd_bot(config,
channel, channel,
@ -120,8 +118,6 @@ TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
position, position,
statsdClient, statsdClient,
fields, fields,
gauge,
timer,
verbose, verbose,
maxQueueSize, maxQueueSize,
enableHeartbeat, enableHeartbeat,

View File

@ -1,4 +1,4 @@
/// Json-cpp amalgamated forward header (http://jsoncpp.sourceforge.net/). /// Json-cpp amalgated forward header (http://jsoncpp.sourceforge.net/).
/// It is intended to be used with #include "json/json-forwards.h" /// It is intended to be used with #include "json/json-forwards.h"
/// This header provides forward declaration for all JsonCpp types. /// This header provides forward declaration for all JsonCpp types.
@ -11,13 +11,13 @@ The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following tests and demonstration applications, are licensed under the following
conditions... conditions...
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all The author (Baptiste Lepilleur) explicitly disclaims copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions, jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain. this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and 2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur, and is
The JsonCpp Authors, and is released under the terms of the MIT License (see below). released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the software may choose to accept it either as 1) Public Domain, 2) under the
@ -32,7 +32,7 @@ described in clear, concise terms at:
The full text of the MIT License follows: The full text of the MIT License follows:
======================================================================== ========================================================================
Copyright (c) 2007-2010 Baptiste Lepilleur and The JsonCpp Authors Copyright (c) 2007-2010 Baptiste Lepilleur
Permission is hereby granted, free of charge, to any person Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation obtaining a copy of this software and associated documentation
@ -73,9 +73,9 @@ license you like.
#ifndef JSON_FORWARD_AMALGAMATED_H_INCLUDED #ifndef JSON_FORWARD_AMALGATED_H_INCLUDED
# define JSON_FORWARD_AMALGAMATED_H_INCLUDED # define JSON_FORWARD_AMALGATED_H_INCLUDED
/// If defined, indicates that the source file is amalgamated /// If defined, indicates that the source file is amalgated
/// to prevent private header inclusion. /// to prevent private header inclusion.
#define JSON_IS_AMALGAMATION #define JSON_IS_AMALGAMATION
@ -83,21 +83,23 @@ license you like.
// Beginning of content of file: include/json/config.h // Beginning of content of file: include/json/config.h
// ////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////
// Copyright 2007-2010 Baptiste Lepilleur and The JsonCpp Authors // Copyright 2007-2010 Baptiste Lepilleur
// Distributed under MIT license, or public domain if desired and // Distributed under MIT license, or public domain if desired and
// recognized in your jurisdiction. // recognized in your jurisdiction.
// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE // See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
#ifndef JSON_CONFIG_H_INCLUDED #ifndef JSON_CONFIG_H_INCLUDED
#define JSON_CONFIG_H_INCLUDED #define JSON_CONFIG_H_INCLUDED
#include <cstddef>
#include <cstdint> /// If defined, indicates that json library is embedded in CppTL library.
#include <istream> //# define JSON_IN_CPPTL 1
#include <memory>
#include <ostream> /// If defined, indicates that json may leverage CppTL library
#include <sstream> //# define JSON_USE_CPPTL 1
#include <string> /// If defined, indicates that cpptl vector based map should be used instead of
#include <type_traits> /// std::map
/// as Value container.
//# define JSON_USE_CPPTL_SMALLMAP 1
// If non-zero, the library uses exceptions to report bad input instead of C // If non-zero, the library uses exceptions to report bad input instead of C
// assertion macros. The default is to use exceptions. // assertion macros. The default is to use exceptions.
@ -105,49 +107,43 @@ license you like.
#define JSON_USE_EXCEPTION 1 #define JSON_USE_EXCEPTION 1
#endif #endif
// Temporary, tracked for removal with issue #982. /// If defined, indicates that the source file is amalgated
#ifndef JSON_USE_NULLREF
#define JSON_USE_NULLREF 1
#endif
/// If defined, indicates that the source file is amalgamated
/// to prevent private header inclusion. /// to prevent private header inclusion.
/// Remarks: it is automatically defined in the generated amalgamated header. /// Remarks: it is automatically defined in the generated amalgated header.
// #define JSON_IS_AMALGAMATION // #define JSON_IS_AMALGAMATION
// Export macros for DLL visibility #ifdef JSON_IN_CPPTL
#if defined(JSON_DLL_BUILD) #include <cpptl/config.h>
#if defined(_MSC_VER) || defined(__MINGW32__) #ifndef JSON_USE_CPPTL
#define JSON_USE_CPPTL 1
#endif
#endif
#ifdef JSON_IN_CPPTL
#define JSON_API CPPTL_API
#elif defined(JSON_DLL_BUILD)
#if defined(_MSC_VER)
#define JSON_API __declspec(dllexport) #define JSON_API __declspec(dllexport)
#define JSONCPP_DISABLE_DLL_INTERFACE_WARNING #define JSONCPP_DISABLE_DLL_INTERFACE_WARNING
#elif defined(__GNUC__) || defined(__clang__)
#define JSON_API __attribute__((visibility("default")))
#endif // if defined(_MSC_VER) #endif // if defined(_MSC_VER)
#elif defined(JSON_DLL) #elif defined(JSON_DLL)
#if defined(_MSC_VER) || defined(__MINGW32__) #if defined(_MSC_VER)
#define JSON_API __declspec(dllimport) #define JSON_API __declspec(dllimport)
#define JSONCPP_DISABLE_DLL_INTERFACE_WARNING #define JSONCPP_DISABLE_DLL_INTERFACE_WARNING
#endif // if defined(_MSC_VER) #endif // if defined(_MSC_VER)
#endif // ifdef JSON_DLL_BUILD #endif // ifdef JSON_IN_CPPTL
#if !defined(JSON_API) #if !defined(JSON_API)
#define JSON_API #define JSON_API
#endif #endif
#if defined(_MSC_VER) && _MSC_VER < 1800 #if !defined(JSON_HAS_UNIQUE_PTR)
#error \ #if __cplusplus >= 201103L
"ERROR: Visual Studio 12 (2013) with _MSC_VER=1800 is the oldest supported compiler with sufficient C++11 capabilities" #define JSON_HAS_UNIQUE_PTR (1)
#endif #elif _MSC_VER >= 1600
#define JSON_HAS_UNIQUE_PTR (1)
#if defined(_MSC_VER) && _MSC_VER < 1900
// As recommended at
// https://stackoverflow.com/questions/2915672/snprintf-and-visual-studio-2010
extern JSON_API int msvc_pre1900_c99_snprintf(char* outBuf, size_t size,
const char* format, ...);
#define jsoncpp_snprintf msvc_pre1900_c99_snprintf
#else #else
#define jsoncpp_snprintf std::snprintf #define JSON_HAS_UNIQUE_PTR (0)
#endif
#endif #endif
// If JSON_NO_INT64 is defined, then Json only support C++ "int" type for // If JSON_NO_INT64 is defined, then Json only support C++ "int" type for
@ -155,96 +151,55 @@ extern JSON_API int msvc_pre1900_c99_snprintf(char* outBuf, size_t size,
// Storages, and 64 bits integer support is disabled. // Storages, and 64 bits integer support is disabled.
// #define JSON_NO_INT64 1 // #define JSON_NO_INT64 1
// JSONCPP_OVERRIDE is maintained for backwards compatibility of external tools. #if defined(_MSC_VER) && _MSC_VER <= 1200 // MSVC 6
// C++11 should be used directly in JSONCPP. // Microsoft Visual Studio 6 only support conversion from __int64 to double
#define JSONCPP_OVERRIDE override // (no conversion from unsigned __int64).
#define JSON_USE_INT64_DOUBLE_CONVERSION 1
// Disable warning 4786 for VS6 caused by STL (identifier was truncated to '255'
// characters in the debug information)
// All projects I've ever seen with VS6 were using this globally (not bothering
// with pragma push/pop).
#pragma warning(disable : 4786)
#endif // if defined(_MSC_VER) && _MSC_VER < 1200 // MSVC 6
#if __cplusplus >= 201103L #if defined(_MSC_VER) && _MSC_VER >= 1500 // MSVC 2008
#define JSONCPP_NOEXCEPT noexcept /// Indicates that the following function is deprecated.
#define JSONCPP_OP_EXPLICIT explicit
#elif defined(_MSC_VER) && _MSC_VER < 1900
#define JSONCPP_NOEXCEPT throw()
#define JSONCPP_OP_EXPLICIT explicit
#elif defined(_MSC_VER) && _MSC_VER >= 1900
#define JSONCPP_NOEXCEPT noexcept
#define JSONCPP_OP_EXPLICIT explicit
#else
#define JSONCPP_NOEXCEPT throw()
#define JSONCPP_OP_EXPLICIT
#endif
#ifdef __clang__
#if __has_extension(attribute_deprecated_with_message)
#define JSONCPP_DEPRECATED(message) __attribute__((deprecated(message)))
#endif
#elif defined(__GNUC__) // not clang (gcc comes later since clang emulates gcc)
#if (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5))
#define JSONCPP_DEPRECATED(message) __attribute__((deprecated(message)))
#elif (__GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1))
#define JSONCPP_DEPRECATED(message) __attribute__((__deprecated__))
#endif // GNUC version
#elif defined(_MSC_VER) // MSVC (after clang because clang on Windows emulates
// MSVC)
#define JSONCPP_DEPRECATED(message) __declspec(deprecated(message)) #define JSONCPP_DEPRECATED(message) __declspec(deprecated(message))
#endif // __clang__ || __GNUC__ || _MSC_VER #elif defined(__clang__) && defined(__has_feature)
#if __has_feature(attribute_deprecated_with_message)
#define JSONCPP_DEPRECATED(message) __attribute__ ((deprecated(message)))
#endif
#elif defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5))
#define JSONCPP_DEPRECATED(message) __attribute__ ((deprecated(message)))
#elif defined(__GNUC__) && (__GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1))
#define JSONCPP_DEPRECATED(message) __attribute__((__deprecated__))
#endif
#if !defined(JSONCPP_DEPRECATED) #if !defined(JSONCPP_DEPRECATED)
#define JSONCPP_DEPRECATED(message) #define JSONCPP_DEPRECATED(message)
#endif // if !defined(JSONCPP_DEPRECATED) #endif // if !defined(JSONCPP_DEPRECATED)
#if defined(__clang__) || (defined(__GNUC__) && (__GNUC__ >= 6))
#define JSON_USE_INT64_DOUBLE_CONVERSION 1
#endif
#if !defined(JSON_IS_AMALGAMATION)
#include "allocator.h"
#include "version.h"
#endif // if !defined(JSON_IS_AMALGAMATION)
namespace Json { namespace Json {
using Int = int; typedef int Int;
using UInt = unsigned int; typedef unsigned int UInt;
#if defined(JSON_NO_INT64) #if defined(JSON_NO_INT64)
using LargestInt = int; typedef int LargestInt;
using LargestUInt = unsigned int; typedef unsigned int LargestUInt;
#undef JSON_HAS_INT64 #undef JSON_HAS_INT64
#else // if defined(JSON_NO_INT64) #else // if defined(JSON_NO_INT64)
// For Microsoft Visual use specific types as long long is not supported // For Microsoft Visual use specific types as long long is not supported
#if defined(_MSC_VER) // Microsoft Visual Studio #if defined(_MSC_VER) // Microsoft Visual Studio
using Int64 = __int64; typedef __int64 Int64;
using UInt64 = unsigned __int64; typedef unsigned __int64 UInt64;
#else // if defined(_MSC_VER) // Other platforms, use long long #else // if defined(_MSC_VER) // Other platforms, use long long
using Int64 = int64_t; typedef long long int Int64;
using UInt64 = uint64_t; typedef unsigned long long int UInt64;
#endif // if defined(_MSC_VER) #endif // if defined(_MSC_VER)
using LargestInt = Int64; typedef Int64 LargestInt;
using LargestUInt = UInt64; typedef UInt64 LargestUInt;
#define JSON_HAS_INT64 #define JSON_HAS_INT64
#endif // if defined(JSON_NO_INT64) #endif // if defined(JSON_NO_INT64)
} // end namespace Json
template <typename T>
using Allocator =
typename std::conditional<JSONCPP_USING_SECURE_MEMORY, SecureAllocator<T>,
std::allocator<T>>::type;
using String = std::basic_string<char, std::char_traits<char>, Allocator<char>>;
using IStringStream =
std::basic_istringstream<String::value_type, String::traits_type,
String::allocator_type>;
using OStringStream =
std::basic_ostringstream<String::value_type, String::traits_type,
String::allocator_type>;
using IStream = std::istream;
using OStream = std::ostream;
} // namespace Json
// Legacy names (formerly macros).
using JSONCPP_STRING = Json::String;
using JSONCPP_ISTRINGSTREAM = Json::IStringStream;
using JSONCPP_OSTRINGSTREAM = Json::OStringStream;
using JSONCPP_ISTREAM = Json::IStream;
using JSONCPP_OSTREAM = Json::OStream;
#endif // JSON_CONFIG_H_INCLUDED #endif // JSON_CONFIG_H_INCLUDED
@ -261,7 +216,7 @@ using JSONCPP_OSTREAM = Json::OStream;
// Beginning of content of file: include/json/forwards.h // Beginning of content of file: include/json/forwards.h
// ////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////
// Copyright 2007-2010 Baptiste Lepilleur and The JsonCpp Authors // Copyright 2007-2010 Baptiste Lepilleur
// Distributed under MIT license, or public domain if desired and // Distributed under MIT license, or public domain if desired and
// recognized in your jurisdiction. // recognized in your jurisdiction.
// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE // See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
@ -276,23 +231,17 @@ using JSONCPP_OSTREAM = Json::OStream;
namespace Json { namespace Json {
// writer.h // writer.h
class StreamWriter;
class StreamWriterBuilder;
class Writer;
class FastWriter; class FastWriter;
class StyledWriter; class StyledWriter;
class StyledStreamWriter;
// reader.h // reader.h
class Reader; class Reader;
class CharReader;
class CharReaderBuilder;
// json_features.h // features.h
class Features; class Features;
// value.h // value.h
using ArrayIndex = unsigned int; typedef unsigned int ArrayIndex;
class StaticString; class StaticString;
class Path; class Path;
class PathArgument; class PathArgument;
@ -313,4 +262,4 @@ class ValueConstIterator;
#endif //ifndef JSON_FORWARD_AMALGAMATED_H_INCLUDED #endif //ifndef JSON_FORWARD_AMALGATED_H_INCLUDED

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +0,0 @@
#!/bin/sh
# Used to clean up some non essential folders to make the ws container
rm -rf build
rm -rf third_party/zlib
rm -rf third_party/mbedtls

View File

@ -70,8 +70,6 @@ int main(int argc, char** argv)
std::string password; std::string password;
std::string prefix("ws.test.v0"); std::string prefix("ws.test.v0");
std::string fields; std::string fields;
std::string gauge;
std::string timer;
std::string dsn; std::string dsn;
std::string redisHosts("127.0.0.1"); std::string redisHosts("127.0.0.1");
std::string redisPassword; std::string redisPassword;
@ -266,8 +264,6 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--port", statsdPort, "Statsd port"); cobra2statsd->add_option("--port", statsdPort, "Statsd port");
cobra2statsd->add_option("--prefix", prefix, "Statsd prefix"); cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join(); cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join();
cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")->join();
cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")->join();
cobra2statsd->add_option("channel", channel, "Channel")->required(); cobra2statsd->add_option("channel", channel, "Channel")->required();
cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_flag("-v", verbose, "Verbose");
cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
@ -456,43 +452,22 @@ int main(int argc, char** argv)
ret = ix::ws_cobra_metrics_publish_main(cobraConfig, channel, path, stress); ret = ix::ws_cobra_metrics_publish_main(cobraConfig, channel, path, stress);
} }
else if (app.got_subcommand("cobra_to_statsd")) else if (app.got_subcommand("cobra_to_statsd"))
{
if (!timer.empty() && !gauge.empty())
{
spdlog::error("--gauge and --timer options are exclusive. " \
"you can only supply one");
ret = 1;
}
else
{ {
bool enableHeartbeat = true; bool enableHeartbeat = true;
int runtime = -1; // run indefinitely int runtime = -1;
ix::StatsdClient statsdClient(hostname, statsdPort, prefix); ix::StatsdClient statsdClient(hostname, statsdPort, prefix);
std::string errMsg;
bool initialized = statsdClient.init(errMsg);
if (!initialized)
{
spdlog::error(errMsg);
ret = 1;
}
else
{
ret = ix::cobra_to_statsd_bot(cobraConfig, ret = ix::cobra_to_statsd_bot(cobraConfig,
channel, channel,
filter, filter,
position, position,
statsdClient, statsdClient,
fields, fields,
gauge,
timer,
verbose, verbose,
maxQueueSize, maxQueueSize,
enableHeartbeat, enableHeartbeat,
runtime); runtime);
} }
}
}
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
{ {
bool enableHeartbeat = true; bool enableHeartbeat = true;

View File

@ -14,19 +14,8 @@
namespace ix namespace ix
{ {
using StreamWriterPtr = std::unique_ptr<Json::StreamWriter>;
StreamWriterPtr makeStreamWriter()
{
Json::StreamWriterBuilder builder;
builder["commentStyle"] = "None";
builder["indentation"] = ""; // will make the JSON object compact
std::unique_ptr<Json::StreamWriter> jsonWriter(builder.newStreamWriter());
return jsonWriter;
}
void writeToStdout(bool fluentd, void writeToStdout(bool fluentd,
const StreamWriterPtr& jsonWriter, Json::FastWriter& jsonWriter,
const Json::Value& msg, const Json::Value& msg,
const std::string& position) const std::string& position)
{ {
@ -40,15 +29,12 @@ namespace ix
msgWithPosition["position"] = position; msgWithPosition["position"] = position;
enveloppe["message"] = msgWithPosition; enveloppe["message"] = msgWithPosition;
jsonWriter->write(enveloppe, &std::cout); std::cout << jsonWriter.write(enveloppe);
std::cout << std::endl; // add lf and flush
} }
else else
{ {
enveloppe = msg; enveloppe = msg;
std::cout << position << " "; std::cout << position << " " << jsonWriter.write(enveloppe);
jsonWriter->write(enveloppe, &std::cout);
std::cout << std::endl;
} }
} }
@ -63,13 +49,14 @@ namespace ix
conn.configure(config); conn.configure(config);
conn.connect(); conn.connect();
Json::FastWriter jsonWriter;
// Display incoming messages
std::atomic<int> msgPerSeconds(0); std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0); std::atomic<int> msgCount(0);
std::atomic<bool> fatalCobraError(false);
auto jsonWriter = makeStreamWriter();
auto timer = [&msgPerSeconds, &msgCount, &fatalCobraError] { auto timer = [&msgPerSeconds, &msgCount] {
while (!fatalCobraError) while (true)
{ {
spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds); spdlog::info("#messages {} msg/s {}", msgCount, msgPerSeconds);
@ -91,8 +78,7 @@ namespace ix
&msgCount, &msgCount,
&msgPerSeconds, &msgPerSeconds,
&quiet, &quiet,
&fluentd, &fluentd](ix::CobraConnectionEventType eventType,
&fatalCobraError](ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId, const std::string& subscriptionId,
@ -151,32 +137,14 @@ namespace ix
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
else if (eventType == ix::CobraConnection_EventType_Handshake_Error)
{
spdlog::error("Subscriber: Handshake error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Authentication_Error)
{
spdlog::error("Subscriber: Authentication error: {}", errMsg);
fatalCobraError = true;
}
else if (eventType == ix::CobraConnection_EventType_Subscription_Error)
{
spdlog::error("Subscriber: Subscription error: {}", errMsg);
fatalCobraError = true;
}
}); });
while (!fatalCobraError) while (true)
{ {
auto duration = std::chrono::seconds(1); auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
conn.disconnect(); return 0;
t.join();
return fatalCobraError ? 1 : 0;
} }
} // namespace ix } // namespace ix