In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch)
This commit is contained in:
@ -10,6 +10,8 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <jsoncpp/json/json.h>
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
@ -21,64 +23,83 @@ namespace ix
|
||||
const std::string& rolename,
|
||||
const std::string& rolesecret,
|
||||
const std::string& channel,
|
||||
const std::string& path,
|
||||
bool stress)
|
||||
const std::string& path)
|
||||
{
|
||||
std::atomic<int> sentMessages(0);
|
||||
std::atomic<int> ackedMessages(0);
|
||||
CobraConnection::setPublishTrackerCallback(
|
||||
[&sentMessages, &ackedMessages](bool sent, bool acked)
|
||||
{
|
||||
if (sent) sentMessages++;
|
||||
if (acked) ackedMessages++;
|
||||
}
|
||||
);
|
||||
|
||||
CobraMetricsPublisher cobraMetricsPublisher;
|
||||
cobraMetricsPublisher.enable(true);
|
||||
|
||||
bool enablePerMessageDeflate = true;
|
||||
cobraMetricsPublisher.configure(appkey, endpoint, channel,
|
||||
rolename, rolesecret, enablePerMessageDeflate);
|
||||
|
||||
while (!cobraMetricsPublisher.isAuthenticated()) ;
|
||||
|
||||
std::ifstream f(path);
|
||||
std::string str((std::istreambuf_iterator<char>(f)),
|
||||
std::istreambuf_iterator<char>());
|
||||
|
||||
Json::Value data;
|
||||
Json::Reader reader;
|
||||
if (!reader.parse(str, data)) return 1;
|
||||
|
||||
if (!stress)
|
||||
if (!reader.parse(str, data))
|
||||
{
|
||||
cobraMetricsPublisher.push(channel, data);
|
||||
spdlog::info("Input file is not a JSON file");
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Stress mode to try to trigger server and client bugs
|
||||
while (true)
|
||||
|
||||
ix::CobraConnection conn;
|
||||
conn.configure(appkey, endpoint,
|
||||
rolename, rolesecret,
|
||||
ix::WebSocketPerMessageDeflateOptions(true));
|
||||
conn.connect();
|
||||
|
||||
// Display incoming messages
|
||||
std::atomic<bool> authenticated(false);
|
||||
std::atomic<bool> messageAcked(false);
|
||||
std::condition_variable condition;
|
||||
|
||||
conn.setEventCallback(
|
||||
[&conn, &channel, &data, &authenticated, &messageAcked, &condition]
|
||||
(ix::CobraConnectionEventType eventType,
|
||||
const std::string& errMsg,
|
||||
const ix::WebSocketHttpHeaders& headers,
|
||||
const std::string& subscriptionId,
|
||||
CobraConnection::MsgId msgId)
|
||||
{
|
||||
for (int i = 0 ; i < 1000; ++i)
|
||||
if (eventType == ix::CobraConnection_EventType_Open)
|
||||
{
|
||||
cobraMetricsPublisher.push(channel, data);
|
||||
spdlog::info("Publisher connected");
|
||||
|
||||
for (auto it : headers)
|
||||
{
|
||||
spdlog::info("{}: {}", it.first, it.second);
|
||||
}
|
||||
}
|
||||
else if (eventType == ix::CobraConnection_EventType_Authenticated)
|
||||
{
|
||||
spdlog::info("Publisher authenticated");
|
||||
authenticated = true;
|
||||
|
||||
cobraMetricsPublisher.suspend();
|
||||
cobraMetricsPublisher.resume();
|
||||
spdlog::info("Publishing data");
|
||||
|
||||
// FIXME: investigate why without this check we trigger a lock
|
||||
while (!cobraMetricsPublisher.isAuthenticated()) ;
|
||||
Json::Value channels;
|
||||
channels[0] = channel;
|
||||
conn.publish(channels, data);
|
||||
}
|
||||
else if (eventType == ix::CobraConnection_EventType_Subscribed)
|
||||
{
|
||||
spdlog::info("Publisher: subscribed to channel {}", subscriptionId);
|
||||
}
|
||||
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
|
||||
{
|
||||
spdlog::info("Publisher: unsubscribed from channel {}", subscriptionId);
|
||||
}
|
||||
else if (eventType == ix::CobraConnection_EventType_Error)
|
||||
{
|
||||
spdlog::error("Publisher: error {}", errMsg);
|
||||
condition.notify_one();
|
||||
}
|
||||
else if (eventType == ix::CobraConnection_EventType_Published)
|
||||
{
|
||||
spdlog::info("Published message acked: {}", msgId);
|
||||
messageAcked = true;
|
||||
condition.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Wait a bit for the message to get a chance to be sent
|
||||
// there isn't any ack on publish right now so it's the best we can do
|
||||
// FIXME: this comment is a lie now
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages);
|
||||
while (!authenticated) ;
|
||||
while (!messageAcked) ;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Reference in New Issue
Block a user