diff --git a/ixsnake/ixsnake/IXSnakeProtocol.cpp b/ixsnake/ixsnake/IXSnakeProtocol.cpp index 8b85c136..99310546 100644 --- a/ixsnake/ixsnake/IXSnakeProtocol.cpp +++ b/ixsnake/ixsnake/IXSnakeProtocol.cpp @@ -8,6 +8,7 @@ #include "IXAppConfig.h" #include "IXSnakeConnectionState.h" +#include "IXStreamSql.h" #include "nlohmann/json.hpp" #include #include @@ -187,12 +188,25 @@ namespace snake } } + std::string filterStr; + if (pdu["body"].find("filter") != pdu["body"].end()) + { + std::string filterStr = pdu["body"]["filter"]; + } + + std::unique_ptr streamSql = std::make_unique(filterStr); + int id = 0; - auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) { + auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) { auto msg = nlohmann::json::parse(messageStr); msg = msg["body"]["message"]; + if (streamSql->valid() && !streamSql->match(msg)) + { + return; + } + nlohmann::json response = { {"action", "rtm/subscription/data"}, {"id", id++}, diff --git a/ixsnake/ixsnake/IXStreamSql.cpp b/ixsnake/ixsnake/IXStreamSql.cpp index 49d4d24c..6431b34c 100644 --- a/ixsnake/ixsnake/IXStreamSql.cpp +++ b/ixsnake/ixsnake/IXStreamSql.cpp @@ -2,20 +2,62 @@ * IXStreamSql.cpp * Author: Benjamin Sergeant * Copyright (c) 2020 Machine Zone, Inc. All rights reserved. + * + * Super simple hacked up version of a stream sql expression, + * that only supports non nested field evaluation */ #include "IXStreamSql.h" +#include +#include namespace snake { StreamSql::StreamSql(const std::string& sqlFilter) + : _valid(false) { - ; + std::string token; + std::stringstream tokenStream(sqlFilter); + std::vector tokens; + + // Split by ' ' + while (std::getline(tokenStream, token, ' ')) + { + tokens.push_back(token); + } + + _valid = tokens.size() == 8; + if (!_valid) return; + + _field = tokens[5]; + _operator = tokens[6]; + _value = tokens[7]; + + // remove single quotes + _value = _value.substr(1, _value.size() - 2); + + if (_operator == "LIKE") + { + _value = _value.substr(1, _value.size() - 2); + } } - bool StreamSql::match(const nlohmann::json& pdu) + bool StreamSql::valid() const { - return true; + return _valid; + } + + bool StreamSql::match(const nlohmann::json& msg) + { + if (!_valid) return false; + + if (msg.find(_field) == msg.end()) + { + return false; + } + + std::string value = msg[_field]; + return value == _value; } } // namespace snake diff --git a/ixsnake/ixsnake/IXStreamSql.h b/ixsnake/ixsnake/IXStreamSql.h index 961888fc..812a2ad8 100644 --- a/ixsnake/ixsnake/IXStreamSql.h +++ b/ixsnake/ixsnake/IXStreamSql.h @@ -14,11 +14,16 @@ namespace snake class StreamSql { public: - StreamSql(const std::string& sqlFilter); + StreamSql(const std::string& sqlFilter = std::string()); ~StreamSql() = default; - bool match(const nlohmann::json& pdu); + bool match(const nlohmann::json& msg); + bool valid() const; private: + std::string _field; + std::string _operator; + std::string _value; + bool _valid; }; } diff --git a/test/IXCobraToStdoutBotTest.cpp b/test/IXCobraToStdoutBotTest.cpp index a4c58304..4b864206 100644 --- a/test/IXCobraToStdoutBotTest.cpp +++ b/test/IXCobraToStdoutBotTest.cpp @@ -92,6 +92,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]") cobraBotConfig.enableHeartbeat = false; bool quiet = false; + cobraBotConfig.filter = + std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'"; + // We could try to capture the output ... not sure how. bool fluentd = true; diff --git a/test/IXStreamSqlTest.cpp b/test/IXStreamSqlTest.cpp index 6fde65a2..e14c126d 100644 --- a/test/IXStreamSqlTest.cpp +++ b/test/IXStreamSqlTest.cpp @@ -18,14 +18,24 @@ namespace ix { SECTION("expression A") { - snake::StreamSql streamSql("select * from subscriber_republished_v1_neo where session LIKE '%{self.session_id}%'"); + snake::StreamSql streamSql( + "select * from subscriber_republished_v1_neo where session LIKE '%123456%'"); - nlohmann::json msg = { - {"action", "auth/authenticate/error"}, - {"id", 1}, - {"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}}; + nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}}; - REQUIRE(streamSql.match(msg)); + CHECK(streamSql.match(msg)); + } + + SECTION("expression A") + { + snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where " + "session = '30091320ed8d4e50b758f8409b83bed7'"); + + nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"}, + {"id", "foo_id"}, + {"timestamp", 12}}; + + CHECK(streamSql.match(msg)); } }