(ixsnake) uses an std::thread to handle redis subscriptions (2 unittest still failing)
This commit is contained in:
		| @@ -7,9 +7,10 @@ | ||||
| #pragma once | ||||
|  | ||||
| #include <ixredis/IXRedisClient.h> | ||||
| #include <future> | ||||
| #include <thread> | ||||
| #include <ixwebsocket/IXConnectionState.h> | ||||
| #include <string> | ||||
| #include "IXStreamSql.h" | ||||
|  | ||||
| namespace snake | ||||
| { | ||||
| @@ -51,7 +52,23 @@ namespace snake | ||||
|             return _redisClient; | ||||
|         } | ||||
|  | ||||
|         std::future<void> fut; | ||||
|         void cleanup() | ||||
|         { | ||||
|             if (subscriptionThread.joinable()) | ||||
|             { | ||||
|                 subscriptionRedisClient.stop(); | ||||
|                 subscriptionThread.join(); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // We could make those accessible through methods | ||||
|         std::thread subscriptionThread; | ||||
|         std::string appChannel; | ||||
|         std::string subscriptionId; | ||||
|         std::unique_ptr<StreamSql> streamSql; | ||||
|         ix::RedisClient subscriptionRedisClient; | ||||
|         ix::RedisClient::OnRedisSubscribeResponseCallback onRedisSubscribeResponseCallback; | ||||
|         ix::RedisClient::OnRedisSubscribeCallback onRedisSubscribeCallback; | ||||
|  | ||||
|     private: | ||||
|         std::string _nonce; | ||||
|   | ||||
| @@ -8,7 +8,6 @@ | ||||
|  | ||||
| #include "IXAppConfig.h" | ||||
| #include "IXSnakeConnectionState.h" | ||||
| #include "IXStreamSql.h" | ||||
| #include "nlohmann/json.hpp" | ||||
| #include <iostream> | ||||
| #include <ixcore/utils/IXCoreLogger.h> | ||||
| @@ -20,20 +19,22 @@ namespace snake | ||||
| { | ||||
|     void handleError(const std::string& action, | ||||
|                      ix::WebSocket& ws, | ||||
|                      nlohmann::json pdu, | ||||
|                      const nlohmann::json& pdu, | ||||
|                      uint64_t pduId, | ||||
|                      const std::string& errMsg) | ||||
|     { | ||||
|         std::string actionError(action); | ||||
|         actionError += "/error"; | ||||
|  | ||||
|         nlohmann::json response = { | ||||
|             {"action", actionError}, {"id", pdu.value("id", 1)}, {"body", {{"reason", errMsg}}}}; | ||||
|             {"action", actionError}, {"id", pduId}, {"body", {{"reason", errMsg}}}}; | ||||
|         ws.sendText(response.dump()); | ||||
|     } | ||||
|  | ||||
|     void handleHandshake(std::shared_ptr<SnakeConnectionState> state, | ||||
|                          ix::WebSocket& ws, | ||||
|                          const nlohmann::json& pdu) | ||||
|                          const nlohmann::json& pdu, | ||||
|                          uint64_t pduId) | ||||
|     { | ||||
|         std::string role = pdu["body"]["data"]["role"]; | ||||
|  | ||||
| @@ -42,7 +43,7 @@ namespace snake | ||||
|  | ||||
|         nlohmann::json response = { | ||||
|             {"action", "auth/handshake/ok"}, | ||||
|             {"id", pdu.value("id", 1)}, | ||||
|             {"id", pduId}, | ||||
|             {"body", | ||||
|              { | ||||
|                  {"data", {{"nonce", state->getNonce()}, {"connection_id", state->getId()}}}, | ||||
| @@ -56,7 +57,8 @@ namespace snake | ||||
|     void handleAuth(std::shared_ptr<SnakeConnectionState> state, | ||||
|                     ix::WebSocket& ws, | ||||
|                     const AppConfig& appConfig, | ||||
|                     const nlohmann::json& pdu) | ||||
|                     const nlohmann::json& pdu, | ||||
|                     uint64_t pduId) | ||||
|     { | ||||
|         auto secret = getRoleSecret(appConfig, state->appkey(), state->role()); | ||||
|  | ||||
| @@ -64,7 +66,7 @@ namespace snake | ||||
|         { | ||||
|             nlohmann::json response = { | ||||
|                 {"action", "auth/authenticate/error"}, | ||||
|                 {"id", pdu.value("id", 1)}, | ||||
|                 {"id", pduId}, | ||||
|                 {"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}}; | ||||
|             ws.sendText(response.dump()); | ||||
|             return; | ||||
| @@ -93,7 +95,8 @@ namespace snake | ||||
|     void handlePublish(std::shared_ptr<SnakeConnectionState> state, | ||||
|                        ix::WebSocket& ws, | ||||
|                        const AppConfig& appConfig, | ||||
|                        const nlohmann::json& pdu) | ||||
|                        const nlohmann::json& pdu, | ||||
|                        uint64_t pduId) | ||||
|     { | ||||
|         std::vector<std::string> channels; | ||||
|  | ||||
| @@ -113,7 +116,7 @@ namespace snake | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Missing channels or channel field in publish data"; | ||||
|             handleError("rtm/publish", ws, pdu, ss.str()); | ||||
|             handleError("rtm/publish", ws, pdu, pduId, ss.str()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
| @@ -133,7 +136,7 @@ namespace snake | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 ss << "Cannot publish to redis host " << errMsg; | ||||
|                 handleError("rtm/publish", ws, pdu, ss.str()); | ||||
|                 handleError("rtm/publish", ws, pdu, pduId, ss.str()); | ||||
|                 return; | ||||
|             } | ||||
|         } | ||||
| @@ -147,20 +150,21 @@ namespace snake | ||||
|     // | ||||
|     // FIXME: this is not cancellable. We should be able to cancel the redis subscription | ||||
|     // | ||||
|     void handleRedisSubscription(std::shared_ptr<SnakeConnectionState> state, | ||||
|                                  ix::WebSocket& ws, | ||||
|                                  const AppConfig& appConfig, | ||||
|                                  const nlohmann::json& pdu) | ||||
|     void handleSubscribe(std::shared_ptr<SnakeConnectionState> state, | ||||
|                          ix::WebSocket& ws, | ||||
|                          const AppConfig& appConfig, | ||||
|                          const nlohmann::json& pdu, | ||||
|                          uint64_t pduId) | ||||
|     { | ||||
|         std::string channel = pdu["body"]["channel"]; | ||||
|         std::string subscriptionId = channel; | ||||
|         state->subscriptionId = channel; | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         ss << state->appkey() << "::" << channel; | ||||
|  | ||||
|         std::string appChannel(ss.str()); | ||||
|         state->appChannel = ss.str(); | ||||
|  | ||||
|         ix::RedisClient redisClient; | ||||
|         ix::RedisClient& redisClient = state->subscriptionRedisClient; | ||||
|         int port = appConfig.redisPort; | ||||
|  | ||||
|         auto urls = appConfig.redisHosts; | ||||
| @@ -171,7 +175,7 @@ namespace snake | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Cannot connect to redis host " << hostname << ":" << port; | ||||
|             handleError("rtm/subscribe", ws, pdu, ss.str()); | ||||
|             handleError("rtm/subscribe", ws, pdu, pduId, ss.str()); | ||||
|             return; | ||||
|         } | ||||
|  | ||||
| @@ -183,7 +187,7 @@ namespace snake | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 ss << "Cannot authenticated to redis"; | ||||
|                 handleError("rtm/subscribe", ws, pdu, ss.str()); | ||||
|                 handleError("rtm/subscribe", ws, pdu, pduId, ss.str()); | ||||
|                 return; | ||||
|             } | ||||
|         } | ||||
| @@ -193,16 +197,15 @@ namespace snake | ||||
|         { | ||||
|             std::string filterStr = pdu["body"]["filter"]; | ||||
|         } | ||||
|  | ||||
|         std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr); | ||||
|         state->streamSql = std::make_unique<StreamSql>(filterStr); | ||||
|  | ||||
|         int id = 0; | ||||
|         auto callback = [&ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) { | ||||
|         state->onRedisSubscribeCallback = [&ws, &id, state](const std::string& messageStr) { | ||||
|             auto msg = nlohmann::json::parse(messageStr); | ||||
|  | ||||
|             msg = msg["body"]["message"]; | ||||
|  | ||||
|             if (streamSql->valid() && !streamSql->match(msg)) | ||||
|             if (state->streamSql->valid() && !state->streamSql->match(msg)) | ||||
|             { | ||||
|                 return; | ||||
|             } | ||||
| @@ -211,50 +214,49 @@ namespace snake | ||||
|                 {"action", "rtm/subscription/data"}, | ||||
|                 {"id", id++}, | ||||
|                 {"body", | ||||
|                  {{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}}; | ||||
|                  {{"subscription_id", state->subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}}; | ||||
|  | ||||
|             ws.sendText(response.dump()); | ||||
|         }; | ||||
|  | ||||
|         auto responseCallback = [&ws, pdu, &subscriptionId](const std::string& redisResponse) { | ||||
|         state->onRedisSubscribeResponseCallback = [&ws, state, pduId](const std::string& redisResponse) { | ||||
|             std::stringstream ss; | ||||
|             ss << "Redis Response: " << redisResponse << "..."; | ||||
|             ix::CoreLogger::log(ss.str().c_str()); | ||||
|  | ||||
|             // Success | ||||
|             nlohmann::json response = {{"action", "rtm/subscribe/ok"}, | ||||
|                                        {"id", pdu.value("id", 1)}, | ||||
|                                        {"body", {{"subscription_id", subscriptionId}}}}; | ||||
|                                        {"id", pduId}, | ||||
|                                        {"body", {{"subscription_id", state->subscriptionId}}}}; | ||||
|             ws.sendText(response.dump()); | ||||
|         }; | ||||
|  | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Subscribing to " << appChannel << "..."; | ||||
|             ss << "Subscribing to " << state->appChannel << "..."; | ||||
|             ix::CoreLogger::log(ss.str().c_str()); | ||||
|         } | ||||
|  | ||||
|         if (!redisClient.subscribe(appChannel, responseCallback, callback)) | ||||
|         auto subscription = [&redisClient, state, &ws, &pdu, pduId] | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "Error subscribing to channel " << appChannel; | ||||
|             handleError("rtm/subscribe", ws, pdu, ss.str()); | ||||
|             return; | ||||
|         } | ||||
|     } | ||||
|             if (!redisClient.subscribe(state->appChannel,  | ||||
|                                        state->onRedisSubscribeResponseCallback, | ||||
|                                        state->onRedisSubscribeCallback)) | ||||
|             { | ||||
|                 std::stringstream ss; | ||||
|                 ss << "Error subscribing to channel " << state->appChannel; | ||||
|                 handleError("rtm/subscribe", ws, pdu, pduId, ss.str()); | ||||
|                 return; | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|     void handleSubscribe(std::shared_ptr<SnakeConnectionState> state, | ||||
|                          ix::WebSocket& ws, | ||||
|                          const AppConfig& appConfig, | ||||
|                          const nlohmann::json& pdu) | ||||
|     { | ||||
|         state->fut = | ||||
|             std::async(std::launch::async, handleRedisSubscription, state, std::ref(ws), appConfig, pdu); | ||||
|         state->subscriptionThread = std::thread(subscription); | ||||
|     } | ||||
|  | ||||
|     void handleUnSubscribe(std::shared_ptr<SnakeConnectionState> state, | ||||
|                            ix::WebSocket& ws, | ||||
|                            const nlohmann::json& pdu) | ||||
|                            const nlohmann::json& pdu, | ||||
|                            uint64_t pduId) | ||||
|     { | ||||
|         // extract subscription_id | ||||
|         auto body = pdu["body"]; | ||||
| @@ -263,7 +265,7 @@ namespace snake | ||||
|         state->redisClient().stop(); | ||||
|  | ||||
|         nlohmann::json response = {{"action", "rtm/unsubscribe/ok"}, | ||||
|                                    {"id", pdu.value("id", 1)}, | ||||
|                                    {"id", pduId}, | ||||
|                                    {"body", {{"subscription_id", subscriptionId}}}}; | ||||
|         ws.sendText(response.dump()); | ||||
|     } | ||||
| @@ -289,26 +291,27 @@ namespace snake | ||||
|         } | ||||
|  | ||||
|         auto action = pdu["action"]; | ||||
|         uint64_t pduId = pdu.value("id", 1); | ||||
|  | ||||
|         if (action == "auth/handshake") | ||||
|         { | ||||
|             handleHandshake(state, ws, pdu); | ||||
|             handleHandshake(state, ws, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "auth/authenticate") | ||||
|         { | ||||
|             handleAuth(state, ws, appConfig, pdu); | ||||
|             handleAuth(state, ws, appConfig, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "rtm/publish") | ||||
|         { | ||||
|             handlePublish(state, ws, appConfig, pdu); | ||||
|             handlePublish(state, ws, appConfig, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "rtm/subscribe") | ||||
|         { | ||||
|             handleSubscribe(state, ws, appConfig, pdu); | ||||
|             handleSubscribe(state, ws, appConfig, pdu, pduId); | ||||
|         } | ||||
|         else if (action == "rtm/unsubscribe") | ||||
|         { | ||||
|             handleUnSubscribe(state, ws, pdu); | ||||
|             handleUnSubscribe(state, ws, pdu, pduId); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|   | ||||
| @@ -68,6 +68,8 @@ namespace snake | ||||
|                 auto remoteIp = connectionInfo.remoteIp; | ||||
|              | ||||
|                 std::stringstream ss; | ||||
|                 ss << "[" << state->getId() << "] "; | ||||
|  | ||||
|                 ix::LogLevel logLevel = ix::LogLevel::Debug; | ||||
|                 if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                 { | ||||
| @@ -97,6 +99,8 @@ namespace snake | ||||
|                     ss << "Closed connection" | ||||
|                        << " code " << msg->closeInfo.code << " reason " | ||||
|                        << msg->closeInfo.reason << std::endl; | ||||
|  | ||||
|                     state->cleanup(); | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|                 { | ||||
| @@ -113,7 +117,7 @@ namespace snake | ||||
|                 } | ||||
|                 else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|                 { | ||||
|                     ss << "Received " << msg->wireSize << " bytes" << std::endl; | ||||
|                     ss << "Received " << msg->wireSize << " bytes" << " " << msg->str << std::endl; | ||||
|                     processCobraMessage(state, webSocket, _appConfig, msg->str); | ||||
|                 } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user