CobraConnection: sets a unique id field for all messages sent to [cobra](https://github.com/machinezone/cobra).

CobraConnection: sets a counter as a field for each event published.
This commit is contained in:
Benjamin Sergeant 2019-08-26 09:51:25 -07:00
parent ac60ec4320
commit 0847e60d2a
7 changed files with 47 additions and 2 deletions

12
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,12 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/pocc/pre-commit-hooks
rev: ''
hooks:
- id: clang-format

View File

@ -1,6 +1,13 @@
# Changelog
All notable changes to this project will be documented in this file.
## [5.0.7] - 2019-08-23
- CobraConnection: sets a unique id field for all messages sent to [cobra](https://github.com/machinezone/cobra).
- CobraConnection: sets a counter as a field for each event published.
## [5.0.6] - 2019-08-22
- Windows: silly compile error (poll should be in the global namespace)
## [5.0.5] - 2019-08-22
- Windows: use select instead of WSAPoll, through a poll wrapper

View File

@ -1 +1 @@
5.0.4
5.0.6

View File

@ -24,7 +24,8 @@ namespace ix
_webSocket(new WebSocket()),
_publishMode(CobraConnection_PublishMode_Immediate),
_authenticated(false),
_eventCallback(nullptr)
_eventCallback(nullptr),
_id(0)
{
_pdu["action"] = "rtm/publish";
@ -244,6 +245,7 @@ namespace ix
Json::Value pdu;
pdu["action"] = "auth/handshake";
pdu["body"] = body;
pdu["id"] = _id++;
std::string serializedJson = serializeJson(pdu);
CobraConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
@ -306,6 +308,7 @@ namespace ix
Json::Value pdu;
pdu["action"] = "auth/authenticate";
pdu["body"] = body;
pdu["id"] = _id++;
std::string serializedJson = serializeJson(pdu);
CobraConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
@ -402,6 +405,7 @@ namespace ix
_body["channels"] = channels;
_body["message"] = msg;
_pdu["body"] = _body;
_pdu["id"] = _id++;
std::string serializedJson = serializeJson(_pdu);
@ -444,6 +448,7 @@ namespace ix
Json::Value pdu;
pdu["action"] = "rtm/subscribe";
pdu["body"] = body;
pdu["id"] = _id++;
_webSocket->send(pdu.toStyledString());
@ -469,6 +474,7 @@ namespace ix
Json::Value pdu;
pdu["action"] = "rtm/unsubscribe";
pdu["body"] = body;
pdu["id"] = _id++;
_webSocket->send(pdu.toStyledString());
}

View File

@ -168,6 +168,9 @@ namespace ix
// Cap the queue size (100 elems so far -> ~100k)
static constexpr size_t kQueueMaxSize = 256;
// Each pdu sent should have an incremental unique id
std::atomic<uint64_t> _id;
};
} // namespace ix

View File

@ -191,6 +191,19 @@ namespace ix
msg["device"] = _device;
}
{
//
// Bump a counter for each id
// This is used to make sure that we are not
// dropping messages, by checking that all the ids is the list of
// all natural numbers until the last value sent (0, 1, 2, ..., N)
//
std::lock_guard<std::mutex> lock(_device_mutex);
auto it = _counters.emplace(id, 0);
msg["per_id_counter"] = it.first->second;
it.first->second += 1;
}
// Now actually enqueue the task
_cobra_metrics_theaded_publisher.push(msg);
}

View File

@ -151,6 +151,10 @@ namespace ix
_last_update;
mutable std::mutex _last_update_mutex; // protect access to _last_update
/// Bump a counter for each metric type
std::unordered_map<std::string, int> _counters;
mutable std::mutex _counters_mutex; // protect access to _counters
// const strings for internal ids
static const std::string kSetRateControlId;
static const std::string kSetBlacklistId;