Compare commits

...

4 Commits

Author SHA1 Message Date
Benjamin Sergeant
7a4a84d6e0 project builds / gross hack to disable compression code path / ws connect -x works but test fails 2020-07-07 19:25:18 -07:00
Benjamin Sergeant
fbe7b0b020 WebSocketPerMessageDeflateCompressor can work with vector or std::string 2020-07-07 18:17:44 -07:00
Benjamin Sergeant
afd9ef7d6f more templates in WebSocketTransport 2020-07-07 11:07:01 -07:00
Benjamin Sergeant
f772e40ad8 WebSocketPerMessageDeflateCompressor 2020-07-07 10:59:59 -07:00
8 changed files with 126 additions and 31 deletions

View File

@ -429,6 +429,16 @@ namespace ix
return (binary) ? sendBinary(data, onProgressCallback) : sendText(data, onProgressCallback); return (binary) ? sendBinary(data, onProgressCallback) : sendText(data, onProgressCallback);
} }
WebSocketSendInfo WebSocket::sendBinary(const std::vector<uint8_t>& data,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
std::lock_guard<std::mutex> lock(_writeMutex);
auto webSocketSendInfo = _ws.sendBinary(data, onProgressCallback);
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
return webSocketSendInfo;
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text, WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {

View File

@ -74,8 +74,11 @@ namespace ix
WebSocketSendInfo send(const std::string& data, WebSocketSendInfo send(const std::string& data,
bool binary = false, bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text, WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::vector<uint8_t>& data,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text, WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);

View File

@ -59,14 +59,38 @@ namespace ix
return true; return true;
} }
bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value, template<typename T>
const std::string& ending) bool WebSocketPerMessageDeflateCompressor::endsWithEmptyUnCompressedBlock(const T& value)
{ {
if (ending.size() > value.size()) return false; if (kEmptyUncompressedBlock.size() > value.size()) return false;
return std::equal(ending.rbegin(), ending.rend(), value.rbegin()); auto N = value.size();
return value[N - 1] == kEmptyUncompressedBlock[3] &&
value[N - 2] == kEmptyUncompressedBlock[2] &&
value[N - 3] == kEmptyUncompressedBlock[1] &&
value[N - 4] == kEmptyUncompressedBlock[0];
} }
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out) bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::vector<uint8_t>& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, std::string& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out)
{
return compressData(in, out);
}
template<typename T, typename S> bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
{ {
// //
// 7.2.1. Compression // 7.2.1. Compression
@ -96,7 +120,8 @@ namespace ix
// The normal buffer size should be 6 but // The normal buffer size should be 6 but
// we remove the 4 octets from the tail (#4) // we remove the 4 octets from the tail (#4)
uint8_t buf[2] = {0x02, 0x00}; uint8_t buf[2] = {0x02, 0x00};
out.append((char*) (buf), 2); out.push_back(buf[0]);
out.push_back(buf[1]);
return true; return true;
} }
@ -114,10 +139,10 @@ namespace ix
output = _compressBufferSize - _deflateState.avail_out; output = _compressBufferSize - _deflateState.avail_out;
out.append((char*) (_compressBuffer.get()), output); out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
} while (_deflateState.avail_out == 0); } while (_deflateState.avail_out == 0);
if (endsWith(out, kEmptyUncompressedBlock)) if (endsWithEmptyUnCompressedBlock(out))
{ {
out.resize(out.size() - 4); out.resize(out.size() - 4);
} }

View File

@ -9,6 +9,7 @@
#include "zlib.h" #include "zlib.h"
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
namespace ix namespace ix
{ {
@ -20,9 +21,13 @@ namespace ix
bool init(uint8_t deflateBits, bool clientNoContextTakeOver); bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
bool compress(const std::string& in, std::string& out); bool compress(const std::string& in, std::string& out);
bool compress(const std::string& in, std::vector<uint8_t>& out);
bool compress(const std::vector<uint8_t>& in, std::string& out);
bool compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out);
private: private:
static bool endsWith(const std::string& value, const std::string& ending); template<typename T, typename S> bool compressData(const T& in, S& out);
template<typename T> bool endsWithEmptyUnCompressedBlock(const T& value);
int _flush; int _flush;
size_t _compressBufferSize; size_t _compressBufferSize;

View File

@ -326,9 +326,10 @@ namespace ix
return _txbuf.empty(); return _txbuf.empty();
} }
template<class Iterator>
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header, void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin, Iterator begin,
std::string::const_iterator end, Iterator end,
uint64_t message_size, uint64_t message_size,
uint8_t masking_key[4]) uint8_t masking_key[4])
{ {
@ -751,9 +752,27 @@ namespace ix
} }
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type, WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::string& message, const std::string& message,
bool compress, bool compress,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{
return sendRawData(type, message, compress, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::vector<uint8_t>& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
return sendRawData(type, message, compress, onProgressCallback);
}
template<class T>
WebSocketSendInfo WebSocketTransport::sendRawData(wsheader_type::opcode_type type,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{ {
if (_readyState != ReadyState::OPEN && _readyState != ReadyState::CLOSING) if (_readyState != ReadyState::OPEN && _readyState != ReadyState::CLOSING)
{ {
@ -764,12 +783,14 @@ namespace ix
size_t wireSize = message.size(); size_t wireSize = message.size();
bool compressionError = false; bool compressionError = false;
std::string::const_iterator message_begin = message.begin(); auto message_begin = message.begin();
std::string::const_iterator message_end = message.end(); auto message_end = message.end();
#if 0
if (compress) if (compress)
{ {
if (!_perMessageDeflate->compress(message, _compressedMessage)) T compressedMessage;
if (!_perMessageDeflate->compress(message, compressedMessage))
{ {
bool success = false; bool success = false;
compressionError = true; compressionError = true;
@ -778,11 +799,12 @@ namespace ix
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize); return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
} }
compressionError = false; compressionError = false;
wireSize = _compressedMessage.size(); wireSize = compressedMessage.size();
message_begin = _compressedMessage.begin(); message_begin = compressedMessage.begin();
message_end = _compressedMessage.end(); message_end = compressedMessage.end();
} }
#endif
{ {
std::lock_guard<std::mutex> lock(_txbufMutex); std::lock_guard<std::mutex> lock(_txbufMutex);
@ -808,8 +830,8 @@ namespace ix
// //
auto steps = wireSize / kChunkSize; auto steps = wireSize / kChunkSize;
std::string::const_iterator begin = message_begin; auto begin = message_begin;
std::string::const_iterator end = message_end; auto end = message_end;
for (uint64_t i = 0; i < steps; ++i) for (uint64_t i = 0; i < steps; ++i)
{ {
@ -859,10 +881,11 @@ namespace ix
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize); return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
} }
template<class Iterator>
bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type, bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin, bool fin,
std::string::const_iterator message_begin, Iterator message_begin,
std::string::const_iterator message_end, Iterator message_end,
bool compress) bool compress)
{ {
uint64_t message_size = static_cast<uint64_t>(message_end - message_begin); uint64_t message_size = static_cast<uint64_t>(message_end - message_begin);
@ -961,6 +984,14 @@ namespace ix
return info; return info;
} }
WebSocketSendInfo WebSocketTransport::sendBinary(const std::vector<uint8_t>& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(
wsheader_type::BINARY_FRAME, message, _enablePerMessageDeflate, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message, WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
@ -1055,7 +1086,7 @@ namespace ix
else else
{ {
// no close code/reason set // no close code/reason set
sendData(wsheader_type::CLOSE, "", compress); sendData(wsheader_type::CLOSE, std::string(""), compress);
} }
} }

View File

@ -86,6 +86,9 @@ namespace ix
WebSocketInitResult connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs); WebSocketInitResult connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs);
PollResult poll(); PollResult poll();
WebSocketSendInfo sendBinary(const std::vector<uint8_t>& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendBinary(const std::string& message, WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback); const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message, WebSocketSendInfo sendText(const std::string& message,
@ -190,7 +193,7 @@ namespace ix
std::atomic<bool> _enablePerMessageDeflate; std::atomic<bool> _enablePerMessageDeflate;
std::string _decompressedMessage; std::string _decompressedMessage;
std::string _compressedMessage; std::vector<uint8_t> _compressedMessage;
// Used to control TLS connection behavior // Used to control TLS connection behavior
SocketTLSOptions _socketTLSOptions; SocketTLSOptions _socketTLSOptions;
@ -244,10 +247,22 @@ namespace ix
bool compress, bool compress,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::vector<uint8_t>& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
template<class T>
WebSocketSendInfo sendRawData(wsheader_type::opcode_type type,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
template<class Iterator>
bool sendFragment(wsheader_type::opcode_type type, bool sendFragment(wsheader_type::opcode_type type,
bool fin, bool fin,
std::string::const_iterator begin, Iterator begin,
std::string::const_iterator end, Iterator end,
bool compress); bool compress);
void emitMessage(MessageKind messageKind, void emitMessage(MessageKind messageKind,
@ -256,9 +271,11 @@ namespace ix
const OnMessageCallback& onMessageCallback); const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const; bool isSendBufferEmpty() const;
template<class Iterator>
void appendToSendBuffer(const std::vector<uint8_t>& header, void appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin, Iterator begin,
std::string::const_iterator end, Iterator end,
uint64_t message_size, uint64_t message_size,
uint8_t masking_key[4]); uint8_t masking_key[4]);

View File

@ -100,6 +100,7 @@ namespace
} }
_webSocket.setUrl(url); _webSocket.setUrl(url);
_webSocket.disablePerMessageDeflate();
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
@ -188,7 +189,9 @@ namespace
void WebSocketChat::sendMessage(const std::string& text) void WebSocketChat::sendMessage(const std::string& text)
{ {
_webSocket.sendBinary(encodeMessage(text)); auto msg = encodeMessage(text);
std::vector<uint8_t> data(text.begin(), text.end());
_webSocket.sendBinary(data);
} }
bool startServer(ix::WebSocketServer& server) bool startServer(ix::WebSocketServer& server)

View File

@ -216,7 +216,8 @@ namespace ix
{ {
if (_binaryMode) if (_binaryMode)
{ {
_webSocket.sendBinary(text); std::vector<uint8_t> data(text.begin(), text.end());
_webSocket.sendBinary(data);
} }
else else
{ {