(ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events

This commit is contained in:
Benjamin Sergeant 2019-12-27 19:10:15 -08:00
parent 50bea7dffa
commit 6522bc06ba
3 changed files with 51 additions and 13 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [7.8.4] - 2019-12-27
(ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events
## [7.8.3] - 2019-12-27 ## [7.8.3] - 2019-12-27
(ws cobra to sentry) refactor queue related code into a class (ws cobra to sentry) refactor queue related code into a class

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.8.3" #define IX_WEBSOCKET_VERSION "7.8.4"

View File

@ -15,6 +15,7 @@
#include <sstream> #include <sstream>
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <map>
namespace ix namespace ix
{ {
@ -27,7 +28,7 @@ namespace ix
void add(Json::Value msg); void add(Json::Value msg);
private: private:
std::queue<Json::Value> _queue; std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex; std::mutex _mutex;
std::condition_variable _condition; std::condition_variable _condition;
size_t _maxQueueSize; size_t _maxQueueSize;
@ -38,24 +39,55 @@ namespace ix
{ {
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this] { return !_queue.empty() && !_stop; }); if (_queues.empty())
{
Json::Value val;
return val;
}
auto msg = _queue.front(); std::vector<std::string> games;
_queue.pop(); for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
spdlog::info("Sending event for game '{}'", game);
_condition.wait(lock, [this] { return !_stop; });
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg; return msg;
} }
void QueueManager::add(Json::Value msg) void QueueManager::add(Json::Value msg)
{ {
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point std::string game;
// in queuing too many events. if (msg.isMember("device") && msg["device"].isMember("game"))
if (_queue.size() < _maxQueueSize) {
{ game = msg["device"]["game"].asString();
_queue.push(msg); }
_condition.notify_one();
} if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
} }
int ws_cobra_to_sentry_main(const std::string& appkey, int ws_cobra_to_sentry_main(const std::string& appkey,
@ -114,6 +146,8 @@ namespace ix
{ {
Json::Value msg = queueManager.pop(); Json::Value msg = queueManager.pop();
if (stop) return;
auto ret = sentryClient.send(msg, verbose); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first; HttpResponsePtr response = ret.first;