Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
cae23c764f | |||
f25b2af6eb | |||
508d372df1 | |||
12c3275c36 | |||
98189c23dc | |||
ec55b4a82a | |||
5d58982f77 | |||
57665ca825 |
@ -1 +1 @@
|
||||
5.1.6
|
||||
5.2.0
|
||||
|
@ -10,4 +10,4 @@ Interested ? Go read the [docs](https://bsergean.github.io/IXWebSocket/site/) !
|
||||
|
||||
IXWebSocket is actively being developed, check out the [changelog](CHANGELOG.md) to know what's cooking. If you are looking for a real time messaging service (the chat-like 'server' your websocket code will talk to) with many features such as history, backed by Redis, look at [cobra](https://github.com/machinezone/cobra).
|
||||
|
||||
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html).
|
||||
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html). The only tests that are still failing are the Websocket Compression ones (see section 12 and 13).
|
||||
|
@ -1,11 +1,30 @@
|
||||
# Changelog
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
## [5.2.0] - 2019-09-04
|
||||
|
||||
- Fragmentation: for sent messages which are compressed, the continuation fragments should not have the rsv1 bit set (fix all autobahn tests for zlib compression 12.X)
|
||||
- Websocket Server / do a case insensitive string search when looking for an Upgrade header whose value is websocket. (some client use WebSocket with some upper-case characters)
|
||||
|
||||
## [5.1.9] - 2019-09-03
|
||||
|
||||
- ws autobahn / report progress with spdlog::info to get timing info
|
||||
- ws autobahn / use condition variables for stopping test case + add more logging on errors
|
||||
|
||||
## [5.1.8] - 2019-09-03
|
||||
|
||||
- Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others)
|
||||
|
||||
## [5.1.7] - 2019-09-03
|
||||
|
||||
- Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
|
||||
|
||||
## [5.1.6] - 2019-09-03
|
||||
|
||||
- Sending invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
|
||||
- Fix failing unittest which was sending binary data in text mode with WebSocket::send to call properly call WebSocket::sendBinary instead.
|
||||
- Validate that the reason is proper utf-8. (fix autobahn test 7.5.1)
|
||||
- Validate close codes. Autobahn 7.9.*
|
||||
|
||||
## [5.1.5] - 2019-09-03
|
||||
|
||||
|
@ -27,4 +27,5 @@ namespace ix
|
||||
const std::string WebSocketCloseConstants::kProtocolErrorCodeDataOpcodeOutOfSequence("Fragmentation: data message out of sequence");
|
||||
const std::string WebSocketCloseConstants::kProtocolErrorCodeContinuationOpCodeOutOfSequence("Fragmentation: continuation opcode out of sequence");
|
||||
const std::string WebSocketCloseConstants::kInvalidFramePayloadDataMessage("Invalid frame payload data");
|
||||
const std::string WebSocketCloseConstants::kInvalidCloseCodeMessage("Invalid close code");
|
||||
}
|
||||
|
@ -32,5 +32,6 @@ namespace ix
|
||||
static const std::string kProtocolErrorCodeDataOpcodeOutOfSequence;
|
||||
static const std::string kProtocolErrorCodeContinuationOpCodeOutOfSequence;
|
||||
static const std::string kInvalidFramePayloadDataMessage;
|
||||
static const std::string kInvalidCloseCodeMessage;
|
||||
};
|
||||
} // namespace ix
|
||||
|
@ -295,7 +295,7 @@ namespace ix
|
||||
return sendErrorResponse(400, "Missing Sec-WebSocket-Key value");
|
||||
}
|
||||
|
||||
if (headers["upgrade"] != "websocket")
|
||||
if (!insensitiveStringCompare(headers["upgrade"], "WebSocket"))
|
||||
{
|
||||
return sendErrorResponse(400, "Invalid or missing Upgrade header");
|
||||
}
|
||||
@ -326,6 +326,7 @@ namespace ix
|
||||
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
|
||||
ss << "Upgrade: websocket\r\n";
|
||||
ss << "Connection: Upgrade\r\n";
|
||||
ss << "Server: " << userAgent() << "\r\n";
|
||||
|
||||
// Parse the client headers. Does it support deflate ?
|
||||
std::string header = headers["sec-websocket-extensions"];
|
||||
|
@ -77,6 +77,7 @@ namespace ix
|
||||
|
||||
WebSocketTransport::WebSocketTransport() :
|
||||
_useMask(true),
|
||||
_compressedMessage(false),
|
||||
_readyState(ReadyState::CLOSED),
|
||||
_closeCode(WebSocketCloseConstants::kInternalErrorCode),
|
||||
_closeReason(WebSocketCloseConstants::kInternalErrorMessage),
|
||||
@ -558,13 +559,16 @@ namespace ix
|
||||
return;
|
||||
}
|
||||
|
||||
unmaskReceiveBuffer(ws);
|
||||
std::string frameData(_rxbuf.begin()+ws.header_size,
|
||||
_rxbuf.begin()+ws.header_size+(size_t) ws.N);
|
||||
|
||||
// We got a whole message, now do something with it:
|
||||
if (
|
||||
ws.opcode == wsheader_type::TEXT_FRAME
|
||||
|| ws.opcode == wsheader_type::BINARY_FRAME
|
||||
|| ws.opcode == wsheader_type::CONTINUATION
|
||||
) {
|
||||
unmaskReceiveBuffer(ws);
|
||||
|
||||
if (ws.opcode != wsheader_type::CONTINUATION)
|
||||
{
|
||||
@ -573,6 +577,8 @@ namespace ix
|
||||
? MessageKind::MSG_TEXT
|
||||
: MessageKind::MSG_BINARY;
|
||||
|
||||
_compressedMessage = _enablePerMessageDeflate && ws.rsv1;
|
||||
|
||||
// Continuation message needs to follow a non-fin TEXT or BINARY message
|
||||
if (!_chunks.empty())
|
||||
{
|
||||
@ -593,10 +599,11 @@ namespace ix
|
||||
if (ws.fin && _chunks.empty())
|
||||
{
|
||||
emitMessage(_fragmentedMessageKind,
|
||||
std::string(_rxbuf.begin()+ws.header_size,
|
||||
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
|
||||
ws,
|
||||
frameData,
|
||||
_compressedMessage,
|
||||
onMessageCallback);
|
||||
|
||||
_compressedMessage = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -607,30 +614,26 @@ namespace ix
|
||||
// the internal buffer which is slow and can let the internal OS
|
||||
// receive buffer fill out.
|
||||
//
|
||||
_chunks.emplace_back(
|
||||
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
|
||||
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
|
||||
_chunks.emplace_back(frameData);
|
||||
|
||||
if (ws.fin)
|
||||
{
|
||||
emitMessage(_fragmentedMessageKind, getMergedChunks(),
|
||||
ws, onMessageCallback);
|
||||
_compressedMessage, onMessageCallback);
|
||||
|
||||
_chunks.clear();
|
||||
_compressedMessage = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
emitMessage(MessageKind::FRAGMENT, std::string(), ws, onMessageCallback);
|
||||
emitMessage(MessageKind::FRAGMENT, std::string(), false, onMessageCallback);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (ws.opcode == wsheader_type::PING)
|
||||
{
|
||||
unmaskReceiveBuffer(ws);
|
||||
|
||||
std::string pingData(_rxbuf.begin()+ws.header_size,
|
||||
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
|
||||
|
||||
// too large
|
||||
if (pingData.size() > 125)
|
||||
if (frameData.size() > 125)
|
||||
{
|
||||
// Unexpected frame type
|
||||
close(WebSocketCloseConstants::kProtocolErrorCode,
|
||||
@ -642,29 +645,23 @@ namespace ix
|
||||
{
|
||||
// Reply back right away
|
||||
bool compress = false;
|
||||
sendData(wsheader_type::PONG, pingData, compress);
|
||||
sendData(wsheader_type::PONG, frameData, compress);
|
||||
}
|
||||
|
||||
emitMessage(MessageKind::PING, pingData, ws, onMessageCallback);
|
||||
emitMessage(MessageKind::PING, frameData, false, onMessageCallback);
|
||||
}
|
||||
else if (ws.opcode == wsheader_type::PONG)
|
||||
{
|
||||
unmaskReceiveBuffer(ws);
|
||||
std::string pongData(_rxbuf.begin()+ws.header_size,
|
||||
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
|
||||
|
||||
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
|
||||
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
|
||||
|
||||
emitMessage(MessageKind::PONG, pongData, ws, onMessageCallback);
|
||||
emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
|
||||
}
|
||||
else if (ws.opcode == wsheader_type::CLOSE)
|
||||
{
|
||||
std::string reason;
|
||||
uint16_t code = 0;
|
||||
|
||||
unmaskReceiveBuffer(ws);
|
||||
|
||||
if (ws.N >= 2)
|
||||
{
|
||||
// Extract the close code first, available as the first 2 bytes
|
||||
@ -674,8 +671,7 @@ namespace ix
|
||||
// Get the reason.
|
||||
if (ws.N > 2)
|
||||
{
|
||||
reason.assign(_rxbuf.begin()+ws.header_size + 2,
|
||||
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
|
||||
reason = frameData.substr(2, frameData.size());
|
||||
}
|
||||
|
||||
// Validate that the reason is proper utf-8. Autobahn 7.5.1
|
||||
@ -684,6 +680,20 @@ namespace ix
|
||||
code = WebSocketCloseConstants::kInvalidFramePayloadData;
|
||||
reason = WebSocketCloseConstants::kInvalidFramePayloadDataMessage;
|
||||
}
|
||||
|
||||
// Validate close codes. Autobahn 7.9.*
|
||||
// 1014, 1015 are debattable. The firefox MSDN has a description for them
|
||||
if (code < 1000 || code == 1004 || code == 1006 ||
|
||||
(code > 1013 && code < 3000))
|
||||
{
|
||||
// build up an error message containing the bad error code
|
||||
std::stringstream ss;
|
||||
ss << WebSocketCloseConstants::kInvalidCloseCodeMessage
|
||||
<< ": " << code;
|
||||
reason = ss.str();
|
||||
|
||||
code = WebSocketCloseConstants::kProtocolErrorCode;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -768,8 +778,7 @@ namespace ix
|
||||
|
||||
for (auto&& chunk : _chunks)
|
||||
{
|
||||
std::string str(chunk.begin(), chunk.end());
|
||||
msg += str;
|
||||
msg += chunk;
|
||||
}
|
||||
|
||||
return msg;
|
||||
@ -777,21 +786,38 @@ namespace ix
|
||||
|
||||
void WebSocketTransport::emitMessage(MessageKind messageKind,
|
||||
const std::string& message,
|
||||
const wsheader_type& ws,
|
||||
bool compressedMessage,
|
||||
const OnMessageCallback& onMessageCallback)
|
||||
{
|
||||
size_t wireSize = message.size();
|
||||
|
||||
// When the RSV1 bit is 1 it means the message is compressed
|
||||
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != MessageKind::FRAGMENT)
|
||||
if (compressedMessage && messageKind != MessageKind::FRAGMENT)
|
||||
{
|
||||
std::string decompressedMessage;
|
||||
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
|
||||
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
|
||||
|
||||
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(decompressedMessage))
|
||||
{
|
||||
close(WebSocketCloseConstants::kInvalidFramePayloadData,
|
||||
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
|
||||
}
|
||||
else
|
||||
{
|
||||
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
onMessageCallback(message, wireSize, false, messageKind);
|
||||
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(message))
|
||||
{
|
||||
close(WebSocketCloseConstants::kInvalidFramePayloadData,
|
||||
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
|
||||
}
|
||||
else
|
||||
{
|
||||
onMessageCallback(message, wireSize, false, messageKind);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -840,6 +866,8 @@ namespace ix
|
||||
message_end = compressedMessage.end();
|
||||
}
|
||||
|
||||
_txbuf.reserve(wireSize);
|
||||
|
||||
// Common case for most message. No fragmentation required.
|
||||
if (wireSize < kChunkSize)
|
||||
{
|
||||
@ -927,8 +955,9 @@ namespace ix
|
||||
header[0] |= 0x80;
|
||||
}
|
||||
|
||||
// This bit indicate that the frame is compressed
|
||||
if (compress)
|
||||
// The rsv1 bit indicate that the frame is compressed
|
||||
// continuation opcodes should not set it. Autobahn 12.2.10 and others 12.X
|
||||
if (compress && type != wsheader_type::CONTINUATION)
|
||||
{
|
||||
header[0] |= 0x40;
|
||||
}
|
||||
|
@ -149,13 +149,16 @@ namespace ix
|
||||
// messages (tested messages up to 700M) and we cannot put them in a single
|
||||
// buffer that is resized, as this operation can be slow when a buffer has its
|
||||
// size increased 2 fold, while appending to a list has a fixed cost.
|
||||
std::list<std::vector<uint8_t>> _chunks;
|
||||
std::list<std::string> _chunks;
|
||||
|
||||
// Record the message kind (will be TEXT or BINARY) for a fragmented
|
||||
// message, present in the first chunk, since the final chunk will be a
|
||||
// CONTINUATION opcode and doesn't tell the full message kind
|
||||
MessageKind _fragmentedMessageKind;
|
||||
|
||||
// Ditto for whether a message is compressed
|
||||
bool _compressedMessage;
|
||||
|
||||
// Fragments are 32K long
|
||||
static constexpr size_t kChunkSize = 1 << 15;
|
||||
|
||||
@ -244,7 +247,7 @@ namespace ix
|
||||
|
||||
void emitMessage(MessageKind messageKind,
|
||||
const std::string& message,
|
||||
const wsheader_type& ws,
|
||||
bool compressedMessage,
|
||||
const OnMessageCallback& onMessageCallback);
|
||||
|
||||
bool isSendBufferEmpty() const;
|
||||
|
@ -6,4 +6,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define IX_WEBSOCKET_VERSION "5.1.6"
|
||||
#define IX_WEBSOCKET_VERSION "5.2.0"
|
||||
|
3
makefile
3
makefile
@ -63,6 +63,9 @@ test:
|
||||
ws_test: ws
|
||||
(cd ws ; env DEBUG=1 PATH=../ws/build:$$PATH bash test_ws.sh)
|
||||
|
||||
autobahn_report:
|
||||
cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/
|
||||
|
||||
# For the fork that is configured with appveyor
|
||||
rebase_upstream:
|
||||
git fetch upstream
|
||||
|
@ -42,6 +42,23 @@
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
std::string truncate(const std::string& str, size_t n)
|
||||
{
|
||||
if (str.size() < n)
|
||||
{
|
||||
return str;
|
||||
}
|
||||
else
|
||||
{
|
||||
return str.substr(0, n) + "...";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace ix
|
||||
{
|
||||
@ -117,7 +134,7 @@ namespace ix
|
||||
ss << "Received " << msg->wireSize << " bytes" << std::endl;
|
||||
|
||||
ss << "autobahn: received message: "
|
||||
<< msg->str
|
||||
<< truncate(msg->str, 40)
|
||||
<< std::endl;
|
||||
|
||||
_webSocket.send(msg->str, msg->binary);
|
||||
@ -161,7 +178,7 @@ namespace ix
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
void generateReport(const std::string& url)
|
||||
bool generateReport(const std::string& url)
|
||||
{
|
||||
ix::WebSocket webSocket;
|
||||
std::string reportUrl(url);
|
||||
@ -169,14 +186,16 @@ namespace ix
|
||||
webSocket.setUrl(reportUrl);
|
||||
webSocket.disableAutomaticReconnection();
|
||||
|
||||
std::atomic<bool> done(false);
|
||||
std::atomic<bool> success(true);
|
||||
std::condition_variable condition;
|
||||
|
||||
webSocket.setOnMessageCallback(
|
||||
[&done](const ix::WebSocketMessagePtr& msg)
|
||||
[&condition, &success](const ix::WebSocketMessagePtr& msg)
|
||||
{
|
||||
if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
std::cerr << "Report generated" << std::endl;
|
||||
done = true;
|
||||
condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
@ -186,18 +205,24 @@ namespace ix
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
std::cerr << ss.str() << std::endl;
|
||||
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
);
|
||||
webSocket.start();
|
||||
|
||||
while (!done)
|
||||
webSocket.start();
|
||||
std::mutex mutex;
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
condition.wait(lock);
|
||||
webSocket.stop();
|
||||
|
||||
if (!success)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
spdlog::error("Cannot generate report at url {}", reportUrl);
|
||||
}
|
||||
|
||||
webSocket.stop();
|
||||
return success;
|
||||
}
|
||||
|
||||
int getTestCaseCount(const std::string& url)
|
||||
@ -208,15 +233,15 @@ namespace ix
|
||||
webSocket.setUrl(caseCountUrl);
|
||||
webSocket.disableAutomaticReconnection();
|
||||
|
||||
int count = 0;
|
||||
int count = -1;
|
||||
std::condition_variable condition;
|
||||
|
||||
std::atomic<bool> done(false);
|
||||
webSocket.setOnMessageCallback(
|
||||
[&done, &count](const ix::WebSocketMessagePtr& msg)
|
||||
[&condition, &count](const ix::WebSocketMessagePtr& msg)
|
||||
{
|
||||
if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
done = true;
|
||||
condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
@ -226,6 +251,8 @@ namespace ix
|
||||
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
|
||||
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
|
||||
std::cerr << ss.str() << std::endl;
|
||||
|
||||
condition.notify_one();
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
@ -236,16 +263,18 @@ namespace ix
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
webSocket.start();
|
||||
|
||||
while (!done)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
|
||||
std::mutex mutex;
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
condition.wait(lock);
|
||||
webSocket.stop();
|
||||
|
||||
if (count == -1)
|
||||
{
|
||||
spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
@ -254,14 +283,20 @@ namespace ix
|
||||
//
|
||||
int ws_autobahn_main(const std::string& url, bool quiet)
|
||||
{
|
||||
int N = getTestCaseCount(url);
|
||||
std::cerr << "Test cases count: " << N << std::endl;
|
||||
int testCasesCount = getTestCaseCount(url);
|
||||
std::cerr << "Test cases count: " << testCasesCount << std::endl;
|
||||
|
||||
N++;
|
||||
|
||||
for (int i = 1 ; i < N; ++i)
|
||||
if (testCasesCount == -1)
|
||||
{
|
||||
std::cerr << "Execute test case " << i << std::endl;
|
||||
spdlog::error("Cannot retrieve test case count at url {}", url);
|
||||
return 1;
|
||||
}
|
||||
|
||||
testCasesCount++;
|
||||
|
||||
for (int i = 1 ; i < testCasesCount; ++i)
|
||||
{
|
||||
spdlog::info("Execute test case {}", i);
|
||||
|
||||
int caseNumber = i;
|
||||
|
||||
@ -277,9 +312,7 @@ namespace ix
|
||||
testCase.run();
|
||||
}
|
||||
|
||||
generateReport(url);
|
||||
|
||||
return 0;
|
||||
return generateReport(url) ? 0 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user