(ws) ws connect has a -g option to gzip decompress messages for API such as the websocket Huobi Global.

This commit is contained in:
Benjamin Sergeant 2021-06-08 09:49:27 -07:00
parent 47fd04e210
commit 30bcddb99f
4 changed files with 36 additions and 11 deletions

View File

@ -2,6 +2,10 @@
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [11.2.9] - 2021-06-08
(ws) ws connect has a -g option to gzip decompress messages for API such as the websocket Huobi Global.
## [11.2.8] - 2021-06-03 ## [11.2.8] - 2021-06-03
(websocket client + server) WebSocketMessage class tweak to fix unsafe patterns (websocket client + server) WebSocketMessage class tweak to fix unsafe patterns

View File

@ -41,8 +41,7 @@ namespace ix
, _pingIntervalSecs(kDefaultPingIntervalSecs) , _pingIntervalSecs(kDefaultPingIntervalSecs)
{ {
_ws.setOnCloseCallback( _ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) {
{
_onMessageCallback( _onMessageCallback(
ix::make_unique<WebSocketMessage>(WebSocketMessageType::Close, ix::make_unique<WebSocketMessage>(WebSocketMessageType::Close,
emptyMsg, emptyMsg,
@ -385,8 +384,7 @@ namespace ix
[this](const std::string& msg, [this](const std::string& msg,
size_t wireSize, size_t wireSize,
bool decompressionError, bool decompressionError,
WebSocketTransport::MessageKind messageKind) WebSocketTransport::MessageKind messageKind) {
{
WebSocketMessageType webSocketMessageType; WebSocketMessageType webSocketMessageType;
switch (messageKind) switch (messageKind)
{ {

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "11.2.8" #define IX_WEBSOCKET_VERSION "11.2.9"

View File

@ -632,7 +632,8 @@ namespace ix
uint32_t maxWaitBetweenReconnectionRetries, uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions, const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol, const std::string& subprotocol,
int pingIntervalSecs); int pingIntervalSecs,
bool decompressGzipMessages);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -657,6 +658,7 @@ namespace ix
bool _binaryMode; bool _binaryMode;
std::atomic<int> _receivedBytes; std::atomic<int> _receivedBytes;
std::atomic<int> _sentBytes; std::atomic<int> _sentBytes;
bool _decompressGzipMessages;
void log(const std::string& msg); void log(const std::string& msg);
WebSocketHttpHeaders parseHeaders(const std::string& data); WebSocketHttpHeaders parseHeaders(const std::string& data);
@ -670,12 +672,14 @@ namespace ix
uint32_t maxWaitBetweenReconnectionRetries, uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions, const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol, const std::string& subprotocol,
int pingIntervalSecs) int pingIntervalSecs,
bool decompressGzipMessages)
: _url(url) : _url(url)
, _disablePerMessageDeflate(disablePerMessageDeflate) , _disablePerMessageDeflate(disablePerMessageDeflate)
, _binaryMode(binaryMode) , _binaryMode(binaryMode)
, _receivedBytes(0) , _receivedBytes(0)
, _sentBytes(0) , _sentBytes(0)
, _decompressGzipMessages(decompressGzipMessages)
{ {
if (disableAutomaticReconnection) if (disableAutomaticReconnection)
{ {
@ -784,7 +788,21 @@ namespace ix
{ {
spdlog::info("Received {} bytes", msg->wireSize); spdlog::info("Received {} bytes", msg->wireSize);
ss << "ws_connect: received message: " << msg->str; std::string payload = msg->str;
if (_decompressGzipMessages)
{
std::string decompressedBytes;
if (gzipDecompress(payload, decompressedBytes))
{
payload = decompressedBytes;
}
else
{
spdlog::error("Error decompressing: {}", payload);
}
}
ss << "ws_connect: received message: " << payload;
log(ss.str()); log(ss.str());
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
@ -837,7 +855,8 @@ namespace ix
uint32_t maxWaitBetweenReconnectionRetries, uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions, const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol, const std::string& subprotocol,
int pingIntervalSecs) int pingIntervalSecs,
bool decompressGzipMessages)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url, WebSocketConnect webSocketChat(url,
@ -848,7 +867,8 @@ namespace ix
maxWaitBetweenReconnectionRetries, maxWaitBetweenReconnectionRetries,
tlsOptions, tlsOptions,
subprotocol, subprotocol,
pingIntervalSecs); pingIntervalSecs,
decompressGzipMessages);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
@ -2490,6 +2510,7 @@ int main(int argc, char** argv)
uint32_t maxWaitBetweenReconnectionRetries = 10 * 1000; // 10 seconds uint32_t maxWaitBetweenReconnectionRetries = 10 * 1000; // 10 seconds
int pingIntervalSecs = 30; int pingIntervalSecs = 30;
int runCount = 1; int runCount = 1;
bool decompressGzipMessages = false;
auto addGenericOptions = [&pidfile](CLI::App* app) { auto addGenericOptions = [&pidfile](CLI::App* app) {
app->add_option("--pidfile", pidfile, "Pid file"); app->add_option("--pidfile", pidfile, "Pid file");
@ -2552,6 +2573,7 @@ int main(int argc, char** argv)
"Max Wait Time between reconnection retries"); "Max Wait Time between reconnection retries");
connectApp->add_option("--ping_interval", pingIntervalSecs, "Interval between sending pings"); connectApp->add_option("--ping_interval", pingIntervalSecs, "Interval between sending pings");
connectApp->add_option("--subprotocol", subprotocol, "Subprotocol"); connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
connectApp->add_flag("-g", decompressGzipMessages, "Decompress gziped messages");
addGenericOptions(connectApp); addGenericOptions(connectApp);
addTLSOptions(connectApp); addTLSOptions(connectApp);
@ -2740,7 +2762,8 @@ int main(int argc, char** argv)
maxWaitBetweenReconnectionRetries, maxWaitBetweenReconnectionRetries,
tlsOptions, tlsOptions,
subprotocol, subprotocol,
pingIntervalSecs); pingIntervalSecs,
decompressGzipMessages);
} }
else if (app.got_subcommand("autoroute")) else if (app.got_subcommand("autoroute"))
{ {