From 1769199d325c986529fcd21c2cb2f585a688f97b Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Sat, 21 Sep 2019 09:23:58 -0700 Subject: [PATCH] Fix crash in the Linux unittest in the HTTP client code, in Socket::readBytes. Cobra Metrics Publisher code returns the message id of the message that got published, to be used to validated that it got sent properly when receiving an ack. --- DOCKER_VERSION | 2 +- Dockerfile | 1 - docker/Dockerfile.ubuntu_disco | 3 +- docs/CHANGELOG.md | 5 ++ ixcobra/ixcobra/IXCobraConnection.cpp | 63 ++++++++++++++----- ixcobra/ixcobra/IXCobraConnection.h | 16 +++++ ixcobra/ixcobra/IXCobraMetricsPublisher.cpp | 31 ++++----- ixcobra/ixcobra/IXCobraMetricsPublisher.h | 9 +-- .../IXCobraMetricsThreadedPublisher.cpp | 62 +++++++++--------- .../ixcobra/IXCobraMetricsThreadedPublisher.h | 6 +- ixwebsocket/IXSocket.cpp | 11 ++-- ixwebsocket/IXWebSocketVersion.h | 2 +- ws/ws_cobra_metrics_publish.cpp | 3 +- ws/ws_cobra_publish.cpp | 8 +-- 14 files changed, 135 insertions(+), 87 deletions(-) delete mode 120000 Dockerfile diff --git a/DOCKER_VERSION b/DOCKER_VERSION index ca063943..bee94338 100644 --- a/DOCKER_VERSION +++ b/DOCKER_VERSION @@ -1 +1 @@ -6.2.2 +6.2.3 diff --git a/Dockerfile b/Dockerfile deleted file mode 120000 index 197ac830..00000000 --- a/Dockerfile +++ /dev/null @@ -1 +0,0 @@ -docker/Dockerfile.alpine \ No newline at end of file diff --git a/docker/Dockerfile.ubuntu_disco b/docker/Dockerfile.ubuntu_disco index 800c5181..34804999 100644 --- a/docker/Dockerfile.ubuntu_disco +++ b/docker/Dockerfile.ubuntu_disco @@ -20,4 +20,5 @@ COPY . . ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin ENV PATH="${CMAKE_BIN_PATH}:${PATH}" -RUN ["make", "test"] +# RUN ["make", "test"] +CMD ["sh"] diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 22aa945a..1bb1761c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,11 @@ # Changelog All notable changes to this project will be documented in this file. +## [6.2.3] - 2019-09-21 + +- Fix crash in the Linux unittest in the HTTP client code, in Socket::readBytes +- Cobra Metrics Publisher code returns the message id of the message that got published, to be used to validated that it got sent properly when receiving an ack. + ## [6.2.2] - 2019-09-19 - In DNS lookup code, make sure the weak pointer we use lives through the expected scope (if branch) diff --git a/ixcobra/ixcobra/IXCobraConnection.cpp b/ixcobra/ixcobra/IXCobraConnection.cpp index 0bccca9a..285b3845 100644 --- a/ixcobra/ixcobra/IXCobraConnection.cpp +++ b/ixcobra/ixcobra/IXCobraConnection.cpp @@ -21,13 +21,14 @@ namespace ix TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr; PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr; constexpr size_t CobraConnection::kQueueMaxSize; + constexpr CobraConnection::MsgId CobraConnection::kInvalidMsgId; CobraConnection::CobraConnection() : _webSocket(new WebSocket()), _publishMode(CobraConnection_PublishMode_Immediate), _authenticated(false), _eventCallback(nullptr), - _id(0) + _id(1) { _pdu["action"] = "rtm/publish"; @@ -456,11 +457,10 @@ namespace ix return _jsonWriter.write(value); } - // - // publish is not thread safe as we are trying to reuse some Json objects. - // - CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels, - const Json::Value& msg) + std::pair CobraConnection::prePublish( + const Json::Value& channels, + const Json::Value& msg, + bool addToQueue) { invokePublishTrackerCallback(true, false); @@ -473,6 +473,38 @@ namespace ix std::string serializedJson = serializeJson(_pdu); + if (addToQueue) + { + enqueue(serializedJson); + } + + return std::make_pair(msgId, serializedJson); + } + + bool CobraConnection::publishNext() + { + std::lock_guard lock(_queueMutex); + + auto&& msg = _messageQueue.back(); + if (!publishMessage(msg)) + { + _messageQueue.push_back(msg); + return false; + } + _messageQueue.pop_back(); + return true; + } + + // + // publish is not thread safe as we are trying to reuse some Json objects. + // + CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels, + const Json::Value& msg) + { + auto p = prePublish(channels, msg, false); + auto msgId = p.first; + auto serializedJson = p.second; + // // 1. When we use batch mode, we just enqueue and will do the flush explicitely // 2. When we aren't authenticated yet to the cobra server, we need to enqueue @@ -567,22 +599,21 @@ namespace ix // bool CobraConnection::flushQueue() { - std::lock_guard lock(_queueMutex); - - while (!_messageQueue.empty()) + while (!isQueueEmpty()) { - auto&& msg = _messageQueue.back(); - if (!publishMessage(msg)) - { - _messageQueue.push_back(msg); - return false; - } - _messageQueue.pop_back(); + bool ok = publishNext(); + if (!ok) return false; } return true; } + bool CobraConnection::isQueueEmpty() + { + std::lock_guard lock(_queueMutex); + return _messageQueue.empty(); + } + bool CobraConnection::publishMessage(const std::string& serializedJson) { auto webSocketSendInfo = _webSocket->send(serializedJson); diff --git a/ixcobra/ixcobra/IXCobraConnection.h b/ixcobra/ixcobra/IXCobraConnection.h index 6e7857f4..656a45e8 100644 --- a/ixcobra/ixcobra/IXCobraConnection.h +++ b/ixcobra/ixcobra/IXCobraConnection.h @@ -118,6 +118,19 @@ namespace ix void suspend(); void resume(); + /// Prepare a message for transmission + /// (update the pdu, compute a msgId, serialize json to a string) + std::pair prePublish( + const Json::Value& channels, + const Json::Value& msg, + bool addToQueue); + + /// Attempt to send next message from the internal queue + bool publishNext(); + + // An invalid message id, signifying an error. + static constexpr MsgId kInvalidMsgId = 0; + private: bool sendHandshakeMessage(); bool handleHandshakeResponse(const Json::Value& data); @@ -147,6 +160,9 @@ namespace ix uint64_t msgId = std::numeric_limits::max()); void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu); + /// Tells whether the internal queue is empty or not + bool isQueueEmpty(); + /// /// Member variables /// diff --git a/ixcobra/ixcobra/IXCobraMetricsPublisher.cpp b/ixcobra/ixcobra/IXCobraMetricsPublisher.cpp index 688507a8..1283b57b 100644 --- a/ixcobra/ixcobra/IXCobraMetricsPublisher.cpp +++ b/ixcobra/ixcobra/IXCobraMetricsPublisher.cpp @@ -135,23 +135,23 @@ namespace ix return ms; } - void CobraMetricsPublisher::push(const std::string& id, - const std::string& data, - bool shouldPushTest) + CobraConnection::MsgId CobraMetricsPublisher::push(const std::string& id, + const std::string& data, + bool shouldPushTest) { - if (!_enabled) return; + if (!_enabled) return CobraConnection::kInvalidMsgId; Json::Value root; Json::Reader reader; - if (!reader.parse(data, root)) return; + if (!reader.parse(data, root)) return CobraConnection::kInvalidMsgId; - push(id, root, shouldPushTest); + return push(id, root, shouldPushTest); } - void CobraMetricsPublisher::push(const std::string& id, - const CobraMetricsPublisher::Message& data) + CobraConnection::MsgId CobraMetricsPublisher::push(const std::string& id, + const CobraMetricsPublisher::Message& data) { - if (!_enabled) return; + if (!_enabled) return CobraConnection::kInvalidMsgId; Json::Value root; for (auto it : data) @@ -159,7 +159,7 @@ namespace ix root[it.first] = it.second; } - push(id, root); + return push(id, root); } bool CobraMetricsPublisher::shouldPush(const std::string& id) const @@ -171,11 +171,12 @@ namespace ix return true; } - void CobraMetricsPublisher::push(const std::string& id, - const Json::Value& data, - bool shouldPushTest) + CobraConnection::MsgId CobraMetricsPublisher::push( + const std::string& id, + const Json::Value& data, + bool shouldPushTest) { - if (shouldPushTest && !shouldPush(id)) return; + if (shouldPushTest && !shouldPush(id)) return CobraConnection::kInvalidMsgId; setLastUpdate(id); @@ -205,7 +206,7 @@ namespace ix } // Now actually enqueue the task - _cobra_metrics_theaded_publisher.push(msg); + return _cobra_metrics_theaded_publisher.push(msg); } void CobraMetricsPublisher::setPublishMode(CobraConnectionPublishMode publishMode) diff --git a/ixcobra/ixcobra/IXCobraMetricsPublisher.h b/ixcobra/ixcobra/IXCobraMetricsPublisher.h index f917a4ac..9311b26a 100644 --- a/ixcobra/ixcobra/IXCobraMetricsPublisher.h +++ b/ixcobra/ixcobra/IXCobraMetricsPublisher.h @@ -59,8 +59,9 @@ namespace ix /// Simple interface, list of key value pairs where typeof(key) == typeof(value) == string typedef std::unordered_map Message; - void push(const std::string& id, - const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message()); + CobraConnection::MsgId push( + const std::string& id, + const CobraMetricsPublisher::Message& data = CobraMetricsPublisher::Message()); /// Richer interface using json, which supports types (bool, int, float) and hierarchies of /// elements @@ -69,10 +70,10 @@ namespace ix /// shouldPush method for places where we want to be as lightweight as possible when /// collecting metrics. When set to false, it is used so that we don't do double work when /// computing whether a metrics should be sent or not. - void push(const std::string& id, const Json::Value& data, bool shouldPushTest = true); + CobraConnection::MsgId push(const std::string& id, const Json::Value& data, bool shouldPushTest = true); /// Interface used by lua. msg is a json encoded string. - void push(const std::string& id, const std::string& data, bool shouldPushTest = true); + CobraConnection::MsgId push(const std::string& id, const std::string& data, bool shouldPushTest = true); /// Tells whether a metric can be pushed. /// A metric can be pushed if it satisfies those conditions: diff --git a/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp b/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp index 78ca5e40..a208ab76 100644 --- a/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp +++ b/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp @@ -101,17 +101,12 @@ namespace ix webSocketPerMessageDeflateOptions); } - void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, - const Json::Value& msg) + void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind) { - // Enqueue the task { - // acquire lock std::unique_lock lock(_queue_mutex); - - // add the task - _queue.push(std::make_pair(messageKind, msg)); - } // release lock + _queue.push(messageKind); + } // wake up one thread _condition.notify_one(); @@ -131,11 +126,6 @@ namespace ix { setThreadName("CobraMetricsPublisher"); - Json::Value channels; - channels.append(std::string()); - channels.append(std::string()); - const std::string messageIdKey("id"); - _cobra_connection.connect(); while (true) @@ -156,11 +146,8 @@ namespace ix return; } - auto item = _queue.front(); + messageKind = _queue.front(); _queue.pop(); - - messageKind = item.first; - msg = item.second; } switch (messageKind) @@ -179,37 +166,44 @@ namespace ix case MessageKind::Message: { - ; + _cobra_connection.publishNext(); }; break; } - - // - // Publish to multiple channels. This let the consumer side - // easily subscribe to all message of a certain type, without having - // to do manipulations on the messages on the server side. - // - channels[0] = _channel; - if (msg.isMember(messageIdKey)) - { - channels[1] = msg[messageIdKey]; - } - _cobra_connection.publish(channels, msg); } } - void CobraMetricsThreadedPublisher::push(const Json::Value& msg) + CobraConnection::MsgId CobraMetricsThreadedPublisher::push(const Json::Value& msg) { - pushMessage(MessageKind::Message, msg); + static const std::string messageIdKey("id"); + + // + // Publish to multiple channels. This let the consumer side + // easily subscribe to all message of a certain type, without having + // to do manipulations on the messages on the server side. + // + Json::Value channels; + + channels.append(_channel); + if (msg.isMember(messageIdKey)) + { + channels.append(msg[messageIdKey]); + } + auto res = _cobra_connection.prePublish(channels, msg, true); + auto msgId = res.first; + + pushMessage(MessageKind::Message); + + return msgId; } void CobraMetricsThreadedPublisher::suspend() { - pushMessage(MessageKind::Suspend, Json::Value()); + pushMessage(MessageKind::Suspend); } void CobraMetricsThreadedPublisher::resume() { - pushMessage(MessageKind::Resume, Json::Value()); + pushMessage(MessageKind::Resume); } bool CobraMetricsThreadedPublisher::isConnected() const diff --git a/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.h b/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.h index 50e6e398..8f2a127f 100644 --- a/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.h +++ b/ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.h @@ -37,7 +37,7 @@ namespace ix /// Push a msg to our queue of messages to be published to cobra on the background // thread. Main user right now is the Cobra Metrics System - void push(const Json::Value& msg); + CobraConnection::MsgId push(const Json::Value& msg); /// Set cobra connection publish mode void setPublishMode(CobraConnectionPublishMode publishMode); @@ -64,7 +64,7 @@ namespace ix }; /// Push a message to be processed by the background thread - void pushMessage(MessageKind messageKind, const Json::Value& msg); + void pushMessage(MessageKind messageKind); /// Get a wait time which is increasing exponentially based on the number of retries uint64_t getWaitTimeExp(int retry_count); @@ -94,7 +94,7 @@ namespace ix /// that it should wake up and take care of publishing it to cobra /// To shutdown the worker thread one has to set the _stop boolean to true. /// This is done in the destructor - std::queue> _queue; + std::queue _queue; mutable std::mutex _queue_mutex; std::condition_variable _condition; std::atomic _stop; diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index f9e68351..c77d54a7 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -367,17 +367,16 @@ namespace ix size_t size = std::min(kChunkSize, length - output.size()); ssize_t ret = recv((char*)&_readBuffer[0], size); - if (ret <= 0 && !Socket::isWaitNeeded()) - { - // Error - return std::make_pair(false, std::string()); - } - else + if (ret > 0) { output.insert(output.end(), _readBuffer.begin(), _readBuffer.begin() + ret); } + else if (ret <= 0 && !Socket::isWaitNeeded()) + { + return std::make_pair(false, std::string()); + } if (onProgressCallback) onProgressCallback((int) output.size(), (int) length); diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 8d53fd2a..3c9ad005 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "6.2.2" +#define IX_WEBSOCKET_VERSION "6.2.3" diff --git a/ws/ws_cobra_metrics_publish.cpp b/ws/ws_cobra_metrics_publish.cpp index d0b204c4..501beb34 100644 --- a/ws/ws_cobra_metrics_publish.cpp +++ b/ws/ws_cobra_metrics_publish.cpp @@ -53,7 +53,8 @@ namespace ix if (!stress) { - cobraMetricsPublisher.push(channel, data); + auto msgId = cobraMetricsPublisher.push(channel, data); + spdlog::info("Sent message: {}", msgId); } else { diff --git a/ws/ws_cobra_publish.cpp b/ws/ws_cobra_publish.cpp index cb473a84..d9d3fb9d 100644 --- a/ws/ws_cobra_publish.cpp +++ b/ws/ws_cobra_publish.cpp @@ -70,11 +70,11 @@ namespace ix spdlog::info("Publisher authenticated"); authenticated = true; - spdlog::info("Publishing data"); - Json::Value channels; channels[0] = channel; - conn.publish(channels, data); + auto msgId = conn.publish(channels, data); + + spdlog::info("Published msg {}", msgId); } else if (eventType == ix::CobraConnection_EventType_Subscribed) { @@ -91,7 +91,7 @@ namespace ix } else if (eventType == ix::CobraConnection_EventType_Published) { - spdlog::info("Published message acked: {}", msgId); + spdlog::info("Published message id {} acked", msgId); messageAcked = true; condition.notify_one(); }