From 91198aca0d297eaea521a58b95d05f10161769f6 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Tue, 26 Mar 2019 09:33:22 -0700 Subject: [PATCH] (ws) redis_subscribe and redis_publish can take a password + display subscribe response --- ws/IXRedisClient.cpp | 41 +++++++++++++++++++++++++++++++++++++++ ws/IXRedisClient.h | 5 +++++ ws/ws.cpp | 7 +++++-- ws/ws.h | 2 ++ ws/ws_redis_publish.cpp | 13 +++++++++++++ ws/ws_redis_subscribe.cpp | 20 ++++++++++++++++++- 6 files changed, 85 insertions(+), 3 deletions(-) diff --git a/ws/IXRedisClient.cpp b/ws/IXRedisClient.cpp index d023c08b..cf48a6f6 100644 --- a/ws/IXRedisClient.cpp +++ b/ws/IXRedisClient.cpp @@ -30,6 +30,39 @@ namespace ix return _socket->connect(hostname, port, errMsg, nullptr); } + bool RedisClient::auth(const std::string& password, + std::string& response) + { + response.clear(); + + if (!_socket) return false; + + std::stringstream ss; + ss << "AUTH "; + ss << password; + ss << "\r\n"; + + bool sent = _socket->writeBytes(ss.str(), nullptr); + if (!sent) + { + return false; + } + + auto pollResult = _socket->isReadyToRead(-1); + if (pollResult == PollResultType::Error) + { + return false; + } + + auto lineResult = _socket->readLine(nullptr); + auto lineValid = lineResult.first; + auto line = lineResult.second; + + response = line; + return lineValid; + } + + bool RedisClient::publish(const std::string& channel, const std::string& message) { @@ -65,6 +98,7 @@ namespace ix // FIXME: we assume that redis never return errors... // bool RedisClient::subscribe(const std::string& channel, + const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeCallback& callback) { if (!_socket) return false; @@ -87,10 +121,14 @@ namespace ix return false; } + // build the response as a single string + std::stringstream oss; + // Read the first line of the response auto lineResult = _socket->readLine(nullptr); auto lineValid = lineResult.first; auto line = lineResult.second; + oss << line; if (!lineValid) return false; @@ -100,10 +138,13 @@ namespace ix auto lineResult = _socket->readLine(nullptr); auto lineValid = lineResult.first; auto line = lineResult.second; + oss << line; if (!lineValid) return false; } + responseCallback(oss.str()); + // Wait indefinitely for new messages while (true) { diff --git a/ws/IXRedisClient.h b/ws/IXRedisClient.h index 42f5e77f..ee53a7aa 100644 --- a/ws/IXRedisClient.h +++ b/ws/IXRedisClient.h @@ -15,6 +15,7 @@ namespace ix class RedisClient { public: + using OnRedisSubscribeResponseCallback = std::function; using OnRedisSubscribeCallback = std::function; RedisClient() = default; @@ -23,10 +24,14 @@ namespace ix bool connect(const std::string& hostname, int port); + bool auth(const std::string& password, + std::string& response); + bool publish(const std::string& channel, const std::string& message); bool subscribe(const std::string& channel, + const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeCallback& callback); private: diff --git a/ws/ws.cpp b/ws/ws.cpp index 64f86bfc..1c1e7c53 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -37,6 +37,7 @@ int main(int argc, char** argv) std::string pidfile; std::string channel; std::string message; + std::string password; bool headersOnly = false; bool followRedirects = false; bool verbose = false; @@ -102,12 +103,14 @@ int main(int argc, char** argv) CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher"); redisPublishApp->add_option("--port", redisPort, "Port"); redisPublishApp->add_option("--host", hostname, "Hostname"); + redisPublishApp->add_option("--password", password, "Password"); redisPublishApp->add_option("channel", channel, "Channel")->required(); redisPublishApp->add_option("message", message, "Message")->required(); CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber"); redisSubscribeApp->add_option("--port", redisPort, "Port"); redisSubscribeApp->add_option("--host", hostname, "Hostname"); + redisSubscribeApp->add_option("--password", password, "Password"); redisSubscribeApp->add_option("channel", channel, "Channel")->required(); redisSubscribeApp->add_flag("-v", verbose, "Verbose"); @@ -166,11 +169,11 @@ int main(int argc, char** argv) } else if (app.got_subcommand("redis_publish")) { - return ix::ws_redis_publish_main(hostname, redisPort, channel, message); + return ix::ws_redis_publish_main(hostname, redisPort, password, channel, message); } else if (app.got_subcommand("redis_subscribe")) { - return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose); + return ix::ws_redis_subscribe_main(hostname, redisPort, password, channel, verbose); } return 1; diff --git a/ws/ws.h b/ws/ws.h index d5b5637a..95ae880f 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -42,11 +42,13 @@ namespace ix int ws_redis_publish_main(const std::string& hostname, int port, + const std::string& password, const std::string& channel, const std::string& message); int ws_redis_subscribe_main(const std::string& hostname, int port, + const std::string& password, const std::string& channel, bool verbose); } diff --git a/ws/ws_redis_publish.cpp b/ws/ws_redis_publish.cpp index 592f664b..31a1ec86 100644 --- a/ws/ws_redis_publish.cpp +++ b/ws/ws_redis_publish.cpp @@ -12,6 +12,7 @@ namespace ix { int ws_redis_publish_main(const std::string& hostname, int port, + const std::string& password, const std::string& channel, const std::string& message) { @@ -22,6 +23,18 @@ namespace ix return 1; } + if (!password.empty()) + { + std::string authResponse; + if (!redisClient.auth(password, authResponse)) + { + std::stringstream ss; + std::cerr << "Cannot authenticated to redis" << std::endl; + return 1; + } + std::cout << "Auth response: " << authResponse << ":" << port << std::endl; + } + std::cerr << "Publishing message " << message << " to " << channel << "..." << std::endl; if (!redisClient.publish(channel, message)) diff --git a/ws/ws_redis_subscribe.cpp b/ws/ws_redis_subscribe.cpp index 50a76041..bc861d39 100644 --- a/ws/ws_redis_subscribe.cpp +++ b/ws/ws_redis_subscribe.cpp @@ -13,6 +13,7 @@ namespace ix { int ws_redis_subscribe_main(const std::string& hostname, int port, + const std::string& password, const std::string& channel, bool verbose) { @@ -23,6 +24,18 @@ namespace ix return 1; } + if (!password.empty()) + { + std::string authResponse; + if (!redisClient.auth(password, authResponse)) + { + std::stringstream ss; + std::cerr << "Cannot authenticated to redis" << std::endl; + return 1; + } + std::cout << "Auth response: " << authResponse << ":" << port << std::endl; + } + std::chrono::time_point lastTimePoint; int msgPerSeconds = 0; int msgCount = 0; @@ -53,8 +66,13 @@ namespace ix } }; + auto responseCallback = [](const std::string& redisResponse) + { + std::cout << "Redis subscribe response: " << redisResponse << std::endl; + }; + std::cerr << "Subscribing to " << channel << "..." << std::endl; - if (!redisClient.subscribe(channel, callback)) + if (!redisClient.subscribe(channel, responseCallback, callback)) { std::cerr << "Error subscribing to channel " << channel << std::endl; return 1;