From 9957ec9724f01dd9a11aaabf745f7d51e301ce96 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Fri, 24 Jul 2020 12:33:17 -0700 Subject: [PATCH] (ws) port ws snake to the new server API --- docs/CHANGELOG.md | 4 + ixsnake/ixsnake/IXAppConfig.cpp | 6 ++ ixsnake/ixsnake/IXSnakeProtocol.cpp | 42 +++++------ ixsnake/ixsnake/IXSnakeProtocol.h | 2 +- ixsnake/ixsnake/IXSnakeServer.cpp | 112 ++++++++++++++-------------- ixwebsocket/IXWebSocketVersion.h | 2 +- 6 files changed, 88 insertions(+), 80 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bf79fd13..4de9eaaa 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All changes to this project will be documented in this file. +## [9.10.4] - 2020-07-24 + +(ws) port ws snake to the new server API + ## [9.10.3] - 2020-07-24 (ws) port ws transfer to the new server API diff --git a/ixsnake/ixsnake/IXAppConfig.cpp b/ixsnake/ixsnake/IXAppConfig.cpp index 8dd80b23..d1eb68b1 100644 --- a/ixsnake/ixsnake/IXAppConfig.cpp +++ b/ixsnake/ixsnake/IXAppConfig.cpp @@ -26,6 +26,12 @@ namespace snake } auto roles = appConfig.apps[appkey]["roles"]; + if (roles.count(role) == 0) + { + std::cerr << "Missing role " << role << std::endl; + return std::string(); + } + auto channel = roles[role]["secret"]; return channel; } diff --git a/ixsnake/ixsnake/IXSnakeProtocol.cpp b/ixsnake/ixsnake/IXSnakeProtocol.cpp index 99310546..f514dfa2 100644 --- a/ixsnake/ixsnake/IXSnakeProtocol.cpp +++ b/ixsnake/ixsnake/IXSnakeProtocol.cpp @@ -19,7 +19,7 @@ namespace snake { void handleError(const std::string& action, - std::shared_ptr ws, + ix::WebSocket& ws, nlohmann::json pdu, const std::string& errMsg) { @@ -28,11 +28,11 @@ namespace snake nlohmann::json response = { {"action", actionError}, {"id", pdu.value("id", 1)}, {"body", {{"reason", errMsg}}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); } void handleHandshake(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const nlohmann::json& pdu) { std::string role = pdu["body"]["data"]["role"]; @@ -50,11 +50,11 @@ namespace snake auto serializedResponse = response.dump(); - ws->sendText(serializedResponse); + ws.sendText(serializedResponse); } void handleAuth(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const AppConfig& appConfig, const nlohmann::json& pdu) { @@ -66,7 +66,7 @@ namespace snake {"action", "auth/authenticate/error"}, {"id", pdu.value("id", 1)}, {"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); return; } @@ -80,18 +80,18 @@ namespace snake {"action", "auth/authenticate/error"}, {"id", pdu.value("id", 1)}, {"body", {{"error", "authentication_failed"}, {"reason", "invalid hash"}}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); return; } nlohmann::json response = { {"action", "auth/authenticate/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); } void handlePublish(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const AppConfig& appConfig, const nlohmann::json& pdu) { @@ -141,14 +141,14 @@ namespace snake nlohmann::json response = { {"action", "rtm/publish/ok"}, {"id", pdu.value("id", 1)}, {"body", {}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); } // // FIXME: this is not cancellable. We should be able to cancel the redis subscription // void handleRedisSubscription(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const AppConfig& appConfig, const nlohmann::json& pdu) { @@ -197,7 +197,7 @@ namespace snake std::unique_ptr streamSql = std::make_unique(filterStr); int id = 0; - auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) { + auto callback = [&ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) { auto msg = nlohmann::json::parse(messageStr); msg = msg["body"]["message"]; @@ -213,10 +213,10 @@ namespace snake {"body", {{"subscription_id", subscriptionId}, {"position", "0-0"}, {"messages", {msg}}}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); }; - auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse) { + auto responseCallback = [&ws, pdu, &subscriptionId](const std::string& redisResponse) { std::stringstream ss; ss << "Redis Response: " << redisResponse << "..."; ix::CoreLogger::log(ss.str().c_str()); @@ -225,7 +225,7 @@ namespace snake nlohmann::json response = {{"action", "rtm/subscribe/ok"}, {"id", pdu.value("id", 1)}, {"body", {{"subscription_id", subscriptionId}}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); }; { @@ -244,16 +244,16 @@ namespace snake } void handleSubscribe(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const AppConfig& appConfig, const nlohmann::json& pdu) { state->fut = - std::async(std::launch::async, handleRedisSubscription, state, ws, appConfig, pdu); + std::async(std::launch::async, handleRedisSubscription, state, std::ref(ws), appConfig, pdu); } void handleUnSubscribe(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const nlohmann::json& pdu) { // extract subscription_id @@ -265,11 +265,11 @@ namespace snake nlohmann::json response = {{"action", "rtm/unsubscribe/ok"}, {"id", pdu.value("id", 1)}, {"body", {{"subscription_id", subscriptionId}}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); } void processCobraMessage(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const AppConfig& appConfig, const std::string& str) { @@ -284,7 +284,7 @@ namespace snake ss << "malformed json pdu: " << e.what() << " -> " << str << ""; nlohmann::json response = {{"body", {{"error", "invalid_json"}, {"reason", ss.str()}}}}; - ws->sendText(response.dump()); + ws.sendText(response.dump()); return; } diff --git a/ixsnake/ixsnake/IXSnakeProtocol.h b/ixsnake/ixsnake/IXSnakeProtocol.h index fd541d07..4f73ca7b 100644 --- a/ixsnake/ixsnake/IXSnakeProtocol.h +++ b/ixsnake/ixsnake/IXSnakeProtocol.h @@ -20,7 +20,7 @@ namespace snake struct AppConfig; void processCobraMessage(std::shared_ptr state, - std::shared_ptr ws, + ix::WebSocket& ws, const AppConfig& appConfig, const std::string& str); } // namespace snake diff --git a/ixsnake/ixsnake/IXSnakeServer.cpp b/ixsnake/ixsnake/IXSnakeServer.cpp index 0afcb0a5..60c9d08f 100644 --- a/ixsnake/ixsnake/IXSnakeServer.cpp +++ b/ixsnake/ixsnake/IXSnakeServer.cpp @@ -59,68 +59,66 @@ namespace snake }; _server.setConnectionStateFactory(factory); - _server.setOnConnectionCallback( - [this](std::shared_ptr webSocket, - std::shared_ptr connectionState, - std::unique_ptr connectionInfo) { + _server.setOnClientMessageCallback( + [this](std::shared_ptr connectionState, + ix::ConnectionInfo& connectionInfo, + ix::WebSocket& webSocket, + const ix::WebSocketMessagePtr& msg) { auto state = std::dynamic_pointer_cast(connectionState); - auto remoteIp = connectionInfo->remoteIp; + auto remoteIp = connectionInfo.remoteIp; - webSocket->setOnMessageCallback( - [this, webSocket, state, remoteIp](const ix::WebSocketMessagePtr& msg) { - std::stringstream ss; - ix::LogLevel logLevel = ix::LogLevel::Debug; - if (msg->type == ix::WebSocketMessageType::Open) - { - ss << "New connection" << std::endl; - ss << "remote ip: " << remoteIp << std::endl; - ss << "id: " << state->getId() << std::endl; - ss << "Uri: " << msg->openInfo.uri << std::endl; - ss << "Headers:" << std::endl; - for (auto it : msg->openInfo.headers) - { - ss << it.first << ": " << it.second << std::endl; - } + std::stringstream ss; + ix::LogLevel logLevel = ix::LogLevel::Debug; + if (msg->type == ix::WebSocketMessageType::Open) + { + ss << "New connection" << std::endl; + ss << "remote ip: " << remoteIp << std::endl; + ss << "id: " << state->getId() << std::endl; + ss << "Uri: " << msg->openInfo.uri << std::endl; + ss << "Headers:" << std::endl; + for (auto it : msg->openInfo.headers) + { + ss << it.first << ": " << it.second << std::endl; + } - std::string appkey = parseAppKey(msg->openInfo.uri); - state->setAppkey(appkey); + std::string appkey = parseAppKey(msg->openInfo.uri); + state->setAppkey(appkey); - // Connect to redis first - if (!state->redisClient().connect(_appConfig.redisHosts[0], - _appConfig.redisPort)) - { - ss << "Cannot connect to redis host" << std::endl; - logLevel = ix::LogLevel::Error; - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - ss << "Closed connection" - << " code " << msg->closeInfo.code << " reason " - << msg->closeInfo.reason << std::endl; - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - std::stringstream ss; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - logLevel = ix::LogLevel::Error; - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - ss << "Received message fragment" << std::endl; - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - ss << "Received " << msg->wireSize << " bytes" << std::endl; - processCobraMessage(state, webSocket, _appConfig, msg->str); - } + // Connect to redis first + if (!state->redisClient().connect(_appConfig.redisHosts[0], + _appConfig.redisPort)) + { + ss << "Cannot connect to redis host" << std::endl; + logLevel = ix::LogLevel::Error; + } + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "Closed connection" + << " code " << msg->closeInfo.code << " reason " + << msg->closeInfo.reason << std::endl; + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + std::stringstream ss; + ss << "Connection error: " << msg->errorInfo.reason << std::endl; + ss << "#retries: " << msg->errorInfo.retries << std::endl; + ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; + ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; + logLevel = ix::LogLevel::Error; + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + ss << "Received message fragment" << std::endl; + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + ss << "Received " << msg->wireSize << " bytes" << std::endl; + processCobraMessage(state, webSocket, _appConfig, msg->str); + } - ix::CoreLogger::log(ss.str().c_str(), logLevel); - }); - }); + ix::CoreLogger::log(ss.str().c_str(), logLevel); + }); auto res = _server.listen(); if (!res.first) diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 3feb0d54..74d5589e 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "9.10.3" +#define IX_WEBSOCKET_VERSION "9.10.4"