Add cobra_metrics_to_redis sub-command to create streams for each cobra metric event being received.

This commit is contained in:
Benjamin Sergeant 2019-10-24 14:42:25 -07:00
parent adf83f3255
commit c40033b6d9
9 changed files with 343 additions and 18 deletions

View File

@ -1 +1 @@
7.1.0 7.2.0

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [7.2.0] - 2019-10-24
- Add cobra_metrics_to_redis sub-command to create streams for each cobra metric event being received.
## [7.1.0] - 2019-10-13 ## [7.1.0] - 2019-10-13
- Add client support for websocket subprotocol. Look for the new addSubProtocol method for details. - Add client support for websocket subprotocol. Look for the new addSubProtocol method for details.

View File

@ -33,6 +33,11 @@ namespace ix
return _socket->connect(hostname, port, errMsg, nullptr); return _socket->connect(hostname, port, errMsg, nullptr);
} }
void RedisClient::stop()
{
_stop = true;
}
bool RedisClient::auth(const std::string& password, std::string& response) bool RedisClient::auth(const std::string& password, std::string& response)
{ {
response.clear(); response.clear();
@ -243,8 +248,102 @@ namespace ix
return true; return true;
} }
void RedisClient::stop() std::string RedisClient::prepareXaddCommand(
const std::string& stream,
const std::string& message)
{ {
_stop = true; std::stringstream ss;
ss << "*5\r\n";
ss << writeString("XADD");
ss << writeString(stream);
ss << writeString("*");
ss << writeString("field");
ss << writeString(message);
return ss.str();
}
std::string RedisClient::xadd(const std::string& stream,
const std::string& message,
std::string& errMsg)
{
errMsg.clear();
if (!_socket)
{
errMsg = "socket is not initialized";
return std::string();
}
std::string command = prepareXaddCommand(stream, message);
bool sent = _socket->writeBytes(command, nullptr);
if (!sent)
{
errMsg = "Cannot write bytes to socket";
return std::string();
}
return readXaddReply(errMsg);
}
std::string RedisClient::readXaddReply(std::string& errMsg)
{
// Read result
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
errMsg = "Error while polling for result";
return std::string();
}
// First line is the string length
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid)
{
errMsg = "Error while polling for result";
return std::string();
}
int stringSize;
{
std::stringstream ss;
ss << line.substr(1, line.size() - 1);
ss >> stringSize;
}
// Read the result, which is the stream id computed by the redis server
lineResult = _socket->readLine(nullptr);
lineValid = lineResult.first;
line = lineResult.second;
std::string streamId = line.substr(0, stringSize - 1);
return streamId;
}
bool RedisClient::sendCommand(const std::string& commands, int commandsCount, std::string& errMsg)
{
bool sent = _socket->writeBytes(commands, nullptr);
if (!sent)
{
errMsg = "Cannot write bytes to socket";
return false;
}
bool success = true;
for (int i = 0; i < commandsCount; ++i)
{
auto reply = readXaddReply(errMsg);
if (reply == std::string())
{
success = false;
}
}
return success;
} }
} // namespace ix } // namespace ix

View File

@ -30,12 +30,27 @@ namespace ix
bool auth(const std::string& password, std::string& response); bool auth(const std::string& password, std::string& response);
// Publish / Subscribe
bool publish(const std::string& channel, const std::string& message, std::string& errMsg); bool publish(const std::string& channel, const std::string& message, std::string& errMsg);
bool subscribe(const std::string& channel, bool subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback); const OnRedisSubscribeCallback& callback);
// XADD
std::string xadd(
const std::string& channel,
const std::string& message,
std::string& errMsg);
std::string prepareXaddCommand(
const std::string& stream,
const std::string& message);
std::string readXaddReply(std::string& errMsg);
bool sendCommand(const std::string& commands, int commandsCount, std::string& errMsg);
void stop(); void stop();
private: private:

View File

@ -54,6 +54,7 @@ add_executable(ws
ws_cobra_publish.cpp ws_cobra_publish.cpp
ws_cobra_to_statsd.cpp ws_cobra_to_statsd.cpp
ws_cobra_to_sentry.cpp ws_cobra_to_sentry.cpp
ws_cobra_metrics_to_redis.cpp
ws_httpd.cpp ws_httpd.cpp
ws_autobahn.cpp ws_autobahn.cpp
ws.cpp) ws.cpp)

View File

@ -205,20 +205,20 @@ int main(int argc, char** argv)
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber"); CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber");
cobraSubscribeApp->add_option("--appkey", appkey, "Appkey"); cobraSubscribeApp->add_option("--appkey", appkey, "Appkey")->required();
cobraSubscribeApp->add_option("--endpoint", endpoint, "Endpoint"); cobraSubscribeApp->add_option("--endpoint", endpoint, "Endpoint")->required();
cobraSubscribeApp->add_option("--rolename", rolename, "Role name"); cobraSubscribeApp->add_option("--rolename", rolename, "Role name")->required();
cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret"); cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobraSubscribeApp->add_option("channel", channel, "Channel")->required(); cobraSubscribeApp->add_option("channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->add_option("--appkey", appkey, "Appkey"); cobraPublish->add_option("--appkey", appkey, "Appkey")->required();
cobraPublish->add_option("--endpoint", endpoint, "Endpoint"); cobraPublish->add_option("--endpoint", endpoint, "Endpoint")->required();
cobraPublish->add_option("--rolename", rolename, "Role name"); cobraPublish->add_option("--rolename", rolename, "Role name")->required();
cobraPublish->add_option("--rolesecret", rolesecret, "Role secret"); cobraPublish->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobraPublish->add_option("channel", channel, "Channel")->required(); cobraPublish->add_option("channel", channel, "Channel")->required();
cobraPublish->add_option("--pidfile", pidfile, "Pid file"); cobraPublish->add_option("--pidfile", pidfile, "Pid file");
cobraPublish->add_option("path", path, "Path to the file to send") cobraPublish->add_option("path", path, "Path to the file to send")
@ -238,7 +238,7 @@ int main(int argc, char** argv)
->check(CLI::ExistingPath); ->check(CLI::ExistingPath);
cobraMetricsPublish->add_flag("--stress", stress, "Stress mode"); cobraMetricsPublish->add_flag("--stress", stress, "Stress mode");
CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra metrics to statsd");
cobra2statsd->add_option("--appkey", appkey, "Appkey"); cobra2statsd->add_option("--appkey", appkey, "Appkey");
cobra2statsd->add_option("--endpoint", endpoint, "Endpoint"); cobra2statsd->add_option("--endpoint", endpoint, "Endpoint");
cobra2statsd->add_option("--rolename", rolename, "Role name"); cobra2statsd->add_option("--rolename", rolename, "Role name");
@ -252,11 +252,11 @@ int main(int argc, char** argv)
cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra to sentry"); CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry");
cobra2sentry->add_option("--appkey", appkey, "Appkey"); cobra2sentry->add_option("--appkey", appkey, "Appkey")->required();
cobra2sentry->add_option("--endpoint", endpoint, "Endpoint"); cobra2sentry->add_option("--endpoint", endpoint, "Endpoint")->required();
cobra2sentry->add_option("--rolename", rolename, "Role name"); cobra2sentry->add_option("--rolename", rolename, "Role name")->required();
cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret"); cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN");
cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry"); cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry");
cobra2sentry->add_option("channel", channel, "Channel")->required(); cobra2sentry->add_option("channel", channel, "Channel")->required();
@ -265,6 +265,19 @@ int main(int argc, char** argv)
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
CLI::App* cobra2redisApp =
app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis");
cobra2redisApp->add_option("--appkey", appkey, "Appkey")->required();
cobra2redisApp->add_option("--endpoint", endpoint, "Endpoint")->required();
cobra2redisApp->add_option("--rolename", rolename, "Role name")->required();
cobra2redisApp->add_option("--rolesecret", rolesecret, "Role secret")->required();
cobra2redisApp->add_option("channel", channel, "Channel")->required();
cobra2redisApp->add_option("--pidfile", pidfile, "Pid file");
cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter");
cobra2redisApp->add_option("--hostname", hostname, "Redis hostname");
cobra2redisApp->add_option("--port", redisPort, "Redis port");
cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats");
CLI::App* runApp = app.add_subcommand("snake", "Snake server"); CLI::App* runApp = app.add_subcommand("snake", "Snake server");
runApp->add_option("--port", port, "Connection url"); runApp->add_option("--port", port, "Connection url");
runApp->add_option("--host", hostname, "Hostname"); runApp->add_option("--host", hostname, "Hostname");
@ -407,6 +420,11 @@ int main(int argc, char** argv)
ret = ix::ws_cobra_to_sentry_main( ret = ix::ws_cobra_to_sentry_main(
appkey, endpoint, rolename, rolesecret, channel, filter, dsn, verbose, strict, jobs); appkey, endpoint, rolename, rolesecret, channel, filter, dsn, verbose, strict, jobs);
} }
else if (app.got_subcommand("cobra_metrics_to_redis"))
{
ret = ix::ws_cobra_metrics_to_redis(
appkey, endpoint, rolename, rolesecret, channel, filter, quiet, hostname, redisPort);
}
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {
ret = ix::ws_snake_main( ret = ix::ws_snake_main(

10
ws/ws.h
View File

@ -116,6 +116,16 @@ namespace ix
bool strict, bool strict,
int jobs); int jobs);
int ws_cobra_metrics_to_redis(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
bool quiet,
const std::string& host,
int port);
int ws_snake_main(int port, int ws_snake_main(int port,
const std::string& hostname, const std::string& hostname,
const std::string& redisHosts, const std::string& redisHosts,

View File

@ -0,0 +1,178 @@
/*
* ws_cobra_metrics_to_redis.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <ixsnake/IXRedisClient.h>
#include <mutex>
#include <queue>
#include <spdlog/spdlog.h>
#include <sstream>
#include <thread>
namespace ix
{
int ws_cobra_metrics_to_redis(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& filter,
bool quiet,
const std::string& host,
int port)
{
ix::CobraConnection conn;
conn.configure(
appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true));
conn.connect();
// Display incoming messages
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
auto timer = [&msgPerSeconds, &msgCount] {
while (true)
{
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds << std::endl;
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
ix::RedisClient redisClient;
if (!redisClient.connect(host, port))
{
spdlog::error("Cannot connect to redis host {}:{}", host, port);
return 0;
}
std::mutex conditionVariableMutex;
std::condition_variable condition;
std::queue<Json::Value> queue;
auto redisSender = [&condition, &queue, &conditionVariableMutex, &redisClient] {
Json::FastWriter jsonWriter;
int batchSize = 1000;
int i = 0;
std::stringstream pipe;
while (true)
{
Json::Value msg;
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
condition.wait(lock, [&queue] { return !queue.empty(); });
msg = queue.front();
queue.pop();
}
// compute channel
std::stringstream ss;
ss << "session=" << msg["session"].asString() << ";msgid=" << msg["id"].asString();
std::string channel = ss.str();
std::string errMsg;
pipe << redisClient.prepareXaddCommand(channel, jsonWriter.write(msg));
if (i++ == batchSize)
{
if (!redisClient.sendCommand(pipe.str(), batchSize, errMsg))
{
spdlog::error("error sending command: {}", errMsg);
}
i = 0;
}
}
};
std::thread t1(timer);
std::thread t2(redisSender);
conn.setEventCallback([&conn,
&channel,
&filter,
&msgCount,
&msgPerSeconds,
&quiet,
&conditionVariableMutex,
&condition,
&queue](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
for (auto it : headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
spdlog::info("Subscriber authenticated");
conn.subscribe(channel,
filter,
[&quiet,
&msgPerSeconds,
&msgCount,
&conditionVariableMutex,
&condition,
&queue](const Json::Value& msg) {
{
std::unique_lock<std::mutex> lock(conditionVariableMutex);
queue.push(msg);
}
condition.notify_one();
msgPerSeconds++;
msgCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
});
while (true)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
return 0;
}
} // namespace ix

View File

@ -72,7 +72,7 @@ namespace ix
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) { [&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) {
if (!quiet) if (!quiet)
{ {
std::cout << jsonWriter.write(msg) << std::endl; std::cerr << jsonWriter.write(msg) << std::endl;
} }
msgPerSeconds++; msgPerSeconds++;