(cobra client and bots) add batch_size subscription option for retrieving multiple messages at once
This commit is contained in:
		@@ -1,6 +1,10 @@
 | 
			
		||||
# Changelog
 | 
			
		||||
All changes to this project will be documented in this file.
 | 
			
		||||
 | 
			
		||||
## [9.7.7] - 2020-06-17
 | 
			
		||||
 | 
			
		||||
(cobra client and bots) add batch_size subscription option for retrieving multiple messages at once
 | 
			
		||||
 | 
			
		||||
## [9.7.6] - 2020-06-15
 | 
			
		||||
 | 
			
		||||
(websocket) WebSocketServer is not a final class, so that users can extend it (fix #215)
 | 
			
		||||
 
 | 
			
		||||
@@ -29,6 +29,7 @@ namespace ix
 | 
			
		||||
        auto runtime = botConfig.runtime;
 | 
			
		||||
        auto maxEventsPerMinute = botConfig.maxEventsPerMinute;
 | 
			
		||||
        auto limitReceivedEvents = botConfig.limitReceivedEvents;
 | 
			
		||||
        auto batchSize = botConfig.batchSize;
 | 
			
		||||
 | 
			
		||||
        ix::CobraConnection conn;
 | 
			
		||||
        conn.configure(config);
 | 
			
		||||
@@ -148,6 +149,7 @@ namespace ix
 | 
			
		||||
                               &receivedCountPerMinutes,
 | 
			
		||||
                               maxEventsPerMinute,
 | 
			
		||||
                               limitReceivedEvents,
 | 
			
		||||
                               batchSize,
 | 
			
		||||
                               &fatalCobraError,
 | 
			
		||||
                               &sentCount](const CobraEventPtr& event) {
 | 
			
		||||
            if (event->type == ix::CobraEventType::Open)
 | 
			
		||||
@@ -169,7 +171,7 @@ namespace ix
 | 
			
		||||
                CoreLogger::info("Subscribing to " + channel);
 | 
			
		||||
                CoreLogger::info("Subscribing at position " + subscriptionPosition);
 | 
			
		||||
                CoreLogger::info("Subscribing with filter " + filter);
 | 
			
		||||
                conn.subscribe(channel, filter, subscriptionPosition,
 | 
			
		||||
                conn.subscribe(channel, filter, subscriptionPosition, batchSize,
 | 
			
		||||
                    [&sentCount, &receivedCountPerMinutes,
 | 
			
		||||
                     maxEventsPerMinute, limitReceivedEvents,
 | 
			
		||||
                     &throttled, &receivedCount,
 | 
			
		||||
 
 | 
			
		||||
@@ -27,5 +27,6 @@ namespace ix
 | 
			
		||||
        int runtime = -1;
 | 
			
		||||
        int maxEventsPerMinute = std::numeric_limits<int>::max();
 | 
			
		||||
        bool limitReceivedEvents = false;
 | 
			
		||||
        int batchSize = 1;
 | 
			
		||||
    };
 | 
			
		||||
} // namespace ix
 | 
			
		||||
 
 | 
			
		||||
@@ -562,11 +562,13 @@ namespace ix
 | 
			
		||||
    void CobraConnection::subscribe(const std::string& channel,
 | 
			
		||||
                                    const std::string& filter,
 | 
			
		||||
                                    const std::string& position,
 | 
			
		||||
                                    int batchSize,
 | 
			
		||||
                                    SubscriptionCallback cb)
 | 
			
		||||
    {
 | 
			
		||||
        // Create and send a subscribe pdu
 | 
			
		||||
        Json::Value body;
 | 
			
		||||
        body["channel"] = channel;
 | 
			
		||||
        body["batch_size"] = batchSize;
 | 
			
		||||
 | 
			
		||||
        if (!filter.empty())
 | 
			
		||||
        {
 | 
			
		||||
 
 | 
			
		||||
@@ -88,6 +88,7 @@ namespace ix
 | 
			
		||||
        void subscribe(const std::string& channel,
 | 
			
		||||
                       const std::string& filter = std::string(),
 | 
			
		||||
                       const std::string& position = std::string(),
 | 
			
		||||
                       int batchSize = 1,
 | 
			
		||||
                       SubscriptionCallback cb = nullptr);
 | 
			
		||||
 | 
			
		||||
        /// Unsubscribe from a channel
 | 
			
		||||
 
 | 
			
		||||
@@ -6,4 +6,4 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.7.6"
 | 
			
		||||
#define IX_WEBSOCKET_VERSION "9.7.7"
 | 
			
		||||
 
 | 
			
		||||
@@ -125,10 +125,12 @@ namespace
 | 
			
		||||
    {
 | 
			
		||||
        std::string filter;
 | 
			
		||||
        std::string position("$");
 | 
			
		||||
        int batchSize = 1;
 | 
			
		||||
 | 
			
		||||
        _conn.subscribe(channel,
 | 
			
		||||
                        filter,
 | 
			
		||||
                        position,
 | 
			
		||||
                        batchSize,
 | 
			
		||||
                        [this](const Json::Value& msg, const std::string& /*position*/) {
 | 
			
		||||
                            spdlog::info("receive {}", msg.toStyledString());
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -76,10 +76,12 @@ namespace
 | 
			
		||||
                log("Subscriber authenticated");
 | 
			
		||||
                std::string filter;
 | 
			
		||||
                std::string position("$");
 | 
			
		||||
                int batchSize = 1;
 | 
			
		||||
 | 
			
		||||
                conn.subscribe(channel,
 | 
			
		||||
                               filter,
 | 
			
		||||
                               position,
 | 
			
		||||
                               batchSize,
 | 
			
		||||
                               [](const Json::Value& msg, const std::string& /*position*/) {
 | 
			
		||||
                                   log(msg.toStyledString());
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -192,6 +192,8 @@ int main(int argc, char** argv)
 | 
			
		||||
            "--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
 | 
			
		||||
        app->add_option(
 | 
			
		||||
            "--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
 | 
			
		||||
        app->add_option(
 | 
			
		||||
            "--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    app.add_flag("--version", version, "Print ws version");
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user