snake: stream sql mock + add republished channel option

This commit is contained in:
Benjamin Sergeant 2020-07-10 15:06:55 -07:00
parent c98959b895
commit 26a1e63626
11 changed files with 102 additions and 4 deletions

View File

@ -7,12 +7,14 @@ set (IXSNAKE_SOURCES
ixsnake/IXSnakeServer.cpp ixsnake/IXSnakeServer.cpp
ixsnake/IXSnakeProtocol.cpp ixsnake/IXSnakeProtocol.cpp
ixsnake/IXAppConfig.cpp ixsnake/IXAppConfig.cpp
ixsnake/IXStreamSql.cpp
) )
set (IXSNAKE_HEADERS set (IXSNAKE_HEADERS
ixsnake/IXSnakeServer.h ixsnake/IXSnakeServer.h
ixsnake/IXSnakeProtocol.h ixsnake/IXSnakeProtocol.h
ixsnake/IXAppConfig.h ixsnake/IXAppConfig.h
ixsnake/IXStreamSql.h
) )
add_library(ixsnake STATIC add_library(ixsnake STATIC

View File

@ -33,6 +33,9 @@ namespace snake
// Misc // Misc
bool verbose; bool verbose;
bool disablePong; bool disablePong;
// If non empty, every published message gets republished to a given channel
std::string republishChannel;
}; };
bool isAppKeyValid(const AppConfig& appConfig, std::string appkey); bool isAppKeyValid(const AppConfig& appConfig, std::string appkey);

View File

@ -30,6 +30,7 @@ namespace snake
{ {
return _appkey; return _appkey;
} }
void setAppkey(const std::string& appkey) void setAppkey(const std::string& appkey)
{ {
_appkey = appkey; _appkey = appkey;
@ -39,6 +40,7 @@ namespace snake
{ {
return _role; return _role;
} }
void setRole(const std::string& role) void setRole(const std::string& role)
{ {
_role = role; _role = role;

View File

@ -91,6 +91,7 @@ namespace snake
void handlePublish(std::shared_ptr<SnakeConnectionState> state, void handlePublish(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu) const nlohmann::json& pdu)
{ {
std::vector<std::string> channels; std::vector<std::string> channels;
@ -115,6 +116,12 @@ namespace snake
return; return;
} }
// add an extra channel if the config has one specified
if (!appConfig.republishChannel.empty())
{
channels.push_back(appConfig.republishChannel);
}
for (auto&& channel : channels) for (auto&& channel : channels)
{ {
std::stringstream ss; std::stringstream ss;
@ -279,7 +286,7 @@ namespace snake
} }
else if (action == "rtm/publish") else if (action == "rtm/publish")
{ {
handlePublish(state, ws, pdu); handlePublish(state, ws, appConfig, pdu);
} }
else if (action == "rtm/subscribe") else if (action == "rtm/subscribe")
{ {

View File

@ -0,0 +1,21 @@
/*
* IXStreamSql.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXStreamSql.h"
namespace snake
{
StreamSql::StreamSql(const std::string& sqlFilter)
{
;
}
bool StreamSql::match(const nlohmann::json& pdu)
{
return true;
}
} // namespace snake

View File

@ -0,0 +1,24 @@
/*
* IXStreamSql.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include "nlohmann/json.hpp"
namespace snake
{
class StreamSql
{
public:
StreamSql(const std::string& sqlFilter);
~StreamSql() = default;
bool match(const nlohmann::json& pdu);
private:
};
}

View File

@ -57,6 +57,7 @@ set (SOURCES
IXWebSocketChatTest.cpp IXWebSocketChatTest.cpp
IXWebSocketBroadcastTest.cpp IXWebSocketBroadcastTest.cpp
IXWebSocketPerMessageDeflateCompressorTest.cpp IXWebSocketPerMessageDeflateCompressorTest.cpp
IXStreamSqlTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet

32
test/IXStreamSqlTest.cpp Normal file
View File

@ -0,0 +1,32 @@
/*
* IXStreamSqlTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include <ixsnake/IXStreamSql.h>
#include <string.h>
using namespace ix;
namespace ix
{
TEST_CASE("stream_sql", "[streamsql]")
{
SECTION("expression A")
{
snake::StreamSql streamSql("select * from subscriber_republished_v1_neo where session LIKE '%{self.session_id}%'");
nlohmann::json msg = {
{"action", "auth/authenticate/error"},
{"id", 1},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}};
REQUIRE(streamSql.match(msg));
}
}
} // namespace ix

View File

@ -122,6 +122,7 @@ int main(int argc, char** argv)
std::string key; std::string key;
std::string logfile; std::string logfile;
std::string scriptPath; std::string scriptPath;
std::string republishChannel;
ix::SocketTLSOptions tlsOptions; ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig; ix::CobraConfig cobraConfig;
ix::CobraBotConfig cobraBotConfig; ix::CobraBotConfig cobraBotConfig;
@ -391,6 +392,7 @@ int main(int argc, char** argv)
snakeApp->add_option("--redis_password", redisPassword, "Redis password"); snakeApp->add_option("--redis_password", redisPassword, "Redis password");
snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data") snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
->check(CLI::ExistingPath); ->check(CLI::ExistingPath);
snakeApp->add_option("--republish_channel", republishChannel, "Republish channel");
snakeApp->add_flag("-v", verbose, "Verbose"); snakeApp->add_flag("-v", verbose, "Verbose");
snakeApp->add_flag("-d", disablePong, "Disable Pongs"); snakeApp->add_flag("-d", disablePong, "Disable Pongs");
addTLSOptions(snakeApp); addTLSOptions(snakeApp);
@ -637,7 +639,8 @@ int main(int argc, char** argv)
verbose, verbose,
appsConfigPath, appsConfigPath,
tlsOptions, tlsOptions,
disablePong); disablePong,
republishChannel);
} }
else if (app.got_subcommand("httpd")) else if (app.got_subcommand("httpd"))
{ {

View File

@ -103,7 +103,8 @@ namespace ix
bool verbose, bool verbose,
const std::string& appsConfigPath, const std::string& appsConfigPath,
const ix::SocketTLSOptions& tlsOptions, const ix::SocketTLSOptions& tlsOptions,
bool disablePong); bool disablePong,
const std::string& republishChannel);
int ws_httpd_main(int port, int ws_httpd_main(int port,
const std::string& hostname, const std::string& hostname,

View File

@ -45,7 +45,8 @@ namespace ix
bool verbose, bool verbose,
const std::string& appsConfigPath, const std::string& appsConfigPath,
const SocketTLSOptions& socketTLSOptions, const SocketTLSOptions& socketTLSOptions,
bool disablePong) bool disablePong,
const std::string& republishChannel)
{ {
snake::AppConfig appConfig; snake::AppConfig appConfig;
appConfig.port = port; appConfig.port = port;
@ -55,6 +56,7 @@ namespace ix
appConfig.redisPassword = redisPassword; appConfig.redisPassword = redisPassword;
appConfig.socketTLSOptions = socketTLSOptions; appConfig.socketTLSOptions = socketTLSOptions;
appConfig.disablePong = disablePong; appConfig.disablePong = disablePong;
appConfig.republishChannel = republishChannel;
// Parse config file // Parse config file
auto str = readAsString(appsConfigPath); auto str = readAsString(appsConfigPath);