snake server / join subscription background thread in the ConnectionState destructor + attach cobra message subscription id to the connection state instead of having it be a local reference that gets unbound
This commit is contained in:
parent
5daa59f9f3
commit
af2f31045d
@ -17,6 +17,15 @@ namespace snake
|
|||||||
class SnakeConnectionState : public ix::ConnectionState
|
class SnakeConnectionState : public ix::ConnectionState
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
virtual ~SnakeConnectionState()
|
||||||
|
{
|
||||||
|
if (subscriptionThread.joinable())
|
||||||
|
{
|
||||||
|
subscriptionRedisClient.stop();
|
||||||
|
subscriptionThread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::string getNonce()
|
std::string getNonce()
|
||||||
{
|
{
|
||||||
return _nonce;
|
return _nonce;
|
||||||
@ -52,19 +61,11 @@ namespace snake
|
|||||||
return _redisClient;
|
return _redisClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanup()
|
|
||||||
{
|
|
||||||
if (subscriptionThread.joinable())
|
|
||||||
{
|
|
||||||
subscriptionRedisClient.stop();
|
|
||||||
subscriptionThread.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We could make those accessible through methods
|
// We could make those accessible through methods
|
||||||
std::thread subscriptionThread;
|
std::thread subscriptionThread;
|
||||||
std::string appChannel;
|
std::string appChannel;
|
||||||
std::string subscriptionId;
|
std::string subscriptionId;
|
||||||
|
uint64_t id;
|
||||||
std::unique_ptr<StreamSql> streamSql;
|
std::unique_ptr<StreamSql> streamSql;
|
||||||
ix::RedisClient subscriptionRedisClient;
|
ix::RedisClient subscriptionRedisClient;
|
||||||
ix::RedisClient::OnRedisSubscribeResponseCallback onRedisSubscribeResponseCallback;
|
ix::RedisClient::OnRedisSubscribeResponseCallback onRedisSubscribeResponseCallback;
|
||||||
|
@ -198,9 +198,8 @@ namespace snake
|
|||||||
std::string filterStr = pdu["body"]["filter"];
|
std::string filterStr = pdu["body"]["filter"];
|
||||||
}
|
}
|
||||||
state->streamSql = std::make_unique<StreamSql>(filterStr);
|
state->streamSql = std::make_unique<StreamSql>(filterStr);
|
||||||
|
state->id = 0;
|
||||||
int id = 0;
|
state->onRedisSubscribeCallback = [&ws, state](const std::string& messageStr) {
|
||||||
state->onRedisSubscribeCallback = [&ws, &id, state](const std::string& messageStr) {
|
|
||||||
auto msg = nlohmann::json::parse(messageStr);
|
auto msg = nlohmann::json::parse(messageStr);
|
||||||
|
|
||||||
msg = msg["body"]["message"];
|
msg = msg["body"]["message"];
|
||||||
@ -212,7 +211,7 @@ namespace snake
|
|||||||
|
|
||||||
nlohmann::json response = {
|
nlohmann::json response = {
|
||||||
{"action", "rtm/subscription/data"},
|
{"action", "rtm/subscription/data"},
|
||||||
{"id", id++},
|
{"id", state->id++},
|
||||||
{"body",
|
{"body",
|
||||||
{{"subscription_id", state->subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
|
{{"subscription_id", state->subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
|
||||||
|
|
||||||
|
@ -99,8 +99,6 @@ namespace snake
|
|||||||
ss << "Closed connection"
|
ss << "Closed connection"
|
||||||
<< " code " << msg->closeInfo.code << " reason "
|
<< " code " << msg->closeInfo.code << " reason "
|
||||||
<< msg->closeInfo.reason << std::endl;
|
<< msg->closeInfo.reason << std::endl;
|
||||||
|
|
||||||
state->cleanup();
|
|
||||||
}
|
}
|
||||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user