diff --git a/DOCKER_VERSION b/DOCKER_VERSION index a3fcc712..0ee843cc 100644 --- a/DOCKER_VERSION +++ b/DOCKER_VERSION @@ -1 +1 @@ -7.1.0 +7.2.0 diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ec0b3b28..3ee768b7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [7.2.0] - 2019-10-24 + +- Add cobra_metrics_to_redis sub-command to create streams for each cobra metric event being received. + ## [7.1.0] - 2019-10-13 - Add client support for websocket subprotocol. Look for the new addSubProtocol method for details. diff --git a/ixsnake/ixsnake/IXRedisClient.cpp b/ixsnake/ixsnake/IXRedisClient.cpp index c2e3ad75..754041d8 100644 --- a/ixsnake/ixsnake/IXRedisClient.cpp +++ b/ixsnake/ixsnake/IXRedisClient.cpp @@ -33,6 +33,11 @@ namespace ix return _socket->connect(hostname, port, errMsg, nullptr); } + void RedisClient::stop() + { + _stop = true; + } + bool RedisClient::auth(const std::string& password, std::string& response) { response.clear(); @@ -243,8 +248,102 @@ namespace ix return true; } - void RedisClient::stop() + std::string RedisClient::prepareXaddCommand( + const std::string& stream, + const std::string& message) { - _stop = true; + std::stringstream ss; + ss << "*5\r\n"; + ss << writeString("XADD"); + ss << writeString(stream); + ss << writeString("*"); + ss << writeString("field"); + ss << writeString(message); + + return ss.str(); + } + + std::string RedisClient::xadd(const std::string& stream, + const std::string& message, + std::string& errMsg) + { + errMsg.clear(); + + if (!_socket) + { + errMsg = "socket is not initialized"; + return std::string(); + } + + std::string command = prepareXaddCommand(stream, message); + + bool sent = _socket->writeBytes(command, nullptr); + if (!sent) + { + errMsg = "Cannot write bytes to socket"; + return std::string(); + } + + return readXaddReply(errMsg); + } + + std::string RedisClient::readXaddReply(std::string& errMsg) + { + // Read result + auto pollResult = _socket->isReadyToRead(-1); + if (pollResult == PollResultType::Error) + { + errMsg = "Error while polling for result"; + return std::string(); + } + + // First line is the string length + auto lineResult = _socket->readLine(nullptr); + auto lineValid = lineResult.first; + auto line = lineResult.second; + + if (!lineValid) + { + errMsg = "Error while polling for result"; + return std::string(); + } + + int stringSize; + { + std::stringstream ss; + ss << line.substr(1, line.size() - 1); + ss >> stringSize; + } + + // Read the result, which is the stream id computed by the redis server + lineResult = _socket->readLine(nullptr); + lineValid = lineResult.first; + line = lineResult.second; + + std::string streamId = line.substr(0, stringSize - 1); + return streamId; + } + + bool RedisClient::sendCommand(const std::string& commands, int commandsCount, std::string& errMsg) + { + bool sent = _socket->writeBytes(commands, nullptr); + if (!sent) + { + errMsg = "Cannot write bytes to socket"; + return false; + } + + bool success = true; + + for (int i = 0; i < commandsCount; ++i) + { + auto reply = readXaddReply(errMsg); + if (reply == std::string()) + { + success = false; + } + } + + return success; } } // namespace ix diff --git a/ixsnake/ixsnake/IXRedisClient.h b/ixsnake/ixsnake/IXRedisClient.h index 453ef6f1..d965ae84 100644 --- a/ixsnake/ixsnake/IXRedisClient.h +++ b/ixsnake/ixsnake/IXRedisClient.h @@ -30,12 +30,27 @@ namespace ix bool auth(const std::string& password, std::string& response); + // Publish / Subscribe bool publish(const std::string& channel, const std::string& message, std::string& errMsg); bool subscribe(const std::string& channel, const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeCallback& callback); + // XADD + std::string xadd( + const std::string& channel, + const std::string& message, + std::string& errMsg); + + std::string prepareXaddCommand( + const std::string& stream, + const std::string& message); + + std::string readXaddReply(std::string& errMsg); + + bool sendCommand(const std::string& commands, int commandsCount, std::string& errMsg); + void stop(); private: diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 717484b6..af58c5c8 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -54,6 +54,7 @@ add_executable(ws ws_cobra_publish.cpp ws_cobra_to_statsd.cpp ws_cobra_to_sentry.cpp + ws_cobra_metrics_to_redis.cpp ws_httpd.cpp ws_autobahn.cpp ws.cpp) diff --git a/ws/ws.cpp b/ws/ws.cpp index 5d73eaff..a4358d5d 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -205,20 +205,20 @@ int main(int argc, char** argv) redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); CLI::App* cobraSubscribeApp = app.add_subcommand("cobra_subscribe", "Cobra subscriber"); - cobraSubscribeApp->add_option("--appkey", appkey, "Appkey"); - cobraSubscribeApp->add_option("--endpoint", endpoint, "Endpoint"); - cobraSubscribeApp->add_option("--rolename", rolename, "Role name"); - cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret"); + cobraSubscribeApp->add_option("--appkey", appkey, "Appkey")->required(); + cobraSubscribeApp->add_option("--endpoint", endpoint, "Endpoint")->required(); + cobraSubscribeApp->add_option("--rolename", rolename, "Role name")->required(); + cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobraSubscribeApp->add_option("channel", channel, "Channel")->required(); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter"); cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats"); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); - cobraPublish->add_option("--appkey", appkey, "Appkey"); - cobraPublish->add_option("--endpoint", endpoint, "Endpoint"); - cobraPublish->add_option("--rolename", rolename, "Role name"); - cobraPublish->add_option("--rolesecret", rolesecret, "Role secret"); + cobraPublish->add_option("--appkey", appkey, "Appkey")->required(); + cobraPublish->add_option("--endpoint", endpoint, "Endpoint")->required(); + cobraPublish->add_option("--rolename", rolename, "Role name")->required(); + cobraPublish->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobraPublish->add_option("channel", channel, "Channel")->required(); cobraPublish->add_option("--pidfile", pidfile, "Pid file"); cobraPublish->add_option("path", path, "Path to the file to send") @@ -238,7 +238,7 @@ int main(int argc, char** argv) ->check(CLI::ExistingPath); cobraMetricsPublish->add_flag("--stress", stress, "Stress mode"); - CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); + CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra metrics to statsd"); cobra2statsd->add_option("--appkey", appkey, "Appkey"); cobra2statsd->add_option("--endpoint", endpoint, "Endpoint"); cobra2statsd->add_option("--rolename", rolename, "Role name"); @@ -252,11 +252,11 @@ int main(int argc, char** argv) cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--filter", filter, "Stream SQL Filter"); - CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra to sentry"); - cobra2sentry->add_option("--appkey", appkey, "Appkey"); - cobra2sentry->add_option("--endpoint", endpoint, "Endpoint"); - cobra2sentry->add_option("--rolename", rolename, "Role name"); - cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret"); + CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra metrics to sentry"); + cobra2sentry->add_option("--appkey", appkey, "Appkey")->required(); + cobra2sentry->add_option("--endpoint", endpoint, "Endpoint")->required(); + cobra2sentry->add_option("--rolename", rolename, "Role name")->required(); + cobra2sentry->add_option("--rolesecret", rolesecret, "Role secret")->required(); cobra2sentry->add_option("--dsn", dsn, "Sentry DSN"); cobra2sentry->add_option("--jobs", jobs, "Number of thread sending events to Sentry"); cobra2sentry->add_option("channel", channel, "Channel")->required(); @@ -265,6 +265,19 @@ int main(int argc, char** argv) cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--filter", filter, "Stream SQL Filter"); + CLI::App* cobra2redisApp = + app.add_subcommand("cobra_metrics_to_redis", "Cobra metrics to redis"); + cobra2redisApp->add_option("--appkey", appkey, "Appkey")->required(); + cobra2redisApp->add_option("--endpoint", endpoint, "Endpoint")->required(); + cobra2redisApp->add_option("--rolename", rolename, "Role name")->required(); + cobra2redisApp->add_option("--rolesecret", rolesecret, "Role secret")->required(); + cobra2redisApp->add_option("channel", channel, "Channel")->required(); + cobra2redisApp->add_option("--pidfile", pidfile, "Pid file"); + cobra2redisApp->add_option("--filter", filter, "Stream SQL Filter"); + cobra2redisApp->add_option("--hostname", hostname, "Redis hostname"); + cobra2redisApp->add_option("--port", redisPort, "Redis port"); + cobra2redisApp->add_flag("-q", quiet, "Quiet / only display stats"); + CLI::App* runApp = app.add_subcommand("snake", "Snake server"); runApp->add_option("--port", port, "Connection url"); runApp->add_option("--host", hostname, "Hostname"); @@ -407,6 +420,11 @@ int main(int argc, char** argv) ret = ix::ws_cobra_to_sentry_main( appkey, endpoint, rolename, rolesecret, channel, filter, dsn, verbose, strict, jobs); } + else if (app.got_subcommand("cobra_metrics_to_redis")) + { + ret = ix::ws_cobra_metrics_to_redis( + appkey, endpoint, rolename, rolesecret, channel, filter, quiet, hostname, redisPort); + } else if (app.got_subcommand("snake")) { ret = ix::ws_snake_main( diff --git a/ws/ws.h b/ws/ws.h index 30c62f18..cfa22e90 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -116,6 +116,16 @@ namespace ix bool strict, int jobs); + int ws_cobra_metrics_to_redis(const std::string& appkey, + const std::string& endpoint, + const std::string& rolename, + const std::string& rolesecret, + const std::string& channel, + const std::string& filter, + bool quiet, + const std::string& host, + int port); + int ws_snake_main(int port, const std::string& hostname, const std::string& redisHosts, diff --git a/ws/ws_cobra_metrics_to_redis.cpp b/ws/ws_cobra_metrics_to_redis.cpp new file mode 100644 index 00000000..3b39b941 --- /dev/null +++ b/ws/ws_cobra_metrics_to_redis.cpp @@ -0,0 +1,178 @@ +/* + * ws_cobra_metrics_to_redis.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ix +{ + int ws_cobra_metrics_to_redis(const std::string& appkey, + const std::string& endpoint, + const std::string& rolename, + const std::string& rolesecret, + const std::string& channel, + const std::string& filter, + bool quiet, + const std::string& host, + int port) + { + ix::CobraConnection conn; + conn.configure( + appkey, endpoint, rolename, rolesecret, ix::WebSocketPerMessageDeflateOptions(true)); + conn.connect(); + + // Display incoming messages + std::atomic msgPerSeconds(0); + std::atomic msgCount(0); + + auto timer = [&msgPerSeconds, &msgCount] { + while (true) + { + std::cout << "#messages " << msgCount << " " + << "msg/s " << msgPerSeconds << std::endl; + + msgPerSeconds = 0; + auto duration = std::chrono::seconds(1); + std::this_thread::sleep_for(duration); + } + }; + + ix::RedisClient redisClient; + if (!redisClient.connect(host, port)) + { + spdlog::error("Cannot connect to redis host {}:{}", host, port); + return 0; + } + + std::mutex conditionVariableMutex; + std::condition_variable condition; + std::queue queue; + + auto redisSender = [&condition, &queue, &conditionVariableMutex, &redisClient] { + Json::FastWriter jsonWriter; + + int batchSize = 1000; + int i = 0; + + std::stringstream pipe; + + while (true) + { + Json::Value msg; + + { + std::unique_lock lock(conditionVariableMutex); + condition.wait(lock, [&queue] { return !queue.empty(); }); + + msg = queue.front(); + queue.pop(); + } + + // compute channel + std::stringstream ss; + ss << "session=" << msg["session"].asString() << ";msgid=" << msg["id"].asString(); + + std::string channel = ss.str(); + + std::string errMsg; + pipe << redisClient.prepareXaddCommand(channel, jsonWriter.write(msg)); + + if (i++ == batchSize) + { + if (!redisClient.sendCommand(pipe.str(), batchSize, errMsg)) + { + spdlog::error("error sending command: {}", errMsg); + } + + i = 0; + } + } + }; + + std::thread t1(timer); + std::thread t2(redisSender); + + conn.setEventCallback([&conn, + &channel, + &filter, + &msgCount, + &msgPerSeconds, + &quiet, + &conditionVariableMutex, + &condition, + &queue](ix::CobraConnectionEventType eventType, + const std::string& errMsg, + const ix::WebSocketHttpHeaders& headers, + const std::string& subscriptionId, + CobraConnection::MsgId msgId) { + if (eventType == ix::CobraConnection_EventType_Open) + { + spdlog::info("Subscriber connected"); + + for (auto it : headers) + { + spdlog::info("{}: {}", it.first, it.second); + } + } + else if (eventType == ix::CobraConnection_EventType_Authenticated) + { + spdlog::info("Subscriber authenticated"); + + conn.subscribe(channel, + filter, + [&quiet, + &msgPerSeconds, + &msgCount, + &conditionVariableMutex, + &condition, + &queue](const Json::Value& msg) { + { + std::unique_lock lock(conditionVariableMutex); + queue.push(msg); + } + + condition.notify_one(); + + msgPerSeconds++; + msgCount++; + }); + } + else if (eventType == ix::CobraConnection_EventType_Subscribed) + { + spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_UnSubscribed) + { + spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); + } + else if (eventType == ix::CobraConnection_EventType_Error) + { + spdlog::error("Subscriber: error {}", errMsg); + } + else if (eventType == ix::CobraConnection_EventType_Published) + { + spdlog::error("Published message hacked: {}", msgId); + } + }); + + while (true) + { + std::chrono::duration duration(10); + std::this_thread::sleep_for(duration); + } + + return 0; + } +} // namespace ix diff --git a/ws/ws_cobra_subscribe.cpp b/ws/ws_cobra_subscribe.cpp index 25376d10..34e25900 100644 --- a/ws/ws_cobra_subscribe.cpp +++ b/ws/ws_cobra_subscribe.cpp @@ -72,7 +72,7 @@ namespace ix [&jsonWriter, &quiet, &msgPerSeconds, &msgCount](const Json::Value& msg) { if (!quiet) { - std::cout << jsonWriter.write(msg) << std::endl; + std::cerr << jsonWriter.write(msg) << std::endl; } msgPerSeconds++;