(cobra client and bots) add batch_size subscription option for retrieving multiple messages at once

This commit is contained in:
Benjamin Sergeant 2020-06-17 17:13:45 -07:00
parent bf0f11fd65
commit 565a08b229
9 changed files with 18 additions and 2 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.7.7] - 2020-06-17
(cobra client and bots) add batch_size subscription option for retrieving multiple messages at once
## [9.7.6] - 2020-06-15 ## [9.7.6] - 2020-06-15
(websocket) WebSocketServer is not a final class, so that users can extend it (fix #215) (websocket) WebSocketServer is not a final class, so that users can extend it (fix #215)

View File

@ -29,6 +29,7 @@ namespace ix
auto runtime = botConfig.runtime; auto runtime = botConfig.runtime;
auto maxEventsPerMinute = botConfig.maxEventsPerMinute; auto maxEventsPerMinute = botConfig.maxEventsPerMinute;
auto limitReceivedEvents = botConfig.limitReceivedEvents; auto limitReceivedEvents = botConfig.limitReceivedEvents;
auto batchSize = botConfig.batchSize;
ix::CobraConnection conn; ix::CobraConnection conn;
conn.configure(config); conn.configure(config);
@ -148,6 +149,7 @@ namespace ix
&receivedCountPerMinutes, &receivedCountPerMinutes,
maxEventsPerMinute, maxEventsPerMinute,
limitReceivedEvents, limitReceivedEvents,
batchSize,
&fatalCobraError, &fatalCobraError,
&sentCount](const CobraEventPtr& event) { &sentCount](const CobraEventPtr& event) {
if (event->type == ix::CobraEventType::Open) if (event->type == ix::CobraEventType::Open)
@ -169,7 +171,7 @@ namespace ix
CoreLogger::info("Subscribing to " + channel); CoreLogger::info("Subscribing to " + channel);
CoreLogger::info("Subscribing at position " + subscriptionPosition); CoreLogger::info("Subscribing at position " + subscriptionPosition);
CoreLogger::info("Subscribing with filter " + filter); CoreLogger::info("Subscribing with filter " + filter);
conn.subscribe(channel, filter, subscriptionPosition, conn.subscribe(channel, filter, subscriptionPosition, batchSize,
[&sentCount, &receivedCountPerMinutes, [&sentCount, &receivedCountPerMinutes,
maxEventsPerMinute, limitReceivedEvents, maxEventsPerMinute, limitReceivedEvents,
&throttled, &receivedCount, &throttled, &receivedCount,

View File

@ -27,5 +27,6 @@ namespace ix
int runtime = -1; int runtime = -1;
int maxEventsPerMinute = std::numeric_limits<int>::max(); int maxEventsPerMinute = std::numeric_limits<int>::max();
bool limitReceivedEvents = false; bool limitReceivedEvents = false;
int batchSize = 1;
}; };
} // namespace ix } // namespace ix

View File

@ -562,11 +562,13 @@ namespace ix
void CobraConnection::subscribe(const std::string& channel, void CobraConnection::subscribe(const std::string& channel,
const std::string& filter, const std::string& filter,
const std::string& position, const std::string& position,
int batchSize,
SubscriptionCallback cb) SubscriptionCallback cb)
{ {
// Create and send a subscribe pdu // Create and send a subscribe pdu
Json::Value body; Json::Value body;
body["channel"] = channel; body["channel"] = channel;
body["batch_size"] = batchSize;
if (!filter.empty()) if (!filter.empty())
{ {

View File

@ -88,6 +88,7 @@ namespace ix
void subscribe(const std::string& channel, void subscribe(const std::string& channel,
const std::string& filter = std::string(), const std::string& filter = std::string(),
const std::string& position = std::string(), const std::string& position = std::string(),
int batchSize = 1,
SubscriptionCallback cb = nullptr); SubscriptionCallback cb = nullptr);
/// Unsubscribe from a channel /// Unsubscribe from a channel

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.7.6" #define IX_WEBSOCKET_VERSION "9.7.7"

View File

@ -125,10 +125,12 @@ namespace
{ {
std::string filter; std::string filter;
std::string position("$"); std::string position("$");
int batchSize = 1;
_conn.subscribe(channel, _conn.subscribe(channel,
filter, filter,
position, position,
batchSize,
[this](const Json::Value& msg, const std::string& /*position*/) { [this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString()); spdlog::info("receive {}", msg.toStyledString());

View File

@ -76,10 +76,12 @@ namespace
log("Subscriber authenticated"); log("Subscriber authenticated");
std::string filter; std::string filter;
std::string position("$"); std::string position("$");
int batchSize = 1;
conn.subscribe(channel, conn.subscribe(channel,
filter, filter,
position, position,
batchSize,
[](const Json::Value& msg, const std::string& /*position*/) { [](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString()); log(msg.toStyledString());

View File

@ -192,6 +192,8 @@ int main(int argc, char** argv)
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute"); "--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
app->add_option( app->add_option(
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute"); "--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
app->add_option(
"--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
}; };
app.add_flag("--version", version, "Print ws version"); app.add_flag("--version", version, "Print ws version");