Compare commits

...

4 Commits

Author SHA1 Message Date
Benjamin Sergeant
7a4a84d6e0 project builds / gross hack to disable compression code path / ws connect -x works but test fails 2020-07-07 19:25:18 -07:00
Benjamin Sergeant
fbe7b0b020 WebSocketPerMessageDeflateCompressor can work with vector or std::string 2020-07-07 18:17:44 -07:00
Benjamin Sergeant
afd9ef7d6f more templates in WebSocketTransport 2020-07-07 11:07:01 -07:00
Benjamin Sergeant
f772e40ad8 WebSocketPerMessageDeflateCompressor 2020-07-07 10:59:59 -07:00
8 changed files with 126 additions and 31 deletions

View File

@ -429,6 +429,16 @@ namespace ix
return (binary) ? sendBinary(data, onProgressCallback) : sendText(data, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendBinary(const std::vector<uint8_t>& data,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
std::lock_guard<std::mutex> lock(_writeMutex);
auto webSocketSendInfo = _ws.sendBinary(data, onProgressCallback);
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
return webSocketSendInfo;
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{

View File

@ -74,8 +74,11 @@ namespace ix
WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::vector<uint8_t>& data,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);

View File

@ -59,14 +59,38 @@ namespace ix
return true;
}
bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value,
const std::string& ending)
template<typename T>
bool WebSocketPerMessageDeflateCompressor::endsWithEmptyUnCompressedBlock(const T& value)
{
if (ending.size() > value.size()) return false;
return std::equal(ending.rbegin(), ending.rend(), value.rbegin());
if (kEmptyUncompressedBlock.size() > value.size()) return false;
auto N = value.size();
return value[N - 1] == kEmptyUncompressedBlock[3] &&
value[N - 2] == kEmptyUncompressedBlock[2] &&
value[N - 3] == kEmptyUncompressedBlock[1] &&
value[N - 4] == kEmptyUncompressedBlock[0];
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::vector<uint8_t>& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, std::string& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out)
{
return compressData(in, out);
}
template<typename T, typename S> bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
{
//
// 7.2.1. Compression
@ -96,7 +120,8 @@ namespace ix
// The normal buffer size should be 6 but
// we remove the 4 octets from the tail (#4)
uint8_t buf[2] = {0x02, 0x00};
out.append((char*) (buf), 2);
out.push_back(buf[0]);
out.push_back(buf[1]);
return true;
}
@ -114,10 +139,10 @@ namespace ix
output = _compressBufferSize - _deflateState.avail_out;
out.append((char*) (_compressBuffer.get()), output);
out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
} while (_deflateState.avail_out == 0);
if (endsWith(out, kEmptyUncompressedBlock))
if (endsWithEmptyUnCompressedBlock(out))
{
out.resize(out.size() - 4);
}

View File

@ -9,6 +9,7 @@
#include "zlib.h"
#include <memory>
#include <string>
#include <vector>
namespace ix
{
@ -20,9 +21,13 @@ namespace ix
bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
bool compress(const std::string& in, std::string& out);
bool compress(const std::string& in, std::vector<uint8_t>& out);
bool compress(const std::vector<uint8_t>& in, std::string& out);
bool compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out);
private:
static bool endsWith(const std::string& value, const std::string& ending);
template<typename T, typename S> bool compressData(const T& in, S& out);
template<typename T> bool endsWithEmptyUnCompressedBlock(const T& value);
int _flush;
size_t _compressBufferSize;

View File

@ -326,9 +326,10 @@ namespace ix
return _txbuf.empty();
}
template<class Iterator>
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin,
std::string::const_iterator end,
Iterator begin,
Iterator end,
uint64_t message_size,
uint8_t masking_key[4])
{
@ -754,6 +755,24 @@ namespace ix
const std::string& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
return sendRawData(type, message, compress, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::vector<uint8_t>& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
return sendRawData(type, message, compress, onProgressCallback);
}
template<class T>
WebSocketSendInfo WebSocketTransport::sendRawData(wsheader_type::opcode_type type,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
if (_readyState != ReadyState::OPEN && _readyState != ReadyState::CLOSING)
{
@ -764,12 +783,14 @@ namespace ix
size_t wireSize = message.size();
bool compressionError = false;
std::string::const_iterator message_begin = message.begin();
std::string::const_iterator message_end = message.end();
auto message_begin = message.begin();
auto message_end = message.end();
#if 0
if (compress)
{
if (!_perMessageDeflate->compress(message, _compressedMessage))
T compressedMessage;
if (!_perMessageDeflate->compress(message, compressedMessage))
{
bool success = false;
compressionError = true;
@ -778,11 +799,12 @@ namespace ix
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
}
compressionError = false;
wireSize = _compressedMessage.size();
wireSize = compressedMessage.size();
message_begin = _compressedMessage.begin();
message_end = _compressedMessage.end();
message_begin = compressedMessage.begin();
message_end = compressedMessage.end();
}
#endif
{
std::lock_guard<std::mutex> lock(_txbufMutex);
@ -808,8 +830,8 @@ namespace ix
//
auto steps = wireSize / kChunkSize;
std::string::const_iterator begin = message_begin;
std::string::const_iterator end = message_end;
auto begin = message_begin;
auto end = message_end;
for (uint64_t i = 0; i < steps; ++i)
{
@ -859,10 +881,11 @@ namespace ix
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
}
template<class Iterator>
bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator message_begin,
std::string::const_iterator message_end,
Iterator message_begin,
Iterator message_end,
bool compress)
{
uint64_t message_size = static_cast<uint64_t>(message_end - message_begin);
@ -961,6 +984,14 @@ namespace ix
return info;
}
WebSocketSendInfo WebSocketTransport::sendBinary(const std::vector<uint8_t>& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(
wsheader_type::BINARY_FRAME, message, _enablePerMessageDeflate, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback)
@ -1055,7 +1086,7 @@ namespace ix
else
{
// no close code/reason set
sendData(wsheader_type::CLOSE, "", compress);
sendData(wsheader_type::CLOSE, std::string(""), compress);
}
}

View File

@ -86,6 +86,9 @@ namespace ix
WebSocketInitResult connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs);
PollResult poll();
WebSocketSendInfo sendBinary(const std::vector<uint8_t>& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message,
@ -190,7 +193,7 @@ namespace ix
std::atomic<bool> _enablePerMessageDeflate;
std::string _decompressedMessage;
std::string _compressedMessage;
std::vector<uint8_t> _compressedMessage;
// Used to control TLS connection behavior
SocketTLSOptions _socketTLSOptions;
@ -244,10 +247,22 @@ namespace ix
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::vector<uint8_t>& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
template<class T>
WebSocketSendInfo sendRawData(wsheader_type::opcode_type type,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
template<class Iterator>
bool sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator begin,
std::string::const_iterator end,
Iterator begin,
Iterator end,
bool compress);
void emitMessage(MessageKind messageKind,
@ -256,9 +271,11 @@ namespace ix
const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const;
template<class Iterator>
void appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin,
std::string::const_iterator end,
Iterator begin,
Iterator end,
uint64_t message_size,
uint8_t masking_key[4]);

View File

@ -100,6 +100,7 @@ namespace
}
_webSocket.setUrl(url);
_webSocket.disablePerMessageDeflate();
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
@ -188,7 +189,9 @@ namespace
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.sendBinary(encodeMessage(text));
auto msg = encodeMessage(text);
std::vector<uint8_t> data(text.begin(), text.end());
_webSocket.sendBinary(data);
}
bool startServer(ix::WebSocketServer& server)

View File

@ -216,7 +216,8 @@ namespace ix
{
if (_binaryMode)
{
_webSocket.sendBinary(text);
std::vector<uint8_t> data(text.begin(), text.end());
_webSocket.sendBinary(data);
}
else
{