new cobra to python bot (still sending to statsd)

values + string building can be done in python (we are embedding it)
This commit is contained in:
Benjamin Sergeant 2020-06-24 23:21:19 -07:00
parent c45b197c85
commit 615f1778c3
10 changed files with 431 additions and 6 deletions

View File

@ -2,7 +2,7 @@ FROM alpine:3.12 as build
RUN apk add --no-cache \
gcc g++ musl-dev linux-headers \
cmake mbedtls-dev make zlib-dev ninja
cmake mbedtls-dev make zlib-dev python3-dev ninja
RUN addgroup -S app && \
adduser -S -G app app && \
@ -20,7 +20,7 @@ RUN make ws_mbedtls_install && \
FROM alpine:3.12 as runtime
RUN apk add --no-cache libstdc++ mbedtls ca-certificates && \
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 && \
addgroup -S app && \
adduser -S -G app app

View File

@ -1,6 +1,11 @@
# Changelog
All changes to this project will be documented in this file.
=======
## [9.8.2] - 2020-06-24
(cobra bots) new cobra metrics bot to send data to statsd using Python for processing the message
## [9.8.1] - 2020-06-19
(cobra metrics to statsd bot) fps slow frame info : do not include os name

View File

@ -10,6 +10,7 @@ set (IXBOTS_SOURCES
ixbots/IXCobraToStdoutBot.cpp
ixbots/IXCobraMetricsToStatsdBot.cpp
ixbots/IXCobraMetricsToRedisBot.cpp
ixbots/IXCobraToPythonBot.cpp
ixbots/IXStatsdClient.cpp
)
@ -21,6 +22,7 @@ set (IXBOTS_HEADERS
ixbots/IXCobraToStdoutBot.h
ixbots/IXCobraMetricsToStatsdBot.h
ixbots/IXCobraMetricsToRedisBot.h
ixbots/IXCobraToPythonBot.h
ixbots/IXStatsdClient.h
)
@ -34,6 +36,8 @@ if (NOT JSONCPP_FOUND)
set(JSONCPP_INCLUDE_DIRS ../third_party/jsoncpp)
endif()
find_package(Python COMPONENTS Development)
set(IXBOTS_INCLUDE_DIRS
.
..
@ -43,6 +47,7 @@ set(IXBOTS_INCLUDE_DIRS
../ixredis
../ixsentry
${JSONCPP_INCLUDE_DIRS}
${SPDLOG_INCLUDE_DIRS})
${SPDLOG_INCLUDE_DIRS}
${Python_INCLUDE_DIRS})
target_include_directories( ixbots PUBLIC ${IXBOTS_INCLUDE_DIRS} )

View File

@ -0,0 +1,343 @@
/*
* IXCobraToPythonBot.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXCobraToPythonBot.h"
#include "IXCobraBot.h"
#include "IXStatsdClient.h"
#include <chrono>
#include <ixcobra/IXCobraConnection.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <sstream>
#include <vector>
#include <algorithm>
#include <map>
#include <cctype>
//
// I cannot get Windows to easily build on CI (github action) so support
// is disabled for now. It should be a simple fix
// (linking error about missing debug build)
//
#ifndef _WIN32
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#endif
#ifndef _WIN32
namespace
{
//
// This function is unused at this point. It produce a correct output,
// but triggers memory leaks when called repeateadly, as the reference counting
// Python functions are not used properly
//
PyObject* jsonToPythonObject(const Json::Value& val)
{
switch(val.type())
{
case Json::nullValue:
{
return Py_None;
}
case Json::intValue:
{
return PyLong_FromLong(val.asInt64());
}
case Json::uintValue:
{
return PyLong_FromLong(val.asUInt64());
}
case Json::realValue:
{
return PyFloat_FromDouble(val.asDouble());
}
case Json::stringValue:
{
return PyUnicode_FromString(val.asCString());
}
case Json::booleanValue:
{
return val.asBool() ? Py_True : Py_False;
}
case Json::arrayValue:
{
PyObject* list = PyList_New(val.size());
Py_ssize_t i = 0;
for (auto&& it = val.begin(); it != val.end(); ++it)
{
PyList_SetItem(list, i++, jsonToPythonObject(*it));
}
return list;
}
case Json::objectValue:
{
PyObject* dict = PyDict_New();
for (auto&& it = val.begin(); it != val.end(); ++it)
{
PyObject* key = jsonToPythonObject(it.key());
PyObject* value = jsonToPythonObject(*it);
PyDict_SetItem(dict, key, value);
}
return dict;
}
}
}
}
#endif
namespace ix
{
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
StatsdClient& statsdClient,
const std::string& scriptPath)
{
#ifdef _WIN32
CoreLogger::error("Command is disabled on Windows.");
return -1;
#else
CobraBot bot;
Py_InitializeEx(0); // 0 arg so that we do not install signal handlers
// which prevent us from using Ctrl-C
size_t lastIndex = scriptPath.find_last_of(".");
std::string modulePath = scriptPath.substr(0, lastIndex);
PyObject* pyModuleName = PyUnicode_DecodeFSDefault(modulePath.c_str());
if (pyModuleName == nullptr)
{
CoreLogger::error("Python error: Cannot decode file system path");
PyErr_Print();
return false;
}
// Import module
PyObject* pyModule = PyImport_Import(pyModuleName);
Py_DECREF(pyModuleName);
if (pyModule == nullptr)
{
CoreLogger::error("Python error: Cannot import module.");
CoreLogger::error("Module name cannot countain dash characters.");
CoreLogger::error("Is PYTHONPATH set correctly ?");
PyErr_Print();
return false;
}
// module main funtion name is named 'run'
const std::string entryPoint("run");
PyObject* pyFunc = PyObject_GetAttrString(pyModule, entryPoint.c_str());
if (!pyFunc)
{
CoreLogger::error("run symbol is missing from module.");
PyErr_Print();
return false;
}
if (!PyCallable_Check(pyFunc))
{
CoreLogger::error("run symbol is not a function.");
PyErr_Print();
return false;
}
bot.setOnBotMessageCallback(
[&statsdClient, pyFunc]
(const Json::Value& msg,
const std::string& /*position*/,
std::atomic<bool>& /*throttled*/,
std::atomic<bool>& fatalCobraError,
std::atomic<uint64_t>& sentCount) -> void {
if (msg["device"].isNull())
{
CoreLogger::info("no device entry, skipping event");
return;
}
if (msg["id"].isNull())
{
CoreLogger::info("no id entry, skipping event");
return;
}
//
// Invoke python script here. First build function parameters, a tuple
//
const int kVersion = 1; // We can bump this and let the interface evolve
PyObject *pyArgs = PyTuple_New(2);
PyTuple_SetItem(pyArgs, 0, PyLong_FromLong(kVersion)); // First argument
//
// It would be better to create a Python object (a dictionary)
// from the json msg, but it is simpler to serialize it to a string
// and decode it on the Python side of the fence
//
PyObject* pySerializedJson = PyUnicode_FromString(msg.toStyledString().c_str());
PyTuple_SetItem(pyArgs, 1, pySerializedJson); // Second argument
// Invoke the python routine
PyObject* pyList = PyObject_CallObject(pyFunc, pyArgs);
// Error calling the function
if (pyList == nullptr)
{
fatalCobraError = true;
CoreLogger::error("run() function call failed. Input msg: ");
auto serializedMsg = msg.toStyledString();
CoreLogger::error(serializedMsg);
PyErr_Print();
CoreLogger::error("================");
return;
}
// Invalid return type
if (!PyList_Check(pyList))
{
fatalCobraError = true;
CoreLogger::error("run() return type should be a list");
return;
}
// The result is a list of dict containing sufficient info
// to send messages to statsd
auto listSize = PyList_Size(pyList);
for (Py_ssize_t i = 0 ; i < listSize; ++i)
{
PyObject* dict = PyList_GetItem(pyList, i);
// Make sure this is a dict
if (!PyDict_Check(dict))
{
fatalCobraError = true;
CoreLogger::error("list element is not a dict");
continue;
}
//
// Retrieve object kind
//
PyObject* pyKind = PyDict_GetItemString(dict, "kind");
if (!PyUnicode_Check(pyKind))
{
fatalCobraError = true;
CoreLogger::error("kind entry is not a string");
continue;
}
std::string kind(PyUnicode_AsUTF8(pyKind));
bool counter = false;
bool gauge = false;
bool timing = false;
if (kind == "counter")
{
counter = true;
}
else if (kind == "gauge")
{
gauge = true;
}
else if (kind == "timing")
{
timing = true;
}
else
{
fatalCobraError = true;
CoreLogger::error(std::string("invalid kind entry: ") + kind +
". Supported ones are counter, gauge, timing");
continue;
}
//
// Retrieve object key
//
PyObject* pyKey = PyDict_GetItemString(dict, "key");
if (!PyUnicode_Check(pyKey))
{
fatalCobraError = true;
CoreLogger::error("key entry is not a string");
continue;
}
std::string key(PyUnicode_AsUTF8(pyKey));
//
// Retrieve object value and send data to statsd
//
PyObject* pyValue = PyDict_GetItemString(dict, "value");
// Send data to statsd
if (PyFloat_Check(pyValue))
{
double value = PyFloat_AsDouble(pyValue);
if (counter)
{
statsdClient.count(key, value);
}
else if (gauge)
{
statsdClient.gauge(key, value);
}
else if (timing)
{
statsdClient.timing(key, value);
}
}
else if (PyLong_Check(pyValue))
{
long value = PyLong_AsLong(pyValue);
if (counter)
{
statsdClient.count(key, value);
}
else if (gauge)
{
statsdClient.gauge(key, value);
}
else if (timing)
{
statsdClient.timing(key, value);
}
}
else
{
fatalCobraError = true;
CoreLogger::error("value entry is neither an int or a float");
continue;
}
sentCount++; // should we update this for each statsd object sent ?
}
Py_DECREF(pyArgs);
Py_DECREF(pyList);
});
bool status = bot.run(config);
// Cleanup - we should do something similar in all exit case ...
Py_DECREF(pyFunc);
Py_DECREF(pyModule);
Py_FinalizeEx();
return status;
}
#endif
}

View File

@ -0,0 +1,19 @@
/*
* IXCobraMetricsToStatsdBot.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <cstdint>
#include <ixbots/IXStatsdClient.h>
#include "IXCobraBotConfig.h"
#include <stddef.h>
#include <string>
namespace ix
{
int64_t cobra_to_python_bot(const ix::CobraBotConfig& config,
StatsdClient& statsdClient,
const std::string& scriptPath);
} // namespace ix

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "9.8.1"
#define IX_WEBSOCKET_VERSION "9.8.2"

View File

@ -29,10 +29,10 @@ ws_mbedtls_install:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; ninja install)
ws:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; ninja install)
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. && ninja install)
ws_install:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j 4 install)
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=MinSizeRel -DUSE_TLS=1 -DUSE_WS=1 .. && make -j 4 install)
ws_openssl:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_OPEN_SSL=1 .. ; make -j 4)

View File

@ -91,6 +91,15 @@ if (JSONCPP_FOUND)
target_link_libraries(ixwebsocket_unittest ${JSONCPP_LIBRARIES})
endif()
find_package(Python COMPONENTS Development)
if (NOT Python_FOUND)
message(FATAL_ERROR "Python3 not found")
endif()
message("Python_FOUND:${Python_FOUND}")
message("Python_VERSION:${Python_VERSION}")
message("Python_Development_FOUND:${Python_Development_FOUND}")
message("Python_LIBRARIES:${Python_LIBRARIES}")
# library with the most dependencies come first
target_link_libraries(ixwebsocket_unittest ixbots)
target_link_libraries(ixwebsocket_unittest ixsnake)
@ -102,5 +111,8 @@ target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest spdlog)
if (NOT WIN32)
target_link_libraries(ixwebsocket_unittest ${Python_LIBRARIES})
endif()
install(TARGETS ixwebsocket_unittest DESTINATION bin)

View File

@ -32,6 +32,15 @@ if (NOT JSONCPP_FOUND)
set(JSONCPP_SOURCES ../third_party/jsoncpp/jsoncpp.cpp)
endif()
find_package(Python COMPONENTS Development)
if (NOT Python_FOUND)
message(FATAL_ERROR "Python3 not found")
endif()
message("Python_FOUND:${Python_FOUND}")
message("Python_VERSION:${Python_VERSION}")
message("Python_Development_FOUND:${Python_Development_FOUND}")
message("Python_LIBRARIES:${Python_LIBRARIES}")
add_executable(ws
../third_party/msgpack11/msgpack11.cpp
../third_party/cpp-linenoise/linenoise.cpp
@ -71,6 +80,9 @@ target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcore)
target_link_libraries(ws spdlog)
if (NOT WIN32)
target_link_libraries(ws ${Python_LIBRARIES})
endif()
if (JSONCPP_FOUND)
target_include_directories(ws PUBLIC ${JSONCPP_INCLUDE_DIRS})

View File

@ -11,6 +11,7 @@
#include <cli11/CLI11.hpp>
#include <fstream>
#include <ixbots/IXCobraToPythonBot.h>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixbots/IXCobraToStdoutBot.h>
@ -121,6 +122,7 @@ int main(int argc, char** argv)
std::string project;
std::string key;
std::string logfile;
std::string scriptPath;
ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig;
ix::CobraBotConfig cobraBotConfig;
@ -352,6 +354,16 @@ int main(int argc, char** argv)
addTLSOptions(cobra2statsd);
addCobraBotConfig(cobra2statsd);
CLI::App* cobra2python = app.add_subcommand("cobra_to_python", "Cobra to python");
cobra2python->fallthrough();
cobra2python->add_option("--host", hostname, "Statsd host");
cobra2python->add_option("--port", statsdPort, "Statsd port");
cobra2python->add_option("--prefix", prefix, "Statsd prefix");
cobra2python->add_option("--script", scriptPath, "Python script path")->check(CLI::ExistingPath);
cobra2python->add_option("--pidfile", pidfile, "Pid file");
addTLSOptions(cobra2python);
addCobraBotConfig(cobra2python);
CLI::App* cobraMetrics2statsd = app.add_subcommand("cobra_metrics_to_statsd", "Cobra metrics to statsd");
cobraMetrics2statsd->fallthrough();
cobraMetrics2statsd->add_option("--host", hostname, "Statsd host");
@ -590,6 +602,23 @@ int main(int argc, char** argv)
}
}
}
else if (app.got_subcommand("cobra_to_python"))
{
ix::StatsdClient statsdClient(hostname, statsdPort, prefix, verbose);
std::string errMsg;
bool initialized = statsdClient.init(errMsg);
if (!initialized)
{
spdlog::error(errMsg);
ret = 1;
}
else
{
ret = (int) ix::cobra_to_python_bot(
cobraBotConfig, statsdClient, scriptPath);
}
}
else if (app.got_subcommand("cobra_metrics_to_statsd"))
{
ix::StatsdClient statsdClient(hostname, statsdPort, prefix, verbose);