(redis cobra bots) update the cobra to redis bot to use the bot framework, and change it to report fps metrics into redis streams.
This commit is contained in:
		
							
								
								
									
										27
									
								
								ixredis/CMakeLists.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								ixredis/CMakeLists.txt
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | ||||
| # | ||||
| # Author: Benjamin Sergeant | ||||
| # Copyright (c) 2020 Machine Zone, Inc. All rights reserved. | ||||
| # | ||||
|  | ||||
| set (IXREDIS_SOURCES | ||||
|     ixredis/IXRedisClient.cpp | ||||
|     ixredis/IXRedisServer.cpp | ||||
| ) | ||||
|  | ||||
| set (IXREDIS_HEADERS | ||||
|     ixredis/IXRedisClient.h | ||||
|     ixredis/IXRedisServer.h | ||||
| ) | ||||
|  | ||||
| add_library(ixredis STATIC | ||||
|     ${IXREDIS_SOURCES} | ||||
|     ${IXREDIS_HEADERS} | ||||
| ) | ||||
|  | ||||
| set(IXREDIS_INCLUDE_DIRS | ||||
|     . | ||||
|     .. | ||||
|     ../ixcore | ||||
|     ../ixwebsocket) | ||||
|  | ||||
| target_include_directories( ixredis PUBLIC ${IXREDIS_INCLUDE_DIRS} ) | ||||
							
								
								
									
										352
									
								
								ixredis/ixredis/IXRedisClient.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										352
									
								
								ixredis/ixredis/IXRedisClient.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,352 @@ | ||||
| /* | ||||
|  *  IXRedisClient.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include "IXRedisClient.h" | ||||
|  | ||||
| #include <cstring> | ||||
| #include <iomanip> | ||||
| #include <iostream> | ||||
| #include <ixwebsocket/IXSocket.h> | ||||
| #include <ixwebsocket/IXSocketFactory.h> | ||||
| #include <ixwebsocket/IXSocketTLSOptions.h> | ||||
| #include <sstream> | ||||
| #include <vector> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     bool RedisClient::connect(const std::string& hostname, int port) | ||||
|     { | ||||
|         bool tls = false; | ||||
|         std::string errorMsg; | ||||
|         SocketTLSOptions tlsOptions; | ||||
|         _socket = createSocket(tls, -1, errorMsg, tlsOptions); | ||||
|  | ||||
|         if (!_socket) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         CancellationRequest cancellationRequest = []() -> bool { return false; }; | ||||
|  | ||||
|         std::string errMsg; | ||||
|         return _socket->connect(hostname, port, errMsg, cancellationRequest); | ||||
|     } | ||||
|  | ||||
|     void RedisClient::stop() | ||||
|     { | ||||
|         _stop = true; | ||||
|     } | ||||
|  | ||||
|     bool RedisClient::auth(const std::string& password, std::string& response) | ||||
|     { | ||||
|         response.clear(); | ||||
|  | ||||
|         if (!_socket) return false; | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         ss << "AUTH "; | ||||
|         ss << password; | ||||
|         ss << "\r\n"; | ||||
|  | ||||
|         bool sent = _socket->writeBytes(ss.str(), nullptr); | ||||
|         if (!sent) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         auto pollResult = _socket->isReadyToRead(-1); | ||||
|         if (pollResult == PollResultType::Error) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         auto lineResult = _socket->readLine(nullptr); | ||||
|         auto lineValid = lineResult.first; | ||||
|         auto line = lineResult.second; | ||||
|  | ||||
|         response = line; | ||||
|         return lineValid; | ||||
|     } | ||||
|  | ||||
|     std::string RedisClient::writeString(const std::string& str) | ||||
|     { | ||||
|         std::stringstream ss; | ||||
|         ss << "$"; | ||||
|         ss << str.size(); | ||||
|         ss << "\r\n"; | ||||
|         ss << str; | ||||
|         ss << "\r\n"; | ||||
|  | ||||
|         return ss.str(); | ||||
|     } | ||||
|  | ||||
|     bool RedisClient::publish(const std::string& channel, | ||||
|                               const std::string& message, | ||||
|                               std::string& errMsg) | ||||
|     { | ||||
|         errMsg.clear(); | ||||
|  | ||||
|         if (!_socket) | ||||
|         { | ||||
|             errMsg = "socket is not initialized"; | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         ss << "*3\r\n"; | ||||
|         ss << writeString("PUBLISH"); | ||||
|         ss << writeString(channel); | ||||
|         ss << writeString(message); | ||||
|  | ||||
|         bool sent = _socket->writeBytes(ss.str(), nullptr); | ||||
|         if (!sent) | ||||
|         { | ||||
|             errMsg = "Cannot write bytes to socket"; | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         auto pollResult = _socket->isReadyToRead(-1); | ||||
|         if (pollResult == PollResultType::Error) | ||||
|         { | ||||
|             errMsg = "Error while polling for result"; | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         auto lineResult = _socket->readLine(nullptr); | ||||
|         auto lineValid = lineResult.first; | ||||
|         auto line = lineResult.second; | ||||
|  | ||||
|         // A successful response starts with a : | ||||
|         if (line.empty() || line[0] != ':') | ||||
|         { | ||||
|             errMsg = line; | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         return lineValid; | ||||
|     } | ||||
|  | ||||
|     // | ||||
|     // FIXME: we assume that redis never return errors... | ||||
|     // | ||||
|     bool RedisClient::subscribe(const std::string& channel, | ||||
|                                 const OnRedisSubscribeResponseCallback& responseCallback, | ||||
|                                 const OnRedisSubscribeCallback& callback) | ||||
|     { | ||||
|         _stop = false; | ||||
|  | ||||
|         if (!_socket) return false; | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         ss << "*2\r\n"; | ||||
|         ss << writeString("SUBSCRIBE"); | ||||
|         ss << writeString(channel); | ||||
|  | ||||
|         bool sent = _socket->writeBytes(ss.str(), nullptr); | ||||
|         if (!sent) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         // Wait 1s for the response | ||||
|         auto pollResult = _socket->isReadyToRead(-1); | ||||
|         if (pollResult == PollResultType::Error) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         // build the response as a single string | ||||
|         std::stringstream oss; | ||||
|  | ||||
|         // Read the first line of the response | ||||
|         auto lineResult = _socket->readLine(nullptr); | ||||
|         auto lineValid = lineResult.first; | ||||
|         auto line = lineResult.second; | ||||
|         oss << line; | ||||
|  | ||||
|         if (!lineValid) return false; | ||||
|  | ||||
|         // There are 5 items for the subscribe reply | ||||
|         for (int i = 0; i < 5; ++i) | ||||
|         { | ||||
|             auto lineResult = _socket->readLine(nullptr); | ||||
|             auto lineValid = lineResult.first; | ||||
|             auto line = lineResult.second; | ||||
|             oss << line; | ||||
|  | ||||
|             if (!lineValid) return false; | ||||
|         } | ||||
|  | ||||
|         responseCallback(oss.str()); | ||||
|  | ||||
|         // Wait indefinitely for new messages | ||||
|         while (true) | ||||
|         { | ||||
|             if (_stop) break; | ||||
|  | ||||
|             // Wait until something is ready to read | ||||
|             int timeoutMs = 10; | ||||
|             auto pollResult = _socket->isReadyToRead(timeoutMs); | ||||
|             if (pollResult == PollResultType::Error) | ||||
|             { | ||||
|                 return false; | ||||
|             } | ||||
|  | ||||
|             if (pollResult == PollResultType::Timeout) | ||||
|             { | ||||
|                 continue; | ||||
|             } | ||||
|  | ||||
|             // The first line of the response describe the return type, | ||||
|             // => *3 (an array of 3 elements) | ||||
|             auto lineResult = _socket->readLine(nullptr); | ||||
|             auto lineValid = lineResult.first; | ||||
|             auto line = lineResult.second; | ||||
|  | ||||
|             if (!lineValid) return false; | ||||
|  | ||||
|             int arraySize; | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 ss << line.substr(1, line.size() - 1); | ||||
|                 ss >> arraySize; | ||||
|             } | ||||
|  | ||||
|             // There are 6 items for each received message | ||||
|             for (int i = 0; i < arraySize; ++i) | ||||
|             { | ||||
|                 auto lineResult = _socket->readLine(nullptr); | ||||
|                 auto lineValid = lineResult.first; | ||||
|                 auto line = lineResult.second; | ||||
|  | ||||
|                 if (!lineValid) return false; | ||||
|  | ||||
|                 // Messages are string, which start with a string size | ||||
|                 // => $7 (7 bytes) | ||||
|                 int stringSize; | ||||
|                 std::stringstream ss; | ||||
|                 ss << line.substr(1, line.size() - 1); | ||||
|                 ss >> stringSize; | ||||
|  | ||||
|                 auto readResult = _socket->readBytes(stringSize, nullptr, nullptr); | ||||
|                 if (!readResult.first) return false; | ||||
|  | ||||
|                 if (i == 2) | ||||
|                 { | ||||
|                     // The message is the 3rd element. | ||||
|                     callback(readResult.second); | ||||
|                 } | ||||
|  | ||||
|                 // read last 2 bytes (\r\n) | ||||
|                 char c; | ||||
|                 _socket->readByte(&c, nullptr); | ||||
|                 _socket->readByte(&c, nullptr); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     std::string RedisClient::prepareXaddCommand(const std::string& stream, | ||||
|                                                 const std::string& message) | ||||
|     { | ||||
|         std::stringstream ss; | ||||
|         ss << "*5\r\n"; | ||||
|         ss << writeString("XADD"); | ||||
|         ss << writeString(stream); | ||||
|         ss << writeString("*"); | ||||
|         ss << writeString("field"); | ||||
|         ss << writeString(message); | ||||
|  | ||||
|         return ss.str(); | ||||
|     } | ||||
|  | ||||
|     std::string RedisClient::xadd(const std::string& stream, | ||||
|                                   const std::string& message, | ||||
|                                   std::string& errMsg) | ||||
|     { | ||||
|         errMsg.clear(); | ||||
|  | ||||
|         if (!_socket) | ||||
|         { | ||||
|             errMsg = "socket is not initialized"; | ||||
|             return std::string(); | ||||
|         } | ||||
|  | ||||
|         std::string command = prepareXaddCommand(stream, message); | ||||
|  | ||||
|         bool sent = _socket->writeBytes(command, nullptr); | ||||
|         if (!sent) | ||||
|         { | ||||
|             errMsg = "Cannot write bytes to socket"; | ||||
|             return std::string(); | ||||
|         } | ||||
|  | ||||
|         return readXaddReply(errMsg); | ||||
|     } | ||||
|  | ||||
|     std::string RedisClient::readXaddReply(std::string& errMsg) | ||||
|     { | ||||
|         // Read result | ||||
|         auto pollResult = _socket->isReadyToRead(-1); | ||||
|         if (pollResult == PollResultType::Error) | ||||
|         { | ||||
|             errMsg = "Error while polling for result"; | ||||
|             return std::string(); | ||||
|         } | ||||
|  | ||||
|         // First line is the string length | ||||
|         auto lineResult = _socket->readLine(nullptr); | ||||
|         auto lineValid = lineResult.first; | ||||
|         auto line = lineResult.second; | ||||
|  | ||||
|         if (!lineValid) | ||||
|         { | ||||
|             errMsg = "Error while polling for result"; | ||||
|             return std::string(); | ||||
|         } | ||||
|  | ||||
|         int stringSize; | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << line.substr(1, line.size() - 1); | ||||
|             ss >> stringSize; | ||||
|         } | ||||
|  | ||||
|         // Read the result, which is the stream id computed by the redis server | ||||
|         lineResult = _socket->readLine(nullptr); | ||||
|         lineValid = lineResult.first; | ||||
|         line = lineResult.second; | ||||
|  | ||||
|         std::string streamId = line.substr(0, stringSize - 1); | ||||
|         return streamId; | ||||
|     } | ||||
|  | ||||
|     bool RedisClient::sendCommand(const std::string& commands, | ||||
|                                   int commandsCount, | ||||
|                                   std::string& errMsg) | ||||
|     { | ||||
|         bool sent = _socket->writeBytes(commands, nullptr); | ||||
|         if (!sent) | ||||
|         { | ||||
|             errMsg = "Cannot write bytes to socket"; | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         bool success = true; | ||||
|  | ||||
|         for (int i = 0; i < commandsCount; ++i) | ||||
|         { | ||||
|             auto reply = readXaddReply(errMsg); | ||||
|             if (reply == std::string()) | ||||
|             { | ||||
|                 success = false; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         return success; | ||||
|     } | ||||
| } // namespace ix | ||||
							
								
								
									
										59
									
								
								ixredis/ixredis/IXRedisClient.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								ixredis/ixredis/IXRedisClient.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,59 @@ | ||||
| /* | ||||
|  *  IXRedisClient.h | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #pragma once | ||||
|  | ||||
| #include <atomic> | ||||
| #include <functional> | ||||
| #include <memory> | ||||
| #include <string> | ||||
| #include <ixwebsocket/IXSocket.h> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     class RedisClient | ||||
|     { | ||||
|     public: | ||||
|         using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>; | ||||
|         using OnRedisSubscribeCallback = std::function<void(const std::string&)>; | ||||
|  | ||||
|         RedisClient() | ||||
|             : _stop(false) | ||||
|         { | ||||
|         } | ||||
|         ~RedisClient() = default; | ||||
|  | ||||
|         bool connect(const std::string& hostname, int port); | ||||
|  | ||||
|         bool auth(const std::string& password, std::string& response); | ||||
|  | ||||
|         // Publish / Subscribe | ||||
|         bool publish(const std::string& channel, const std::string& message, std::string& errMsg); | ||||
|  | ||||
|         bool subscribe(const std::string& channel, | ||||
|                        const OnRedisSubscribeResponseCallback& responseCallback, | ||||
|                        const OnRedisSubscribeCallback& callback); | ||||
|  | ||||
|         // XADD | ||||
|         std::string xadd(const std::string& channel, | ||||
|                          const std::string& message, | ||||
|                          std::string& errMsg); | ||||
|  | ||||
|         std::string prepareXaddCommand(const std::string& stream, const std::string& message); | ||||
|  | ||||
|         std::string readXaddReply(std::string& errMsg); | ||||
|  | ||||
|         bool sendCommand(const std::string& commands, int commandsCount, std::string& errMsg); | ||||
|  | ||||
|         void stop(); | ||||
|  | ||||
|     private: | ||||
|         std::string writeString(const std::string& str); | ||||
|  | ||||
|         std::unique_ptr<Socket> _socket; | ||||
|         std::atomic<bool> _stop; | ||||
|     }; | ||||
| } // namespace ix | ||||
							
								
								
									
										285
									
								
								ixredis/ixredis/IXRedisServer.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										285
									
								
								ixredis/ixredis/IXRedisServer.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,285 @@ | ||||
| /* | ||||
|  *  IXRedisServer.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include "IXRedisServer.h" | ||||
|  | ||||
| #include <fstream> | ||||
| #include <ixwebsocket/IXCancellationRequest.h> | ||||
| #include <ixwebsocket/IXNetSystem.h> | ||||
| #include <ixwebsocket/IXSocket.h> | ||||
| #include <ixwebsocket/IXSocketConnect.h> | ||||
| #include <sstream> | ||||
| #include <vector> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     RedisServer::RedisServer( | ||||
|         int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily) | ||||
|         : SocketServer(port, host, backlog, maxConnections, addressFamily) | ||||
|         , _connectedClientsCount(0) | ||||
|         , _stopHandlingConnections(false) | ||||
|     { | ||||
|         ; | ||||
|     } | ||||
|  | ||||
|     RedisServer::~RedisServer() | ||||
|     { | ||||
|         stop(); | ||||
|     } | ||||
|  | ||||
|     void RedisServer::stop() | ||||
|     { | ||||
|         stopAcceptingConnections(); | ||||
|  | ||||
|         _stopHandlingConnections = true; | ||||
|         while (_connectedClientsCount != 0) | ||||
|         { | ||||
|             std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||||
|         } | ||||
|         _stopHandlingConnections = false; | ||||
|  | ||||
|         SocketServer::stop(); | ||||
|     } | ||||
|  | ||||
|     void RedisServer::handleConnection(std::unique_ptr<Socket> socket, | ||||
|                                        std::shared_ptr<ConnectionState> connectionState) | ||||
|     { | ||||
|         _connectedClientsCount++; | ||||
|  | ||||
|         while (!_stopHandlingConnections) | ||||
|         { | ||||
|             std::vector<std::string> tokens; | ||||
|             if (!parseRequest(socket, tokens)) | ||||
|             { | ||||
|                 if (_stopHandlingConnections) | ||||
|                 { | ||||
|                     logError("Cancellation requested"); | ||||
|                 } | ||||
|                 else | ||||
|                 { | ||||
|                     logError("Error parsing request"); | ||||
|                 } | ||||
|                 break; | ||||
|             } | ||||
|  | ||||
|             bool success = false; | ||||
|  | ||||
|             // publish | ||||
|             if (tokens[0] == "COMMAND") | ||||
|             { | ||||
|                 success = handleCommand(socket, tokens); | ||||
|             } | ||||
|             else if (tokens[0] == "PUBLISH") | ||||
|             { | ||||
|                 success = handlePublish(socket, tokens); | ||||
|             } | ||||
|             else if (tokens[0] == "SUBSCRIBE") | ||||
|             { | ||||
|                 success = handleSubscribe(socket, tokens); | ||||
|             } | ||||
|  | ||||
|             if (!success) | ||||
|             { | ||||
|                 if (_stopHandlingConnections) | ||||
|                 { | ||||
|                     logError("Cancellation requested"); | ||||
|                 } | ||||
|                 else | ||||
|                 { | ||||
|                     logError("Error processing request for command: " + tokens[0]); | ||||
|                 } | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         cleanupSubscribers(socket); | ||||
|  | ||||
|         logInfo("Connection closed for connection id " + connectionState->getId()); | ||||
|         connectionState->setTerminated(); | ||||
|  | ||||
|         _connectedClientsCount--; | ||||
|     } | ||||
|  | ||||
|     void RedisServer::cleanupSubscribers(std::unique_ptr<Socket>& socket) | ||||
|     { | ||||
|         std::lock_guard<std::mutex> lock(_mutex); | ||||
|  | ||||
|         for (auto&& it : _subscribers) | ||||
|         { | ||||
|             it.second.erase(socket.get()); | ||||
|         } | ||||
|  | ||||
|         for (auto it : _subscribers) | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Subscription id: " << it.first << " #subscribers: " << it.second.size(); | ||||
|  | ||||
|             logInfo(ss.str()); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     size_t RedisServer::getConnectedClientsCount() | ||||
|     { | ||||
|         return _connectedClientsCount; | ||||
|     } | ||||
|  | ||||
|     bool RedisServer::startsWith(const std::string& str, const std::string& start) | ||||
|     { | ||||
|         return str.compare(0, start.length(), start) == 0; | ||||
|     } | ||||
|  | ||||
|     std::string RedisServer::writeString(const std::string& str) | ||||
|     { | ||||
|         std::stringstream ss; | ||||
|         ss << "$"; | ||||
|         ss << str.size(); | ||||
|         ss << "\r\n"; | ||||
|         ss << str; | ||||
|         ss << "\r\n"; | ||||
|  | ||||
|         return ss.str(); | ||||
|     } | ||||
|  | ||||
|     bool RedisServer::parseRequest(std::unique_ptr<Socket>& socket, | ||||
|                                    std::vector<std::string>& tokens) | ||||
|     { | ||||
|         // Parse first line | ||||
|         auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections); | ||||
|         auto lineResult = socket->readLine(cb); | ||||
|         auto lineValid = lineResult.first; | ||||
|         auto line = lineResult.second; | ||||
|  | ||||
|         if (!lineValid) return false; | ||||
|  | ||||
|         std::string str = line.substr(1); | ||||
|         std::stringstream ss; | ||||
|         ss << str; | ||||
|         int count; | ||||
|         ss >> count; | ||||
|  | ||||
|         for (int i = 0; i < count; ++i) | ||||
|         { | ||||
|             auto lineResult = socket->readLine(cb); | ||||
|             auto lineValid = lineResult.first; | ||||
|             auto line = lineResult.second; | ||||
|  | ||||
|             if (!lineValid) return false; | ||||
|  | ||||
|             int stringSize; | ||||
|             std::stringstream ss; | ||||
|             ss << line.substr(1, line.size() - 1); | ||||
|             ss >> stringSize; | ||||
|  | ||||
|             auto readResult = socket->readBytes(stringSize, nullptr, nullptr); | ||||
|  | ||||
|             if (!readResult.first) return false; | ||||
|  | ||||
|             // read last 2 bytes (\r\n) | ||||
|             char c; | ||||
|             socket->readByte(&c, nullptr); | ||||
|             socket->readByte(&c, nullptr); | ||||
|  | ||||
|             tokens.push_back(readResult.second); | ||||
|         } | ||||
|  | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     bool RedisServer::handleCommand(std::unique_ptr<Socket>& socket, | ||||
|                                     const std::vector<std::string>& tokens) | ||||
|     { | ||||
|         if (tokens.size() != 1) return false; | ||||
|  | ||||
|         auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections); | ||||
|         std::stringstream ss; | ||||
|  | ||||
|         // return 2 nested arrays | ||||
|         ss << "*2\r\n"; | ||||
|  | ||||
|         // | ||||
|         // publish | ||||
|         // | ||||
|         ss << "*6\r\n"; | ||||
|         ss << writeString("publish"); // 1 | ||||
|         ss << ":3\r\n";               // 2 | ||||
|         ss << "*0\r\n";               // 3 | ||||
|         ss << ":1\r\n";               // 4 | ||||
|         ss << ":2\r\n";               // 5 | ||||
|         ss << ":1\r\n";               // 6 | ||||
|  | ||||
|         // | ||||
|         // subscribe | ||||
|         // | ||||
|         ss << "*6\r\n"; | ||||
|         ss << writeString("subscribe"); // 1 | ||||
|         ss << ":2\r\n";                 // 2 | ||||
|         ss << "*0\r\n";                 // 3 | ||||
|         ss << ":1\r\n";                 // 4 | ||||
|         ss << ":1\r\n";                 // 5 | ||||
|         ss << ":1\r\n";                 // 6 | ||||
|  | ||||
|         socket->writeBytes(ss.str(), cb); | ||||
|  | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     bool RedisServer::handleSubscribe(std::unique_ptr<Socket>& socket, | ||||
|                                       const std::vector<std::string>& tokens) | ||||
|     { | ||||
|         if (tokens.size() != 2) return false; | ||||
|  | ||||
|         auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections); | ||||
|         std::string channel = tokens[1]; | ||||
|  | ||||
|         // Respond | ||||
|         socket->writeBytes("*3\r\n", cb); | ||||
|         socket->writeBytes(writeString("subscribe"), cb); | ||||
|         socket->writeBytes(writeString(channel), cb); | ||||
|         socket->writeBytes(":1\r\n", cb); | ||||
|  | ||||
|         std::lock_guard<std::mutex> lock(_mutex); | ||||
|         _subscribers[channel].insert(socket.get()); | ||||
|  | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     bool RedisServer::handlePublish(std::unique_ptr<Socket>& socket, | ||||
|                                     const std::vector<std::string>& tokens) | ||||
|     { | ||||
|         if (tokens.size() != 3) return false; | ||||
|  | ||||
|         auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections); | ||||
|         std::string channel = tokens[1]; | ||||
|         std::string data = tokens[2]; | ||||
|  | ||||
|         // now dispatch the message to subscribers (write custom method) | ||||
|         std::lock_guard<std::mutex> lock(_mutex); | ||||
|         auto it = _subscribers.find(channel); | ||||
|         if (it == _subscribers.end()) | ||||
|         { | ||||
|             // return the number of clients that received the message, 0 in that case | ||||
|             socket->writeBytes(":0\r\n", cb); | ||||
|             return true; | ||||
|         } | ||||
|  | ||||
|         auto subscribers = it->second; | ||||
|         for (auto jt : subscribers) | ||||
|         { | ||||
|             jt->writeBytes("*3\r\n", cb); | ||||
|             jt->writeBytes(writeString("message"), cb); | ||||
|             jt->writeBytes(writeString(channel), cb); | ||||
|             jt->writeBytes(writeString(data), cb); | ||||
|         } | ||||
|  | ||||
|         // return the number of clients that received the message. | ||||
|         std::stringstream ss; | ||||
|         ss << ":" << std::to_string(subscribers.size()) << "\r\n"; | ||||
|         socket->writeBytes(ss.str(), cb); | ||||
|  | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
| } // namespace ix | ||||
							
								
								
									
										64
									
								
								ixredis/ixredis/IXRedisServer.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										64
									
								
								ixredis/ixredis/IXRedisServer.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,64 @@ | ||||
| /* | ||||
|  *  IXRedisServer.h | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2018 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #pragma once | ||||
|  | ||||
| #include <ixwebsocket/IXSocket.h> | ||||
| #include <ixwebsocket/IXSocketServer.h> | ||||
| #include <functional> | ||||
| #include <map> | ||||
| #include <memory> | ||||
| #include <mutex> | ||||
| #include <set> | ||||
| #include <string> | ||||
| #include <thread> | ||||
| #include <utility> // pair | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     class RedisServer final : public SocketServer | ||||
|     { | ||||
|     public: | ||||
|         RedisServer(int port = SocketServer::kDefaultPort, | ||||
|                     const std::string& host = SocketServer::kDefaultHost, | ||||
|                     int backlog = SocketServer::kDefaultTcpBacklog, | ||||
|                     size_t maxConnections = SocketServer::kDefaultMaxConnections, | ||||
|                     int addressFamily = SocketServer::kDefaultAddressFamily); | ||||
|         virtual ~RedisServer(); | ||||
|         virtual void stop() final; | ||||
|  | ||||
|     private: | ||||
|         // Member variables | ||||
|         std::atomic<int> _connectedClientsCount; | ||||
|  | ||||
|         // Subscribers | ||||
|         // We could store connection states in there, to add better debugging | ||||
|         // since a connection state has a readable ID | ||||
|         std::map<std::string, std::set<Socket*>> _subscribers; | ||||
|         std::mutex _mutex; | ||||
|  | ||||
|         std::atomic<bool> _stopHandlingConnections; | ||||
|  | ||||
|         // Methods | ||||
|         virtual void handleConnection(std::unique_ptr<Socket>, | ||||
|                                       std::shared_ptr<ConnectionState> connectionState) final; | ||||
|         virtual size_t getConnectedClientsCount() final; | ||||
|  | ||||
|         bool startsWith(const std::string& str, const std::string& start); | ||||
|         std::string writeString(const std::string& str); | ||||
|  | ||||
|         bool parseRequest(std::unique_ptr<Socket>& socket, std::vector<std::string>& tokens); | ||||
|  | ||||
|         bool handlePublish(std::unique_ptr<Socket>& socket, const std::vector<std::string>& tokens); | ||||
|  | ||||
|         bool handleSubscribe(std::unique_ptr<Socket>& socket, | ||||
|                              const std::vector<std::string>& tokens); | ||||
|  | ||||
|         bool handleCommand(std::unique_ptr<Socket>& socket, const std::vector<std::string>& tokens); | ||||
|  | ||||
|         void cleanupSubscribers(std::unique_ptr<Socket>& socket); | ||||
|     }; | ||||
| } // namespace ix | ||||
		Reference in New Issue
	
	Block a user