(ws) port ws snake to the new server API

This commit is contained in:
Benjamin Sergeant 2020-07-24 12:33:17 -07:00
parent 78a42f61bd
commit 9957ec9724
6 changed files with 88 additions and 80 deletions

View File

@ -1,6 +1,10 @@
# Changelog
All changes to this project will be documented in this file.
## [9.10.4] - 2020-07-24
(ws) port ws snake to the new server API
## [9.10.3] - 2020-07-24
(ws) port ws transfer to the new server API

View File

@ -26,6 +26,12 @@ namespace snake
}
auto roles = appConfig.apps[appkey]["roles"];
if (roles.count(role) == 0)
{
std::cerr << "Missing role " << role << std::endl;
return std::string();
}
auto channel = roles[role]["secret"];
return channel;
}

View File

@ -19,7 +19,7 @@
namespace snake
{
void handleError(const std::string& action,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
nlohmann::json pdu,
const std::string& errMsg)
{
@ -28,11 +28,11 @@ namespace snake
nlohmann::json response = {
{"action", actionError}, {"id", pdu.value("id", 1)}, {"body", {{"reason", errMsg}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
}
void handleHandshake(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const nlohmann::json& pdu)
{
std::string role = pdu["body"]["data"]["role"];
@ -50,11 +50,11 @@ namespace snake
auto serializedResponse = response.dump();
ws->sendText(serializedResponse);
ws.sendText(serializedResponse);
}
void handleAuth(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
@ -66,7 +66,7 @@ namespace snake
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
return;
}
@ -80,18 +80,18 @@ namespace snake
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid hash"}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
return;
}
nlohmann::json response = {
{"action", "auth/authenticate/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
}
void handlePublish(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
@ -141,14 +141,14 @@ namespace snake
nlohmann::json response = {
{"action", "rtm/publish/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
}
//
// FIXME: this is not cancellable. We should be able to cancel the redis subscription
//
void handleRedisSubscription(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
@ -197,7 +197,7 @@ namespace snake
std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr);
int id = 0;
auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) {
auto callback = [&ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) {
auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
@ -213,10 +213,10 @@ namespace snake
{"body",
{{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
};
auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse) {
auto responseCallback = [&ws, pdu, &subscriptionId](const std::string& redisResponse) {
std::stringstream ss;
ss << "Redis Response: " << redisResponse << "...";
ix::CoreLogger::log(ss.str().c_str());
@ -225,7 +225,7 @@ namespace snake
nlohmann::json response = {{"action", "rtm/subscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {{"subscription_id", subscriptionId}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
};
{
@ -244,16 +244,16 @@ namespace snake
}
void handleSubscribe(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
state->fut =
std::async(std::launch::async, handleRedisSubscription, state, ws, appConfig, pdu);
std::async(std::launch::async, handleRedisSubscription, state, std::ref(ws), appConfig, pdu);
}
void handleUnSubscribe(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const nlohmann::json& pdu)
{
// extract subscription_id
@ -265,11 +265,11 @@ namespace snake
nlohmann::json response = {{"action", "rtm/unsubscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {{"subscription_id", subscriptionId}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
}
void processCobraMessage(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const std::string& str)
{
@ -284,7 +284,7 @@ namespace snake
ss << "malformed json pdu: " << e.what() << " -> " << str << "";
nlohmann::json response = {{"body", {{"error", "invalid_json"}, {"reason", ss.str()}}}};
ws->sendText(response.dump());
ws.sendText(response.dump());
return;
}

View File

@ -20,7 +20,7 @@ namespace snake
struct AppConfig;
void processCobraMessage(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
ix::WebSocket& ws,
const AppConfig& appConfig,
const std::string& str);
} // namespace snake

View File

@ -59,15 +59,14 @@ namespace snake
};
_server.setConnectionStateFactory(factory);
_server.setOnConnectionCallback(
[this](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ix::ConnectionState> connectionState,
std::unique_ptr<ix::ConnectionInfo> connectionInfo) {
_server.setOnClientMessageCallback(
[this](std::shared_ptr<ix::ConnectionState> connectionState,
ix::ConnectionInfo& connectionInfo,
ix::WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
auto remoteIp = connectionInfo.remoteIp;
webSocket->setOnMessageCallback(
[this, webSocket, state, remoteIp](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
ix::LogLevel logLevel = ix::LogLevel::Debug;
if (msg->type == ix::WebSocketMessageType::Open)
@ -120,7 +119,6 @@ namespace snake
ix::CoreLogger::log(ss.str().c_str(), logLevel);
});
});
auto res = _server.listen();
if (!res.first)

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "9.10.3"
#define IX_WEBSOCKET_VERSION "9.10.4"