Compare commits

...

5 Commits

9 changed files with 108 additions and 44 deletions

View File

@ -1 +1 @@
5.1.7
5.2.0

View File

@ -10,4 +10,4 @@ Interested ? Go read the [docs](https://bsergean.github.io/IXWebSocket/site/) !
IXWebSocket is actively being developed, check out the [changelog](CHANGELOG.md) to know what's cooking. If you are looking for a real time messaging service (the chat-like 'server' your websocket code will talk to) with many features such as history, backed by Redis, look at [cobra](https://github.com/machinezone/cobra).
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html).
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html). The only tests that are still failing are the Websocket Compression ones (see section 12 and 13).

View File

@ -1,6 +1,20 @@
# Changelog
All notable changes to this project will be documented in this file.
## [5.2.0] - 2019-09-04
- Fragmentation: for sent messages which are compressed, the continuation fragments should not have the rsv1 bit set (fix all autobahn tests for zlib compression 12.X)
- Websocket Server / do a case insensitive string search when looking for an Upgrade header whose value is websocket. (some client use WebSocket with some upper-case characters)
## [5.1.9] - 2019-09-03
- ws autobahn / report progress with spdlog::info to get timing info
- ws autobahn / use condition variables for stopping test case + add more logging on errors
## [5.1.8] - 2019-09-03
- Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others)
## [5.1.7] - 2019-09-03
- Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)

View File

@ -295,7 +295,7 @@ namespace ix
return sendErrorResponse(400, "Missing Sec-WebSocket-Key value");
}
if (headers["upgrade"] != "websocket")
if (!insensitiveStringCompare(headers["upgrade"], "WebSocket"))
{
return sendErrorResponse(400, "Invalid or missing Upgrade header");
}
@ -326,6 +326,7 @@ namespace ix
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";
ss << "Server: " << userAgent() << "\r\n";
// Parse the client headers. Does it support deflate ?
std::string header = headers["sec-websocket-extensions"];

View File

@ -77,6 +77,7 @@ namespace ix
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_compressedMessage(false),
_readyState(ReadyState::CLOSED),
_closeCode(WebSocketCloseConstants::kInternalErrorCode),
_closeReason(WebSocketCloseConstants::kInternalErrorMessage),
@ -576,6 +577,8 @@ namespace ix
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
_compressedMessage = _enablePerMessageDeflate && ws.rsv1;
// Continuation message needs to follow a non-fin TEXT or BINARY message
if (!_chunks.empty())
{
@ -597,8 +600,10 @@ namespace ix
{
emitMessage(_fragmentedMessageKind,
frameData,
ws,
_compressedMessage,
onMessageCallback);
_compressedMessage = false;
}
else
{
@ -614,12 +619,14 @@ namespace ix
if (ws.fin)
{
emitMessage(_fragmentedMessageKind, getMergedChunks(),
ws, onMessageCallback);
_compressedMessage, onMessageCallback);
_chunks.clear();
_compressedMessage = false;
}
else
{
emitMessage(MessageKind::FRAGMENT, std::string(), ws, onMessageCallback);
emitMessage(MessageKind::FRAGMENT, std::string(), false, onMessageCallback);
}
}
}
@ -641,14 +648,14 @@ namespace ix
sendData(wsheader_type::PONG, frameData, compress);
}
emitMessage(MessageKind::PING, frameData, ws, onMessageCallback);
emitMessage(MessageKind::PING, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::PONG)
{
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
emitMessage(MessageKind::PONG, frameData, ws, onMessageCallback);
emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)
{
@ -779,13 +786,13 @@ namespace ix
void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback)
{
size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != MessageKind::FRAGMENT)
if (compressedMessage && messageKind != MessageKind::FRAGMENT)
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
@ -859,6 +866,8 @@ namespace ix
message_end = compressedMessage.end();
}
_txbuf.reserve(wireSize);
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)
{
@ -946,8 +955,9 @@ namespace ix
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed
if (compress)
// The rsv1 bit indicate that the frame is compressed
// continuation opcodes should not set it. Autobahn 12.2.10 and others 12.X
if (compress && type != wsheader_type::CONTINUATION)
{
header[0] |= 0x40;
}

View File

@ -156,6 +156,9 @@ namespace ix
// CONTINUATION opcode and doesn't tell the full message kind
MessageKind _fragmentedMessageKind;
// Ditto for whether a message is compressed
bool _compressedMessage;
// Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15;
@ -244,7 +247,7 @@ namespace ix
void emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const;

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "5.1.7"
#define IX_WEBSOCKET_VERSION "5.2.0"

View File

@ -63,6 +63,9 @@ test:
ws_test: ws
(cd ws ; env DEBUG=1 PATH=../ws/build:$$PATH bash test_ws.sh)
autobahn_report:
cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/
# For the fork that is configured with appveyor
rebase_upstream:
git fetch upstream

View File

@ -42,6 +42,23 @@
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
#include <spdlog/spdlog.h>
namespace
{
std::string truncate(const std::string& str, size_t n)
{
if (str.size() < n)
{
return str;
}
else
{
return str.substr(0, n) + "...";
}
}
}
namespace ix
{
@ -117,7 +134,7 @@ namespace ix
ss << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "autobahn: received message: "
<< msg->str
<< truncate(msg->str, 40)
<< std::endl;
_webSocket.send(msg->str, msg->binary);
@ -161,7 +178,7 @@ namespace ix
_webSocket.stop();
}
void generateReport(const std::string& url)
bool generateReport(const std::string& url)
{
ix::WebSocket webSocket;
std::string reportUrl(url);
@ -169,14 +186,16 @@ namespace ix
webSocket.setUrl(reportUrl);
webSocket.disableAutomaticReconnection();
std::atomic<bool> done(false);
std::atomic<bool> success(true);
std::condition_variable condition;
webSocket.setOnMessageCallback(
[&done](const ix::WebSocketMessagePtr& msg)
[&condition, &success](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Report generated" << std::endl;
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -186,18 +205,24 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
success = false;
}
}
);
webSocket.start();
while (!done)
webSocket.start();
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (!success)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
spdlog::error("Cannot generate report at url {}", reportUrl);
}
webSocket.stop();
return success;
}
int getTestCaseCount(const std::string& url)
@ -208,15 +233,15 @@ namespace ix
webSocket.setUrl(caseCountUrl);
webSocket.disableAutomaticReconnection();
int count = 0;
int count = -1;
std::condition_variable condition;
std::atomic<bool> done(false);
webSocket.setOnMessageCallback(
[&done, &count](const ix::WebSocketMessagePtr& msg)
[&condition, &count](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -226,6 +251,8 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -236,16 +263,18 @@ namespace ix
}
}
);
webSocket.start();
while (!done)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (count == -1)
{
spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl);
}
return count;
}
@ -254,14 +283,20 @@ namespace ix
//
int ws_autobahn_main(const std::string& url, bool quiet)
{
int N = getTestCaseCount(url);
std::cerr << "Test cases count: " << N << std::endl;
int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl;
N++;
for (int i = 1 ; i < N; ++i)
if (testCasesCount == -1)
{
std::cerr << "Execute test case " << i << std::endl;
spdlog::error("Cannot retrieve test case count at url {}", url);
return 1;
}
testCasesCount++;
for (int i = 1 ; i < testCasesCount; ++i)
{
spdlog::info("Execute test case {}", i);
int caseNumber = i;
@ -277,9 +312,7 @@ namespace ix
testCase.run();
}
generateReport(url);
return 0;
return generateReport(url) ? 0 : 1;
}
}