add example websocket C++ server snake which supports basic cobra ops (publish and subscribe without stream sql

This commit is contained in:
Benjamin Sergeant
2019-04-22 17:24:01 -07:00
parent 9ab7bc652a
commit c85d5da111
19 changed files with 846 additions and 20 deletions

52
ws/snake/IXAppConfig.cpp Normal file
View File

@ -0,0 +1,52 @@
/*
* IXSnakeProtocol.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSnakeProtocol.h"
#include "IXAppConfig.h"
#include <iostream>
#include <ixcrypto/IXUuid.h>
namespace snake
{
bool isAppKeyValid(
const AppConfig& appConfig,
std::string appkey)
{
return appConfig.apps.count(appkey) != 0;
}
std::string getRoleSecret(
const AppConfig& appConfig,
std::string appkey,
std::string role)
{
if (!isAppKeyValid(appConfig, appkey))
{
std::cerr << "Missing appkey " << appkey << std::endl;
return std::string();
}
auto roles = appConfig.apps[appkey]["roles"];
auto channel = roles[role]["secret"];
return channel;
}
std::string generateNonce()
{
return ix::uuid4();
}
void dumpConfig(const AppConfig& appConfig)
{
for (auto&& host : appConfig.redisHosts)
{
std::cout << "redis host: " << host << std::endl;
}
std::cout << "redis password: " << appConfig.redisPassword << std::endl;
std::cout << "redis port: " << appConfig.redisPort << std::endl;
}
}

46
ws/snake/IXAppConfig.h Normal file
View File

@ -0,0 +1,46 @@
/*
* IXAppConfig.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <vector>
#include "nlohmann/json.hpp"
namespace snake
{
struct AppConfig
{
// Server
std::string hostname;
int port;
// Redis
std::vector<std::string> redisHosts;
int redisPort;
std::string redisPassword;
// AppKeys
nlohmann::json apps;
// Misc
bool verbose;
};
bool isAppKeyValid(
const AppConfig& appConfig,
std::string appkey);
std::string getRoleSecret(
const AppConfig& appConfig,
std::string appkey,
std::string role);
std::string generateNonce();
void dumpConfig(const AppConfig& appConfig);
}

View File

@ -0,0 +1,40 @@
/*
* IXSnakeConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <future>
#include <ixwebsocket/IXConnectionState.h>
#include "IXRedisClient.h"
namespace snake
{
class SnakeConnectionState : public ix::ConnectionState
{
public:
std::string getNonce() { return _nonce; }
void setNonce(const std::string& nonce) { _nonce = nonce; }
std::string appkey() { return _appkey; }
void setAppkey(const std::string& appkey) { _appkey = appkey; }
std::string role() { return _role; }
void setRole(const std::string& role) { _role = role; }
ix::RedisClient& redisClient() { return _redisClient; }
std::future<void> fut;
private:
std::string _nonce;
std::string _role;
std::string _appkey;
ix::RedisClient _redisClient;
};
}

View File

@ -0,0 +1,307 @@
/*
* IXSnakeProtocol.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSnakeProtocol.h"
#include <ixwebsocket/IXWebSocket.h>
#include <ixcrypto/IXHMac.h>
#include "IXSnakeConnectionState.h"
#include "IXAppConfig.h"
#include "nlohmann/json.hpp"
#include <sstream>
namespace snake
{
void handleError(
const std::string& action,
std::shared_ptr<ix::WebSocket> ws,
nlohmann::json pdu,
const std::string& errMsg)
{
std::string actionError(action);
actionError += "/error";
nlohmann::json response = {
{"action", actionError},
{"id", pdu.value("id", 1)},
{"body", {
{"reason", errMsg}
}}
};
ws->sendText(response.dump());
}
void handleHandshake(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const nlohmann::json& pdu)
{
std::string role = pdu["body"]["data"]["role"];
state->setNonce(generateNonce());
state->setRole(role);
nlohmann::json response = {
{"action", "auth/handshake/ok"},
{"id", pdu.value("id", 1)},
{"body", {
{"data", {
{"nonce", state->getNonce()},
{"connection_id", state->getId()}
}},
}}
};
auto serializedResponse = response.dump();
std::cout << "response = " << serializedResponse << std::endl;
ws->sendText(serializedResponse);
}
void handleAuth(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
auto secret = getRoleSecret(appConfig, state->appkey(), state->role());
std::cout << "secret = " << secret << std::endl;
if (secret.empty())
{
nlohmann::json response = {
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"body", {
{"error", "authentication_failed"},
{"reason", "invalid secret"}
}}
};
ws->sendText(response.dump());
return;
}
auto nonce = state->getNonce();
auto serverHash = ix::hmac(nonce, secret);
std::string clientHash = pdu["body"]["credentials"]["hash"];
if (appConfig.verbose)
{
std::cout << serverHash << std::endl;
std::cout << clientHash << std::endl;
}
if (serverHash != clientHash)
{
nlohmann::json response = {
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"body", {
{"error", "authentication_failed"},
{"reason", "invalid hash"}
}}
};
ws->sendText(response.dump());
return;
}
nlohmann::json response = {
{"action", "auth/authenticate/ok"},
{"id", pdu.value("id", 1)},
{"body", {}}
};
ws->sendText(response.dump());
}
void handlePublish(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
std::vector<std::string> channels;
auto body = pdu["body"];
if (body.find("channels") != body.end())
{
for (auto&& channel : body["channels"])
{
channels.push_back(channel);
}
}
else if (body.find("channel") != body.end())
{
channels.push_back(body["channel"]);
}
else
{
std::stringstream ss;
ss << "Missing channels or channel field in publish data";
handleError("rtm/publish", ws, pdu, ss.str());
return;
}
for (auto&& channel : channels)
{
std::stringstream ss;
ss << state->appkey()
<< "::"
<< channel;
std::string errMsg;
if (!state->redisClient().publish(ss.str(), pdu.dump(), errMsg))
{
std::stringstream ss;
ss << "Cannot publish to redis host " << errMsg;
handleError("rtm/publish", ws, pdu, ss.str());
return;
}
}
}
//
// FIXME: this is not cancellable. We should be able to cancel the redis subscription
//
void handleRedisSubscription(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
std::string channel = pdu["body"]["channel"];
std::string subscriptionId = channel;
std::stringstream ss;
ss << state->appkey()
<< "::"
<< channel;
std::string appChannel(ss.str());
ix::RedisClient redisClient;
int port = appConfig.redisPort;
auto urls = appConfig.redisHosts;
std::string hostname(urls[0]);
// Connect to redis first
if (!redisClient.connect(hostname, port))
{
std::stringstream ss;
ss << "Cannot connect to redis host " << hostname << ":" << port;
handleError("rtm/subscribe", ws, pdu, ss.str());
return;
}
std::cout << "Connected to redis host " << hostname << ":" << port << std::endl;
// Now authenticate, if needed
if (!appConfig.redisPassword.empty())
{
std::string authResponse;
if (!redisClient.auth(appConfig.redisPassword, authResponse))
{
std::stringstream ss;
ss << "Cannot authenticated to redis";
handleError("rtm/subscribe", ws, pdu, ss.str());
return;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
int id = 0;
auto callback = [ws, &id, &subscriptionId](const std::string& messageStr)
{
auto msg = nlohmann::json::parse(messageStr);
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
{"body", {
{"subscription_id", subscriptionId},
{"messages", {{msg}}}
}}
};
ws->sendText(response.dump());
};
auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse)
{
std::cout << "Redis subscribe response: " << redisResponse << std::endl;
// Success
nlohmann::json response = {
{"action", "rtm/subscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {
{"subscription_id", subscriptionId}
}}
};
ws->sendText(response.dump());
};
std::cerr << "Subscribing to " << appChannel << "..." << std::endl;
if (!redisClient.subscribe(appChannel, responseCallback, callback))
{
std::stringstream ss;
ss << "Error subscribing to channel " << appChannel;
handleError("rtm/subscribe", ws, pdu, ss.str());
return;
}
}
void handleSubscribe(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
state->fut = std::async(std::launch::async,
handleRedisSubscription,
state,
ws,
appConfig,
pdu);
}
void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const std::string& str)
{
auto pdu = nlohmann::json::parse(str);
std::cout << "Got " << str << std::endl;
auto action = pdu["action"];
std::cout << "action = " << action << std::endl;
if (action == "auth/handshake")
{
handleHandshake(state, ws, pdu);
}
else if (action == "auth/authenticate")
{
handleAuth(state, ws, appConfig, pdu);
}
else if (action == "rtm/publish")
{
handlePublish(state, ws, appConfig, pdu);
}
else if (action == "rtm/subscribe")
{
handleSubscribe(state, ws, appConfig, pdu);
}
else
{
std::cerr << "Unhandled action: " << action << std::endl;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* IXSnakeProtocol.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class WebSocket;
}
namespace snake
{
class SnakeConnectionState;
struct AppConfig;
void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const std::string& str);
}

130
ws/snake/IXSnakeServer.cpp Normal file
View File

@ -0,0 +1,130 @@
/*
* IXSnakeServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <IXSnakeServer.h>
#include <IXSnakeProtocol.h>
#include <IXSnakeConnectionState.h>
#include <IXAppConfig.h>
#include <iostream>
#include <sstream>
namespace snake
{
SnakeServer::SnakeServer(const AppConfig& appConfig) :
_appConfig(appConfig),
_server(appConfig.port, appConfig.hostname)
{
;
}
//
// Parse appkey from this uri. Won't work if multiple args are present in the uri
// Uri: /v2?appkey=FC2F10139A2BAc53BB72D9db967b024f
//
std::string SnakeServer::parseAppKey(const std::string& path)
{
std::string::size_type idx;
idx = path.rfind('=');
if (idx != std::string::npos)
{
std::string appkey = path.substr(idx+1);
return appkey;
}
else
{
return std::string();
}
}
bool SnakeServer::run()
{
std::cout << "Listening on " << _appConfig.hostname << ":" << _appConfig.port << std::endl;
auto factory = []() -> std::shared_ptr<ix::ConnectionState>
{
return std::make_shared<SnakeConnectionState>();
};
_server.setConnectionStateFactory(factory);
_server.setOnConnectionCallback(
[this](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ix::ConnectionState> connectionState)
{
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::string appkey = parseAppKey(openInfo.uri);
state->setAppkey(appkey);
// Connect to redis first
if (!state->redisClient().connect(_appConfig.redisHosts[0],
_appConfig.redisPort))
{
std::cerr << "Cannot connect to redis host" << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str);
}
}
);
}
);
auto res = _server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return false;
}
_server.start();
_server.wait();
return true;
}
}

30
ws/snake/IXSnakeServer.h Normal file
View File

@ -0,0 +1,30 @@
/*
* IXSnakeServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <ixwebsocket/IXWebSocketServer.h>
#include "IXAppConfig.h"
namespace snake
{
class SnakeServer
{
public:
SnakeServer(const AppConfig& appConfig);
~SnakeServer() = default;
bool run();
private:
std::string parseAppKey(const std::string& path);
AppConfig _appConfig;
ix::WebSocketServer _server;
};
}

14
ws/snake/appsConfig.json Normal file
View File

@ -0,0 +1,14 @@
{
"apps": {
"FC2F10139A2BAc53BB72D9db967b024f": {
"roles": {
"_sub": {
"secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"
},
"_pub": {
"secret": "1c04DB8fFe76A4EeFE3E318C72d771db"
}
}
}
}
}