IXWebSocket/ws/ws_cobra_publish.cpp

108 lines
3.6 KiB
C++
Raw Normal View History

2019-04-21 20:16:33 +02:00
/*
* ws_cobra_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <atomic>
2019-09-23 19:25:23 +02:00
#include <chrono>
#include <fstream>
#include <ixcobra/IXCobraMetricsPublisher.h>
2019-09-23 19:25:23 +02:00
#include <jsoncpp/json/json.h>
#include <mutex>
#include <spdlog/spdlog.h>
2019-09-23 19:25:23 +02:00
#include <sstream>
#include <thread>
2019-04-21 20:16:33 +02:00
namespace ix
{
int ws_cobra_publish_main(const ix::CobraConfig& config,
2019-04-21 20:16:33 +02:00
const std::string& channel,
const std::string& path)
2019-04-21 20:16:33 +02:00
{
std::ifstream f(path);
2019-09-23 19:25:23 +02:00
std::string str((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
2019-04-21 20:16:33 +02:00
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data))
{
spdlog::info("Input file is not a JSON file");
return 1;
}
ix::CobraConnection conn;
conn.configure(config);
// Display incoming messages
std::atomic<bool> authenticated(false);
std::atomic<bool> messageAcked(false);
2020-04-16 20:58:06 +02:00
conn.setEventCallback(
[&conn, &channel, &data, &authenticated, &messageAcked](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open)
{
spdlog::info("Publisher connected");
2020-04-16 20:58:06 +02:00
for (auto&& it : event->headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (event->type == ix::CobraEventType::Closed)
{
2020-04-16 20:58:06 +02:00
spdlog::info("Subscriber closed: {}", event->errMsg);
2019-09-23 19:25:23 +02:00
}
2020-04-16 20:58:06 +02:00
else if (event->type == ix::CobraEventType::Authenticated)
{
spdlog::info("Publisher authenticated");
authenticated = true;
2020-04-16 20:58:06 +02:00
Json::Value channels;
channels[0] = channel;
auto msgId = conn.publish(channels, data);
2020-04-16 20:58:06 +02:00
spdlog::info("Published msg {}", msgId);
}
else if (event->type == ix::CobraEventType::Subscribed)
{
spdlog::info("Publisher: subscribed to channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::UnSubscribed)
{
spdlog::info("Publisher: unsubscribed from channel {}", event->subscriptionId);
}
else if (event->type == ix::CobraEventType::Error)
{
spdlog::error("Publisher: error {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::Published)
{
spdlog::info("Published message id {} acked", event->msgId);
messageAcked = true;
}
else if (event->type == ix::CobraEventType::Pong)
{
spdlog::info("Received websocket pong");
}
else if (event->type == ix::CobraEventType::HandshakeError)
{
spdlog::error("Subscriber: Handshake error: {}", event->errMsg);
}
else if (event->type == ix::CobraEventType::AuthenticationError)
{
spdlog::error("Subscriber: Authentication error: {}", event->errMsg);
}
});
2019-04-21 20:16:33 +02:00
conn.connect();
2019-09-23 19:25:23 +02:00
while (!authenticated)
;
while (!messageAcked)
;
2019-04-21 20:16:33 +02:00
return 0;
}
2019-09-23 19:25:23 +02:00
} // namespace ix