Feature/send large message (#14)

* introduce send fragment

* can pass a fin frame

* can send messages which are a perfect multiple of the chunk size

* set fin only for last fragment

* cleanup

* last fragment should be of type CONTINUATION

* Add simple send and receive programs

* speedups receiving + better way to wait for thing

* receive speedup by using linked list of chunks instead of large array

* document bug

* use chunks to receive data

* trailing spaces
This commit is contained in:
Benjamin Sergeant
2019-02-20 18:59:07 -08:00
committed by GitHub
parent 505dd6d50f
commit 3a77e96a05
72 changed files with 9117 additions and 260 deletions

View File

@ -29,12 +29,15 @@
#include <cstdarg>
#include <iostream>
#include <sstream>
#include <chrono>
#include <thread>
namespace ix
{
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED),
@ -45,7 +48,7 @@ namespace ix
_heartBeatPeriod(kDefaultHeartBeatPeriod),
_lastSendTimePoint(std::chrono::steady_clock::now())
{
_readbuf.resize(kChunkSize);
}
WebSocketTransport::~WebSocketTransport()
@ -129,7 +132,7 @@ namespace ix
return result;
}
WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const
WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const
{
return _readyState;
}
@ -153,7 +156,7 @@ namespace ix
void WebSocketTransport::setOnCloseCallback(const OnCloseCallback& onCloseCallback)
{
_onCloseCallback = onCloseCallback;
_onCloseCallback = onCloseCallback;
}
// Only consider send time points for that computation.
@ -173,7 +176,7 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
if (pollResult == PollResultType_Timeout &&
heartBeatPeriodExceeded())
{
std::stringstream ss;
@ -182,33 +185,31 @@ namespace ix
return;
}
while (true)
while (true)
{
int N = (int) _rxbuf.size();
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
_rxbuf.resize(N + 1500);
ssize_t ret = _socket->recv((char*)&_rxbuf[0] + N, 1500);
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN)) {
_rxbuf.resize(N);
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
else if (ret <= 0)
{
_rxbuf.resize(N);
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
else
{
_rxbuf.resize(N + ret);
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
}
}
if (isSendBufferEmpty() && _readyState == CLOSING)
if (isSendBufferEmpty() && _readyState == CLOSING)
{
_socket->close();
setReadyState(CLOSED);
@ -282,7 +283,7 @@ namespace ix
//
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback)
{
while (true)
while (true)
{
wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */
@ -294,7 +295,7 @@ namespace ix
ws.N0 = (data[1] & 0x7f);
ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0);
if (_rxbuf.size() < ws.header_size) return; /* Need: ws.header_size - _rxbuf.size() */
//
// Calculate payload length:
// 0-125 mean the payload is that long.
@ -332,7 +333,7 @@ namespace ix
// invalid payload length according to the spec. bail out
return;
}
if (ws.mask)
{
ws.masking_key[0] = ((uint8_t) data[i+0]) << 0;
@ -355,22 +356,40 @@ namespace ix
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
unmaskReceiveBuffer(ws);
_receivedData.insert(_receivedData.end(),
_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed
if (ws.fin)
{
// fire callback with a string message
std::string stringMessage(_receivedData.begin(),
_receivedData.end());
emitMessage(MSG, stringMessage, ws, onMessageCallback);
_receivedData.clear();
//
// Usual case. Small unfragmented messages
//
if (ws.fin && _chunks.empty())
{
emitMessage(MSG,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
onMessageCallback);
}
else
{
//
// Add intermediary message to our chunk list.
// We use a chunk list instead of a big buffer because resizing
// large buffer can be very costly when we need to re-allocate
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin)
{
emitMessage(MSG, getMergedChunks(), ws, onMessageCallback);
_chunks.clear();
}
}
}
else if (ws.opcode == wsheader_type::PING)
@ -420,12 +439,33 @@ namespace ix
close();
}
// Erase the message that has been processed from the input/read buffer
_rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N);
}
}
void WebSocketTransport::emitMessage(MessageKind messageKind,
std::string WebSocketTransport::getMergedChunks() const
{
size_t length = 0;
for (auto&& chunk : _chunks)
{
length += chunk.size();
}
std::string msg;
msg.reserve(length);
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
}
return msg;
}
void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
const OnMessageCallback& onMessageCallback)
@ -448,15 +488,17 @@ namespace ix
unsigned WebSocketTransport::getRandomUnsigned()
{
auto now = std::chrono::system_clock::now();
auto seconds =
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
return static_cast<unsigned>(seconds);
}
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::string& message,
bool compress)
WebSocketSendInfo WebSocketTransport::sendData(
wsheader_type::opcode_type type,
const std::string& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
if (_readyState == CLOSING || _readyState == CLOSED)
{
@ -473,15 +515,81 @@ namespace ix
if (compress)
{
bool success = _perMessageDeflate.compress(message, compressedMessage);
compressionError = !success;
if (!_perMessageDeflate.compress(message, compressedMessage))
{
bool success = false;
compressionError = true;
payloadSize = 0;
wireSize = 0;
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
}
compressionError = false;
wireSize = compressedMessage.size();
message_begin = compressedMessage.begin();
message_end = compressedMessage.end();
}
uint64_t message_size = wireSize;
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)
{
sendFragment(type, true, message_begin, message_end, compress);
}
else
{
//
// Large messages need to be fragmented
//
// Rules:
// First message needs to specify a proper type (BINARY or TEXT)
// Intermediary and last messages need to be of type CONTINUATION
// Last message must set the fin byte.
//
auto steps = wireSize / kChunkSize;
std::string::const_iterator begin = message_begin;
std::string::const_iterator end = message_end;
for (uint64_t i = 0 ; i < steps; ++i)
{
bool firstStep = i == 0;
bool lastStep = (i+1) == steps;
bool fin = lastStep;
end = begin + kChunkSize;
if (lastStep)
{
end = message_end;
}
auto opcodeType = type;
if (!firstStep)
{
opcodeType = wsheader_type::CONTINUATION;
}
// Send message
sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback(i, steps))
{
break;
}
begin += kChunkSize;
}
}
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
void WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator message_begin,
std::string::const_iterator message_end,
bool compress)
{
auto message_size = message_end - message_begin;
unsigned x = getRandomUnsigned();
uint8_t masking_key[4] = {};
@ -494,7 +602,13 @@ namespace ix
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0);
header[0] = 0x80 | type;
header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
if (fin)
{
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed
if (compress)
@ -511,7 +625,7 @@ namespace ix
header[4] = masking_key[2];
header[5] = masking_key[3];
}
else if (message_size < 65536)
else if (message_size < 65536)
{
header[1] = 126 | 0x80;
header[2] = (message_size >> 8) & 0xff;
@ -546,8 +660,6 @@ namespace ix
// Now actually send this data
sendOnSocket();
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message)
@ -556,9 +668,13 @@ namespace ix
return sendData(wsheader_type::PING, message, compress);
}
WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message)
WebSocketSendInfo WebSocketTransport::sendBinary(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::BINARY_FRAME, message, _enablePerMessageDeflate);
return sendData(wsheader_type::BINARY_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket()
@ -569,7 +685,7 @@ namespace ix
{
ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;