Fix crash in the Linux unittest in the HTTP client code, in Socket::readBytes. Cobra Metrics Publisher code returns the message id of the message that got published, to be used to validated that it got sent properly when receiving an ack.

This commit is contained in:
Benjamin Sergeant 2019-09-21 09:23:58 -07:00
parent 8821183aea
commit 1769199d32
14 changed files with 135 additions and 87 deletions

View File

@ -1 +1 @@
6.2.2 6.2.3

View File

@ -1 +0,0 @@
docker/Dockerfile.alpine

View File

@ -20,4 +20,5 @@ COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}" ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make", "test"] # RUN ["make", "test"]
CMD ["sh"]

View File

@ -1,6 +1,11 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [6.2.3] - 2019-09-21
- Fix crash in the Linux unittest in the HTTP client code, in Socket::readBytes
- Cobra Metrics Publisher code returns the message id of the message that got published, to be used to validated that it got sent properly when receiving an ack.
## [6.2.2] - 2019-09-19 ## [6.2.2] - 2019-09-19
- In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch) - In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch)

View File

@ -21,13 +21,14 @@ namespace ix
TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr; TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr;
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr; PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize; constexpr size_t CobraConnection::kQueueMaxSize;
constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId;
CobraConnection::CobraConnection() : CobraConnection::CobraConnection() :
_webSocket(new WebSocket()), _webSocket(new WebSocket()),
_publishMode(CobraConnection_PublishMode_Immediate), _publishMode(CobraConnection_PublishMode_Immediate),
_authenticated(false), _authenticated(false),
_eventCallback(nullptr), _eventCallback(nullptr),
_id(0) _id(1)
{ {
_pdu["action"] = "rtm/publish"; _pdu["action"] = "rtm/publish";
@ -456,11 +457,10 @@ namespace ix
return _jsonWriter.write(value); return _jsonWriter.write(value);
} }
// std::pair<CobraConnection::MsgId, std::string> CobraConnection::prePublish(
// publish is not thread safe as we are trying to reuse some Json objects. const Json::Value& channels,
// const Json::Value& msg,
CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels, bool addToQueue)
const Json::Value& msg)
{ {
invokePublishTrackerCallback(true, false); invokePublishTrackerCallback(true, false);
@ -473,6 +473,38 @@ namespace ix
std::string serializedJson = serializeJson(_pdu); std::string serializedJson = serializeJson(_pdu);
if (addToQueue)
{
enqueue(serializedJson);
}
return std::make_pair(msgId, serializedJson);
}
bool CobraConnection::publishNext()
{
std::lock_guard<std::mutex> lock(_queueMutex);
auto&& msg = _messageQueue.back();
if (!publishMessage(msg))
{
_messageQueue.push_back(msg);
return false;
}
_messageQueue.pop_back();
return true;
}
//
// publish is not thread safe as we are trying to reuse some Json objects.
//
CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg)
{
auto p = prePublish(channels, msg, false);
auto msgId = p.first;
auto serializedJson = p.second;
// //
// 1. When we use batch mode, we just enqueue and will do the flush explicitely // 1. When we use batch mode, we just enqueue and will do the flush explicitely
// 2. When we aren't authenticated yet to the cobra server, we need to enqueue // 2. When we aren't authenticated yet to the cobra server, we need to enqueue
@ -567,22 +599,21 @@ namespace ix
// //
bool CobraConnection::flushQueue() bool CobraConnection::flushQueue()
{ {
std::lock_guard<std::mutex> lock(_queueMutex); while (!isQueueEmpty())
while (!_messageQueue.empty())
{ {
auto&& msg = _messageQueue.back(); bool ok = publishNext();
if (!publishMessage(msg)) if (!ok) return false;
{
_messageQueue.push_back(msg);
return false;
}
_messageQueue.pop_back();
} }
return true; return true;
} }
bool CobraConnection::isQueueEmpty()
{
std::lock_guard<std::mutex> lock(_queueMutex);
return _messageQueue.empty();
}
bool CobraConnection::publishMessage(const std::string& serializedJson) bool CobraConnection::publishMessage(const std::string& serializedJson)
{ {
auto webSocketSendInfo = _webSocket->send(serializedJson); auto webSocketSendInfo = _webSocket->send(serializedJson);

View File

@ -118,6 +118,19 @@ namespace ix
void suspend(); void suspend();
void resume(); void resume();
/// Prepare a message for transmission
/// (update the pdu, compute a msgId, serialize json to a string)
std::pair<CobraConnection::MsgId, std::string> prePublish(
const Json::Value& channels,
const Json::Value& msg,
bool addToQueue);
/// Attempt to send next message from the internal queue
bool publishNext();
// An invalid message id, signifying an error.
static constexpr MsgId kInvalidMsgId = 0;
private: private:
bool sendHandshakeMessage(); bool sendHandshakeMessage();
bool handleHandshakeResponse(const Json::Value& data); bool handleHandshakeResponse(const Json::Value& data);
@ -147,6 +160,9 @@ namespace ix
uint64_t msgId = std::numeric_limits<uint64_t>::max()); uint64_t msgId = std::numeric_limits<uint64_t>::max());
void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu); void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu);
/// Tells whether the internal queue is empty or not
bool isQueueEmpty();
/// ///
/// Member variables /// Member variables
/// ///

View File

@ -135,23 +135,23 @@ namespace ix
return ms; return ms;
} }
void CobraMetricsPublisher::push(const std::string& id, CobraConnection::MsgId CobraMetricsPublisher::push(const std::string& id,
const std::string& data, const std::string& data,
bool shouldPushTest) bool shouldPushTest)
{ {
if (!_enabled) return; if (!_enabled) return CobraConnection::kInvalidMsgId;
Json::Value root; Json::Value root;
Json::Reader reader; Json::Reader reader;
if (!reader.parse(data, root)) return; if (!reader.parse(data, root)) return CobraConnection::kInvalidMsgId;
push(id, root, shouldPushTest); return push(id, root, shouldPushTest);
} }
void CobraMetricsPublisher::push(const std::string& id, CobraConnection::MsgId CobraMetricsPublisher::push(const std::string& id,
const CobraMetricsPublisher::Message& data) const CobraMetricsPublisher::Message& data)
{ {
if (!_enabled) return; if (!_enabled) return CobraConnection::kInvalidMsgId;
Json::Value root; Json::Value root;
for (auto it : data) for (auto it : data)
@ -159,7 +159,7 @@ namespace ix
root[it.first] = it.second; root[it.first] = it.second;
} }
push(id, root); return push(id, root);
} }
bool CobraMetricsPublisher::shouldPush(const std::string& id) const bool CobraMetricsPublisher::shouldPush(const std::string& id) const
@ -171,11 +171,12 @@ namespace ix
return true; return true;
} }
void CobraMetricsPublisher::push(const std::string& id, CobraConnection::MsgId CobraMetricsPublisher::push(
const Json::Value& data, const std::string& id,
bool shouldPushTest) const Json::Value& data,
bool shouldPushTest)
{ {
if (shouldPushTest && !shouldPush(id)) return; if (shouldPushTest && !shouldPush(id)) return CobraConnection::kInvalidMsgId;
setLastUpdate(id); setLastUpdate(id);
@ -205,7 +206,7 @@ namespace ix
} }
// Now actually enqueue the task // Now actually enqueue the task
_cobra_metrics_theaded_publisher.push(msg); return _cobra_metrics_theaded_publisher.push(msg);
} }
void CobraMetricsPublisher::setPublishMode(CobraConnectionPublishMode publishMode) void CobraMetricsPublisher::setPublishMode(CobraConnectionPublishMode publishMode)

View File

@ -59,8 +59,9 @@ namespace ix
/// Simple interface, list of key value pairs where typeof(key) == typeof(value) == string /// Simple interface, list of key value pairs where typeof(key) == typeof(value) == string
typedef std::unordered_map<std::string, std::string> Message; typedef std::unordered_map<std::string, std::string> Message;
void push(const std::string& id, CobraConnection::MsgId push(
const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message()); const std::string& id,
const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message());
/// Richer interface using json, which supports types (bool, int, float) and hierarchies of /// Richer interface using json, which supports types (bool, int, float) and hierarchies of
/// elements /// elements
@ -69,10 +70,10 @@ namespace ix
/// shouldPush method for places where we want to be as lightweight as possible when /// shouldPush method for places where we want to be as lightweight as possible when
/// collecting metrics. When set to false, it is used so that we don't do double work when /// collecting metrics. When set to false, it is used so that we don't do double work when
/// computing whether a metrics should be sent or not. /// computing whether a metrics should be sent or not.
void push(const std::string& id, const Json::Value& data, bool shouldPushTest = true); CobraConnection::MsgId push(const std::string& id, const Json::Value& data, bool shouldPushTest = true);
/// Interface used by lua. msg is a json encoded string. /// Interface used by lua. msg is a json encoded string.
void push(const std::string& id, const std::string& data, bool shouldPushTest = true); CobraConnection::MsgId push(const std::string& id, const std::string& data, bool shouldPushTest = true);
/// Tells whether a metric can be pushed. /// Tells whether a metric can be pushed.
/// A metric can be pushed if it satisfies those conditions: /// A metric can be pushed if it satisfies those conditions:

View File

@ -101,17 +101,12 @@ namespace ix
webSocketPerMessageDeflateOptions); webSocketPerMessageDeflateOptions);
} }
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind)
const Json::Value& msg)
{ {
// Enqueue the task
{ {
// acquire lock
std::unique_lock<std::mutex> lock(_queue_mutex); std::unique_lock<std::mutex> lock(_queue_mutex);
_queue.push(messageKind);
// add the task }
_queue.push(std::make_pair(messageKind, msg));
} // release lock
// wake up one thread // wake up one thread
_condition.notify_one(); _condition.notify_one();
@ -131,11 +126,6 @@ namespace ix
{ {
setThreadName("CobraMetricsPublisher"); setThreadName("CobraMetricsPublisher");
Json::Value channels;
channels.append(std::string());
channels.append(std::string());
const std::string messageIdKey("id");
_cobra_connection.connect(); _cobra_connection.connect();
while (true) while (true)
@ -156,11 +146,8 @@ namespace ix
return; return;
} }
auto item = _queue.front(); messageKind = _queue.front();
_queue.pop(); _queue.pop();
messageKind = item.first;
msg = item.second;
} }
switch (messageKind) switch (messageKind)
@ -179,37 +166,44 @@ namespace ix
case MessageKind::Message: case MessageKind::Message:
{ {
; _cobra_connection.publishNext();
}; break; }; break;
} }
//
// Publish to multiple channels. This let the consumer side
// easily subscribe to all message of a certain type, without having
// to do manipulations on the messages on the server side.
//
channels[0] = _channel;
if (msg.isMember(messageIdKey))
{
channels[1] = msg[messageIdKey];
}
_cobra_connection.publish(channels, msg);
} }
} }
void CobraMetricsThreadedPublisher::push(const Json::Value& msg) CobraConnection::MsgId CobraMetricsThreadedPublisher::push(const Json::Value& msg)
{ {
pushMessage(MessageKind::Message, msg); static const std::string messageIdKey("id");
//
// Publish to multiple channels. This let the consumer side
// easily subscribe to all message of a certain type, without having
// to do manipulations on the messages on the server side.
//
Json::Value channels;
channels.append(_channel);
if (msg.isMember(messageIdKey))
{
channels.append(msg[messageIdKey]);
}
auto res = _cobra_connection.prePublish(channels, msg, true);
auto msgId = res.first;
pushMessage(MessageKind::Message);
return msgId;
} }
void CobraMetricsThreadedPublisher::suspend() void CobraMetricsThreadedPublisher::suspend()
{ {
pushMessage(MessageKind::Suspend, Json::Value()); pushMessage(MessageKind::Suspend);
} }
void CobraMetricsThreadedPublisher::resume() void CobraMetricsThreadedPublisher::resume()
{ {
pushMessage(MessageKind::Resume, Json::Value()); pushMessage(MessageKind::Resume);
} }
bool CobraMetricsThreadedPublisher::isConnected() const bool CobraMetricsThreadedPublisher::isConnected() const

View File

@ -37,7 +37,7 @@ namespace ix
/// Push a msg to our queue of messages to be published to cobra on the background /// Push a msg to our queue of messages to be published to cobra on the background
// thread. Main user right now is the Cobra Metrics System // thread. Main user right now is the Cobra Metrics System
void push(const Json::Value& msg); CobraConnection::MsgId push(const Json::Value& msg);
/// Set cobra connection publish mode /// Set cobra connection publish mode
void setPublishMode(CobraConnectionPublishMode publishMode); void setPublishMode(CobraConnectionPublishMode publishMode);
@ -64,7 +64,7 @@ namespace ix
}; };
/// Push a message to be processed by the background thread /// Push a message to be processed by the background thread
void pushMessage(MessageKind messageKind, const Json::Value& msg); void pushMessage(MessageKind messageKind);
/// Get a wait time which is increasing exponentially based on the number of retries /// Get a wait time which is increasing exponentially based on the number of retries
uint64_t getWaitTimeExp(int retry_count); uint64_t getWaitTimeExp(int retry_count);
@ -94,7 +94,7 @@ namespace ix
/// that it should wake up and take care of publishing it to cobra /// that it should wake up and take care of publishing it to cobra
/// To shutdown the worker thread one has to set the _stop boolean to true. /// To shutdown the worker thread one has to set the _stop boolean to true.
/// This is done in the destructor /// This is done in the destructor
std::queue<std::pair<MessageKind, Json::Value>> _queue; std::queue<MessageKind> _queue;
mutable std::mutex _queue_mutex; mutable std::mutex _queue_mutex;
std::condition_variable _condition; std::condition_variable _condition;
std::atomic<bool> _stop; std::atomic<bool> _stop;

View File

@ -367,17 +367,16 @@ namespace ix
size_t size = std::min(kChunkSize, length - output.size()); size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size); ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && !Socket::isWaitNeeded()) if (ret > 0)
{
// Error
return std::make_pair(false, std::string());
}
else
{ {
output.insert(output.end(), output.insert(output.end(),
_readBuffer.begin(), _readBuffer.begin(),
_readBuffer.begin() + ret); _readBuffer.begin() + ret);
} }
else if (ret <= 0 && !Socket::isWaitNeeded())
{
return std::make_pair(false, std::string());
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "6.2.2" #define IX_WEBSOCKET_VERSION "6.2.3"

View File

@ -53,7 +53,8 @@ namespace ix
if (!stress) if (!stress)
{ {
cobraMetricsPublisher.push(channel, data); auto msgId = cobraMetricsPublisher.push(channel, data);
spdlog::info("Sent message: {}", msgId);
} }
else else
{ {

View File

@ -70,11 +70,11 @@ namespace ix
spdlog::info("Publisher authenticated"); spdlog::info("Publisher authenticated");
authenticated = true; authenticated = true;
spdlog::info("Publishing data");
Json::Value channels; Json::Value channels;
channels[0] = channel; channels[0] = channel;
conn.publish(channels, data); auto msgId = conn.publish(channels, data);
spdlog::info("Published msg {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)
{ {
@ -91,7 +91,7 @@ namespace ix
} }
else if (eventType == ix::CobraConnection_EventType_Published) else if (eventType == ix::CobraConnection_EventType_Published)
{ {
spdlog::info("Published message acked: {}", msgId); spdlog::info("Published message id {} acked", msgId);
messageAcked = true; messageAcked = true;
condition.notify_one(); condition.notify_one();
} }