(ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events

This commit is contained in:
Benjamin Sergeant 2019-12-27 19:10:15 -08:00
parent 1cd7cf340a
commit 47a3736b24
3 changed files with 51 additions and 13 deletions

View File

@ -1,6 +1,10 @@
# Changelog
All changes to this project will be documented in this file.
## [7.8.4] - 2019-12-27
(ws cobra to sentry) game is picked in a fair manner, so that all games get the same share of sent events
## [7.8.3] - 2019-12-27
(ws cobra to sentry) refactor queue related code into a class

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "7.8.3"
#define IX_WEBSOCKET_VERSION "7.8.4"

View File

@ -15,6 +15,7 @@
#include <sstream>
#include <thread>
#include <vector>
#include <map>
namespace ix
{
@ -27,7 +28,7 @@ namespace ix
void add(Json::Value msg);
private:
std::queue<Json::Value> _queue;
std::map<std::string, std::queue<Json::Value>> _queues;
std::mutex _mutex;
std::condition_variable _condition;
size_t _maxQueueSize;
@ -38,24 +39,55 @@ namespace ix
{
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this] { return !_queue.empty() && !_stop; });
if (_queues.empty())
{
Json::Value val;
return val;
}
auto msg = _queue.front();
_queue.pop();
std::vector<std::string> games;
for (auto it : _queues)
{
games.push_back(it.first);
}
std::random_shuffle(games.begin(), games.end());
std::string game = games[0];
spdlog::info("Sending event for game '{}'", game);
_condition.wait(lock, [this] { return !_stop; });
if (_queues[game].empty())
{
Json::Value val;
return val;
}
auto msg = _queues[game].front();
_queues[game].pop();
return msg;
}
void QueueManager::add(Json::Value msg)
{
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<std::mutex> lock(_mutex);
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queue.size() < _maxQueueSize)
{
_queue.push(msg);
_condition.notify_one();
}
std::string game;
if (msg.isMember("device") && msg["device"].isMember("game"))
{
game = msg["device"]["game"].asString();
}
if (game.empty()) return;
// if the sending is not fast enough there is no point
// in queuing too many events.
if (_queues[game].size() < _maxQueueSize)
{
_queues[game].push(msg);
_condition.notify_one();
}
}
int ws_cobra_to_sentry_main(const std::string& appkey,
@ -114,6 +146,8 @@ namespace ix
{
Json::Value msg = queueManager.pop();
if (stop) return;
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;