(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel

This commit is contained in:
Benjamin Sergeant 2020-08-31 13:45:00 -07:00
parent 73b9c0b89b
commit 5896d3740f
7 changed files with 96 additions and 2 deletions

View File

@ -2,6 +2,10 @@
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [10.3.2] - 2020-08-31
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel
## [10.3.1] - 2020-08-28 ## [10.3.1] - 2020-08-28
(socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis (socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis

View File

@ -5,6 +5,7 @@
set (IXBOTS_SOURCES set (IXBOTS_SOURCES
ixbots/IXCobraBot.cpp ixbots/IXCobraBot.cpp
ixbots/IXCobraToCobraBot.cpp
ixbots/IXCobraToSentryBot.cpp ixbots/IXCobraToSentryBot.cpp
ixbots/IXCobraToStatsdBot.cpp ixbots/IXCobraToStatsdBot.cpp
ixbots/IXCobraToStdoutBot.cpp ixbots/IXCobraToStdoutBot.cpp
@ -16,6 +17,7 @@ set (IXBOTS_SOURCES
set (IXBOTS_HEADERS set (IXBOTS_HEADERS
ixbots/IXCobraBot.h ixbots/IXCobraBot.h
ixbots/IXCobraBotConfig.h ixbots/IXCobraBotConfig.h
ixbots/IXCobraToCobraBot.h
ixbots/IXCobraToSentryBot.h ixbots/IXCobraToSentryBot.h
ixbots/IXCobraToStatsdBot.h ixbots/IXCobraToStatsdBot.h
ixbots/IXCobraToStdoutBot.h ixbots/IXCobraToStdoutBot.h

View File

@ -0,0 +1,43 @@
/*
* IXCobraToCobraBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXCobraToCobraBot.h"
#include "IXCobraBot.h"
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <sstream>
namespace ix
{
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& cobraBotConfig,
const std::string& republishChannel,
const std::string& publisherRolename,
const std::string& publisherRolesecret)
{
CobraBot bot;
CobraMetricsPublisher cobraMetricsPublisher;
CobraConfig cobraPublisherConfig = cobraBotConfig.cobraConfig;
cobraPublisherConfig.rolename = publisherRolename;
cobraPublisherConfig.rolesecret = publisherRolesecret;
cobraMetricsPublisher.configure(cobraPublisherConfig, republishChannel);
bot.setOnBotMessageCallback(
[&republishChannel, &cobraMetricsPublisher](const Json::Value& msg,
const std::string& /*position*/,
std::atomic<bool>& /*throttled*/,
std::atomic<bool>& /*fatalCobraError*/,
std::atomic<uint64_t>& sentCount) -> void {
Json::Value msgWithNoId(msg);
msgWithNoId.removeMember("id");
cobraMetricsPublisher.push(republishChannel, msg);
sentCount++;
});
return bot.run(cobraBotConfig);
}
} // namespace ix

View File

@ -0,0 +1,20 @@
/*
* IXCobraToCobraBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <cstdint>
#include <ixbots/IXStatsdClient.h>
#include "IXCobraBotConfig.h"
#include <stddef.h>
#include <string>
namespace ix
{
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& config,
const std::string& republishChannel,
const std::string& publisherRolename,
const std::string& publisherRolesecret);
} // namespace ix

View File

@ -24,6 +24,7 @@ namespace ix
{ {
_cobra_connection.setEventCallback([](const CobraEventPtr& event) { _cobra_connection.setEventCallback([](const CobraEventPtr& event) {
std::stringstream ss; std::stringstream ss;
ix::LogLevel logLevel = LogLevel::Info;
if (event->type == ix::CobraEventType::Open) if (event->type == ix::CobraEventType::Open)
{ {
@ -41,6 +42,7 @@ namespace ix
else if (event->type == ix::CobraEventType::Error) else if (event->type == ix::CobraEventType::Error)
{ {
ss << "Error: " << event->errMsg; ss << "Error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
} }
else if (event->type == ix::CobraEventType::Closed) else if (event->type == ix::CobraEventType::Closed)
{ {
@ -57,6 +59,7 @@ namespace ix
else if (event->type == ix::CobraEventType::Published) else if (event->type == ix::CobraEventType::Published)
{ {
ss << "Published message " << event->msgId << " acked"; ss << "Published message " << event->msgId << " acked";
logLevel = ix::LogLevel::Debug;
} }
else if (event->type == ix::CobraEventType::Pong) else if (event->type == ix::CobraEventType::Pong)
{ {
@ -65,17 +68,20 @@ namespace ix
else if (event->type == ix::CobraEventType::HandshakeError) else if (event->type == ix::CobraEventType::HandshakeError)
{ {
ss << "Handshake error: " << event->errMsg; ss << "Handshake error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
} }
else if (event->type == ix::CobraEventType::AuthenticationError) else if (event->type == ix::CobraEventType::AuthenticationError)
{ {
ss << "Authentication error: " << event->errMsg; ss << "Authentication error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
} }
else if (event->type == ix::CobraEventType::SubscriptionError) else if (event->type == ix::CobraEventType::SubscriptionError)
{ {
ss << "Subscription error: " << event->errMsg; ss << "Subscription error: " << event->errMsg;
logLevel = ix::LogLevel::Error;
} }
CoreLogger::log(ss.str().c_str()); CoreLogger::log(ss.str().c_str(), logLevel);
}); });
} }

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "10.3.1" #define IX_WEBSOCKET_VERSION "10.3.2"

View File

@ -18,6 +18,7 @@
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include <ixbots/IXCobraMetricsToRedisBot.h> #include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixbots/IXCobraToCobraBot.h>
#include <ixbots/IXCobraToPythonBot.h> #include <ixbots/IXCobraToPythonBot.h>
#include <ixbots/IXCobraToSentryBot.h> #include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h> #include <ixbots/IXCobraToStatsdBot.h>
@ -2813,6 +2814,8 @@ int main(int argc, char** argv)
std::string logfile; std::string logfile;
std::string moduleName; std::string moduleName;
std::string republishChannel; std::string republishChannel;
std::string publisherRolename;
std::string publisherRolesecret;
std::string sendMsg("hello world"); std::string sendMsg("hello world");
ix::SocketTLSOptions tlsOptions; ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig; ix::CobraConfig cobraConfig;
@ -3077,6 +3080,17 @@ int main(int argc, char** argv)
addTLSOptions(cobra2statsd); addTLSOptions(cobra2statsd);
addCobraBotConfig(cobra2statsd); addCobraBotConfig(cobra2statsd);
CLI::App* cobra2cobra = app.add_subcommand("cobra_to_cobra", "Cobra to Cobra");
cobra2cobra->fallthrough();
cobra2cobra->add_option("--republish", republishChannel, "Republish channel");
cobra2cobra->add_option("--publisher_rolename", publisherRolename, "Publisher Role name")
->required();
cobra2cobra->add_option("--publisher_rolesecret", publisherRolesecret, "Publisher Role secret")
->required();
cobra2cobra->add_flag("-q", quiet, "Quiet");
addTLSOptions(cobra2cobra);
addCobraBotConfig(cobra2cobra);
CLI::App* cobra2python = app.add_subcommand("cobra_to_python", "Cobra to python"); CLI::App* cobra2python = app.add_subcommand("cobra_to_python", "Cobra to python");
cobra2python->fallthrough(); cobra2python->fallthrough();
cobra2python->add_option("--host", hostname, "Statsd host"); cobra2python->add_option("--host", hostname, "Statsd host");
@ -3408,6 +3422,11 @@ int main(int argc, char** argv)
ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose); ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose);
} }
} }
else if (app.got_subcommand("cobra_to_cobra"))
{
ret = (int) ix::cobra_to_cobra_bot(
cobraBotConfig, republishChannel, publisherRolename, publisherRolesecret);
}
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {
ret = ix::ws_snake_main(port, ret = ix::ws_snake_main(port,