Compare commits

...

8 Commits

11 changed files with 160 additions and 70 deletions

View File

@ -1 +1 @@
5.1.6
5.2.0

View File

@ -10,4 +10,4 @@ Interested ? Go read the [docs](https://bsergean.github.io/IXWebSocket/site/) !
IXWebSocket is actively being developed, check out the [changelog](CHANGELOG.md) to know what's cooking. If you are looking for a real time messaging service (the chat-like 'server' your websocket code will talk to) with many features such as history, backed by Redis, look at [cobra](https://github.com/machinezone/cobra).
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html).
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html). The only tests that are still failing are the Websocket Compression ones (see section 12 and 13).

View File

@ -1,11 +1,30 @@
# Changelog
All notable changes to this project will be documented in this file.
## [5.2.0] - 2019-09-04
- Fragmentation: for sent messages which are compressed, the continuation fragments should not have the rsv1 bit set (fix all autobahn tests for zlib compression 12.X)
- Websocket Server / do a case insensitive string search when looking for an Upgrade header whose value is websocket. (some client use WebSocket with some upper-case characters)
## [5.1.9] - 2019-09-03
- ws autobahn / report progress with spdlog::info to get timing info
- ws autobahn / use condition variables for stopping test case + add more logging on errors
## [5.1.8] - 2019-09-03
- Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others)
## [5.1.7] - 2019-09-03
- Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
## [5.1.6] - 2019-09-03
- Sending invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
- Fix failing unittest which was sending binary data in text mode with WebSocket::send to call properly call WebSocket::sendBinary instead.
- Validate that the reason is proper utf-8. (fix autobahn test 7.5.1)
- Validate close codes. Autobahn 7.9.*
## [5.1.5] - 2019-09-03

View File

@ -27,4 +27,5 @@ namespace ix
const std::string WebSocketCloseConstants::kProtocolErrorCodeDataOpcodeOutOfSequence("Fragmentation: data message out of sequence");
const std::string WebSocketCloseConstants::kProtocolErrorCodeContinuationOpCodeOutOfSequence("Fragmentation: continuation opcode out of sequence");
const std::string WebSocketCloseConstants::kInvalidFramePayloadDataMessage("Invalid frame payload data");
const std::string WebSocketCloseConstants::kInvalidCloseCodeMessage("Invalid close code");
}

View File

@ -32,5 +32,6 @@ namespace ix
static const std::string kProtocolErrorCodeDataOpcodeOutOfSequence;
static const std::string kProtocolErrorCodeContinuationOpCodeOutOfSequence;
static const std::string kInvalidFramePayloadDataMessage;
static const std::string kInvalidCloseCodeMessage;
};
} // namespace ix

View File

@ -295,7 +295,7 @@ namespace ix
return sendErrorResponse(400, "Missing Sec-WebSocket-Key value");
}
if (headers["upgrade"] != "websocket")
if (!insensitiveStringCompare(headers["upgrade"], "WebSocket"))
{
return sendErrorResponse(400, "Invalid or missing Upgrade header");
}
@ -326,6 +326,7 @@ namespace ix
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";
ss << "Server: " << userAgent() << "\r\n";
// Parse the client headers. Does it support deflate ?
std::string header = headers["sec-websocket-extensions"];

View File

@ -77,6 +77,7 @@ namespace ix
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_compressedMessage(false),
_readyState(ReadyState::CLOSED),
_closeCode(WebSocketCloseConstants::kInternalErrorCode),
_closeReason(WebSocketCloseConstants::kInternalErrorMessage),
@ -558,13 +559,16 @@ namespace ix
return;
}
unmaskReceiveBuffer(ws);
std::string frameData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N);
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
unmaskReceiveBuffer(ws);
if (ws.opcode != wsheader_type::CONTINUATION)
{
@ -573,6 +577,8 @@ namespace ix
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
_compressedMessage = _enablePerMessageDeflate && ws.rsv1;
// Continuation message needs to follow a non-fin TEXT or BINARY message
if (!_chunks.empty())
{
@ -593,10 +599,11 @@ namespace ix
if (ws.fin && _chunks.empty())
{
emitMessage(_fragmentedMessageKind,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
frameData,
_compressedMessage,
onMessageCallback);
_compressedMessage = false;
}
else
{
@ -607,30 +614,26 @@ namespace ix
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
_chunks.emplace_back(frameData);
if (ws.fin)
{
emitMessage(_fragmentedMessageKind, getMergedChunks(),
ws, onMessageCallback);
_compressedMessage, onMessageCallback);
_chunks.clear();
_compressedMessage = false;
}
else
{
emitMessage(MessageKind::FRAGMENT, std::string(), ws, onMessageCallback);
emitMessage(MessageKind::FRAGMENT, std::string(), false, onMessageCallback);
}
}
}
else if (ws.opcode == wsheader_type::PING)
{
unmaskReceiveBuffer(ws);
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
// too large
if (pingData.size() > 125)
if (frameData.size() > 125)
{
// Unexpected frame type
close(WebSocketCloseConstants::kProtocolErrorCode,
@ -642,29 +645,23 @@ namespace ix
{
// Reply back right away
bool compress = false;
sendData(wsheader_type::PONG, pingData, compress);
sendData(wsheader_type::PONG, frameData, compress);
}
emitMessage(MessageKind::PING, pingData, ws, onMessageCallback);
emitMessage(MessageKind::PING, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::PONG)
{
unmaskReceiveBuffer(ws);
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
emitMessage(MessageKind::PONG, pongData, ws, onMessageCallback);
emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)
{
std::string reason;
uint16_t code = 0;
unmaskReceiveBuffer(ws);
if (ws.N >= 2)
{
// Extract the close code first, available as the first 2 bytes
@ -674,8 +671,7 @@ namespace ix
// Get the reason.
if (ws.N > 2)
{
reason.assign(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
reason = frameData.substr(2, frameData.size());
}
// Validate that the reason is proper utf-8. Autobahn 7.5.1
@ -684,6 +680,20 @@ namespace ix
code = WebSocketCloseConstants::kInvalidFramePayloadData;
reason = WebSocketCloseConstants::kInvalidFramePayloadDataMessage;
}
// Validate close codes. Autobahn 7.9.*
// 1014, 1015 are debattable. The firefox MSDN has a description for them
if (code < 1000 || code == 1004 || code == 1006 ||
(code > 1013 && code < 3000))
{
// build up an error message containing the bad error code
std::stringstream ss;
ss << WebSocketCloseConstants::kInvalidCloseCodeMessage
<< ": " << code;
reason = ss.str();
code = WebSocketCloseConstants::kProtocolErrorCode;
}
}
else
{
@ -768,8 +778,7 @@ namespace ix
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
msg += chunk;
}
return msg;
@ -777,21 +786,38 @@ namespace ix
void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback)
{
size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != MessageKind::FRAGMENT)
if (compressedMessage && messageKind != MessageKind::FRAGMENT)
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(decompressedMessage))
{
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
}
else
{
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
}
}
else
{
onMessageCallback(message, wireSize, false, messageKind);
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(message))
{
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
}
else
{
onMessageCallback(message, wireSize, false, messageKind);
}
}
}
@ -840,6 +866,8 @@ namespace ix
message_end = compressedMessage.end();
}
_txbuf.reserve(wireSize);
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)
{
@ -927,8 +955,9 @@ namespace ix
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed
if (compress)
// The rsv1 bit indicate that the frame is compressed
// continuation opcodes should not set it. Autobahn 12.2.10 and others 12.X
if (compress && type != wsheader_type::CONTINUATION)
{
header[0] |= 0x40;
}

View File

@ -149,13 +149,16 @@ namespace ix
// messages (tested messages up to 700M) and we cannot put them in a single
// buffer that is resized, as this operation can be slow when a buffer has its
// size increased 2 fold, while appending to a list has a fixed cost.
std::list<std::vector<uint8_t>> _chunks;
std::list<std::string> _chunks;
// Record the message kind (will be TEXT or BINARY) for a fragmented
// message, present in the first chunk, since the final chunk will be a
// CONTINUATION opcode and doesn't tell the full message kind
MessageKind _fragmentedMessageKind;
// Ditto for whether a message is compressed
bool _compressedMessage;
// Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15;
@ -244,7 +247,7 @@ namespace ix
void emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const;

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "5.1.6"
#define IX_WEBSOCKET_VERSION "5.2.0"

View File

@ -63,6 +63,9 @@ test:
ws_test: ws
(cd ws ; env DEBUG=1 PATH=../ws/build:$$PATH bash test_ws.sh)
autobahn_report:
cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/
# For the fork that is configured with appveyor
rebase_upstream:
git fetch upstream

View File

@ -42,6 +42,23 @@
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
#include <spdlog/spdlog.h>
namespace
{
std::string truncate(const std::string& str, size_t n)
{
if (str.size() < n)
{
return str;
}
else
{
return str.substr(0, n) + "...";
}
}
}
namespace ix
{
@ -117,7 +134,7 @@ namespace ix
ss << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "autobahn: received message: "
<< msg->str
<< truncate(msg->str, 40)
<< std::endl;
_webSocket.send(msg->str, msg->binary);
@ -161,7 +178,7 @@ namespace ix
_webSocket.stop();
}
void generateReport(const std::string& url)
bool generateReport(const std::string& url)
{
ix::WebSocket webSocket;
std::string reportUrl(url);
@ -169,14 +186,16 @@ namespace ix
webSocket.setUrl(reportUrl);
webSocket.disableAutomaticReconnection();
std::atomic<bool> done(false);
std::atomic<bool> success(true);
std::condition_variable condition;
webSocket.setOnMessageCallback(
[&done](const ix::WebSocketMessagePtr& msg)
[&condition, &success](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Report generated" << std::endl;
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -186,18 +205,24 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
success = false;
}
}
);
webSocket.start();
while (!done)
webSocket.start();
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (!success)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
spdlog::error("Cannot generate report at url {}", reportUrl);
}
webSocket.stop();
return success;
}
int getTestCaseCount(const std::string& url)
@ -208,15 +233,15 @@ namespace ix
webSocket.setUrl(caseCountUrl);
webSocket.disableAutomaticReconnection();
int count = 0;
int count = -1;
std::condition_variable condition;
std::atomic<bool> done(false);
webSocket.setOnMessageCallback(
[&done, &count](const ix::WebSocketMessagePtr& msg)
[&condition, &count](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -226,6 +251,8 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -236,16 +263,18 @@ namespace ix
}
}
);
webSocket.start();
while (!done)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (count == -1)
{
spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl);
}
return count;
}
@ -254,14 +283,20 @@ namespace ix
//
int ws_autobahn_main(const std::string& url, bool quiet)
{
int N = getTestCaseCount(url);
std::cerr << "Test cases count: " << N << std::endl;
int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl;
N++;
for (int i = 1 ; i < N; ++i)
if (testCasesCount == -1)
{
std::cerr << "Execute test case " << i << std::endl;
spdlog::error("Cannot retrieve test case count at url {}", url);
return 1;
}
testCasesCount++;
for (int i = 1 ; i < testCasesCount; ++i)
{
spdlog::info("Execute test case {}", i);
int caseNumber = i;
@ -277,9 +312,7 @@ namespace ix
testCase.run();
}
generateReport(url);
return 0;
return generateReport(url) ? 0 : 1;
}
}