(ws) redis_subscribe and redis_publish can take a password + display subscribe response
This commit is contained in:
		| @@ -30,6 +30,39 @@ namespace ix | ||||
|         return _socket->connect(hostname, port, errMsg, nullptr); | ||||
|     } | ||||
|  | ||||
|     bool RedisClient::auth(const std::string& password, | ||||
|                            std::string& response) | ||||
|     { | ||||
|         response.clear(); | ||||
|  | ||||
|         if (!_socket) return false; | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         ss << "AUTH "; | ||||
|         ss << password; | ||||
|         ss << "\r\n"; | ||||
|  | ||||
|         bool sent = _socket->writeBytes(ss.str(), nullptr); | ||||
|         if (!sent) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         auto pollResult = _socket->isReadyToRead(-1); | ||||
|         if (pollResult == PollResultType::Error) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         auto lineResult = _socket->readLine(nullptr); | ||||
|         auto lineValid = lineResult.first; | ||||
|         auto line = lineResult.second; | ||||
|  | ||||
|         response = line; | ||||
|         return lineValid; | ||||
|     } | ||||
|  | ||||
|  | ||||
|     bool RedisClient::publish(const std::string& channel, | ||||
|                               const std::string& message) | ||||
|     { | ||||
| @@ -65,6 +98,7 @@ namespace ix | ||||
|     // FIXME: we assume that redis never return errors... | ||||
|     // | ||||
|     bool RedisClient::subscribe(const std::string& channel, | ||||
|                                 const OnRedisSubscribeResponseCallback& responseCallback, | ||||
|                                 const OnRedisSubscribeCallback& callback) | ||||
|     { | ||||
|         if (!_socket) return false; | ||||
| @@ -87,10 +121,14 @@ namespace ix | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         // build the response as a single string | ||||
|         std::stringstream oss; | ||||
|  | ||||
|         // Read the first line of the response | ||||
|         auto lineResult = _socket->readLine(nullptr); | ||||
|         auto lineValid = lineResult.first; | ||||
|         auto line = lineResult.second; | ||||
|         oss << line; | ||||
|  | ||||
|         if (!lineValid) return false; | ||||
|  | ||||
| @@ -100,10 +138,13 @@ namespace ix | ||||
|             auto lineResult = _socket->readLine(nullptr); | ||||
|             auto lineValid = lineResult.first; | ||||
|             auto line = lineResult.second; | ||||
|             oss << line; | ||||
|  | ||||
|             if (!lineValid) return false; | ||||
|         } | ||||
|  | ||||
|         responseCallback(oss.str()); | ||||
|  | ||||
|         // Wait indefinitely for new messages | ||||
|         while (true) | ||||
|         { | ||||
|   | ||||
| @@ -15,6 +15,7 @@ namespace ix | ||||
|  | ||||
|     class RedisClient { | ||||
|     public: | ||||
|         using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>; | ||||
|         using OnRedisSubscribeCallback = std::function<void(const std::string&)>; | ||||
|  | ||||
|         RedisClient() = default; | ||||
| @@ -23,10 +24,14 @@ namespace ix | ||||
|         bool connect(const std::string& hostname, | ||||
|                      int port); | ||||
|  | ||||
|         bool auth(const std::string& password, | ||||
|                   std::string& response); | ||||
|  | ||||
|         bool publish(const std::string& channel, | ||||
|                      const std::string& message); | ||||
|  | ||||
|         bool subscribe(const std::string& channel, | ||||
|                        const OnRedisSubscribeResponseCallback& responseCallback, | ||||
|                        const OnRedisSubscribeCallback& callback); | ||||
|  | ||||
|     private: | ||||
|   | ||||
| @@ -37,6 +37,7 @@ int main(int argc, char** argv) | ||||
|     std::string pidfile; | ||||
|     std::string channel; | ||||
|     std::string message; | ||||
|     std::string password; | ||||
|     bool headersOnly = false; | ||||
|     bool followRedirects = false; | ||||
|     bool verbose = false; | ||||
| @@ -102,12 +103,14 @@ int main(int argc, char** argv) | ||||
|     CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher"); | ||||
|     redisPublishApp->add_option("--port", redisPort, "Port"); | ||||
|     redisPublishApp->add_option("--host", hostname, "Hostname"); | ||||
|     redisPublishApp->add_option("--password", password, "Password"); | ||||
|     redisPublishApp->add_option("channel", channel, "Channel")->required(); | ||||
|     redisPublishApp->add_option("message", message, "Message")->required(); | ||||
|  | ||||
|     CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber"); | ||||
|     redisSubscribeApp->add_option("--port", redisPort, "Port"); | ||||
|     redisSubscribeApp->add_option("--host", hostname, "Hostname"); | ||||
|     redisSubscribeApp->add_option("--password", password, "Password"); | ||||
|     redisSubscribeApp->add_option("channel", channel, "Channel")->required(); | ||||
|     redisSubscribeApp->add_flag("-v", verbose, "Verbose"); | ||||
|  | ||||
| @@ -166,11 +169,11 @@ int main(int argc, char** argv) | ||||
|     } | ||||
|     else if (app.got_subcommand("redis_publish")) | ||||
|     { | ||||
|         return ix::ws_redis_publish_main(hostname, redisPort, channel, message); | ||||
|         return ix::ws_redis_publish_main(hostname, redisPort, password, channel, message); | ||||
|     } | ||||
|     else if (app.got_subcommand("redis_subscribe")) | ||||
|     { | ||||
|         return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose); | ||||
|         return ix::ws_redis_subscribe_main(hostname, redisPort, password, channel, verbose); | ||||
|     } | ||||
|  | ||||
|     return 1; | ||||
|   | ||||
							
								
								
									
										2
									
								
								ws/ws.h
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								ws/ws.h
									
									
									
									
									
								
							| @@ -42,11 +42,13 @@ namespace ix | ||||
|  | ||||
|     int ws_redis_publish_main(const std::string& hostname, | ||||
|                               int port, | ||||
|                               const std::string& password, | ||||
|                               const std::string& channel, | ||||
|                               const std::string& message); | ||||
|  | ||||
|     int ws_redis_subscribe_main(const std::string& hostname, | ||||
|                                 int port, | ||||
|                                 const std::string& password, | ||||
|                                 const std::string& channel, | ||||
|                                 bool verbose); | ||||
| } | ||||
|   | ||||
| @@ -12,6 +12,7 @@ namespace ix | ||||
| { | ||||
|     int ws_redis_publish_main(const std::string& hostname, | ||||
|                               int port, | ||||
|                               const std::string& password, | ||||
|                               const std::string& channel, | ||||
|                               const std::string& message) | ||||
|     { | ||||
| @@ -22,6 +23,18 @@ namespace ix | ||||
|             return 1; | ||||
|         } | ||||
|  | ||||
|         if (!password.empty()) | ||||
|         { | ||||
|             std::string authResponse; | ||||
|             if (!redisClient.auth(password, authResponse)) | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 std::cerr << "Cannot authenticated to redis" << std::endl; | ||||
|                 return 1; | ||||
|             } | ||||
|             std::cout << "Auth response: " << authResponse << ":" << port << std::endl; | ||||
|         } | ||||
|  | ||||
|         std::cerr << "Publishing message " << message | ||||
|                   << " to " << channel << "..." << std::endl; | ||||
|         if (!redisClient.publish(channel, message)) | ||||
|   | ||||
| @@ -13,6 +13,7 @@ namespace ix | ||||
| { | ||||
|     int ws_redis_subscribe_main(const std::string& hostname, | ||||
|                                 int port, | ||||
|                                 const std::string& password, | ||||
|                                 const std::string& channel, | ||||
|                                 bool verbose) | ||||
|     { | ||||
| @@ -23,6 +24,18 @@ namespace ix | ||||
|             return 1; | ||||
|         } | ||||
|  | ||||
|         if (!password.empty()) | ||||
|         { | ||||
|             std::string authResponse; | ||||
|             if (!redisClient.auth(password, authResponse)) | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 std::cerr << "Cannot authenticated to redis" << std::endl; | ||||
|                 return 1; | ||||
|             } | ||||
|             std::cout << "Auth response: " << authResponse << ":" << port << std::endl; | ||||
|         } | ||||
|  | ||||
|         std::chrono::time_point<std::chrono::steady_clock> lastTimePoint; | ||||
|         int msgPerSeconds = 0; | ||||
|         int msgCount = 0; | ||||
| @@ -53,8 +66,13 @@ namespace ix | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         auto responseCallback = [](const std::string& redisResponse) | ||||
|         { | ||||
|             std::cout << "Redis subscribe response: " << redisResponse << std::endl; | ||||
|         }; | ||||
|  | ||||
|         std::cerr << "Subscribing to " << channel << "..." << std::endl; | ||||
|         if (!redisClient.subscribe(channel, callback)) | ||||
|         if (!redisClient.subscribe(channel, responseCallback, callback)) | ||||
|         { | ||||
|             std::cerr << "Error subscribing to channel " << channel << std::endl; | ||||
|             return 1; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user