add cobra metrics publisher + server unittest
This commit is contained in:
parent
c0f098a578
commit
37a7b362d8
@ -3,6 +3,8 @@ All notable changes to this project will be documented in this file.
|
|||||||
|
|
||||||
## [6.0.1] - 2019-09-05
|
## [6.0.1] - 2019-09-05
|
||||||
|
|
||||||
|
- add cobra metrics publisher + server unittest
|
||||||
|
- add cobra client + server unittest
|
||||||
- ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled
|
- ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled
|
||||||
- IXCobraConnection / pdu handlers can crash if they receive json data which is not an object
|
- IXCobraConnection / pdu handlers can crash if they receive json data which is not an object
|
||||||
|
|
||||||
|
@ -63,6 +63,7 @@ set (SOURCES
|
|||||||
IXHttpServerTest.cpp
|
IXHttpServerTest.cpp
|
||||||
IXUnityBuildsTest.cpp
|
IXUnityBuildsTest.cpp
|
||||||
IXCobraChatTest.cpp
|
IXCobraChatTest.cpp
|
||||||
|
IXCobraMetricsPublisherTest.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
# Some unittest don't work on windows yet
|
# Some unittest don't work on windows yet
|
||||||
|
@ -172,7 +172,7 @@ namespace
|
|||||||
//
|
//
|
||||||
void SatoriChat::run()
|
void SatoriChat::run()
|
||||||
{
|
{
|
||||||
snake::AppConfig appConfig = makeSnakeServerConfig();
|
snake::AppConfig appConfig = makeSnakeServerConfig(8008);
|
||||||
|
|
||||||
// Display config on the terminal for debugging
|
// Display config on the terminal for debugging
|
||||||
dumpConfig(appConfig);
|
dumpConfig(appConfig);
|
||||||
@ -182,16 +182,10 @@ namespace
|
|||||||
|
|
||||||
// "chat" conf
|
// "chat" conf
|
||||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
||||||
std::string endpoint("ws://localhost:8008");
|
|
||||||
std::string channel = _session;
|
std::string channel = _session;
|
||||||
std::string role = "_sub";
|
std::string role = "_sub";
|
||||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
||||||
|
std::string endpoint("ws://localhost:8008");
|
||||||
// appkey = "1121b8DfbB33E56dE1F82fC2A08cD1D7";
|
|
||||||
// endpoint = "ws://api-internal-cobra.addsrv.com";
|
|
||||||
// endpoint = "ws://localhost:8765";
|
|
||||||
// role = "unittest_subscriber";
|
|
||||||
// secret = "98B69bcdfc145C5fB7C2f4A5aFfe4fd3";
|
|
||||||
|
|
||||||
_conn.configure(appkey, endpoint, role, secret,
|
_conn.configure(appkey, endpoint, role, secret,
|
||||||
ix::WebSocketPerMessageDeflateOptions(true));
|
ix::WebSocketPerMessageDeflateOptions(true));
|
||||||
@ -231,6 +225,10 @@ namespace
|
|||||||
{
|
{
|
||||||
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
|
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
|
||||||
}
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Published)
|
||||||
|
{
|
||||||
|
Logger() << "Subscriber: published message acked: " << msgId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -279,7 +277,7 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
|
|||||||
{
|
{
|
||||||
SECTION("Exchange and count sent/received messages.")
|
SECTION("Exchange and count sent/received messages.")
|
||||||
{
|
{
|
||||||
snake::AppConfig appConfig = makeSnakeServerConfig();
|
snake::AppConfig appConfig = makeSnakeServerConfig(8008);
|
||||||
snake::SnakeServer snakeServer(appConfig);
|
snake::SnakeServer snakeServer(appConfig);
|
||||||
snakeServer.run();
|
snakeServer.run();
|
||||||
|
|
||||||
|
233
test/IXCobraMetricsPublisherTest.cpp
Normal file
233
test/IXCobraMetricsPublisherTest.cpp
Normal file
@ -0,0 +1,233 @@
|
|||||||
|
/*
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2018 Machine Zone. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <set>
|
||||||
|
#include <ixcrypto/IXUuid.h>
|
||||||
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
|
#include "IXTest.h"
|
||||||
|
#include "IXSnakeServer.h"
|
||||||
|
|
||||||
|
#include "catch.hpp"
|
||||||
|
|
||||||
|
using namespace ix;
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// This project / appkey is configure on cobra to not do any batching.
|
||||||
|
// This way we can start a subscriber and receive all messages as they come in.
|
||||||
|
//
|
||||||
|
std::string ENDPOINT("ws://localhost:8009");
|
||||||
|
std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f");
|
||||||
|
std::string CHANNEL("unittest_channel");
|
||||||
|
std::string PUBLISHER_ROLE("_pub");
|
||||||
|
std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db");
|
||||||
|
std::string SUBSCRIBER_ROLE("_sub");
|
||||||
|
std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba");
|
||||||
|
|
||||||
|
std::atomic<bool> gStop;
|
||||||
|
std::atomic<bool> gSubscriberConnectedAndSubscribed;
|
||||||
|
std::atomic<int> gUniqueMessageIdsCount;
|
||||||
|
std::atomic<int> gMessageCount;
|
||||||
|
|
||||||
|
std::set<std::string> gIds;
|
||||||
|
std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex.
|
||||||
|
|
||||||
|
//
|
||||||
|
// Background thread subscribe to the channel and validates what was sent
|
||||||
|
//
|
||||||
|
void startSubscriber()
|
||||||
|
{
|
||||||
|
gSubscriberConnectedAndSubscribed = false;
|
||||||
|
gUniqueMessageIdsCount = 0;
|
||||||
|
gMessageCount = 0;
|
||||||
|
|
||||||
|
ix::CobraConnection conn;
|
||||||
|
conn.configure(APPKEY, ENDPOINT, SUBSCRIBER_ROLE, SUBSCRIBER_SECRET,
|
||||||
|
ix::WebSocketPerMessageDeflateOptions(true));
|
||||||
|
conn.connect();
|
||||||
|
|
||||||
|
conn.setEventCallback(
|
||||||
|
[&conn]
|
||||||
|
(ix::CobraConnectionEventType eventType,
|
||||||
|
const std::string& errMsg,
|
||||||
|
const ix::WebSocketHttpHeaders& headers,
|
||||||
|
const std::string& subscriptionId,
|
||||||
|
CobraConnection::MsgId msgId)
|
||||||
|
{
|
||||||
|
if (eventType == ix::CobraConnection_EventType_Open)
|
||||||
|
{
|
||||||
|
Logger() << "Subscriber connected:";
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Authenticated)
|
||||||
|
{
|
||||||
|
log("Subscriber authenticated");
|
||||||
|
std::string filter;
|
||||||
|
conn.subscribe(CHANNEL, filter,
|
||||||
|
[](const Json::Value& msg)
|
||||||
|
{
|
||||||
|
log(msg.toStyledString());
|
||||||
|
|
||||||
|
std::string id = msg["id"].asString();
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(gProtectIds);
|
||||||
|
gIds.insert(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
gMessageCount++;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Subscribed)
|
||||||
|
{
|
||||||
|
Logger() << "Subscriber: subscribed to channel " << subscriptionId;
|
||||||
|
if (subscriptionId == CHANNEL)
|
||||||
|
{
|
||||||
|
gSubscriberConnectedAndSubscribed = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Logger() << "Subscriber: unexpected channel " << subscriptionId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
|
||||||
|
{
|
||||||
|
Logger() << "Subscriber: ununexpected from channel " << subscriptionId;
|
||||||
|
if (subscriptionId != CHANNEL)
|
||||||
|
{
|
||||||
|
Logger() << "Subscriber: unexpected channel " << subscriptionId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (eventType == ix::CobraConnection_EventType_Published)
|
||||||
|
{
|
||||||
|
Logger() << "Subscriber: published message acked: " << msgId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
while (!gStop)
|
||||||
|
{
|
||||||
|
std::chrono::duration<double, std::milli> duration(10);
|
||||||
|
std::this_thread::sleep_for(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.unsubscribe(CHANNEL);
|
||||||
|
conn.disconnect();
|
||||||
|
|
||||||
|
gUniqueMessageIdsCount = gIds.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
|
||||||
|
{
|
||||||
|
snake::AppConfig appConfig = makeSnakeServerConfig(8009);
|
||||||
|
snake::SnakeServer snakeServer(appConfig);
|
||||||
|
snakeServer.run();
|
||||||
|
|
||||||
|
// Make channel name unique
|
||||||
|
CHANNEL += uuid4();
|
||||||
|
|
||||||
|
gStop = false;
|
||||||
|
std::thread bgThread(&startSubscriber);
|
||||||
|
|
||||||
|
int timeout = 10 * 1000; // 10s
|
||||||
|
|
||||||
|
// Wait until the subscriber is ready (authenticated + subscription successful)
|
||||||
|
while (!gSubscriberConnectedAndSubscribed)
|
||||||
|
{
|
||||||
|
std::chrono::duration<double, std::milli> duration(10);
|
||||||
|
std::this_thread::sleep_for(duration);
|
||||||
|
|
||||||
|
timeout -= 10;
|
||||||
|
if (timeout <= 0)
|
||||||
|
{
|
||||||
|
REQUIRE(false); // timeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ix::CobraMetricsPublisher cobraMetricsPublisher;
|
||||||
|
|
||||||
|
bool perMessageDeflate = true;
|
||||||
|
cobraMetricsPublisher.configure(APPKEY, ENDPOINT, CHANNEL,
|
||||||
|
PUBLISHER_ROLE, PUBLISHER_SECRET, perMessageDeflate);
|
||||||
|
cobraMetricsPublisher.setSession(uuid4());
|
||||||
|
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
|
||||||
|
|
||||||
|
Json::Value data;
|
||||||
|
data["foo"] = "bar";
|
||||||
|
|
||||||
|
// (1) Publish without restrictions
|
||||||
|
cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1)
|
||||||
|
cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2)
|
||||||
|
|
||||||
|
// (2) Restrict what is sent using a blacklist
|
||||||
|
// Add one entry to the blacklist
|
||||||
|
// (will send msg #3)
|
||||||
|
cobraMetricsPublisher.setBlacklist({
|
||||||
|
"sms_metric_B_id" // this id will not be sent
|
||||||
|
});
|
||||||
|
// (msg #4)
|
||||||
|
cobraMetricsPublisher.push("sms_metric_A_id", data);
|
||||||
|
// ...
|
||||||
|
cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent
|
||||||
|
|
||||||
|
// Reset the blacklist
|
||||||
|
// (msg #5)
|
||||||
|
cobraMetricsPublisher.setBlacklist({}); // 4.
|
||||||
|
|
||||||
|
// (3) Restrict what is sent using rate control
|
||||||
|
|
||||||
|
// (msg #6)
|
||||||
|
cobraMetricsPublisher.setRateControl({
|
||||||
|
{"sms_metric_C_id", 1}, // published once per minute (60 seconds) max
|
||||||
|
});
|
||||||
|
// (msg #7)
|
||||||
|
cobraMetricsPublisher.push("sms_metric_C_id", data);
|
||||||
|
cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent
|
||||||
|
|
||||||
|
ix::msleep(1400);
|
||||||
|
|
||||||
|
// (msg #8)
|
||||||
|
cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent
|
||||||
|
|
||||||
|
ix::msleep(600); // wait a bit so that the last message is sent and can be received
|
||||||
|
|
||||||
|
log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher.");
|
||||||
|
|
||||||
|
// Test suspend + resume
|
||||||
|
for (int i = 0 ; i < 3 ; ++i)
|
||||||
|
{
|
||||||
|
cobraMetricsPublisher.suspend();
|
||||||
|
ix::msleep(500);
|
||||||
|
REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore
|
||||||
|
|
||||||
|
cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time
|
||||||
|
|
||||||
|
cobraMetricsPublisher.resume();
|
||||||
|
ix::msleep(2000); // give cobra 2s to connect
|
||||||
|
REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now
|
||||||
|
|
||||||
|
cobraMetricsPublisher.push("sms_metric_E_id", data);
|
||||||
|
}
|
||||||
|
|
||||||
|
ix::msleep(500);
|
||||||
|
|
||||||
|
// Now stop the thread
|
||||||
|
gStop = true;
|
||||||
|
bgThread.join();
|
||||||
|
|
||||||
|
//
|
||||||
|
// Validate that we received all message kinds, and the correct number of messages
|
||||||
|
//
|
||||||
|
CHECK(gIds.count("sms_metric_A_id") == 1);
|
||||||
|
CHECK(gIds.count("sms_metric_B_id") == 1);
|
||||||
|
CHECK(gIds.count("sms_metric_C_id") == 1);
|
||||||
|
CHECK(gIds.count("sms_metric_D_id") == 1);
|
||||||
|
CHECK(gIds.count("sms_metric_E_id") == 1);
|
||||||
|
CHECK(gIds.count("sms_set_rate_control_id") == 1);
|
||||||
|
CHECK(gIds.count("sms_set_blacklist_id") == 1);
|
||||||
|
|
||||||
|
snakeServer.stop();
|
||||||
|
}
|
@ -161,10 +161,10 @@ namespace ix
|
|||||||
return std::string(vec.begin(), vec.end());
|
return std::string(vec.begin(), vec.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
snake::AppConfig makeSnakeServerConfig()
|
snake::AppConfig makeSnakeServerConfig(int port)
|
||||||
{
|
{
|
||||||
snake::AppConfig appConfig;
|
snake::AppConfig appConfig;
|
||||||
appConfig.port = 8008;
|
appConfig.port = port;
|
||||||
appConfig.hostname = "127.0.0.1";
|
appConfig.hostname = "127.0.0.1";
|
||||||
appConfig.verbose = true;
|
appConfig.verbose = true;
|
||||||
appConfig.redisPort = 6379;
|
appConfig.redisPort = 6379;
|
||||||
|
@ -50,5 +50,5 @@ namespace ix
|
|||||||
|
|
||||||
bool startWebSocketEchoServer(ix::WebSocketServer& server);
|
bool startWebSocketEchoServer(ix::WebSocketServer& server);
|
||||||
|
|
||||||
snake::AppConfig makeSnakeServerConfig();
|
snake::AppConfig makeSnakeServerConfig(int port);
|
||||||
} // namespace ix
|
} // namespace ix
|
||||||
|
@ -284,7 +284,6 @@ namespace snake
|
|||||||
void handleUnSubscribe(
|
void handleUnSubscribe(
|
||||||
std::shared_ptr<SnakeConnectionState> state,
|
std::shared_ptr<SnakeConnectionState> state,
|
||||||
std::shared_ptr<ix::WebSocket> ws,
|
std::shared_ptr<ix::WebSocket> ws,
|
||||||
const AppConfig& appConfig,
|
|
||||||
const nlohmann::json& pdu)
|
const nlohmann::json& pdu)
|
||||||
{
|
{
|
||||||
// extract subscription_id
|
// extract subscription_id
|
||||||
@ -333,7 +332,7 @@ namespace snake
|
|||||||
}
|
}
|
||||||
else if (action == "rtm/unsubscribe")
|
else if (action == "rtm/unsubscribe")
|
||||||
{
|
{
|
||||||
handleUnSubscribe(state, ws, appConfig, pdu);
|
handleUnSubscribe(state, ws, pdu);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user