diff --git a/ixsnake/CMakeLists.txt b/ixsnake/CMakeLists.txt index bf11ed8a..bd240d00 100644 --- a/ixsnake/CMakeLists.txt +++ b/ixsnake/CMakeLists.txt @@ -7,12 +7,14 @@ set (IXSNAKE_SOURCES ixsnake/IXSnakeServer.cpp ixsnake/IXSnakeProtocol.cpp ixsnake/IXAppConfig.cpp + ixsnake/IXStreamSql.cpp ) set (IXSNAKE_HEADERS ixsnake/IXSnakeServer.h ixsnake/IXSnakeProtocol.h ixsnake/IXAppConfig.h + ixsnake/IXStreamSql.h ) add_library(ixsnake STATIC diff --git a/ixsnake/ixsnake/IXAppConfig.h b/ixsnake/ixsnake/IXAppConfig.h index bad529da..bdc929df 100644 --- a/ixsnake/ixsnake/IXAppConfig.h +++ b/ixsnake/ixsnake/IXAppConfig.h @@ -33,6 +33,9 @@ namespace snake // Misc bool verbose; bool disablePong; + + // If non empty, every published message gets republished to a given channel + std::string republishChannel; }; bool isAppKeyValid(const AppConfig& appConfig, std::string appkey); diff --git a/ixsnake/ixsnake/IXSnakeConnectionState.h b/ixsnake/ixsnake/IXSnakeConnectionState.h index db6a0f35..4e995342 100644 --- a/ixsnake/ixsnake/IXSnakeConnectionState.h +++ b/ixsnake/ixsnake/IXSnakeConnectionState.h @@ -30,6 +30,7 @@ namespace snake { return _appkey; } + void setAppkey(const std::string& appkey) { _appkey = appkey; @@ -39,6 +40,7 @@ namespace snake { return _role; } + void setRole(const std::string& role) { _role = role; diff --git a/ixsnake/ixsnake/IXSnakeProtocol.cpp b/ixsnake/ixsnake/IXSnakeProtocol.cpp index d1941261..8b85c136 100644 --- a/ixsnake/ixsnake/IXSnakeProtocol.cpp +++ b/ixsnake/ixsnake/IXSnakeProtocol.cpp @@ -91,6 +91,7 @@ namespace snake void handlePublish(std::shared_ptr state, std::shared_ptr ws, + const AppConfig& appConfig, const nlohmann::json& pdu) { std::vector channels; @@ -115,6 +116,12 @@ namespace snake return; } + // add an extra channel if the config has one specified + if (!appConfig.republishChannel.empty()) + { + channels.push_back(appConfig.republishChannel); + } + for (auto&& channel : channels) { std::stringstream ss; @@ -279,7 +286,7 @@ namespace snake } else if (action == "rtm/publish") { - handlePublish(state, ws, pdu); + handlePublish(state, ws, appConfig, pdu); } else if (action == "rtm/subscribe") { diff --git a/ixsnake/ixsnake/IXStreamSql.cpp b/ixsnake/ixsnake/IXStreamSql.cpp new file mode 100644 index 00000000..49d4d24c --- /dev/null +++ b/ixsnake/ixsnake/IXStreamSql.cpp @@ -0,0 +1,21 @@ +/* + * IXStreamSql.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ + +#include "IXStreamSql.h" + +namespace snake +{ + StreamSql::StreamSql(const std::string& sqlFilter) + { + ; + } + + bool StreamSql::match(const nlohmann::json& pdu) + { + return true; + } + +} // namespace snake diff --git a/ixsnake/ixsnake/IXStreamSql.h b/ixsnake/ixsnake/IXStreamSql.h new file mode 100644 index 00000000..961888fc --- /dev/null +++ b/ixsnake/ixsnake/IXStreamSql.h @@ -0,0 +1,24 @@ +/* + * IXStreamSql.h + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include +#include "nlohmann/json.hpp" + +namespace snake +{ + class StreamSql + { + public: + StreamSql(const std::string& sqlFilter); + ~StreamSql() = default; + + bool match(const nlohmann::json& pdu); + + private: + }; +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 1e874073..bc223945 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -57,6 +57,7 @@ set (SOURCES IXWebSocketChatTest.cpp IXWebSocketBroadcastTest.cpp IXWebSocketPerMessageDeflateCompressorTest.cpp + IXStreamSqlTest.cpp ) # Some unittest don't work on windows yet diff --git a/test/IXStreamSqlTest.cpp b/test/IXStreamSqlTest.cpp new file mode 100644 index 00000000..6fde65a2 --- /dev/null +++ b/test/IXStreamSqlTest.cpp @@ -0,0 +1,32 @@ +/* + * IXStreamSqlTest.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2020 Machine Zone. All rights reserved. + */ + +#include "IXTest.h" +#include "catch.hpp" +#include +#include +#include + +using namespace ix; + +namespace ix +{ + TEST_CASE("stream_sql", "[streamsql]") + { + SECTION("expression A") + { + snake::StreamSql streamSql("select * from subscriber_republished_v1_neo where session LIKE '%{self.session_id}%'"); + + nlohmann::json msg = { + {"action", "auth/authenticate/error"}, + {"id", 1}, + {"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}}; + + REQUIRE(streamSql.match(msg)); + } + } + +} // namespace ix diff --git a/ws/ws.cpp b/ws/ws.cpp index d94d1fe7..5d3dbd4b 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -122,6 +122,7 @@ int main(int argc, char** argv) std::string key; std::string logfile; std::string scriptPath; + std::string republishChannel; ix::SocketTLSOptions tlsOptions; ix::CobraConfig cobraConfig; ix::CobraBotConfig cobraBotConfig; @@ -391,6 +392,7 @@ int main(int argc, char** argv) snakeApp->add_option("--redis_password", redisPassword, "Redis password"); snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data") ->check(CLI::ExistingPath); + snakeApp->add_option("--republish_channel", republishChannel, "Republish channel"); snakeApp->add_flag("-v", verbose, "Verbose"); snakeApp->add_flag("-d", disablePong, "Disable Pongs"); addTLSOptions(snakeApp); @@ -637,7 +639,8 @@ int main(int argc, char** argv) verbose, appsConfigPath, tlsOptions, - disablePong); + disablePong, + republishChannel); } else if (app.got_subcommand("httpd")) { diff --git a/ws/ws.h b/ws/ws.h index 6109660b..d5837b18 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -103,7 +103,8 @@ namespace ix bool verbose, const std::string& appsConfigPath, const ix::SocketTLSOptions& tlsOptions, - bool disablePong); + bool disablePong, + const std::string& republishChannel); int ws_httpd_main(int port, const std::string& hostname, diff --git a/ws/ws_snake.cpp b/ws/ws_snake.cpp index 6b3e31bf..2f761453 100644 --- a/ws/ws_snake.cpp +++ b/ws/ws_snake.cpp @@ -45,7 +45,8 @@ namespace ix bool verbose, const std::string& appsConfigPath, const SocketTLSOptions& socketTLSOptions, - bool disablePong) + bool disablePong, + const std::string& republishChannel) { snake::AppConfig appConfig; appConfig.port = port; @@ -55,6 +56,7 @@ namespace ix appConfig.redisPassword = redisPassword; appConfig.socketTLSOptions = socketTLSOptions; appConfig.disablePong = disablePong; + appConfig.republishChannel = republishChannel; // Parse config file auto str = readAsString(appsConfigPath);