add example websocket C++ server snake which supports basic cobra ops (publish and subscribe without stream sql

This commit is contained in:
Benjamin Sergeant 2019-04-22 17:24:01 -07:00
parent 323684efff
commit 0caf875399
19 changed files with 846 additions and 20 deletions

View File

@ -45,5 +45,8 @@ RUN ldd /usr/local/bin/ws
USER app USER app
WORKDIR /home/app WORKDIR /home/app
COPY --chown=app:app ws/snake/appsConfig.json .
COPY --chown=app:app ws/cobraMetricsSample.json .
ENTRYPOINT ["ws"] ENTRYPOINT ["ws"]
CMD ["--help"] CMD ["--help"]

View File

@ -1,11 +1,19 @@
version: "3" version: "3"
services: services:
snake:
image: bsergean/ws:build
entrypoint: ws snake --port 8765 --host 0.0.0.0 --redis_hosts redis1
ports:
- "8765:8765"
networks:
- ws-net
depends_on:
- redis1
ws: ws:
stdin_open: true stdin_open: true
tty: true tty: true
image: bsergean/ws:build image: bsergean/ws:build
ports:
- "8765:8765"
entrypoint: bash entrypoint: bash
networks: networks:
- ws-net - ws-net

View File

@ -11,8 +11,8 @@ if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif() endif()
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
#set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") # set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
set (CMAKE_CXX_STANDARD 14) set (CMAKE_CXX_STANDARD 14)
@ -22,6 +22,7 @@ include_directories(ws .)
include_directories(ws ..) include_directories(ws ..)
include_directories(ws ../third_party) include_directories(ws ../third_party)
include_directories(ws ../third_party/statsd-client-cpp/src) include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws snake)
add_executable(ws add_executable(ws
../third_party/msgpack11/msgpack11.cpp ../third_party/msgpack11/msgpack11.cpp
@ -37,6 +38,10 @@ add_executable(ws
ixcobra/IXCobraMetricsPublisher.cpp ixcobra/IXCobraMetricsPublisher.cpp
ixcobra/IXCobraMetricsThreadedPublisher.cpp ixcobra/IXCobraMetricsThreadedPublisher.cpp
snake/IXSnakeServer.cpp
snake/IXSnakeProtocol.cpp
snake/IXAppConfig.cpp
IXRedisClient.cpp IXRedisClient.cpp
IXSentryClient.cpp IXSentryClient.cpp
@ -55,6 +60,7 @@ add_executable(ws
ws_cobra_publish.cpp ws_cobra_publish.cpp
ws_cobra_to_statsd.cpp ws_cobra_to_statsd.cpp
ws_cobra_to_sentry.cpp ws_cobra_to_sentry.cpp
ws_snake.cpp
ws.cpp) ws.cpp)
target_link_libraries(ws ixwebsocket) target_link_libraries(ws ixwebsocket)

View File

@ -8,6 +8,7 @@
#include <ixwebsocket/IXSocketFactory.h> #include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <iostream>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include <vector> #include <vector>
@ -62,28 +63,47 @@ namespace ix
return lineValid; return lineValid;
} }
std::string RedisClient::writeString(const std::string& str)
{
std::stringstream ss;
ss << "$";
ss << str.size();
ss << "\r\n";
ss << str;
ss << "\r\n";
return ss.str();
}
bool RedisClient::publish(const std::string& channel, bool RedisClient::publish(const std::string& channel,
const std::string& message) const std::string& message,
std::string& errMsg)
{ {
if (!_socket) return false; errMsg.clear();
if (!_socket)
{
errMsg = "socket is not initialized";
return false;
}
std::stringstream ss; std::stringstream ss;
ss << "PUBLISH "; ss << "*3\r\n";
ss << channel; ss << writeString("PUBLISH");
ss << " "; ss << writeString(channel);
ss << message; ss << writeString(message);
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr); bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent) if (!sent)
{ {
errMsg = "Cannot write bytes to socket";
return false; return false;
} }
auto pollResult = _socket->isReadyToRead(-1); auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error) if (pollResult == PollResultType::Error)
{ {
errMsg = "Error while polling for result";
return false; return false;
} }
@ -91,6 +111,13 @@ namespace ix
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
// A successful response starts with a :
if (line.empty() || line[0] != ':')
{
errMsg = line;
return false;
}
return lineValid; return lineValid;
} }

View File

@ -28,13 +28,16 @@ namespace ix
std::string& response); std::string& response);
bool publish(const std::string& channel, bool publish(const std::string& channel,
const std::string& message); const std::string& message,
std::string& errMsg);
bool subscribe(const std::string& channel, bool subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback); const OnRedisSubscribeCallback& callback);
private: private:
std::string writeString(const std::string& str);
std::shared_ptr<Socket> _socket; std::shared_ptr<Socket> _socket;
}; };
} }

View File

@ -0,0 +1 @@
{"foo": "bar", "baz": 123}

52
ws/snake/IXAppConfig.cpp Normal file
View File

@ -0,0 +1,52 @@
/*
* IXSnakeProtocol.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSnakeProtocol.h"
#include "IXAppConfig.h"
#include <iostream>
#include <ixcrypto/IXUuid.h>
namespace snake
{
bool isAppKeyValid(
const AppConfig& appConfig,
std::string appkey)
{
return appConfig.apps.count(appkey) != 0;
}
std::string getRoleSecret(
const AppConfig& appConfig,
std::string appkey,
std::string role)
{
if (!isAppKeyValid(appConfig, appkey))
{
std::cerr << "Missing appkey " << appkey << std::endl;
return std::string();
}
auto roles = appConfig.apps[appkey]["roles"];
auto channel = roles[role]["secret"];
return channel;
}
std::string generateNonce()
{
return ix::uuid4();
}
void dumpConfig(const AppConfig& appConfig)
{
for (auto&& host : appConfig.redisHosts)
{
std::cout << "redis host: " << host << std::endl;
}
std::cout << "redis password: " << appConfig.redisPassword << std::endl;
std::cout << "redis port: " << appConfig.redisPort << std::endl;
}
}

46
ws/snake/IXAppConfig.h Normal file
View File

@ -0,0 +1,46 @@
/*
* IXAppConfig.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <vector>
#include "nlohmann/json.hpp"
namespace snake
{
struct AppConfig
{
// Server
std::string hostname;
int port;
// Redis
std::vector<std::string> redisHosts;
int redisPort;
std::string redisPassword;
// AppKeys
nlohmann::json apps;
// Misc
bool verbose;
};
bool isAppKeyValid(
const AppConfig& appConfig,
std::string appkey);
std::string getRoleSecret(
const AppConfig& appConfig,
std::string appkey,
std::string role);
std::string generateNonce();
void dumpConfig(const AppConfig& appConfig);
}

View File

@ -0,0 +1,40 @@
/*
* IXSnakeConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <future>
#include <ixwebsocket/IXConnectionState.h>
#include "IXRedisClient.h"
namespace snake
{
class SnakeConnectionState : public ix::ConnectionState
{
public:
std::string getNonce() { return _nonce; }
void setNonce(const std::string& nonce) { _nonce = nonce; }
std::string appkey() { return _appkey; }
void setAppkey(const std::string& appkey) { _appkey = appkey; }
std::string role() { return _role; }
void setRole(const std::string& role) { _role = role; }
ix::RedisClient& redisClient() { return _redisClient; }
std::future<void> fut;
private:
std::string _nonce;
std::string _role;
std::string _appkey;
ix::RedisClient _redisClient;
};
}

View File

@ -0,0 +1,307 @@
/*
* IXSnakeProtocol.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSnakeProtocol.h"
#include <ixwebsocket/IXWebSocket.h>
#include <ixcrypto/IXHMac.h>
#include "IXSnakeConnectionState.h"
#include "IXAppConfig.h"
#include "nlohmann/json.hpp"
#include <sstream>
namespace snake
{
void handleError(
const std::string& action,
std::shared_ptr<ix::WebSocket> ws,
nlohmann::json pdu,
const std::string& errMsg)
{
std::string actionError(action);
actionError += "/error";
nlohmann::json response = {
{"action", actionError},
{"id", pdu.value("id", 1)},
{"body", {
{"reason", errMsg}
}}
};
ws->sendText(response.dump());
}
void handleHandshake(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const nlohmann::json& pdu)
{
std::string role = pdu["body"]["data"]["role"];
state->setNonce(generateNonce());
state->setRole(role);
nlohmann::json response = {
{"action", "auth/handshake/ok"},
{"id", pdu.value("id", 1)},
{"body", {
{"data", {
{"nonce", state->getNonce()},
{"connection_id", state->getId()}
}},
}}
};
auto serializedResponse = response.dump();
std::cout << "response = " << serializedResponse << std::endl;
ws->sendText(serializedResponse);
}
void handleAuth(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
auto secret = getRoleSecret(appConfig, state->appkey(), state->role());
std::cout << "secret = " << secret << std::endl;
if (secret.empty())
{
nlohmann::json response = {
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"body", {
{"error", "authentication_failed"},
{"reason", "invalid secret"}
}}
};
ws->sendText(response.dump());
return;
}
auto nonce = state->getNonce();
auto serverHash = ix::hmac(nonce, secret);
std::string clientHash = pdu["body"]["credentials"]["hash"];
if (appConfig.verbose)
{
std::cout << serverHash << std::endl;
std::cout << clientHash << std::endl;
}
if (serverHash != clientHash)
{
nlohmann::json response = {
{"action", "auth/authenticate/error"},
{"id", pdu.value("id", 1)},
{"body", {
{"error", "authentication_failed"},
{"reason", "invalid hash"}
}}
};
ws->sendText(response.dump());
return;
}
nlohmann::json response = {
{"action", "auth/authenticate/ok"},
{"id", pdu.value("id", 1)},
{"body", {}}
};
ws->sendText(response.dump());
}
void handlePublish(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
std::vector<std::string> channels;
auto body = pdu["body"];
if (body.find("channels") != body.end())
{
for (auto&& channel : body["channels"])
{
channels.push_back(channel);
}
}
else if (body.find("channel") != body.end())
{
channels.push_back(body["channel"]);
}
else
{
std::stringstream ss;
ss << "Missing channels or channel field in publish data";
handleError("rtm/publish", ws, pdu, ss.str());
return;
}
for (auto&& channel : channels)
{
std::stringstream ss;
ss << state->appkey()
<< "::"
<< channel;
std::string errMsg;
if (!state->redisClient().publish(ss.str(), pdu.dump(), errMsg))
{
std::stringstream ss;
ss << "Cannot publish to redis host " << errMsg;
handleError("rtm/publish", ws, pdu, ss.str());
return;
}
}
}
//
// FIXME: this is not cancellable. We should be able to cancel the redis subscription
//
void handleRedisSubscription(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
std::string channel = pdu["body"]["channel"];
std::string subscriptionId = channel;
std::stringstream ss;
ss << state->appkey()
<< "::"
<< channel;
std::string appChannel(ss.str());
ix::RedisClient redisClient;
int port = appConfig.redisPort;
auto urls = appConfig.redisHosts;
std::string hostname(urls[0]);
// Connect to redis first
if (!redisClient.connect(hostname, port))
{
std::stringstream ss;
ss << "Cannot connect to redis host " << hostname << ":" << port;
handleError("rtm/subscribe", ws, pdu, ss.str());
return;
}
std::cout << "Connected to redis host " << hostname << ":" << port << std::endl;
// Now authenticate, if needed
if (!appConfig.redisPassword.empty())
{
std::string authResponse;
if (!redisClient.auth(appConfig.redisPassword, authResponse))
{
std::stringstream ss;
ss << "Cannot authenticated to redis";
handleError("rtm/subscribe", ws, pdu, ss.str());
return;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
int id = 0;
auto callback = [ws, &id, &subscriptionId](const std::string& messageStr)
{
auto msg = nlohmann::json::parse(messageStr);
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
{"body", {
{"subscription_id", subscriptionId},
{"messages", {{msg}}}
}}
};
ws->sendText(response.dump());
};
auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse)
{
std::cout << "Redis subscribe response: " << redisResponse << std::endl;
// Success
nlohmann::json response = {
{"action", "rtm/subscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {
{"subscription_id", subscriptionId}
}}
};
ws->sendText(response.dump());
};
std::cerr << "Subscribing to " << appChannel << "..." << std::endl;
if (!redisClient.subscribe(appChannel, responseCallback, callback))
{
std::stringstream ss;
ss << "Error subscribing to channel " << appChannel;
handleError("rtm/subscribe", ws, pdu, ss.str());
return;
}
}
void handleSubscribe(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
state->fut = std::async(std::launch::async,
handleRedisSubscription,
state,
ws,
appConfig,
pdu);
}
void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const std::string& str)
{
auto pdu = nlohmann::json::parse(str);
std::cout << "Got " << str << std::endl;
auto action = pdu["action"];
std::cout << "action = " << action << std::endl;
if (action == "auth/handshake")
{
handleHandshake(state, ws, pdu);
}
else if (action == "auth/authenticate")
{
handleAuth(state, ws, appConfig, pdu);
}
else if (action == "rtm/publish")
{
handlePublish(state, ws, appConfig, pdu);
}
else if (action == "rtm/subscribe")
{
handleSubscribe(state, ws, appConfig, pdu);
}
else
{
std::cerr << "Unhandled action: " << action << std::endl;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* IXSnakeProtocol.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class WebSocket;
}
namespace snake
{
class SnakeConnectionState;
struct AppConfig;
void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const std::string& str);
}

130
ws/snake/IXSnakeServer.cpp Normal file
View File

@ -0,0 +1,130 @@
/*
* IXSnakeServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <IXSnakeServer.h>
#include <IXSnakeProtocol.h>
#include <IXSnakeConnectionState.h>
#include <IXAppConfig.h>
#include <iostream>
#include <sstream>
namespace snake
{
SnakeServer::SnakeServer(const AppConfig& appConfig) :
_appConfig(appConfig),
_server(appConfig.port, appConfig.hostname)
{
;
}
//
// Parse appkey from this uri. Won't work if multiple args are present in the uri
// Uri: /v2?appkey=FC2F10139A2BAc53BB72D9db967b024f
//
std::string SnakeServer::parseAppKey(const std::string& path)
{
std::string::size_type idx;
idx = path.rfind('=');
if (idx != std::string::npos)
{
std::string appkey = path.substr(idx+1);
return appkey;
}
else
{
return std::string();
}
}
bool SnakeServer::run()
{
std::cout << "Listening on " << _appConfig.hostname << ":" << _appConfig.port << std::endl;
auto factory = []() -> std::shared_ptr<ix::ConnectionState>
{
return std::make_shared<SnakeConnectionState>();
};
_server.setConnectionStateFactory(factory);
_server.setOnConnectionCallback(
[this](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ix::ConnectionState> connectionState)
{
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::string appkey = parseAppKey(openInfo.uri);
state->setAppkey(appkey);
// Connect to redis first
if (!state->redisClient().connect(_appConfig.redisHosts[0],
_appConfig.redisPort))
{
std::cerr << "Cannot connect to redis host" << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str);
}
}
);
}
);
auto res = _server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return false;
}
_server.start();
_server.wait();
return true;
}
}

30
ws/snake/IXSnakeServer.h Normal file
View File

@ -0,0 +1,30 @@
/*
* IXSnakeServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <ixwebsocket/IXWebSocketServer.h>
#include "IXAppConfig.h"
namespace snake
{
class SnakeServer
{
public:
SnakeServer(const AppConfig& appConfig);
~SnakeServer() = default;
bool run();
private:
std::string parseAppKey(const std::string& path);
AppConfig _appConfig;
ix::WebSocketServer _server;
};
}

14
ws/snake/appsConfig.json Normal file
View File

@ -0,0 +1,14 @@
{
"apps": {
"FC2F10139A2BAc53BB72D9db967b024f": {
"roles": {
"_sub": {
"secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"
},
"_pub": {
"secret": "1c04DB8fFe76A4EeFE3E318C72d771db"
}
}
}
}
}

View File

@ -45,12 +45,16 @@ int main(int argc, char** argv)
std::string prefix("ws.test.v0"); std::string prefix("ws.test.v0");
std::string fields; std::string fields;
std::string dsn; std::string dsn;
std::string redisHosts("127.0.0.1");
std::string redisPassword;
std::string appsConfigPath("appsConfig.json");
bool headersOnly = false; bool headersOnly = false;
bool followRedirects = false; bool followRedirects = false;
bool verbose = false; bool verbose = false;
bool save = false; bool save = false;
bool compress = false; bool compress = false;
bool strict = false; bool strict = false;
bool stress = false;
int port = 8080; int port = 8080;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -144,6 +148,7 @@ int main(int argc, char** argv)
cobraPublish->add_option("--pidfile", pidfile, "Pid file"); cobraPublish->add_option("--pidfile", pidfile, "Pid file");
cobraPublish->add_option("path", path, "Path to the file to send") cobraPublish->add_option("path", path, "Path to the file to send")
->required()->check(CLI::ExistingPath); ->required()->check(CLI::ExistingPath);
cobraPublish->add_flag("--stress", stress, "Stress mode");
CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd");
cobra2statsd->add_option("--appkey", appkey, "Appkey"); cobra2statsd->add_option("--appkey", appkey, "Appkey");
@ -170,6 +175,17 @@ int main(int argc, char** argv)
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
CLI::App* runApp = app.add_subcommand("snake", "Snake server");
runApp->add_option("--port", port, "Connection url");
runApp->add_option("--host", hostname, "Hostname");
runApp->add_option("--pidfile", pidfile, "Pid file");
runApp->add_option("--redis_hosts", redisHosts, "Redis hosts");
runApp->add_option("--redis_port", redisPort, "Redis hosts");
runApp->add_option("--redis_password", redisPassword, "Redis password");
runApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
->check(CLI::ExistingPath);
runApp->add_flag("-v", verbose, "Verbose");
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
// pid file handling // pid file handling
@ -242,7 +258,7 @@ int main(int argc, char** argv)
{ {
return ix::ws_cobra_publish_main(appkey, endpoint, return ix::ws_cobra_publish_main(appkey, endpoint,
rolename, rolesecret, rolename, rolesecret,
channel, path); channel, path, stress);
} }
else if (app.got_subcommand("cobra_to_statsd")) else if (app.got_subcommand("cobra_to_statsd"))
{ {
@ -258,6 +274,13 @@ int main(int argc, char** argv)
channel, dsn, channel, dsn,
verbose, strict, jobs); verbose, strict, jobs);
} }
else if (app.got_subcommand("snake"))
{
return ix::ws_snake_main(port, hostname,
redisHosts, redisPort,
redisPassword, verbose,
appsConfigPath);
}
return 1; return 1;
} }

11
ws/ws.h
View File

@ -64,7 +64,8 @@ namespace ix
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& path); const std::string& path,
bool stress);
int ws_cobra_to_statsd_main(const std::string& appkey, int ws_cobra_to_statsd_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
@ -86,4 +87,12 @@ namespace ix
bool verbose, bool verbose,
bool strict, bool strict,
int jobs); int jobs);
int ws_snake_main(int port,
const std::string& hostname,
const std::string& redisHosts,
int redisPort,
const std::string& redisPassword,
bool verbose,
const std::string& appsConfigPath);
} }

View File

@ -20,7 +20,8 @@ namespace ix
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& path) const std::string& path,
bool stress)
{ {
CobraMetricsPublisher cobraMetricsPublisher; CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true); cobraMetricsPublisher.enable(true);
@ -39,7 +40,25 @@ namespace ix
Json::Reader reader; Json::Reader reader;
if (!reader.parse(str, data)) return 1; if (!reader.parse(str, data)) return 1;
cobraMetricsPublisher.push(std::string("foo_id"), data); if (!stress)
{
cobraMetricsPublisher.push(channel, data);
}
else
{
// Stress mode to try to trigger server and client bugs
while (true)
{
for (int i = 0 ; i < 1000; ++i)
{
cobraMetricsPublisher.push(channel, data);
}
cobraMetricsPublisher.suspend();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
cobraMetricsPublisher.resume();
}
}
// Wait a bit for the message to get a chance to be sent // Wait a bit for the message to get a chance to be sent
// there isn't any ack on publish right now so it's the best we can do // there isn't any ack on publish right now so it's the best we can do

View File

@ -36,13 +36,14 @@ namespace ix
std::cout << "Auth response: " << authResponse << ":" << port << std::endl; std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
} }
std::string errMsg;
for (int i = 0; i < count; i++) for (int i = 0; i < count; i++)
{ {
//std::cerr << "Publishing message " << message if (!redisClient.publish(channel, message, errMsg))
// << " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{ {
std::cerr << "Error publishing to channel " << channel << std::endl; std::cerr << "Error publishing to channel " << channel
<< "error: " << errMsg
<< std::endl;
return 1; return 1;
} }
} }

81
ws/ws_snake.cpp Normal file
View File

@ -0,0 +1,81 @@
/*
* snake_run.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXSnakeServer.h"
#include <iostream>
#include <sstream>
#include <fstream>
namespace
{
std::vector<uint8_t> load(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize(size);
file.read((char*)&memblock.front(), static_cast<std::streamsize>(size));
return memblock;
}
std::string readAsString(const std::string& path)
{
auto vec = load(path);
return std::string(vec.begin(), vec.end());
}
}
namespace ix
{
int ws_snake_main(int port,
const std::string& hostname,
const std::string& redisHosts,
int redisPort,
const std::string& redisPassword,
bool verbose,
const std::string& appsConfigPath)
{
snake::AppConfig appConfig;
appConfig.port = port;
appConfig.hostname = hostname;
appConfig.verbose = verbose;
appConfig.redisPort = redisPort;
appConfig.redisPassword = redisPassword;
// Parse config file
auto str = readAsString(appsConfigPath);
if (str.empty())
{
std::cout << "Cannot read content of " << appsConfigPath << std::endl;
return 1;
}
std::cout << str << std::endl;
auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"];
std::string token;
std::stringstream tokenStream(redisHosts);
while (std::getline(tokenStream, token, ';'))
{
appConfig.redisHosts.push_back(token);
}
// Display config on the terminal for debugging
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
return snakeServer.run() ? 0 : 1;
}
}