CobraConnection: sets a unique id field for all messages sent to [cobra](https://github.com/machinezone/cobra).
CobraConnection: sets a counter as a field for each event published.
This commit is contained in:
		| @@ -24,7 +24,8 @@ namespace ix | ||||
|         _webSocket(new WebSocket()), | ||||
|         _publishMode(CobraConnection_PublishMode_Immediate), | ||||
|         _authenticated(false), | ||||
|         _eventCallback(nullptr) | ||||
|         _eventCallback(nullptr), | ||||
|         _id(0) | ||||
|     { | ||||
|         _pdu["action"] = "rtm/publish"; | ||||
|  | ||||
| @@ -244,6 +245,7 @@ namespace ix | ||||
|         Json::Value pdu; | ||||
|         pdu["action"] = "auth/handshake"; | ||||
|         pdu["body"] = body; | ||||
|         pdu["id"] = _id++; | ||||
|  | ||||
|         std::string serializedJson = serializeJson(pdu); | ||||
|         CobraConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); | ||||
| @@ -306,6 +308,7 @@ namespace ix | ||||
|         Json::Value pdu; | ||||
|         pdu["action"] = "auth/authenticate"; | ||||
|         pdu["body"] = body; | ||||
|         pdu["id"] = _id++; | ||||
|  | ||||
|         std::string serializedJson = serializeJson(pdu); | ||||
|         CobraConnection::invokeTrafficTrackerCallback(serializedJson.size(), false); | ||||
| @@ -402,6 +405,7 @@ namespace ix | ||||
|         _body["channels"] = channels; | ||||
|         _body["message"] = msg; | ||||
|         _pdu["body"] = _body; | ||||
|         _pdu["id"] = _id++; | ||||
|  | ||||
|         std::string serializedJson = serializeJson(_pdu); | ||||
|  | ||||
| @@ -444,6 +448,7 @@ namespace ix | ||||
|         Json::Value pdu; | ||||
|         pdu["action"] = "rtm/subscribe"; | ||||
|         pdu["body"] = body; | ||||
|         pdu["id"] = _id++; | ||||
|  | ||||
|         _webSocket->send(pdu.toStyledString()); | ||||
|  | ||||
| @@ -469,6 +474,7 @@ namespace ix | ||||
|         Json::Value pdu; | ||||
|         pdu["action"] = "rtm/unsubscribe"; | ||||
|         pdu["body"] = body; | ||||
|         pdu["id"] = _id++; | ||||
|  | ||||
|         _webSocket->send(pdu.toStyledString()); | ||||
|     } | ||||
|   | ||||
| @@ -168,6 +168,9 @@ namespace ix | ||||
|  | ||||
|         // Cap the queue size (100 elems so far -> ~100k) | ||||
|         static constexpr size_t kQueueMaxSize = 256; | ||||
|  | ||||
|         // Each pdu sent should have an incremental unique id | ||||
|         std::atomic<uint64_t> _id; | ||||
|     }; | ||||
|  | ||||
| } // namespace ix | ||||
|   | ||||
| @@ -191,6 +191,19 @@ namespace ix | ||||
|             msg["device"] = _device; | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             // | ||||
|             // Bump a counter for each id | ||||
|             // This is used to make sure that we are not | ||||
|             // dropping messages, by checking that all the ids is the list of | ||||
|             // all natural numbers until the last value sent (0, 1, 2, ..., N) | ||||
|             // | ||||
|             std::lock_guard<std::mutex> lock(_device_mutex); | ||||
|             auto it = _counters.emplace(id, 0); | ||||
|             msg["per_id_counter"] = it.first->second; | ||||
|             it.first->second += 1; | ||||
|         } | ||||
|  | ||||
|         // Now actually enqueue the task | ||||
|         _cobra_metrics_theaded_publisher.push(msg); | ||||
|     } | ||||
|   | ||||
| @@ -151,6 +151,10 @@ namespace ix | ||||
|             _last_update; | ||||
|         mutable std::mutex _last_update_mutex; // protect access to _last_update | ||||
|  | ||||
|         /// Bump a counter for each metric type | ||||
|         std::unordered_map<std::string, int> _counters; | ||||
|         mutable std::mutex _counters_mutex; // protect access to _counters | ||||
|  | ||||
|         // const strings for internal ids | ||||
|         static const std::string kSetRateControlId; | ||||
|         static const std::string kSetBlacklistId; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user