diff --git a/Dockerfile b/Dockerfile index 1dc53df3..d24abf9e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,5 +45,8 @@ RUN ldd /usr/local/bin/ws USER app WORKDIR /home/app +COPY --chown=app:app ws/snake/appsConfig.json . +COPY --chown=app:app ws/cobraMetricsSample.json . + ENTRYPOINT ["ws"] CMD ["--help"] diff --git a/docker-compose.yml b/docker-compose.yml index c84f68f5..c1c7eb13 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,19 @@ version: "3" services: + snake: + image: bsergean/ws:build + entrypoint: ws snake --port 8765 --host 0.0.0.0 --redis_hosts redis1 + ports: + - "8765:8765" + networks: + - ws-net + depends_on: + - redis1 + ws: stdin_open: true tty: true image: bsergean/ws:build - ports: - - "8765:8765" entrypoint: bash networks: - ws-net diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 94c7673d..fff630ab 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -11,8 +11,8 @@ if (NOT WIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") endif() -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") -#set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") +# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") +# set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread") set (CMAKE_CXX_STANDARD 14) @@ -22,6 +22,7 @@ include_directories(ws .) include_directories(ws ..) include_directories(ws ../third_party) include_directories(ws ../third_party/statsd-client-cpp/src) +include_directories(ws snake) add_executable(ws ../third_party/msgpack11/msgpack11.cpp @@ -37,6 +38,10 @@ add_executable(ws ixcobra/IXCobraMetricsPublisher.cpp ixcobra/IXCobraMetricsThreadedPublisher.cpp + snake/IXSnakeServer.cpp + snake/IXSnakeProtocol.cpp + snake/IXAppConfig.cpp + IXRedisClient.cpp IXSentryClient.cpp @@ -55,6 +60,7 @@ add_executable(ws ws_cobra_publish.cpp ws_cobra_to_statsd.cpp ws_cobra_to_sentry.cpp + ws_snake.cpp ws.cpp) target_link_libraries(ws ixwebsocket) diff --git a/ws/IXRedisClient.cpp b/ws/IXRedisClient.cpp index cf48a6f6..b122c8d8 100644 --- a/ws/IXRedisClient.cpp +++ b/ws/IXRedisClient.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -62,28 +63,47 @@ namespace ix return lineValid; } + std::string RedisClient::writeString(const std::string& str) + { + std::stringstream ss; + ss << "$"; + ss << str.size(); + ss << "\r\n"; + ss << str; + ss << "\r\n"; + + return ss.str(); + } bool RedisClient::publish(const std::string& channel, - const std::string& message) + const std::string& message, + std::string& errMsg) { - if (!_socket) return false; + errMsg.clear(); + + if (!_socket) + { + errMsg = "socket is not initialized"; + return false; + } std::stringstream ss; - ss << "PUBLISH "; - ss << channel; - ss << " "; - ss << message; - ss << "\r\n"; + ss << "*3\r\n"; + ss << writeString("PUBLISH"); + ss << writeString(channel); + ss << writeString(message); bool sent = _socket->writeBytes(ss.str(), nullptr); if (!sent) { + errMsg = "Cannot write bytes to socket"; return false; } auto pollResult = _socket->isReadyToRead(-1); if (pollResult == PollResultType::Error) { + errMsg = "Error while polling for result"; return false; } @@ -91,6 +111,13 @@ namespace ix auto lineValid = lineResult.first; auto line = lineResult.second; + // A successful response starts with a : + if (line.empty() || line[0] != ':') + { + errMsg = line; + return false; + } + return lineValid; } diff --git a/ws/IXRedisClient.h b/ws/IXRedisClient.h index ee53a7aa..8148f803 100644 --- a/ws/IXRedisClient.h +++ b/ws/IXRedisClient.h @@ -28,13 +28,16 @@ namespace ix std::string& response); bool publish(const std::string& channel, - const std::string& message); + const std::string& message, + std::string& errMsg); bool subscribe(const std::string& channel, const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeCallback& callback); private: + std::string writeString(const std::string& str); + std::shared_ptr _socket; }; } diff --git a/ws/cobraMetricsSample.json b/ws/cobraMetricsSample.json new file mode 100644 index 00000000..1cc41eb1 --- /dev/null +++ b/ws/cobraMetricsSample.json @@ -0,0 +1 @@ +{"foo": "bar", "baz": 123} diff --git a/ws/snake/IXAppConfig.cpp b/ws/snake/IXAppConfig.cpp new file mode 100644 index 00000000..7b1a15dd --- /dev/null +++ b/ws/snake/IXAppConfig.cpp @@ -0,0 +1,52 @@ +/* + * IXSnakeProtocol.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXSnakeProtocol.h" +#include "IXAppConfig.h" +#include +#include + +namespace snake +{ + bool isAppKeyValid( + const AppConfig& appConfig, + std::string appkey) + { + return appConfig.apps.count(appkey) != 0; + } + + std::string getRoleSecret( + const AppConfig& appConfig, + std::string appkey, + std::string role) + { + if (!isAppKeyValid(appConfig, appkey)) + { + std::cerr << "Missing appkey " << appkey << std::endl; + return std::string(); + } + + auto roles = appConfig.apps[appkey]["roles"]; + auto channel = roles[role]["secret"]; + return channel; + } + + std::string generateNonce() + { + return ix::uuid4(); + } + + void dumpConfig(const AppConfig& appConfig) + { + for (auto&& host : appConfig.redisHosts) + { + std::cout << "redis host: " << host << std::endl; + } + + std::cout << "redis password: " << appConfig.redisPassword << std::endl; + std::cout << "redis port: " << appConfig.redisPort << std::endl; + } +} diff --git a/ws/snake/IXAppConfig.h b/ws/snake/IXAppConfig.h new file mode 100644 index 00000000..6d8b345c --- /dev/null +++ b/ws/snake/IXAppConfig.h @@ -0,0 +1,46 @@ +/* + * IXAppConfig.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include +#include + +#include "nlohmann/json.hpp" + +namespace snake +{ + struct AppConfig + { + // Server + std::string hostname; + int port; + + // Redis + std::vector redisHosts; + int redisPort; + std::string redisPassword; + + // AppKeys + nlohmann::json apps; + + // Misc + bool verbose; + }; + + bool isAppKeyValid( + const AppConfig& appConfig, + std::string appkey); + + std::string getRoleSecret( + const AppConfig& appConfig, + std::string appkey, + std::string role); + + std::string generateNonce(); + + void dumpConfig(const AppConfig& appConfig); +} diff --git a/ws/snake/IXSnakeConnectionState.h b/ws/snake/IXSnakeConnectionState.h new file mode 100644 index 00000000..72772b42 --- /dev/null +++ b/ws/snake/IXSnakeConnectionState.h @@ -0,0 +1,40 @@ +/* + * IXSnakeConnectionState.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include +#include + +#include +#include "IXRedisClient.h" + +namespace snake +{ + class SnakeConnectionState : public ix::ConnectionState + { + public: + std::string getNonce() { return _nonce; } + void setNonce(const std::string& nonce) { _nonce = nonce; } + + std::string appkey() { return _appkey; } + void setAppkey(const std::string& appkey) { _appkey = appkey; } + + std::string role() { return _role; } + void setRole(const std::string& role) { _role = role; } + + ix::RedisClient& redisClient() { return _redisClient; } + + std::future fut; + + private: + std::string _nonce; + std::string _role; + std::string _appkey; + + ix::RedisClient _redisClient; + }; +} diff --git a/ws/snake/IXSnakeProtocol.cpp b/ws/snake/IXSnakeProtocol.cpp new file mode 100644 index 00000000..033b5119 --- /dev/null +++ b/ws/snake/IXSnakeProtocol.cpp @@ -0,0 +1,307 @@ +/* + * IXSnakeProtocol.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXSnakeProtocol.h" + +#include +#include + +#include "IXSnakeConnectionState.h" +#include "IXAppConfig.h" + +#include "nlohmann/json.hpp" +#include + +namespace snake +{ + void handleError( + const std::string& action, + std::shared_ptr ws, + nlohmann::json pdu, + const std::string& errMsg) + { + std::string actionError(action); + actionError += "/error"; + + nlohmann::json response = { + {"action", actionError}, + {"id", pdu.value("id", 1)}, + {"body", { + {"reason", errMsg} + }} + }; + ws->sendText(response.dump()); + } + + void handleHandshake( + std::shared_ptr state, + std::shared_ptr ws, + const nlohmann::json& pdu) + { + std::string role = pdu["body"]["data"]["role"]; + + state->setNonce(generateNonce()); + state->setRole(role); + + nlohmann::json response = { + {"action", "auth/handshake/ok"}, + {"id", pdu.value("id", 1)}, + {"body", { + {"data", { + {"nonce", state->getNonce()}, + {"connection_id", state->getId()} + }}, + }} + }; + + auto serializedResponse = response.dump(); + std::cout << "response = " << serializedResponse << std::endl; + + ws->sendText(serializedResponse); + } + + void handleAuth( + std::shared_ptr state, + std::shared_ptr ws, + const AppConfig& appConfig, + const nlohmann::json& pdu) + { + auto secret = getRoleSecret(appConfig, state->appkey(), state->role()); + std::cout << "secret = " << secret << std::endl; + + if (secret.empty()) + { + nlohmann::json response = { + {"action", "auth/authenticate/error"}, + {"id", pdu.value("id", 1)}, + {"body", { + {"error", "authentication_failed"}, + {"reason", "invalid secret"} + }} + }; + ws->sendText(response.dump()); + return; + } + + auto nonce = state->getNonce(); + auto serverHash = ix::hmac(nonce, secret); + std::string clientHash = pdu["body"]["credentials"]["hash"]; + + if (appConfig.verbose) + { + std::cout << serverHash << std::endl; + std::cout << clientHash << std::endl; + } + + if (serverHash != clientHash) + { + nlohmann::json response = { + {"action", "auth/authenticate/error"}, + {"id", pdu.value("id", 1)}, + {"body", { + {"error", "authentication_failed"}, + {"reason", "invalid hash"} + }} + }; + ws->sendText(response.dump()); + return; + } + + nlohmann::json response = { + {"action", "auth/authenticate/ok"}, + {"id", pdu.value("id", 1)}, + {"body", {}} + }; + + ws->sendText(response.dump()); + } + + void handlePublish( + std::shared_ptr state, + std::shared_ptr ws, + const AppConfig& appConfig, + const nlohmann::json& pdu) + { + std::vector channels; + + auto body = pdu["body"]; + if (body.find("channels") != body.end()) + { + for (auto&& channel : body["channels"]) + { + channels.push_back(channel); + } + } + else if (body.find("channel") != body.end()) + { + channels.push_back(body["channel"]); + } + else + { + std::stringstream ss; + ss << "Missing channels or channel field in publish data"; + handleError("rtm/publish", ws, pdu, ss.str()); + return; + } + + for (auto&& channel : channels) + { + std::stringstream ss; + ss << state->appkey() + << "::" + << channel; + + std::string errMsg; + if (!state->redisClient().publish(ss.str(), pdu.dump(), errMsg)) + { + std::stringstream ss; + ss << "Cannot publish to redis host " << errMsg; + handleError("rtm/publish", ws, pdu, ss.str()); + return; + } + } + } + + // + // FIXME: this is not cancellable. We should be able to cancel the redis subscription + // + void handleRedisSubscription( + std::shared_ptr state, + std::shared_ptr ws, + const AppConfig& appConfig, + const nlohmann::json& pdu) + { + std::string channel = pdu["body"]["channel"]; + std::string subscriptionId = channel; + + std::stringstream ss; + ss << state->appkey() + << "::" + << channel; + + std::string appChannel(ss.str()); + + ix::RedisClient redisClient; + int port = appConfig.redisPort; + + auto urls = appConfig.redisHosts; + std::string hostname(urls[0]); + + // Connect to redis first + if (!redisClient.connect(hostname, port)) + { + std::stringstream ss; + ss << "Cannot connect to redis host " << hostname << ":" << port; + handleError("rtm/subscribe", ws, pdu, ss.str()); + return; + } + + std::cout << "Connected to redis host " << hostname << ":" << port << std::endl; + + // Now authenticate, if needed + if (!appConfig.redisPassword.empty()) + { + std::string authResponse; + if (!redisClient.auth(appConfig.redisPassword, authResponse)) + { + std::stringstream ss; + ss << "Cannot authenticated to redis"; + handleError("rtm/subscribe", ws, pdu, ss.str()); + return; + } + std::cout << "Auth response: " << authResponse << ":" << port << std::endl; + } + + int id = 0; + auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) + { + auto msg = nlohmann::json::parse(messageStr); + + nlohmann::json response = { + {"action", "rtm/subscription/data"}, + {"id", id++}, + {"body", { + {"subscription_id", subscriptionId}, + {"messages", {{msg}}} + }} + }; + + ws->sendText(response.dump()); + }; + + auto responseCallback = [ws, pdu, &subscriptionId](const std::string& redisResponse) + { + std::cout << "Redis subscribe response: " << redisResponse << std::endl; + + // Success + nlohmann::json response = { + {"action", "rtm/subscribe/ok"}, + {"id", pdu.value("id", 1)}, + {"body", { + {"subscription_id", subscriptionId} + }} + }; + ws->sendText(response.dump()); + }; + + std::cerr << "Subscribing to " << appChannel << "..." << std::endl; + if (!redisClient.subscribe(appChannel, responseCallback, callback)) + { + std::stringstream ss; + ss << "Error subscribing to channel " << appChannel; + handleError("rtm/subscribe", ws, pdu, ss.str()); + return; + } + } + + void handleSubscribe( + std::shared_ptr state, + std::shared_ptr ws, + const AppConfig& appConfig, + const nlohmann::json& pdu) + { + state->fut = std::async(std::launch::async, + handleRedisSubscription, + state, + ws, + appConfig, + pdu); + } + + void processCobraMessage( + std::shared_ptr state, + std::shared_ptr ws, + const AppConfig& appConfig, + const std::string& str) + { + auto pdu = nlohmann::json::parse(str); + std::cout << "Got " << str << std::endl; + + auto action = pdu["action"]; + std::cout << "action = " << action << std::endl; + + if (action == "auth/handshake") + { + handleHandshake(state, ws, pdu); + } + else if (action == "auth/authenticate") + { + handleAuth(state, ws, appConfig, pdu); + } + else if (action == "rtm/publish") + { + handlePublish(state, ws, appConfig, pdu); + } + else if (action == "rtm/subscribe") + { + handleSubscribe(state, ws, appConfig, pdu); + } + else + { + std::cerr << "Unhandled action: " << action << std::endl; + } + } +} diff --git a/ws/snake/IXSnakeProtocol.h b/ws/snake/IXSnakeProtocol.h new file mode 100644 index 00000000..636eabfa --- /dev/null +++ b/ws/snake/IXSnakeProtocol.h @@ -0,0 +1,26 @@ +/* + * IXSnakeProtocol.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include + +namespace ix +{ + class WebSocket; +} + +namespace snake +{ + class SnakeConnectionState; + struct AppConfig; + + void processCobraMessage( + std::shared_ptr state, + std::shared_ptr ws, + const AppConfig& appConfig, + const std::string& str); +} diff --git a/ws/snake/IXSnakeServer.cpp b/ws/snake/IXSnakeServer.cpp new file mode 100644 index 00000000..93c514e5 --- /dev/null +++ b/ws/snake/IXSnakeServer.cpp @@ -0,0 +1,130 @@ +/* + * IXSnakeServer.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include + +#include +#include + +namespace snake +{ + SnakeServer::SnakeServer(const AppConfig& appConfig) : + _appConfig(appConfig), + _server(appConfig.port, appConfig.hostname) + { + ; + } + + // + // Parse appkey from this uri. Won't work if multiple args are present in the uri + // Uri: /v2?appkey=FC2F10139A2BAc53BB72D9db967b024f + // + std::string SnakeServer::parseAppKey(const std::string& path) + { + std::string::size_type idx; + + idx = path.rfind('='); + if (idx != std::string::npos) + { + std::string appkey = path.substr(idx+1); + return appkey; + } + else + { + return std::string(); + } + } + + bool SnakeServer::run() + { + std::cout << "Listening on " << _appConfig.hostname << ":" << _appConfig.port << std::endl; + + auto factory = []() -> std::shared_ptr + { + return std::make_shared(); + }; + _server.setConnectionStateFactory(factory); + + _server.setOnConnectionCallback( + [this](std::shared_ptr webSocket, + std::shared_ptr connectionState) + { + auto state = std::dynamic_pointer_cast(connectionState); + + webSocket->setOnMessageCallback( + [this, webSocket, state](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketOpenInfo& openInfo, + const ix::WebSocketCloseInfo& closeInfo) + { + if (messageType == ix::WebSocket_MessageType_Open) + { + std::cerr << "New connection" << std::endl; + std::cerr << "id: " << state->getId() << std::endl; + std::cerr << "Uri: " << openInfo.uri << std::endl; + std::cerr << "Headers:" << std::endl; + for (auto it : openInfo.headers) + { + std::cerr << it.first << ": " << it.second << std::endl; + } + + std::string appkey = parseAppKey(openInfo.uri); + state->setAppkey(appkey); + + // Connect to redis first + if (!state->redisClient().connect(_appConfig.redisHosts[0], + _appConfig.redisPort)) + { + std::cerr << "Cannot connect to redis host" << std::endl; + } + } + else if (messageType == ix::WebSocket_MessageType_Close) + { + std::cerr << "Closed connection" + << " code " << closeInfo.code + << " reason " << closeInfo.reason << std::endl; + } + else if (messageType == ix::WebSocket_MessageType_Error) + { + std::stringstream ss; + ss << "Connection error: " << error.reason << std::endl; + ss << "#retries: " << error.retries << std::endl; + ss << "Wait time(ms): " << error.wait_time << std::endl; + ss << "HTTP Status: " << error.http_status << std::endl; + std::cerr << ss.str(); + } + else if (messageType == ix::WebSocket_MessageType_Fragment) + { + std::cerr << "Received message fragment" << std::endl; + } + else if (messageType == ix::WebSocket_MessageType_Message) + { + std::cerr << "Received " << wireSize << " bytes" << std::endl; + processCobraMessage(state, webSocket, _appConfig, str); + } + } + ); + } + ); + + auto res = _server.listen(); + if (!res.first) + { + std::cerr << res.second << std::endl; + return false; + } + + _server.start(); + _server.wait(); + + return true; + } +} diff --git a/ws/snake/IXSnakeServer.h b/ws/snake/IXSnakeServer.h new file mode 100644 index 00000000..0cc7f1dc --- /dev/null +++ b/ws/snake/IXSnakeServer.h @@ -0,0 +1,30 @@ +/* + * IXSnakeServer.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include + +#include +#include "IXAppConfig.h" + +namespace snake +{ + class SnakeServer + { + public: + SnakeServer(const AppConfig& appConfig); + ~SnakeServer() = default; + + bool run(); + + private: + std::string parseAppKey(const std::string& path); + + AppConfig _appConfig; + ix::WebSocketServer _server; + }; +} diff --git a/ws/snake/appsConfig.json b/ws/snake/appsConfig.json new file mode 100644 index 00000000..14f8f48b --- /dev/null +++ b/ws/snake/appsConfig.json @@ -0,0 +1,14 @@ +{ + "apps": { + "FC2F10139A2BAc53BB72D9db967b024f": { + "roles": { + "_sub": { + "secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba" + }, + "_pub": { + "secret": "1c04DB8fFe76A4EeFE3E318C72d771db" + } + } + } + } +} diff --git a/ws/ws.cpp b/ws/ws.cpp index 8a8ba8c6..99b16b76 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -45,12 +45,16 @@ int main(int argc, char** argv) std::string prefix("ws.test.v0"); std::string fields; std::string dsn; + std::string redisHosts("127.0.0.1"); + std::string redisPassword; + std::string appsConfigPath("appsConfig.json"); bool headersOnly = false; bool followRedirects = false; bool verbose = false; bool save = false; bool compress = false; bool strict = false; + bool stress = false; int port = 8080; int redisPort = 6379; int statsdPort = 8125; @@ -144,6 +148,7 @@ int main(int argc, char** argv) cobraPublish->add_option("--pidfile", pidfile, "Pid file"); cobraPublish->add_option("path", path, "Path to the file to send") ->required()->check(CLI::ExistingPath); + cobraPublish->add_flag("--stress", stress, "Stress mode"); CLI::App* cobra2statsd = app.add_subcommand("cobra_to_statsd", "Cobra to statsd"); cobra2statsd->add_option("--appkey", appkey, "Appkey"); @@ -170,6 +175,17 @@ int main(int argc, char** argv) cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); + CLI::App* runApp = app.add_subcommand("snake", "Snake server"); + runApp->add_option("--port", port, "Connection url"); + runApp->add_option("--host", hostname, "Hostname"); + runApp->add_option("--pidfile", pidfile, "Pid file"); + runApp->add_option("--redis_hosts", redisHosts, "Redis hosts"); + runApp->add_option("--redis_port", redisPort, "Redis hosts"); + runApp->add_option("--redis_password", redisPassword, "Redis password"); + runApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data") + ->check(CLI::ExistingPath); + runApp->add_flag("-v", verbose, "Verbose"); + CLI11_PARSE(app, argc, argv); // pid file handling @@ -242,7 +258,7 @@ int main(int argc, char** argv) { return ix::ws_cobra_publish_main(appkey, endpoint, rolename, rolesecret, - channel, path); + channel, path, stress); } else if (app.got_subcommand("cobra_to_statsd")) { @@ -258,6 +274,13 @@ int main(int argc, char** argv) channel, dsn, verbose, strict, jobs); } + else if (app.got_subcommand("snake")) + { + return ix::ws_snake_main(port, hostname, + redisHosts, redisPort, + redisPassword, verbose, + appsConfigPath); + } return 1; } diff --git a/ws/ws.h b/ws/ws.h index 050c1d9b..216d7859 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -64,7 +64,8 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, - const std::string& path); + const std::string& path, + bool stress); int ws_cobra_to_statsd_main(const std::string& appkey, const std::string& endpoint, @@ -86,4 +87,12 @@ namespace ix bool verbose, bool strict, int jobs); + + int ws_snake_main(int port, + const std::string& hostname, + const std::string& redisHosts, + int redisPort, + const std::string& redisPassword, + bool verbose, + const std::string& appsConfigPath); } diff --git a/ws/ws_cobra_publish.cpp b/ws/ws_cobra_publish.cpp index 3e3fa211..e42bbeef 100644 --- a/ws/ws_cobra_publish.cpp +++ b/ws/ws_cobra_publish.cpp @@ -20,7 +20,8 @@ namespace ix const std::string& rolename, const std::string& rolesecret, const std::string& channel, - const std::string& path) + const std::string& path, + bool stress) { CobraMetricsPublisher cobraMetricsPublisher; cobraMetricsPublisher.enable(true); @@ -39,7 +40,25 @@ namespace ix Json::Reader reader; if (!reader.parse(str, data)) return 1; - cobraMetricsPublisher.push(std::string("foo_id"), data); + if (!stress) + { + cobraMetricsPublisher.push(channel, data); + } + else + { + // Stress mode to try to trigger server and client bugs + while (true) + { + for (int i = 0 ; i < 1000; ++i) + { + cobraMetricsPublisher.push(channel, data); + } + cobraMetricsPublisher.suspend(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + cobraMetricsPublisher.resume(); + } + } // Wait a bit for the message to get a chance to be sent // there isn't any ack on publish right now so it's the best we can do diff --git a/ws/ws_redis_publish.cpp b/ws/ws_redis_publish.cpp index 298eb657..c5c4139b 100644 --- a/ws/ws_redis_publish.cpp +++ b/ws/ws_redis_publish.cpp @@ -36,13 +36,14 @@ namespace ix std::cout << "Auth response: " << authResponse << ":" << port << std::endl; } + std::string errMsg; for (int i = 0; i < count; i++) { - //std::cerr << "Publishing message " << message - // << " to " << channel << "..." << std::endl; - if (!redisClient.publish(channel, message)) + if (!redisClient.publish(channel, message, errMsg)) { - std::cerr << "Error publishing to channel " << channel << std::endl; + std::cerr << "Error publishing to channel " << channel + << "error: " << errMsg + << std::endl; return 1; } } diff --git a/ws/ws_snake.cpp b/ws/ws_snake.cpp new file mode 100644 index 00000000..4debc650 --- /dev/null +++ b/ws/ws_snake.cpp @@ -0,0 +1,81 @@ +/* + * snake_run.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#include "IXSnakeServer.h" + +#include +#include +#include + +namespace +{ + std::vector load(const std::string& path) + { + std::vector memblock; + + std::ifstream file(path); + if (!file.is_open()) return memblock; + + file.seekg(0, file.end); + std::streamoff size = file.tellg(); + file.seekg(0, file.beg); + + memblock.resize(size); + file.read((char*)&memblock.front(), static_cast(size)); + + return memblock; + } + + std::string readAsString(const std::string& path) + { + auto vec = load(path); + return std::string(vec.begin(), vec.end()); + } +} + +namespace ix +{ + int ws_snake_main(int port, + const std::string& hostname, + const std::string& redisHosts, + int redisPort, + const std::string& redisPassword, + bool verbose, + const std::string& appsConfigPath) + { + snake::AppConfig appConfig; + appConfig.port = port; + appConfig.hostname = hostname; + appConfig.verbose = verbose; + appConfig.redisPort = redisPort; + appConfig.redisPassword = redisPassword; + + // Parse config file + auto str = readAsString(appsConfigPath); + if (str.empty()) + { + std::cout << "Cannot read content of " << appsConfigPath << std::endl; + return 1; + } + + std::cout << str << std::endl; + auto apps = nlohmann::json::parse(str); + appConfig.apps = apps["apps"]; + + std::string token; + std::stringstream tokenStream(redisHosts); + while (std::getline(tokenStream, token, ';')) + { + appConfig.redisHosts.push_back(token); + } + + // Display config on the terminal for debugging + dumpConfig(appConfig); + + snake::SnakeServer snakeServer(appConfig); + return snakeServer.run() ? 0 : 1; + } +}