From 24b2475b11196a24d7b8f291cbabd66af9ff1490 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 5 Sep 2019 20:27:01 -0700 Subject: [PATCH] ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled --- docs/CHANGELOG.md | 1 + ws/IXRedisClient.cpp | 19 +++++++++++++++++-- ws/IXRedisClient.h | 5 ++++- ws/snake/IXAppConfig.h | 2 +- ws/snake/IXSnakeProtocol.cpp | 32 +++++++++++++++++++++++++++++++- ws/snake/IXSnakeServer.cpp | 23 +++++++++++++++++------ ws/snake/IXSnakeServer.h | 2 ++ ws/ws_snake.cpp | 4 +++- 8 files changed, 76 insertions(+), 12 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index fdabf42e..c1ce1b15 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file. ## [6.0.1] - 2019-09-05 +- ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled - IXCobraConnection / pdu handlers can crash if they receive json data which is not an object ## [6.0.0] - 2019-09-04 diff --git a/ws/IXRedisClient.cpp b/ws/IXRedisClient.cpp index b122c8d8..7c91f98a 100644 --- a/ws/IXRedisClient.cpp +++ b/ws/IXRedisClient.cpp @@ -128,6 +128,8 @@ namespace ix const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeCallback& callback) { + _stop = false; + if (!_socket) return false; std::stringstream ss; @@ -159,7 +161,7 @@ namespace ix if (!lineValid) return false; - // There are 5 items for the subscribe repply + // There are 5 items for the subscribe reply for (int i = 0; i < 5; ++i) { auto lineResult = _socket->readLine(nullptr); @@ -175,13 +177,21 @@ namespace ix // Wait indefinitely for new messages while (true) { + if (_stop) break; + // Wait until something is ready to read - auto pollResult = _socket->isReadyToRead(-1); + int timeoutMs = 10; + auto pollResult = _socket->isReadyToRead(timeoutMs); if (pollResult == PollResultType::Error) { return false; } + if (pollResult == PollResultType::Timeout) + { + continue; + } + // The first line of the response describe the return type, // => *3 (an array of 3 elements) auto lineResult = _socket->readLine(nullptr); @@ -231,4 +241,9 @@ namespace ix return true; } + + bool RedisClient::stop() + { + _stop = true; + } } diff --git a/ws/IXRedisClient.h b/ws/IXRedisClient.h index 25f6793b..7dff729d 100644 --- a/ws/IXRedisClient.h +++ b/ws/IXRedisClient.h @@ -19,7 +19,7 @@ namespace ix using OnRedisSubscribeResponseCallback = std::function; using OnRedisSubscribeCallback = std::function; - RedisClient() = default; + RedisClient() : _stop(false) {} ~RedisClient() = default; bool connect(const std::string& hostname, int port); @@ -32,9 +32,12 @@ namespace ix const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeCallback& callback); + bool stop(); + private: std::string writeString(const std::string& str); std::shared_ptr _socket; + std::atomic _stop; }; } // namespace ix diff --git a/ws/snake/IXAppConfig.h b/ws/snake/IXAppConfig.h index e2f9081b..e43243ba 100644 --- a/ws/snake/IXAppConfig.h +++ b/ws/snake/IXAppConfig.h @@ -6,7 +6,7 @@ #pragma once -#include "nlohmann/json.hpp" +#include #include #include diff --git a/ws/snake/IXSnakeProtocol.cpp b/ws/snake/IXSnakeProtocol.cpp index acdfc3cc..495f7beb 100644 --- a/ws/snake/IXSnakeProtocol.cpp +++ b/ws/snake/IXSnakeProtocol.cpp @@ -163,6 +163,14 @@ namespace snake return; } } + + nlohmann::json response = { + {"action", "rtm/publish/ok"}, + {"id", pdu.value("id", 1)}, + {"body", {}} + }; + + ws->sendText(response.dump()); } // @@ -220,12 +228,14 @@ namespace snake { auto msg = nlohmann::json::parse(messageStr); + msg = msg["body"]["message"]; + nlohmann::json response = { {"action", "rtm/subscription/data"}, {"id", id++}, {"body", { {"subscription_id", subscriptionId}, - {"messages", {{msg}}} + {"messages", {msg}} }} }; @@ -271,6 +281,22 @@ namespace snake pdu); } + void handleUnSubscribe( + std::shared_ptr state, + std::shared_ptr ws, + const AppConfig& appConfig, + const nlohmann::json& pdu) + { + state->redisClient().stop(); + + nlohmann::json response = { + {"action", "rtm/unsubscribe/ok"}, + {"id", pdu.value("id", 1)}, + {"body", {}} + }; + ws->sendText(response.dump()); + } + void processCobraMessage( std::shared_ptr state, std::shared_ptr ws, @@ -299,6 +325,10 @@ namespace snake { handleSubscribe(state, ws, appConfig, pdu); } + else if (action == "rtm/unsubscribe") + { + handleUnSubscribe(state, ws, appConfig, pdu); + } else { std::cerr << "Unhandled action: " << action << std::endl; diff --git a/ws/snake/IXSnakeServer.cpp b/ws/snake/IXSnakeServer.cpp index 9cf36009..c0371642 100644 --- a/ws/snake/IXSnakeServer.cpp +++ b/ws/snake/IXSnakeServer.cpp @@ -4,10 +4,10 @@ * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. */ -#include -#include -#include -#include +#include "IXSnakeServer.h" +#include "IXSnakeProtocol.h" +#include "IXSnakeConnectionState.h" +#include "IXAppConfig.h" #include #include @@ -118,8 +118,19 @@ namespace snake } _server.start(); - _server.wait(); - return true; } + + void SnakeServer::runForever() + { + if (run()) + { + _server.wait(); + } + } + + void SnakeServer::stop() + { + _server.stop(); + } } diff --git a/ws/snake/IXSnakeServer.h b/ws/snake/IXSnakeServer.h index 49fed587..4be9a06a 100644 --- a/ws/snake/IXSnakeServer.h +++ b/ws/snake/IXSnakeServer.h @@ -19,6 +19,8 @@ namespace snake ~SnakeServer() = default; bool run(); + void runForever(); + void stop(); private: std::string parseAppKey(const std::string& path); diff --git a/ws/ws_snake.cpp b/ws/ws_snake.cpp index 557d6e47..d23c18e7 100644 --- a/ws/ws_snake.cpp +++ b/ws/ws_snake.cpp @@ -76,6 +76,8 @@ namespace ix dumpConfig(appConfig); snake::SnakeServer snakeServer(appConfig); - return snakeServer.run() ? 0 : 1; + snakeServer.runForever(); + + return 0; // should never reach this } }