implement very very simple stream sql support

This commit is contained in:
Benjamin Sergeant 2020-07-10 16:07:51 -07:00
parent 26a1e63626
commit ca829a3a98
5 changed files with 86 additions and 12 deletions

View File

@ -8,6 +8,7 @@
#include "IXAppConfig.h"
#include "IXSnakeConnectionState.h"
#include "IXStreamSql.h"
#include "nlohmann/json.hpp"
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
@ -187,12 +188,25 @@ namespace snake
}
}
std::string filterStr;
if (pdu["body"].find("filter") != pdu["body"].end())
{
std::string filterStr = pdu["body"]["filter"];
}
std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr);
int id = 0;
auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) {
auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) {
auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
if (streamSql->valid() && !streamSql->match(msg))
{
return;
}
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},

View File

@ -2,20 +2,62 @@
* IXStreamSql.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*
* Super simple hacked up version of a stream sql expression,
* that only supports non nested field evaluation
*/
#include "IXStreamSql.h"
#include <sstream>
#include <iostream>
namespace snake
{
StreamSql::StreamSql(const std::string& sqlFilter)
: _valid(false)
{
;
std::string token;
std::stringstream tokenStream(sqlFilter);
std::vector<std::string> tokens;
// Split by ' '
while (std::getline(tokenStream, token, ' '))
{
tokens.push_back(token);
}
bool StreamSql::match(const nlohmann::json& pdu)
_valid = tokens.size() == 8;
if (!_valid) return;
_field = tokens[5];
_operator = tokens[6];
_value = tokens[7];
// remove single quotes
_value = _value.substr(1, _value.size() - 2);
if (_operator == "LIKE")
{
return true;
_value = _value.substr(1, _value.size() - 2);
}
}
bool StreamSql::valid() const
{
return _valid;
}
bool StreamSql::match(const nlohmann::json& msg)
{
if (!_valid) return false;
if (msg.find(_field) == msg.end())
{
return false;
}
std::string value = msg[_field];
return value == _value;
}
} // namespace snake

View File

@ -14,11 +14,16 @@ namespace snake
class StreamSql
{
public:
StreamSql(const std::string& sqlFilter);
StreamSql(const std::string& sqlFilter = std::string());
~StreamSql() = default;
bool match(const nlohmann::json& pdu);
bool match(const nlohmann::json& msg);
bool valid() const;
private:
std::string _field;
std::string _operator;
std::string _value;
bool _valid;
};
}

View File

@ -92,6 +92,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
cobraBotConfig.enableHeartbeat = false;
bool quiet = false;
cobraBotConfig.filter =
std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'";
// We could try to capture the output ... not sure how.
bool fluentd = true;

View File

@ -18,14 +18,24 @@ namespace ix
{
SECTION("expression A")
{
snake::StreamSql streamSql("select * from subscriber_republished_v1_neo where session LIKE '%{self.session_id}%'");
snake::StreamSql streamSql(
"select * from subscriber_republished_v1_neo where session LIKE '%123456%'");
nlohmann::json msg = {
{"action", "auth/authenticate/error"},
{"id", 1},
{"body", {{"error", "authentication_failed"}, {"reason", "invalid secret"}}}};
nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}};
REQUIRE(streamSql.match(msg));
CHECK(streamSql.match(msg));
}
SECTION("expression A")
{
snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where "
"session = '30091320ed8d4e50b758f8409b83bed7'");
nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"},
{"id", "foo_id"},
{"timestamp", 12}};
CHECK(streamSql.match(msg));
}
}