Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others)

This commit is contained in:
Benjamin Sergeant 2019-09-03 17:42:48 -07:00
parent ec55b4a82a
commit 98189c23dc
5 changed files with 24 additions and 10 deletions

View File

@ -1 +1 @@
5.1.7 5.1.8

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [5.1.8] - 2019-09-03
- Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others)
## [5.1.7] - 2019-09-03 ## [5.1.7] - 2019-09-03
- Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling) - Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)

View File

@ -77,6 +77,7 @@ namespace ix
WebSocketTransport::WebSocketTransport() : WebSocketTransport::WebSocketTransport() :
_useMask(true), _useMask(true),
_compressedMessage(false),
_readyState(ReadyState::CLOSED), _readyState(ReadyState::CLOSED),
_closeCode(WebSocketCloseConstants::kInternalErrorCode), _closeCode(WebSocketCloseConstants::kInternalErrorCode),
_closeReason(WebSocketCloseConstants::kInternalErrorMessage), _closeReason(WebSocketCloseConstants::kInternalErrorMessage),
@ -576,6 +577,8 @@ namespace ix
? MessageKind::MSG_TEXT ? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY; : MessageKind::MSG_BINARY;
_compressedMessage = _enablePerMessageDeflate && ws.rsv1;
// Continuation message needs to follow a non-fin TEXT or BINARY message // Continuation message needs to follow a non-fin TEXT or BINARY message
if (!_chunks.empty()) if (!_chunks.empty())
{ {
@ -597,8 +600,10 @@ namespace ix
{ {
emitMessage(_fragmentedMessageKind, emitMessage(_fragmentedMessageKind,
frameData, frameData,
ws, _compressedMessage,
onMessageCallback); onMessageCallback);
_compressedMessage = false;
} }
else else
{ {
@ -614,12 +619,14 @@ namespace ix
if (ws.fin) if (ws.fin)
{ {
emitMessage(_fragmentedMessageKind, getMergedChunks(), emitMessage(_fragmentedMessageKind, getMergedChunks(),
ws, onMessageCallback); _compressedMessage, onMessageCallback);
_chunks.clear(); _chunks.clear();
_compressedMessage = false;
} }
else else
{ {
emitMessage(MessageKind::FRAGMENT, std::string(), ws, onMessageCallback); emitMessage(MessageKind::FRAGMENT, std::string(), false, onMessageCallback);
} }
} }
} }
@ -641,14 +648,14 @@ namespace ix
sendData(wsheader_type::PONG, frameData, compress); sendData(wsheader_type::PONG, frameData, compress);
} }
emitMessage(MessageKind::PING, frameData, ws, onMessageCallback); emitMessage(MessageKind::PING, frameData, false, onMessageCallback);
} }
else if (ws.opcode == wsheader_type::PONG) else if (ws.opcode == wsheader_type::PONG)
{ {
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex); std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now(); _lastReceivePongTimePoint = std::chrono::steady_clock::now();
emitMessage(MessageKind::PONG, frameData, ws, onMessageCallback); emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
} }
else if (ws.opcode == wsheader_type::CLOSE) else if (ws.opcode == wsheader_type::CLOSE)
{ {
@ -779,13 +786,13 @@ namespace ix
void WebSocketTransport::emitMessage(MessageKind messageKind, void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message, const std::string& message,
const wsheader_type& ws, bool compressedMessage,
const OnMessageCallback& onMessageCallback) const OnMessageCallback& onMessageCallback)
{ {
size_t wireSize = message.size(); size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed // When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != MessageKind::FRAGMENT) if (compressedMessage && messageKind != MessageKind::FRAGMENT)
{ {
std::string decompressedMessage; std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage); bool success = _perMessageDeflate.decompress(message, decompressedMessage);

View File

@ -156,6 +156,9 @@ namespace ix
// CONTINUATION opcode and doesn't tell the full message kind // CONTINUATION opcode and doesn't tell the full message kind
MessageKind _fragmentedMessageKind; MessageKind _fragmentedMessageKind;
// Ditto for whether a message is compressed
bool _compressedMessage;
// Fragments are 32K long // Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15; static constexpr size_t kChunkSize = 1 << 15;
@ -244,7 +247,7 @@ namespace ix
void emitMessage(MessageKind messageKind, void emitMessage(MessageKind messageKind,
const std::string& message, const std::string& message,
const wsheader_type& ws, bool compressedMessage,
const OnMessageCallback& onMessageCallback); const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const; bool isSendBufferEmpty() const;

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "5.1.7" #define IX_WEBSOCKET_VERSION "5.1.8"