Fix crash in the Linux unittest in the HTTP client code, in Socket::readBytes. Cobra Metrics Publisher code returns the message id of the message that got published, to be used to validated that it got sent properly when receiving an ack.

This commit is contained in:
Benjamin Sergeant 2019-09-21 09:23:58 -07:00
parent fcdb57f31d
commit ed4be773a2
14 changed files with 135 additions and 87 deletions

View File

@ -1 +1 @@
6.2.2
6.2.3

View File

@ -1 +0,0 @@
docker/Dockerfile.alpine

View File

@ -20,4 +20,5 @@ COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make", "test"]
# RUN ["make", "test"]
CMD ["sh"]

View File

@ -1,6 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.
## [6.2.3] - 2019-09-21
- Fix crash in the Linux unittest in the HTTP client code, in Socket::readBytes
- Cobra Metrics Publisher code returns the message id of the message that got published, to be used to validated that it got sent properly when receiving an ack.
## [6.2.2] - 2019-09-19
- In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch)

View File

@ -21,13 +21,14 @@ namespace ix
TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr;
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize;
constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId;
CobraConnection::CobraConnection() :
_webSocket(new WebSocket()),
_publishMode(CobraConnection_PublishMode_Immediate),
_authenticated(false),
_eventCallback(nullptr),
_id(0)
_id(1)
{
_pdu["action"] = "rtm/publish";
@ -456,11 +457,10 @@ namespace ix
return _jsonWriter.write(value);
}
//
// publish is not thread safe as we are trying to reuse some Json objects.
//
CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg)
std::pair<CobraConnection::MsgId, std::string> CobraConnection::prePublish(
const Json::Value& channels,
const Json::Value& msg,
bool addToQueue)
{
invokePublishTrackerCallback(true, false);
@ -473,6 +473,38 @@ namespace ix
std::string serializedJson = serializeJson(_pdu);
if (addToQueue)
{
enqueue(serializedJson);
}
return std::make_pair(msgId, serializedJson);
}
bool CobraConnection::publishNext()
{
std::lock_guard<std::mutex> lock(_queueMutex);
auto&& msg = _messageQueue.back();
if (!publishMessage(msg))
{
_messageQueue.push_back(msg);
return false;
}
_messageQueue.pop_back();
return true;
}
//
// publish is not thread safe as we are trying to reuse some Json objects.
//
CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg)
{
auto p = prePublish(channels, msg, false);
auto msgId = p.first;
auto serializedJson = p.second;
//
// 1. When we use batch mode, we just enqueue and will do the flush explicitely
// 2. When we aren't authenticated yet to the cobra server, we need to enqueue
@ -567,22 +599,21 @@ namespace ix
//
bool CobraConnection::flushQueue()
{
std::lock_guard<std::mutex> lock(_queueMutex);
while (!_messageQueue.empty())
while (!isQueueEmpty())
{
auto&& msg = _messageQueue.back();
if (!publishMessage(msg))
{
_messageQueue.push_back(msg);
return false;
}
_messageQueue.pop_back();
bool ok = publishNext();
if (!ok) return false;
}
return true;
}
bool CobraConnection::isQueueEmpty()
{
std::lock_guard<std::mutex> lock(_queueMutex);
return _messageQueue.empty();
}
bool CobraConnection::publishMessage(const std::string& serializedJson)
{
auto webSocketSendInfo = _webSocket->send(serializedJson);

View File

@ -118,6 +118,19 @@ namespace ix
void suspend();
void resume();
/// Prepare a message for transmission
/// (update the pdu, compute a msgId, serialize json to a string)
std::pair<CobraConnection::MsgId, std::string> prePublish(
const Json::Value& channels,
const Json::Value& msg,
bool addToQueue);
/// Attempt to send next message from the internal queue
bool publishNext();
// An invalid message id, signifying an error.
static constexpr MsgId kInvalidMsgId = 0;
private:
bool sendHandshakeMessage();
bool handleHandshakeResponse(const Json::Value& data);
@ -147,6 +160,9 @@ namespace ix
uint64_t msgId = std::numeric_limits<uint64_t>::max());
void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu);
/// Tells whether the internal queue is empty or not
bool isQueueEmpty();
///
/// Member variables
///

View File

@ -135,23 +135,23 @@ namespace ix
return ms;
}
void CobraMetricsPublisher::push(const std::string& id,
const std::string& data,
bool shouldPushTest)
CobraConnection::MsgId CobraMetricsPublisher::push(const std::string& id,
const std::string& data,
bool shouldPushTest)
{
if (!_enabled) return;
if (!_enabled) return CobraConnection::kInvalidMsgId;
Json::Value root;
Json::Reader reader;
if (!reader.parse(data, root)) return;
if (!reader.parse(data, root)) return CobraConnection::kInvalidMsgId;
push(id, root, shouldPushTest);
return push(id, root, shouldPushTest);
}
void CobraMetricsPublisher::push(const std::string& id,
const CobraMetricsPublisher::Message& data)
CobraConnection::MsgId CobraMetricsPublisher::push(const std::string& id,
const CobraMetricsPublisher::Message& data)
{
if (!_enabled) return;
if (!_enabled) return CobraConnection::kInvalidMsgId;
Json::Value root;
for (auto it : data)
@ -159,7 +159,7 @@ namespace ix
root[it.first] = it.second;
}
push(id, root);
return push(id, root);
}
bool CobraMetricsPublisher::shouldPush(const std::string& id) const
@ -171,11 +171,12 @@ namespace ix
return true;
}
void CobraMetricsPublisher::push(const std::string& id,
const Json::Value& data,
bool shouldPushTest)
CobraConnection::MsgId CobraMetricsPublisher::push(
const std::string& id,
const Json::Value& data,
bool shouldPushTest)
{
if (shouldPushTest && !shouldPush(id)) return;
if (shouldPushTest && !shouldPush(id)) return CobraConnection::kInvalidMsgId;
setLastUpdate(id);
@ -205,7 +206,7 @@ namespace ix
}
// Now actually enqueue the task
_cobra_metrics_theaded_publisher.push(msg);
return _cobra_metrics_theaded_publisher.push(msg);
}
void CobraMetricsPublisher::setPublishMode(CobraConnectionPublishMode publishMode)

View File

@ -59,8 +59,9 @@ namespace ix
/// Simple interface, list of key value pairs where typeof(key) == typeof(value) == string
typedef std::unordered_map<std::string, std::string> Message;
void push(const std::string& id,
const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message());
CobraConnection::MsgId push(
const std::string& id,
const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message());
/// Richer interface using json, which supports types (bool, int, float) and hierarchies of
/// elements
@ -69,10 +70,10 @@ namespace ix
/// shouldPush method for places where we want to be as lightweight as possible when
/// collecting metrics. When set to false, it is used so that we don't do double work when
/// computing whether a metrics should be sent or not.
void push(const std::string& id, const Json::Value& data, bool shouldPushTest = true);
CobraConnection::MsgId push(const std::string& id, const Json::Value& data, bool shouldPushTest = true);
/// Interface used by lua. msg is a json encoded string.
void push(const std::string& id, const std::string& data, bool shouldPushTest = true);
CobraConnection::MsgId push(const std::string& id, const std::string& data, bool shouldPushTest = true);
/// Tells whether a metric can be pushed.
/// A metric can be pushed if it satisfies those conditions:

View File

@ -101,17 +101,12 @@ namespace ix
webSocketPerMessageDeflateOptions);
}
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind,
const Json::Value& msg)
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind)
{
// Enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queue_mutex);
// add the task
_queue.push(std::make_pair(messageKind, msg));
} // release lock
_queue.push(messageKind);
}
// wake up one thread
_condition.notify_one();
@ -131,11 +126,6 @@ namespace ix
{
setThreadName("CobraMetricsPublisher");
Json::Value channels;
channels.append(std::string());
channels.append(std::string());
const std::string messageIdKey("id");
_cobra_connection.connect();
while (true)
@ -156,11 +146,8 @@ namespace ix
return;
}
auto item = _queue.front();
messageKind = _queue.front();
_queue.pop();
messageKind = item.first;
msg = item.second;
}
switch (messageKind)
@ -179,37 +166,44 @@ namespace ix
case MessageKind::Message:
{
;
_cobra_connection.publishNext();
}; break;
}
//
// Publish to multiple channels. This let the consumer side
// easily subscribe to all message of a certain type, without having
// to do manipulations on the messages on the server side.
//
channels[0] = _channel;
if (msg.isMember(messageIdKey))
{
channels[1] = msg[messageIdKey];
}
_cobra_connection.publish(channels, msg);
}
}
void CobraMetricsThreadedPublisher::push(const Json::Value& msg)
CobraConnection::MsgId CobraMetricsThreadedPublisher::push(const Json::Value& msg)
{
pushMessage(MessageKind::Message, msg);
static const std::string messageIdKey("id");
//
// Publish to multiple channels. This let the consumer side
// easily subscribe to all message of a certain type, without having
// to do manipulations on the messages on the server side.
//
Json::Value channels;
channels.append(_channel);
if (msg.isMember(messageIdKey))
{
channels.append(msg[messageIdKey]);
}
auto res = _cobra_connection.prePublish(channels, msg, true);
auto msgId = res.first;
pushMessage(MessageKind::Message);
return msgId;
}
void CobraMetricsThreadedPublisher::suspend()
{
pushMessage(MessageKind::Suspend, Json::Value());
pushMessage(MessageKind::Suspend);
}
void CobraMetricsThreadedPublisher::resume()
{
pushMessage(MessageKind::Resume, Json::Value());
pushMessage(MessageKind::Resume);
}
bool CobraMetricsThreadedPublisher::isConnected() const

View File

@ -37,7 +37,7 @@ namespace ix
/// Push a msg to our queue of messages to be published to cobra on the background
// thread. Main user right now is the Cobra Metrics System
void push(const Json::Value& msg);
CobraConnection::MsgId push(const Json::Value& msg);
/// Set cobra connection publish mode
void setPublishMode(CobraConnectionPublishMode publishMode);
@ -64,7 +64,7 @@ namespace ix
};
/// Push a message to be processed by the background thread
void pushMessage(MessageKind messageKind, const Json::Value& msg);
void pushMessage(MessageKind messageKind);
/// Get a wait time which is increasing exponentially based on the number of retries
uint64_t getWaitTimeExp(int retry_count);
@ -94,7 +94,7 @@ namespace ix
/// that it should wake up and take care of publishing it to cobra
/// To shutdown the worker thread one has to set the _stop boolean to true.
/// This is done in the destructor
std::queue<std::pair<MessageKind, Json::Value>> _queue;
std::queue<MessageKind> _queue;
mutable std::mutex _queue_mutex;
std::condition_variable _condition;
std::atomic<bool> _stop;

View File

@ -367,17 +367,16 @@ namespace ix
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && !Socket::isWaitNeeded())
{
// Error
return std::make_pair(false, std::string());
}
else
if (ret > 0)
{
output.insert(output.end(),
_readBuffer.begin(),
_readBuffer.begin() + ret);
}
else if (ret <= 0 && !Socket::isWaitNeeded())
{
return std::make_pair(false, std::string());
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "6.2.2"
#define IX_WEBSOCKET_VERSION "6.2.3"

View File

@ -53,7 +53,8 @@ namespace ix
if (!stress)
{
cobraMetricsPublisher.push(channel, data);
auto msgId = cobraMetricsPublisher.push(channel, data);
spdlog::info("Sent message: {}", msgId);
}
else
{

View File

@ -70,11 +70,11 @@ namespace ix
spdlog::info("Publisher authenticated");
authenticated = true;
spdlog::info("Publishing data");
Json::Value channels;
channels[0] = channel;
conn.publish(channels, data);
auto msgId = conn.publish(channels, data);
spdlog::info("Published msg {}", msgId);
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
@ -91,7 +91,7 @@ namespace ix
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::info("Published message acked: {}", msgId);
spdlog::info("Published message id {} acked", msgId);
messageAcked = true;
condition.notify_one();
}