(ws) redis_subscribe and redis_publish can take a password + display subscribe response
This commit is contained in:
parent
b17a5e5f0b
commit
91198aca0d
@ -30,6 +30,39 @@ namespace ix
|
|||||||
return _socket->connect(hostname, port, errMsg, nullptr);
|
return _socket->connect(hostname, port, errMsg, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RedisClient::auth(const std::string& password,
|
||||||
|
std::string& response)
|
||||||
|
{
|
||||||
|
response.clear();
|
||||||
|
|
||||||
|
if (!_socket) return false;
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "AUTH ";
|
||||||
|
ss << password;
|
||||||
|
ss << "\r\n";
|
||||||
|
|
||||||
|
bool sent = _socket->writeBytes(ss.str(), nullptr);
|
||||||
|
if (!sent)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto pollResult = _socket->isReadyToRead(-1);
|
||||||
|
if (pollResult == PollResultType::Error)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
|
auto lineValid = lineResult.first;
|
||||||
|
auto line = lineResult.second;
|
||||||
|
|
||||||
|
response = line;
|
||||||
|
return lineValid;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool RedisClient::publish(const std::string& channel,
|
bool RedisClient::publish(const std::string& channel,
|
||||||
const std::string& message)
|
const std::string& message)
|
||||||
{
|
{
|
||||||
@ -65,6 +98,7 @@ namespace ix
|
|||||||
// FIXME: we assume that redis never return errors...
|
// FIXME: we assume that redis never return errors...
|
||||||
//
|
//
|
||||||
bool RedisClient::subscribe(const std::string& channel,
|
bool RedisClient::subscribe(const std::string& channel,
|
||||||
|
const OnRedisSubscribeResponseCallback& responseCallback,
|
||||||
const OnRedisSubscribeCallback& callback)
|
const OnRedisSubscribeCallback& callback)
|
||||||
{
|
{
|
||||||
if (!_socket) return false;
|
if (!_socket) return false;
|
||||||
@ -87,10 +121,14 @@ namespace ix
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// build the response as a single string
|
||||||
|
std::stringstream oss;
|
||||||
|
|
||||||
// Read the first line of the response
|
// Read the first line of the response
|
||||||
auto lineResult = _socket->readLine(nullptr);
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
auto lineValid = lineResult.first;
|
auto lineValid = lineResult.first;
|
||||||
auto line = lineResult.second;
|
auto line = lineResult.second;
|
||||||
|
oss << line;
|
||||||
|
|
||||||
if (!lineValid) return false;
|
if (!lineValid) return false;
|
||||||
|
|
||||||
@ -100,10 +138,13 @@ namespace ix
|
|||||||
auto lineResult = _socket->readLine(nullptr);
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
auto lineValid = lineResult.first;
|
auto lineValid = lineResult.first;
|
||||||
auto line = lineResult.second;
|
auto line = lineResult.second;
|
||||||
|
oss << line;
|
||||||
|
|
||||||
if (!lineValid) return false;
|
if (!lineValid) return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
responseCallback(oss.str());
|
||||||
|
|
||||||
// Wait indefinitely for new messages
|
// Wait indefinitely for new messages
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
|
@ -15,6 +15,7 @@ namespace ix
|
|||||||
|
|
||||||
class RedisClient {
|
class RedisClient {
|
||||||
public:
|
public:
|
||||||
|
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
|
||||||
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
|
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
|
||||||
|
|
||||||
RedisClient() = default;
|
RedisClient() = default;
|
||||||
@ -23,10 +24,14 @@ namespace ix
|
|||||||
bool connect(const std::string& hostname,
|
bool connect(const std::string& hostname,
|
||||||
int port);
|
int port);
|
||||||
|
|
||||||
|
bool auth(const std::string& password,
|
||||||
|
std::string& response);
|
||||||
|
|
||||||
bool publish(const std::string& channel,
|
bool publish(const std::string& channel,
|
||||||
const std::string& message);
|
const std::string& message);
|
||||||
|
|
||||||
bool subscribe(const std::string& channel,
|
bool subscribe(const std::string& channel,
|
||||||
|
const OnRedisSubscribeResponseCallback& responseCallback,
|
||||||
const OnRedisSubscribeCallback& callback);
|
const OnRedisSubscribeCallback& callback);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -37,6 +37,7 @@ int main(int argc, char** argv)
|
|||||||
std::string pidfile;
|
std::string pidfile;
|
||||||
std::string channel;
|
std::string channel;
|
||||||
std::string message;
|
std::string message;
|
||||||
|
std::string password;
|
||||||
bool headersOnly = false;
|
bool headersOnly = false;
|
||||||
bool followRedirects = false;
|
bool followRedirects = false;
|
||||||
bool verbose = false;
|
bool verbose = false;
|
||||||
@ -102,12 +103,14 @@ int main(int argc, char** argv)
|
|||||||
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
|
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
|
||||||
redisPublishApp->add_option("--port", redisPort, "Port");
|
redisPublishApp->add_option("--port", redisPort, "Port");
|
||||||
redisPublishApp->add_option("--host", hostname, "Hostname");
|
redisPublishApp->add_option("--host", hostname, "Hostname");
|
||||||
|
redisPublishApp->add_option("--password", password, "Password");
|
||||||
redisPublishApp->add_option("channel", channel, "Channel")->required();
|
redisPublishApp->add_option("channel", channel, "Channel")->required();
|
||||||
redisPublishApp->add_option("message", message, "Message")->required();
|
redisPublishApp->add_option("message", message, "Message")->required();
|
||||||
|
|
||||||
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
|
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
|
||||||
redisSubscribeApp->add_option("--port", redisPort, "Port");
|
redisSubscribeApp->add_option("--port", redisPort, "Port");
|
||||||
redisSubscribeApp->add_option("--host", hostname, "Hostname");
|
redisSubscribeApp->add_option("--host", hostname, "Hostname");
|
||||||
|
redisSubscribeApp->add_option("--password", password, "Password");
|
||||||
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
|
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
|
||||||
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
|
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
|
||||||
|
|
||||||
@ -166,11 +169,11 @@ int main(int argc, char** argv)
|
|||||||
}
|
}
|
||||||
else if (app.got_subcommand("redis_publish"))
|
else if (app.got_subcommand("redis_publish"))
|
||||||
{
|
{
|
||||||
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
|
return ix::ws_redis_publish_main(hostname, redisPort, password, channel, message);
|
||||||
}
|
}
|
||||||
else if (app.got_subcommand("redis_subscribe"))
|
else if (app.got_subcommand("redis_subscribe"))
|
||||||
{
|
{
|
||||||
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
|
return ix::ws_redis_subscribe_main(hostname, redisPort, password, channel, verbose);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
2
ws/ws.h
2
ws/ws.h
@ -42,11 +42,13 @@ namespace ix
|
|||||||
|
|
||||||
int ws_redis_publish_main(const std::string& hostname,
|
int ws_redis_publish_main(const std::string& hostname,
|
||||||
int port,
|
int port,
|
||||||
|
const std::string& password,
|
||||||
const std::string& channel,
|
const std::string& channel,
|
||||||
const std::string& message);
|
const std::string& message);
|
||||||
|
|
||||||
int ws_redis_subscribe_main(const std::string& hostname,
|
int ws_redis_subscribe_main(const std::string& hostname,
|
||||||
int port,
|
int port,
|
||||||
|
const std::string& password,
|
||||||
const std::string& channel,
|
const std::string& channel,
|
||||||
bool verbose);
|
bool verbose);
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
int ws_redis_publish_main(const std::string& hostname,
|
int ws_redis_publish_main(const std::string& hostname,
|
||||||
int port,
|
int port,
|
||||||
|
const std::string& password,
|
||||||
const std::string& channel,
|
const std::string& channel,
|
||||||
const std::string& message)
|
const std::string& message)
|
||||||
{
|
{
|
||||||
@ -22,6 +23,18 @@ namespace ix
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!password.empty())
|
||||||
|
{
|
||||||
|
std::string authResponse;
|
||||||
|
if (!redisClient.auth(password, authResponse))
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
std::cerr << "Cannot authenticated to redis" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
std::cerr << "Publishing message " << message
|
std::cerr << "Publishing message " << message
|
||||||
<< " to " << channel << "..." << std::endl;
|
<< " to " << channel << "..." << std::endl;
|
||||||
if (!redisClient.publish(channel, message))
|
if (!redisClient.publish(channel, message))
|
||||||
|
@ -13,6 +13,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
int ws_redis_subscribe_main(const std::string& hostname,
|
int ws_redis_subscribe_main(const std::string& hostname,
|
||||||
int port,
|
int port,
|
||||||
|
const std::string& password,
|
||||||
const std::string& channel,
|
const std::string& channel,
|
||||||
bool verbose)
|
bool verbose)
|
||||||
{
|
{
|
||||||
@ -23,6 +24,18 @@ namespace ix
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!password.empty())
|
||||||
|
{
|
||||||
|
std::string authResponse;
|
||||||
|
if (!redisClient.auth(password, authResponse))
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
std::cerr << "Cannot authenticated to redis" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
|
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
|
||||||
int msgPerSeconds = 0;
|
int msgPerSeconds = 0;
|
||||||
int msgCount = 0;
|
int msgCount = 0;
|
||||||
@ -53,8 +66,13 @@ namespace ix
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
auto responseCallback = [](const std::string& redisResponse)
|
||||||
|
{
|
||||||
|
std::cout << "Redis subscribe response: " << redisResponse << std::endl;
|
||||||
|
};
|
||||||
|
|
||||||
std::cerr << "Subscribing to " << channel << "..." << std::endl;
|
std::cerr << "Subscribing to " << channel << "..." << std::endl;
|
||||||
if (!redisClient.subscribe(channel, callback))
|
if (!redisClient.subscribe(channel, responseCallback, callback))
|
||||||
{
|
{
|
||||||
std::cerr << "Error subscribing to channel " << channel << std::endl;
|
std::cerr << "Error subscribing to channel " << channel << std::endl;
|
||||||
return 1;
|
return 1;
|
||||||
|
Loading…
Reference in New Issue
Block a user