IXWebSocket/ws/ws_cobra_publish.cpp

86 lines
2.7 KiB
C++
Raw Normal View History

2019-04-21 20:16:33 +02:00
/*
* ws_cobra_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <fstream>
#include <sstream>
#include <chrono>
#include <thread>
#include <atomic>
#include <jsoncpp/json/json.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <spdlog/spdlog.h>
2019-04-21 20:16:33 +02:00
namespace ix
{
int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
const std::string& channel,
const std::string& path,
bool stress)
2019-04-21 20:16:33 +02:00
{
std::atomic<int> sentMessages(0);
std::atomic<int> ackedMessages(0);
CobraConnection::setPublishTrackerCallback(
[&sentMessages, &ackedMessages](bool sent, bool acked)
{
if (sent) sentMessages++;
if (acked) ackedMessages++;
}
);
2019-04-21 20:16:33 +02:00
CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true);
bool enablePerMessageDeflate = true;
cobraMetricsPublisher.configure(appkey, endpoint, channel,
rolename, rolesecret, enablePerMessageDeflate);
while (!cobraMetricsPublisher.isAuthenticated()) ;
std::ifstream f(path);
std::string str((std::istreambuf_iterator<char>(f)),
std::istreambuf_iterator<char>());
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data)) return 1;
if (!stress)
{
cobraMetricsPublisher.push(channel, data);
}
else
{
// Stress mode to try to trigger server and client bugs
while (true)
{
for (int i = 0 ; i < 1000; ++i)
{
cobraMetricsPublisher.push(channel, data);
}
2019-04-24 01:24:10 +02:00
cobraMetricsPublisher.suspend();
cobraMetricsPublisher.resume();
2019-04-24 01:24:10 +02:00
// FIXME: investigate why without this check we trigger a lock
while (!cobraMetricsPublisher.isAuthenticated()) ;
}
}
2019-04-21 20:16:33 +02:00
// Wait a bit for the message to get a chance to be sent
// there isn't any ack on publish right now so it's the best we can do
// FIXME: this comment is a lie now
2019-04-21 20:16:33 +02:00
std::this_thread::sleep_for(std::chrono::milliseconds(100));
spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages);
2019-04-21 20:16:33 +02:00
return 0;
}
}