IXWebSocketTransport message processing refactoring

This commit is contained in:
Benjamin Sergeant 2019-09-03 15:48:55 -07:00
parent 57665ca825
commit 5d58982f77
2 changed files with 14 additions and 26 deletions

View File

@ -558,13 +558,16 @@ namespace ix
return;
}
unmaskReceiveBuffer(ws);
std::string frameData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N);
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
unmaskReceiveBuffer(ws);
if (ws.opcode != wsheader_type::CONTINUATION)
{
@ -593,8 +596,7 @@ namespace ix
if (ws.fin && _chunks.empty())
{
emitMessage(_fragmentedMessageKind,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
frameData,
ws,
onMessageCallback);
}
@ -607,9 +609,8 @@ namespace ix
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
_chunks.emplace_back(frameData);
if (ws.fin)
{
emitMessage(_fragmentedMessageKind, getMergedChunks(),
@ -624,13 +625,8 @@ namespace ix
}
else if (ws.opcode == wsheader_type::PING)
{
unmaskReceiveBuffer(ws);
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
// too large
if (pingData.size() > 125)
if (frameData.size() > 125)
{
// Unexpected frame type
close(WebSocketCloseConstants::kProtocolErrorCode,
@ -642,29 +638,23 @@ namespace ix
{
// Reply back right away
bool compress = false;
sendData(wsheader_type::PONG, pingData, compress);
sendData(wsheader_type::PONG, frameData, compress);
}
emitMessage(MessageKind::PING, pingData, ws, onMessageCallback);
emitMessage(MessageKind::PING, frameData, ws, onMessageCallback);
}
else if (ws.opcode == wsheader_type::PONG)
{
unmaskReceiveBuffer(ws);
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
emitMessage(MessageKind::PONG, pongData, ws, onMessageCallback);
emitMessage(MessageKind::PONG, frameData, ws, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)
{
std::string reason;
uint16_t code = 0;
unmaskReceiveBuffer(ws);
if (ws.N >= 2)
{
// Extract the close code first, available as the first 2 bytes
@ -674,8 +664,7 @@ namespace ix
// Get the reason.
if (ws.N > 2)
{
reason.assign(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
reason = frameData.substr(2, frameData.size());
}
// Validate that the reason is proper utf-8. Autobahn 7.5.1
@ -782,8 +771,7 @@ namespace ix
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
msg += chunk;
}
return msg;

View File

@ -149,7 +149,7 @@ namespace ix
// messages (tested messages up to 700M) and we cannot put them in a single
// buffer that is resized, as this operation can be slow when a buffer has its
// size increased 2 fold, while appending to a list has a fixed cost.
std::list<std::vector<uint8_t>> _chunks;
std::list<std::string> _chunks;
// Record the message kind (will be TEXT or BINARY) for a fragmented
// message, present in the first chunk, since the final chunk will be a