ws: new command to subscribe to a cobra server and send an event to a sentry server
This commit is contained in:
parent
51fcf65424
commit
8f8dd076ff
@ -1 +1 @@
|
|||||||
1.3.2
|
1.4.0
|
||||||
|
@ -1 +0,0 @@
|
|||||||
Dockerfile.dev
|
|
47
Dockerfile
Normal file
47
Dockerfile
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
# Build time
|
||||||
|
FROM debian:buster as build
|
||||||
|
|
||||||
|
ENV DEBIAN_FRONTEND noninteractive
|
||||||
|
RUN apt-get update
|
||||||
|
RUN apt-get -y install wget
|
||||||
|
RUN mkdir -p /tmp/cmake
|
||||||
|
WORKDIR /tmp/cmake
|
||||||
|
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
|
||||||
|
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
|
||||||
|
|
||||||
|
RUN apt-get -y install g++
|
||||||
|
RUN apt-get -y install libssl-dev
|
||||||
|
RUN apt-get -y install libz-dev
|
||||||
|
RUN apt-get -y install make
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
|
||||||
|
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
|
||||||
|
|
||||||
|
RUN ["make"]
|
||||||
|
|
||||||
|
# Runtime
|
||||||
|
FROM debian:buster as runtime
|
||||||
|
|
||||||
|
ENV DEBIAN_FRONTEND noninteractive
|
||||||
|
RUN apt-get update
|
||||||
|
# Runtime
|
||||||
|
RUN apt-get install -y libssl1.1
|
||||||
|
|
||||||
|
# Debugging
|
||||||
|
RUN apt-get install -y strace
|
||||||
|
RUN apt-get install -y gdb
|
||||||
|
RUN apt-get install -y procps
|
||||||
|
RUN apt-get install -y htop
|
||||||
|
|
||||||
|
RUN adduser --disabled-password --gecos '' app
|
||||||
|
COPY --chown=app:app --from=build /usr/local/bin/ws /usr/local/bin/ws
|
||||||
|
RUN chmod +x /usr/local/bin/ws
|
||||||
|
RUN ldd /usr/local/bin/ws
|
||||||
|
|
||||||
|
# Now run in usermode
|
||||||
|
USER app
|
||||||
|
WORKDIR /home/app
|
||||||
|
|
||||||
|
CMD ["ws"]
|
@ -1,31 +0,0 @@
|
|||||||
FROM debian:stretch
|
|
||||||
|
|
||||||
ENV DEBIAN_FRONTEND noninteractive
|
|
||||||
RUN apt-get update
|
|
||||||
RUN apt-get -y install g++
|
|
||||||
RUN apt-get -y install libssl-dev
|
|
||||||
RUN apt-get -y install gdb
|
|
||||||
RUN apt-get -y install screen
|
|
||||||
RUN apt-get -y install procps
|
|
||||||
RUN apt-get -y install lsof
|
|
||||||
RUN apt-get -y install libz-dev
|
|
||||||
RUN apt-get -y install vim
|
|
||||||
RUN apt-get -y install make
|
|
||||||
RUN apt-get -y install cmake
|
|
||||||
RUN apt-get -y install curl
|
|
||||||
RUN apt-get -y install python
|
|
||||||
RUN apt-get -y install netcat
|
|
||||||
|
|
||||||
# debian strech cmake is too old for building with Docker
|
|
||||||
COPY makefile .
|
|
||||||
RUN ["make", "install_cmake_for_linux"]
|
|
||||||
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
|
|
||||||
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
|
|
||||||
|
|
||||||
# RUN ["make"]
|
|
||||||
|
|
||||||
EXPOSE 8765
|
|
||||||
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]
|
|
@ -1,30 +0,0 @@
|
|||||||
FROM debian:buster
|
|
||||||
|
|
||||||
ENV DEBIAN_FRONTEND noninteractive
|
|
||||||
RUN apt-get update
|
|
||||||
|
|
||||||
RUN apt-get -y install g++
|
|
||||||
RUN apt-get -y install libssl-dev
|
|
||||||
RUN apt-get -y install libz-dev
|
|
||||||
RUN apt-get -y install make
|
|
||||||
|
|
||||||
RUN apt-get -y install wget
|
|
||||||
RUN mkdir -p /tmp/cmake
|
|
||||||
WORKDIR /tmp/cmake
|
|
||||||
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
|
|
||||||
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
|
|
||||||
|
|
||||||
RUN adduser app
|
|
||||||
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
|
|
||||||
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
|
|
||||||
|
|
||||||
RUN ["make"]
|
|
||||||
|
|
||||||
# Now run in usermode
|
|
||||||
USER app
|
|
||||||
|
|
||||||
EXPOSE 8765
|
|
||||||
CMD ["bash"]
|
|
@ -9,6 +9,9 @@ project (ws)
|
|||||||
# There's -Weverything too for clang
|
# There's -Weverything too for clang
|
||||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
|
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
|
||||||
|
|
||||||
|
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
|
||||||
|
#set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
|
||||||
|
|
||||||
set (CMAKE_CXX_STANDARD 14)
|
set (CMAKE_CXX_STANDARD 14)
|
||||||
|
|
||||||
option(USE_TLS "Add TLS support" ON)
|
option(USE_TLS "Add TLS support" ON)
|
||||||
@ -28,6 +31,7 @@ add_executable(ws
|
|||||||
ixcrypto/IXHMac.cpp
|
ixcrypto/IXHMac.cpp
|
||||||
|
|
||||||
IXRedisClient.cpp
|
IXRedisClient.cpp
|
||||||
|
IXSentryClient.cpp
|
||||||
IXCobraConnection.cpp
|
IXCobraConnection.cpp
|
||||||
|
|
||||||
ws_http_client.cpp
|
ws_http_client.cpp
|
||||||
@ -43,6 +47,7 @@ add_executable(ws
|
|||||||
ws_redis_subscribe.cpp
|
ws_redis_subscribe.cpp
|
||||||
ws_cobra_subscribe.cpp
|
ws_cobra_subscribe.cpp
|
||||||
ws_cobra_to_statsd.cpp
|
ws_cobra_to_statsd.cpp
|
||||||
|
ws_cobra_to_sentry.cpp
|
||||||
ws.cpp)
|
ws.cpp)
|
||||||
|
|
||||||
if (APPLE AND USE_TLS)
|
if (APPLE AND USE_TLS)
|
||||||
|
178
ws/IXSentryClient.cpp
Normal file
178
ws/IXSentryClient.cpp
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
/*
|
||||||
|
* IXSentryClient.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXSentryClient.h"
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
SentryClient::SentryClient(const std::string& dsn) :
|
||||||
|
_dsn(dsn),
|
||||||
|
_validDsn(false),
|
||||||
|
_luaFrameRegex("\t([^/]+):([0-9]+): in function '([^/]+)'")
|
||||||
|
{
|
||||||
|
const std::regex dsnRegex("(http[s]?)://([^:]+):([^@]+)@([^/]+)/([0-9]+)");
|
||||||
|
std::smatch group;
|
||||||
|
|
||||||
|
if (std::regex_match(dsn, group, dsnRegex) and group.size() == 6)
|
||||||
|
{
|
||||||
|
_validDsn = true;
|
||||||
|
|
||||||
|
const auto scheme = group.str(1);
|
||||||
|
const auto host = group.str(4);
|
||||||
|
const auto project_id = group.str(5);
|
||||||
|
_url = scheme + "://" + host + "/api/" + project_id + "/store/";
|
||||||
|
|
||||||
|
_publicKey = group.str(2);
|
||||||
|
_secretKey = group.str(3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t SentryClient::getTimestamp()
|
||||||
|
{
|
||||||
|
const auto tp = std::chrono::system_clock::now();
|
||||||
|
const auto dur = tp.time_since_epoch();
|
||||||
|
return std::chrono::duration_cast<std::chrono::seconds>(dur).count();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string SentryClient::getIso8601()
|
||||||
|
{
|
||||||
|
std::time_t now;
|
||||||
|
std::time(&now);
|
||||||
|
char buf[sizeof "2011-10-08T07:07:09Z"];
|
||||||
|
std::strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%SZ", std::gmtime(&now));
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string SentryClient::computeAuthHeader()
|
||||||
|
{
|
||||||
|
std::string securityHeader("Sentry sentry_version=5");
|
||||||
|
securityHeader += ",sentry_client=ws/1.0.0";
|
||||||
|
securityHeader += ",sentry_timestamp=" + std::to_string(SentryClient::getTimestamp());
|
||||||
|
securityHeader += ",sentry_key=" + _publicKey;
|
||||||
|
securityHeader += ",sentry_secret=" + _secretKey;
|
||||||
|
|
||||||
|
return securityHeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
Json::Value SentryClient::parseLuaStackTrace(const std::string& stack)
|
||||||
|
{
|
||||||
|
Json::Value frames;
|
||||||
|
|
||||||
|
// Split by lines
|
||||||
|
std::string line;
|
||||||
|
std::stringstream tokenStream(stack);
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
std::smatch group;
|
||||||
|
|
||||||
|
while (std::getline(tokenStream, line))
|
||||||
|
{
|
||||||
|
// MapScene.lua:2169: in function 'singleCB'
|
||||||
|
if (std::regex_match(line, group, _luaFrameRegex))
|
||||||
|
{
|
||||||
|
const auto fileName = group.str(1);
|
||||||
|
const auto linenoStr = group.str(2);
|
||||||
|
const auto function = group.str(3);
|
||||||
|
|
||||||
|
ss << linenoStr;
|
||||||
|
uint64_t lineno;
|
||||||
|
ss >> lineno;
|
||||||
|
|
||||||
|
Json::Value frame;
|
||||||
|
frame["lineno"] = lineno;
|
||||||
|
frame["filename"] = fileName;
|
||||||
|
frame["function"] = function;
|
||||||
|
|
||||||
|
frames.append(frame);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return frames;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string SentryClient::computePayload(const Json::Value& msg)
|
||||||
|
{
|
||||||
|
Json::Value payload;
|
||||||
|
payload["platform"] = "python";
|
||||||
|
payload["sdk"]["name"] = "ws";
|
||||||
|
payload["sdk"]["version"] = "1.0.0";
|
||||||
|
payload["timestamp"] = SentryClient::getIso8601();
|
||||||
|
|
||||||
|
Json::Value exception;
|
||||||
|
exception["value"] = msg["data"]["message"];
|
||||||
|
|
||||||
|
std::string stackTraceFieldName =
|
||||||
|
(msg["id"].asString() == "game_noisytypes_id") ? "traceback" : "stack";
|
||||||
|
|
||||||
|
exception["stacktrace"]["frames"] =
|
||||||
|
parseLuaStackTrace(msg["data"][stackTraceFieldName].asString());
|
||||||
|
|
||||||
|
payload["exception"].append(exception);
|
||||||
|
|
||||||
|
Json::Value extra;
|
||||||
|
extra["cobra_event"] = msg;
|
||||||
|
|
||||||
|
exception["extra"] = extra;
|
||||||
|
|
||||||
|
return _jsonWriter.write(payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SentryClient::send(const Json::Value& msg,
|
||||||
|
bool verbose)
|
||||||
|
{
|
||||||
|
HttpRequestArgs args;
|
||||||
|
args.extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
|
||||||
|
args.connectTimeout = 60;
|
||||||
|
args.transferTimeout = 5 * 60;
|
||||||
|
args.followRedirects = true;
|
||||||
|
args.verbose = verbose;
|
||||||
|
args.logger = [](const std::string& msg)
|
||||||
|
{
|
||||||
|
std::cout << msg;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::string body = computePayload(msg);
|
||||||
|
HttpResponse out = _httpClient.post(_url, body, args);
|
||||||
|
|
||||||
|
auto statusCode = std::get<0>(out);
|
||||||
|
auto errorCode = std::get<1>(out);
|
||||||
|
auto responseHeaders = std::get<2>(out);
|
||||||
|
auto payload = std::get<3>(out);
|
||||||
|
auto errorMsg = std::get<4>(out);
|
||||||
|
auto uploadSize = std::get<5>(out);
|
||||||
|
auto downloadSize = std::get<6>(out);
|
||||||
|
|
||||||
|
if (verbose)
|
||||||
|
{
|
||||||
|
for (auto it : responseHeaders)
|
||||||
|
{
|
||||||
|
std::cerr << it.first << ": " << it.second << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cerr << "Upload size: " << uploadSize << std::endl;
|
||||||
|
std::cerr << "Download size: " << downloadSize << std::endl;
|
||||||
|
|
||||||
|
std::cerr << "Status: " << statusCode << std::endl;
|
||||||
|
if (errorCode != HttpErrorCode_Ok)
|
||||||
|
{
|
||||||
|
std::cerr << "error message: " << errorMsg << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (responseHeaders["Content-Type"] != "application/octet-stream")
|
||||||
|
{
|
||||||
|
std::cerr << "payload: " << payload << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return statusCode == 200;
|
||||||
|
}
|
||||||
|
} // namespace ix
|
47
ws/IXSentryClient.h
Normal file
47
ws/IXSentryClient.h
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
* IXSentryClient.h
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <jsoncpp/json/json.h>
|
||||||
|
#include <regex>
|
||||||
|
|
||||||
|
#include <ixwebsocket/IXHttpClient.h>
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
class SentryClient
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
SentryClient(const std::string& dsn);
|
||||||
|
~SentryClient() = default;
|
||||||
|
|
||||||
|
bool send(const Json::Value& msg, bool verbose);
|
||||||
|
|
||||||
|
private:
|
||||||
|
int64_t getTimestamp();
|
||||||
|
std::string computeAuthHeader();
|
||||||
|
std::string getIso8601();
|
||||||
|
std::string computePayload(const Json::Value& msg);
|
||||||
|
|
||||||
|
Json::Value parseLuaStackTrace(const std::string& stack);
|
||||||
|
|
||||||
|
std::string _dsn;
|
||||||
|
bool _validDsn;
|
||||||
|
std::string _url;
|
||||||
|
|
||||||
|
// Used for authentication with a header
|
||||||
|
std::string _publicKey;
|
||||||
|
std::string _secretKey;
|
||||||
|
|
||||||
|
Json::FastWriter _jsonWriter;
|
||||||
|
|
||||||
|
std::regex _luaFrameRegex;
|
||||||
|
|
||||||
|
HttpClient _httpClient;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace ix
|
22
ws/ws.cpp
22
ws/ws.cpp
@ -44,11 +44,13 @@ int main(int argc, char** argv)
|
|||||||
std::string rolesecret;
|
std::string rolesecret;
|
||||||
std::string prefix("ws.test.v0");
|
std::string prefix("ws.test.v0");
|
||||||
std::string fields;
|
std::string fields;
|
||||||
|
std::string dsn;
|
||||||
bool headersOnly = false;
|
bool headersOnly = false;
|
||||||
bool followRedirects = false;
|
bool followRedirects = false;
|
||||||
bool verbose = false;
|
bool verbose = false;
|
||||||
bool save = false;
|
bool save = false;
|
||||||
bool compress = false;
|
bool compress = false;
|
||||||
|
bool strict = false;
|
||||||
int port = 8080;
|
int port = 8080;
|
||||||
int redisPort = 6379;
|
int redisPort = 6379;
|
||||||
int statsdPort = 8125;
|
int statsdPort = 8125;
|
||||||
@ -57,6 +59,7 @@ int main(int argc, char** argv)
|
|||||||
int maxRedirects = 5;
|
int maxRedirects = 5;
|
||||||
int delayMs = -1;
|
int delayMs = -1;
|
||||||
int count = 1;
|
int count = 1;
|
||||||
|
int jobs = 4;
|
||||||
|
|
||||||
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
|
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
|
||||||
sendApp->add_option("url", url, "Connection url")->required();
|
sendApp->add_option("url", url, "Connection url")->required();
|
||||||
@ -146,6 +149,18 @@ int main(int argc, char** argv)
|
|||||||
cobra2statsd->add_flag("-v", verbose, "Verbose");
|
cobra2statsd->add_flag("-v", verbose, "Verbose");
|
||||||
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
|
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
|
||||||
|
|
||||||
|
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra to sentry");
|
||||||
|
cobra2sentry->add_option("--appkey", appkey, "Appkey");
|
||||||
|
cobra2sentry->add_option("--endpoint", endpoint, "Endpoint");
|
||||||
|
cobra2sentry->add_option("--rolename", rolename, "Role name");
|
||||||
|
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret");
|
||||||
|
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
|
||||||
|
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
|
||||||
|
cobra2sentry->add_option("channel", channel, "Channel")->required();
|
||||||
|
cobra2sentry->add_flag("-v", verbose, "Verbose");
|
||||||
|
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
|
||||||
|
cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
|
||||||
|
|
||||||
CLI11_PARSE(app, argc, argv);
|
CLI11_PARSE(app, argc, argv);
|
||||||
|
|
||||||
// pid file handling
|
// pid file handling
|
||||||
@ -221,6 +236,13 @@ int main(int argc, char** argv)
|
|||||||
channel, hostname, statsdPort,
|
channel, hostname, statsdPort,
|
||||||
prefix, fields, verbose);
|
prefix, fields, verbose);
|
||||||
}
|
}
|
||||||
|
else if (app.got_subcommand("cobra_to_sentry"))
|
||||||
|
{
|
||||||
|
return ix::ws_cobra_to_sentry_main(appkey, endpoint,
|
||||||
|
rolename, rolesecret,
|
||||||
|
channel, dsn,
|
||||||
|
verbose, strict, jobs);
|
||||||
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
10
ws/ws.h
10
ws/ws.h
@ -70,4 +70,14 @@ namespace ix
|
|||||||
const std::string& prefix,
|
const std::string& prefix,
|
||||||
const std::string& fields,
|
const std::string& fields,
|
||||||
bool verbose);
|
bool verbose);
|
||||||
|
|
||||||
|
int ws_cobra_to_sentry_main(const std::string& appkey,
|
||||||
|
const std::string& endpoint,
|
||||||
|
const std::string& rolename,
|
||||||
|
const std::string& rolesecret,
|
||||||
|
const std::string& channel,
|
||||||
|
const std::string& dsn,
|
||||||
|
bool verbose,
|
||||||
|
bool strict,
|
||||||
|
int jobs);
|
||||||
}
|
}
|
||||||
|
189
ws/ws_cobra_to_sentry.cpp
Normal file
189
ws/ws_cobra_to_sentry.cpp
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
/*
|
||||||
|
* ws_cobra_to_sentry.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <sstream>
|
||||||
|
#include <chrono>
|
||||||
|
#include <thread>
|
||||||
|
#include <atomic>
|
||||||
|
#include <vector>
|
||||||
|
#include <queue>
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include "IXCobraConnection.h"
|
||||||
|
|
||||||
|
#include "IXSentryClient.h"
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
int ws_cobra_to_sentry_main(const std::string& appkey,
|
||||||
|
const std::string& endpoint,
|
||||||
|
const std::string& rolename,
|
||||||
|
const std::string& rolesecret,
|
||||||
|
const std::string& channel,
|
||||||
|
const std::string& dsn,
|
||||||
|
bool verbose,
|
||||||
|
bool strict,
|
||||||
|
int jobs)
|
||||||
|
{
|
||||||
|
ix::CobraConnection conn;
|
||||||
|
conn.configure(appkey, endpoint,
|
||||||
|
rolename, rolesecret,
|
||||||
|
ix::WebSocketPerMessageDeflateOptions(true));
|
||||||
|
conn.connect();
|
||||||
|
|
||||||
|
Json::FastWriter jsonWriter;
|
||||||
|
std::atomic<uint64_t> sentCount(0);
|
||||||
|
std::atomic<uint64_t> receivedCount(0);
|
||||||
|
std::atomic<bool> errorSending(false);
|
||||||
|
std::atomic<bool> stop(false);
|
||||||
|
|
||||||
|
std::mutex conditionVariableMutex;
|
||||||
|
std::condition_variable condition;
|
||||||
|
std::condition_variable progressCondition;
|
||||||
|
std::queue<Json::Value> queue;
|
||||||
|
|
||||||
|
auto sentrySender = [&condition, &progressCondition, &conditionVariableMutex,
|
||||||
|
&queue, verbose, &errorSending, &sentCount,
|
||||||
|
&stop, &dsn]
|
||||||
|
{
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
Json::Value msg;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(conditionVariableMutex);
|
||||||
|
condition.wait(lock, [&queue, &stop]{ return !queue.empty() && !stop; });
|
||||||
|
|
||||||
|
msg = queue.front();
|
||||||
|
queue.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
SentryClient sc(dsn);
|
||||||
|
|
||||||
|
if (!sc.send(msg, verbose))
|
||||||
|
{
|
||||||
|
errorSending = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
++sentCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
progressCondition.notify_one();
|
||||||
|
|
||||||
|
if (stop) return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create a thread pool
|
||||||
|
std::cerr << "Starting " << jobs << " sentry sender jobs" << std::endl;
|
||||||
|
std::vector<std::thread> pool;
|
||||||
|
for (int i = 0; i < jobs; i++)
|
||||||
|
{
|
||||||
|
pool.push_back(std::thread(sentrySender));
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.setEventCallback(
|
||||||
|
[&conn, &channel, &jsonWriter,
|
||||||
|
verbose, &receivedCount, &sentCount,
|
||||||
|
&condition, &conditionVariableMutex,
|
||||||
|
&progressCondition, &queue]
|
||||||
|
(ix::CobraConnectionEventType eventType,
|
||||||
|
const std::string& errMsg,
|
||||||
|
const ix::WebSocketHttpHeaders& headers,
|
||||||
|
const std::string& subscriptionId)
|
||||||
|
{
|
||||||
|
if (eventType == ix::CobraConnection_EventType_Open)
|
||||||
|
{
|
||||||
|
std::cerr << "Subscriber: connected" << std::endl;
|
||||||
|
|
||||||
|
for (auto it : headers)
|
||||||
|
{
|
||||||
|
std::cerr << it.first << ": " << it.second << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (eventType == ix::CobraConnection_EventType_Closed)
|
||||||
|
{
|
||||||
|
std::cerr << "Subscriber: closed" << std::endl;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Authenticated)
|
||||||
|
{
|
||||||
|
std::cerr << "Subscriber authenticated" << std::endl;
|
||||||
|
conn.subscribe(channel,
|
||||||
|
[&jsonWriter, verbose,
|
||||||
|
&sentCount, &receivedCount,
|
||||||
|
&condition, &conditionVariableMutex,
|
||||||
|
&progressCondition, &queue]
|
||||||
|
(const Json::Value& msg)
|
||||||
|
{
|
||||||
|
if (verbose)
|
||||||
|
{
|
||||||
|
std::cerr << jsonWriter.write(msg) << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we cannot send to sentry fast enough, drop the message
|
||||||
|
const uint64_t scaleFactor = 2;
|
||||||
|
|
||||||
|
if (sentCount != 0 &&
|
||||||
|
receivedCount != 0 &&
|
||||||
|
(sentCount * scaleFactor < receivedCount))
|
||||||
|
{
|
||||||
|
std::cerr << "message dropped: sending is backlogged !"
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
condition.notify_one();
|
||||||
|
progressCondition.notify_one();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
++receivedCount;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(conditionVariableMutex);
|
||||||
|
queue.push(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
condition.notify_one();
|
||||||
|
progressCondition.notify_one();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Subscribed)
|
||||||
|
{
|
||||||
|
std::cerr << "Subscriber: subscribed to channel " << subscriptionId << std::endl;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
|
||||||
|
{
|
||||||
|
std::cerr << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl;
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Error)
|
||||||
|
{
|
||||||
|
std::cerr << "Subscriber: error" << errMsg << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
std::mutex progressConditionVariableMutex;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(progressConditionVariableMutex);
|
||||||
|
progressCondition.wait(lock);
|
||||||
|
|
||||||
|
std::cout << "messages"
|
||||||
|
<< " received " << receivedCount
|
||||||
|
<< " sent " << sentCount
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
if (strict && errorSending) break;
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.disconnect();
|
||||||
|
|
||||||
|
// FIXME: join all the bg threads and stop them.
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
{
|
{
|
||||||
|
// fields are command line argument that can be specified multiple times
|
||||||
std::vector<std::string> parseFields(const std::string& fields)
|
std::vector<std::string> parseFields(const std::string& fields)
|
||||||
{
|
{
|
||||||
std::vector<std::string> tokens;
|
std::vector<std::string> tokens;
|
||||||
@ -32,6 +33,10 @@ namespace ix
|
|||||||
return tokens;
|
return tokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Extract an attribute from a Json Value.
|
||||||
|
// extractAttr("foo.bar", {"foo": {"bar": "baz"}}) => baz
|
||||||
|
//
|
||||||
std::string extractAttr(const std::string& attr,
|
std::string extractAttr(const std::string& attr,
|
||||||
const Json::Value& jsonValue)
|
const Json::Value& jsonValue)
|
||||||
{
|
{
|
||||||
@ -71,7 +76,8 @@ namespace ix
|
|||||||
|
|
||||||
// statsd client
|
// statsd client
|
||||||
// test with netcat as a server: `nc -ul 8125`
|
// test with netcat as a server: `nc -ul 8125`
|
||||||
statsd::StatsdClient statsdClient(host, port, prefix, true);
|
bool statsdBatch = true;
|
||||||
|
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
|
||||||
|
|
||||||
Json::FastWriter jsonWriter;
|
Json::FastWriter jsonWriter;
|
||||||
uint64_t msgCount = 0;
|
uint64_t msgCount = 0;
|
||||||
@ -87,6 +93,10 @@ namespace ix
|
|||||||
{
|
{
|
||||||
std::cout << "Subscriber: connected" << std::endl;
|
std::cout << "Subscriber: connected" << std::endl;
|
||||||
}
|
}
|
||||||
|
if (eventType == ix::CobraConnection_EventType_Closed)
|
||||||
|
{
|
||||||
|
std::cout << "Subscriber: closed" << std::endl;
|
||||||
|
}
|
||||||
else if (eventType == ix::CobraConnection_EventType_Authenticated)
|
else if (eventType == ix::CobraConnection_EventType_Authenticated)
|
||||||
{
|
{
|
||||||
std::cout << "Subscriber authenticated" << std::endl;
|
std::cout << "Subscriber authenticated" << std::endl;
|
||||||
@ -129,7 +139,7 @@ namespace ix
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
std::chrono::duration<double, std::milli> duration(10);
|
std::chrono::duration<double, std::milli> duration(1000);
|
||||||
std::this_thread::sleep_for(duration);
|
std::this_thread::sleep_for(duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user