ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled

This commit is contained in:
Benjamin Sergeant 2019-09-05 20:27:01 -07:00
parent 2defe6f597
commit 24b2475b11
8 changed files with 76 additions and 12 deletions

View File

@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file.
## [6.0.1] - 2019-09-05 ## [6.0.1] - 2019-09-05
- ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled
- IXCobraConnection / pdu handlers can crash if they receive json data which is not an object - IXCobraConnection / pdu handlers can crash if they receive json data which is not an object
## [6.0.0] - 2019-09-04 ## [6.0.0] - 2019-09-04

View File

@ -128,6 +128,8 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback) const OnRedisSubscribeCallback& callback)
{ {
_stop = false;
if (!_socket) return false; if (!_socket) return false;
std::stringstream ss; std::stringstream ss;
@ -159,7 +161,7 @@ namespace ix
if (!lineValid) return false; if (!lineValid) return false;
// There are 5 items for the subscribe repply // There are 5 items for the subscribe reply
for (int i = 0; i < 5; ++i) for (int i = 0; i < 5; ++i)
{ {
auto lineResult = _socket->readLine(nullptr); auto lineResult = _socket->readLine(nullptr);
@ -175,13 +177,21 @@ namespace ix
// Wait indefinitely for new messages // Wait indefinitely for new messages
while (true) while (true)
{ {
if (_stop) break;
// Wait until something is ready to read // Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1); int timeoutMs = 10;
auto pollResult = _socket->isReadyToRead(timeoutMs);
if (pollResult == PollResultType::Error) if (pollResult == PollResultType::Error)
{ {
return false; return false;
} }
if (pollResult == PollResultType::Timeout)
{
continue;
}
// The first line of the response describe the return type, // The first line of the response describe the return type,
// => *3 (an array of 3 elements) // => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr); auto lineResult = _socket->readLine(nullptr);
@ -231,4 +241,9 @@ namespace ix
return true; return true;
} }
bool RedisClient::stop()
{
_stop = true;
}
} }

View File

@ -19,7 +19,7 @@ namespace ix
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>; using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>; using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default; RedisClient() : _stop(false) {}
~RedisClient() = default; ~RedisClient() = default;
bool connect(const std::string& hostname, int port); bool connect(const std::string& hostname, int port);
@ -32,9 +32,12 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback); const OnRedisSubscribeCallback& callback);
bool stop();
private: private:
std::string writeString(const std::string& str); std::string writeString(const std::string& str);
std::shared_ptr<Socket> _socket; std::shared_ptr<Socket> _socket;
std::atomic<bool> _stop;
}; };
} // namespace ix } // namespace ix

View File

@ -6,7 +6,7 @@
#pragma once #pragma once
#include "nlohmann/json.hpp" #include <nlohmann/json.hpp>
#include <string> #include <string>
#include <vector> #include <vector>

View File

@ -163,6 +163,14 @@ namespace snake
return; return;
} }
} }
nlohmann::json response = {
{"action", "rtm/publish/ok"},
{"id", pdu.value("id", 1)},
{"body", {}}
};
ws->sendText(response.dump());
} }
// //
@ -220,12 +228,14 @@ namespace snake
{ {
auto msg = nlohmann::json::parse(messageStr); auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
nlohmann::json response = { nlohmann::json response = {
{"action", "rtm/subscription/data"}, {"action", "rtm/subscription/data"},
{"id", id++}, {"id", id++},
{"body", { {"body", {
{"subscription_id", subscriptionId}, {"subscription_id", subscriptionId},
{"messages", {{msg}}} {"messages", {msg}}
}} }}
}; };
@ -271,6 +281,22 @@ namespace snake
pdu); pdu);
} }
void handleUnSubscribe(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
state->redisClient().stop();
nlohmann::json response = {
{"action", "rtm/unsubscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {}}
};
ws->sendText(response.dump());
}
void processCobraMessage( void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state, std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, std::shared_ptr<ix::WebSocket> ws,
@ -299,6 +325,10 @@ namespace snake
{ {
handleSubscribe(state, ws, appConfig, pdu); handleSubscribe(state, ws, appConfig, pdu);
} }
else if (action == "rtm/unsubscribe")
{
handleUnSubscribe(state, ws, appConfig, pdu);
}
else else
{ {
std::cerr << "Unhandled action: " << action << std::endl; std::cerr << "Unhandled action: " << action << std::endl;

View File

@ -4,10 +4,10 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <IXSnakeServer.h> #include "IXSnakeServer.h"
#include <IXSnakeProtocol.h> #include "IXSnakeProtocol.h"
#include <IXSnakeConnectionState.h> #include "IXSnakeConnectionState.h"
#include <IXAppConfig.h> #include "IXAppConfig.h"
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
@ -118,8 +118,19 @@ namespace snake
} }
_server.start(); _server.start();
_server.wait();
return true; return true;
} }
void SnakeServer::runForever()
{
if (run())
{
_server.wait();
}
}
void SnakeServer::stop()
{
_server.stop();
}
} }

View File

@ -19,6 +19,8 @@ namespace snake
~SnakeServer() = default; ~SnakeServer() = default;
bool run(); bool run();
void runForever();
void stop();
private: private:
std::string parseAppKey(const std::string& path); std::string parseAppKey(const std::string& path);

View File

@ -76,6 +76,8 @@ namespace ix
dumpConfig(appConfig); dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig); snake::SnakeServer snakeServer(appConfig);
return snakeServer.run() ? 0 : 1; snakeServer.runForever();
return 0; // should never reach this
} }
} }