+add utf-8 validation code, not hooked up properly yet

+ws autobahn / Add code to test websocket client compliance with the autobahn test-suite
+Ping received with a payload too large (> 125 bytes) trigger a connection closure
+cobra / add tracking about published messages
+cobra / publish returns a message id, that can be used when
+cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system).
This commit is contained in:
Benjamin Sergeant 2019-08-31 16:46:44 -07:00
parent a95fcbbdbf
commit b5b0de2083
16 changed files with 375 additions and 21 deletions

View File

@ -1 +1 @@
5.0.7
5.1.0

View File

@ -1,8 +1,18 @@
# Changelog
All notable changes to this project will be documented in this file.
## [5.1.0] - 2019-08-31
add utf-8 validation code, not hooked up properly yet
ws autobahn / Add code to test websocket client compliance with the autobahn test-suite
Ping received with a payload too large (> 125 bytes) trigger a connection closure
cobra / add tracking about published messages
cobra / publish returns a message id, that can be used when
cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system).
## [5.0.9] - 2019-08-30
User-Agent header is set when not specified.
New option to cap the max wait between reconnection attempts. Still default to 10s. (setMaxWaitBetweenReconnectionRetries).
```

View File

@ -12,6 +12,63 @@
#include <cmath>
#include <cassert>
namespace
{
//
// Stolen from here http://www.zedwood.com/article/cpp-is-valid-utf8-string-function
// There doesn't seem to be anything in the C++ library so far to do that.
// The closest thing is code for converting from utf-8 to utf-16 or utf-32 but
// that isn't working well for some broken input strings.
//
bool isValidUtf8(const std::string& str)
{
size_t i = 0;
size_t ix = str.length();
int c, n, j;
for (; i < ix; i++)
{
c = (unsigned char) str[i];
//if (c==0x09 || c==0x0a || c==0x0d || (0x20 <= c && c <= 0x7e) ) n = 0; // is_printable_ascii
if (0x00 <= c && c <= 0x7f)
{
n = 0; // 0bbbbbbb
}
else if ((c & 0xE0) == 0xC0)
{
n = 1; // 110bbbbb
}
else if ( c==0xed && i<(ix-1) && ((unsigned char)str[i+1] & 0xa0)==0xa0)
{
return false; //U+d800 to U+dfff
}
else if ((c & 0xF0) == 0xE0)
{
n = 2; // 1110bbbb
}
else if ((c & 0xF8) == 0xF0)
{
n = 3; // 11110bbb
}
//else if (($c & 0xFC) == 0xF8) n=4; // 111110bb //byte 5, unnecessary in 4 byte UTF-8
//else if (($c & 0xFE) == 0xFC) n=5; // 1111110b //byte 6, unnecessary in 4 byte UTF-8
else
{
return false;
}
for (j=0; j<n && i<ix; j++)
{ // n bytes matching 10bbbbbb follow ?
if ((++i == ix) || (( (unsigned char)str[i] & 0xC0) != 0x80))
{
return false;
}
}
}
return true;
}
}
namespace ix
{
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
@ -404,6 +461,11 @@ namespace ix
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
if (!isValidUtf8(text))
{
stop();
return false;
}
return sendMessage(text, SendMessageKind::Text, onProgressCallback);
}

View File

@ -590,6 +590,17 @@ namespace ix
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
// too large
if (pingData.size() > 125)
{
std::string reason("reason control frame with payload length > 125 octets");
// Unexpected frame type
close(1002,
reason,
reason.size());
return;
}
if (_enablePong)
{
// Reply back right away

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "5.0.9"
#define IX_WEBSOCKET_VERSION "5.1.0"

View File

@ -70,6 +70,7 @@ add_executable(ws
ws_cobra_to_sentry.cpp
ws_snake.cpp
ws_httpd.cpp
ws_autobahn.cpp
ws.cpp)
target_link_libraries(ws ixwebsocket)

View File

@ -18,6 +18,7 @@
namespace ix
{
TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr;
PublishTrackerCallback CobraConnection::_publishTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize;
CobraConnection::CobraConnection() :
@ -56,6 +57,24 @@ namespace ix
}
}
void CobraConnection::setPublishTrackerCallback(const PublishTrackerCallback& callback)
{
_publishTrackerCallback = callback;
}
void CobraConnection::resetPublishTrackerCallback()
{
setPublishTrackerCallback(nullptr);
}
void CobraConnection::invokePublishTrackerCallback(bool sent, bool acked)
{
if (_publishTrackerCallback)
{
_publishTrackerCallback(sent, acked);
}
}
void CobraConnection::setEventCallback(const EventCallback& eventCallback)
{
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
@ -63,19 +82,20 @@ namespace ix
}
void CobraConnection::invokeEventCallback(ix::CobraConnectionEventType eventType,
const std::string& errorMsg,
const WebSocketHttpHeaders& headers,
const std::string& subscriptionId)
const std::string& errorMsg,
const WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
if (_eventCallback)
{
_eventCallback(eventType, errorMsg, headers, subscriptionId);
_eventCallback(eventType, errorMsg, headers, subscriptionId, msgId);
}
}
void CobraConnection::invokeErrorCallback(const std::string& errorMsg,
const std::string& serializedPdu)
const std::string& serializedPdu)
{
std::stringstream ss;
ss << errorMsg << " : received pdu => " << serializedPdu;
@ -178,6 +198,17 @@ namespace ix
{
invokeErrorCallback("Unsubscription error", msg->str);
}
else if (action == "rtm/publish/ok")
{
if (!handlePublishResponse(data))
{
invokeErrorCallback("Error processing publish response", msg->str);
}
}
else if (action == "rtm/publish/error")
{
invokeErrorCallback("Publish error", msg->str);
}
else
{
invokeErrorCallback("Un-handled message type", msg->str);
@ -374,6 +405,24 @@ namespace ix
return true;
}
bool CobraConnection::handlePublishResponse(const Json::Value& pdu)
{
if (!pdu.isMember("id")) return false;
Json::Value id = pdu["id"];
if (!id.isUInt64()) return false;
uint64_t msgId = id.asUInt64();
invokeEventCallback(ix::CobraConnection_EventType_Published,
std::string(), WebSocketHttpHeaders(),
std::string(), msgId);
invokePublishTrackerCallback(false, true);
return true;
}
bool CobraConnection::connect()
{
_webSocket->start();
@ -399,9 +448,11 @@ namespace ix
//
// publish is not thread safe as we are trying to reuse some Json objects.
//
bool CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg)
CobraConnection::MsgId CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg)
{
invokePublishTrackerCallback(true, false);
_body["channels"] = channels;
_body["message"] = msg;
_pdu["body"] = _body;
@ -412,7 +463,7 @@ namespace ix
if (_publishMode == CobraConnection_PublishMode_Batch)
{
enqueue(serializedJson);
return true;
return _id - 1;
}
//
@ -421,14 +472,14 @@ namespace ix
//
if (_authenticated && publishMessage(serializedJson))
{
return true;
return _id - 1;
}
else // Or else we enqueue
// Slow code path is when we haven't connected yet (startup),
// or when the connection drops for some reason.
{
enqueue(serializedJson);
return false;
return 0;
}
}
@ -528,7 +579,7 @@ namespace ix
{
auto webSocketSendInfo = _webSocket->send(serializedJson);
CobraConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize,
false);
false);
return webSocketSendInfo.success;
}

View File

@ -15,6 +15,7 @@
#include <string>
#include <thread>
#include <unordered_map>
#include <limits>
namespace ix
{
@ -27,7 +28,8 @@ namespace ix
CobraConnection_EventType_Open = 2,
CobraConnection_EventType_Closed = 3,
CobraConnection_EventType_Subscribed = 4,
CobraConnection_EventType_UnSubscribed = 5
CobraConnection_EventType_UnSubscribed = 5,
CobraConnection_EventType_Published = 6
};
enum CobraConnectionPublishMode
@ -40,12 +42,17 @@ namespace ix
using EventCallback = std::function<void(CobraConnectionEventType,
const std::string&,
const WebSocketHttpHeaders&,
const std::string&)>;
const std::string&,
uint64_t msgId)>;
using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
using PublishTrackerCallback = std::function<void(bool sent, bool acked)>;
class CobraConnection
{
public:
using MsgId = uint64_t;
CobraConnection();
~CobraConnection();
@ -57,11 +64,18 @@ namespace ix
const std::string& rolesecret,
const WebSocketPerMessageDeflateOptions& webSocketPerMessageDeflateOptions);
/// Set the traffic tracker callback
static void setTrafficTrackerCallback(const TrafficTrackerCallback& callback);
/// Reset the traffic tracker callback to an no-op one.
static void resetTrafficTrackerCallback();
/// Set the publish tracker callback
static void setPublishTrackerCallback(const PublishTrackerCallback& callback);
/// Reset the publish tracker callback to an no-op one.
static void resetPublishTrackerCallback();
/// Set the closed callback
void setEventCallback(const EventCallback& eventCallback);
@ -71,7 +85,7 @@ namespace ix
/// Publish a message to a channel
///
/// No-op if the connection is not established
bool publish(const Json::Value& channels, const Json::Value& msg);
MsgId publish(const Json::Value& channels, const Json::Value& msg);
// Subscribe to a channel, and execute a callback when an incoming
// message arrives.
@ -111,6 +125,7 @@ namespace ix
bool handleSubscriptionData(const Json::Value& pdu);
bool handleSubscriptionResponse(const Json::Value& pdu);
bool handleUnsubscriptionResponse(const Json::Value& pdu);
bool handlePublishResponse(const Json::Value& pdu);
void initWebSocketOnMessageCallback();
@ -121,11 +136,15 @@ namespace ix
/// Invoke the traffic tracker callback
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
/// Invoke the publish tracker callback
static void invokePublishTrackerCallback(bool sent, bool acked);
/// Invoke event callbacks
void invokeEventCallback(CobraConnectionEventType eventType,
const std::string& errorMsg = std::string(),
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders(),
const std::string& subscriptionId = std::string());
const std::string& subscriptionId = std::string(),
uint64_t msgId = std::numeric_limits<uint64_t>::max());
void invokeErrorCallback(const std::string& errorMsg, const std::string& serializedPdu);
///
@ -150,6 +169,9 @@ namespace ix
/// Traffic tracker callback
static TrafficTrackerCallback _trafficTrackerCallback;
/// Publish tracker callback
static PublishTrackerCallback _publishTrackerCallback;
/// Cobra events callbacks
EventCallback _eventCallback;
mutable std::mutex _eventCallbackMutex;

View File

@ -25,7 +25,8 @@ namespace ix
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId)
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
std::stringstream ss;
@ -58,6 +59,10 @@ namespace ix
{
ss << "Unsubscribed through subscription id: " << subscriptionId;
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
ss << "Published message " << msgId << " acked";
}
ix::IXCoreLogger::Log(ss.str().c_str());
});

View File

@ -226,6 +226,9 @@ int main(int argc, char** argv)
httpServerApp->add_option("--port", port, "Port");
httpServerApp->add_option("--host", hostname, "Hostname");
CLI::App* autobahnApp = app.add_subcommand("autobahn", "Test client Autobahn compliance");
autobahnApp->add_option("--url", url, "url");
CLI11_PARSE(app, argc, argv);
// pid file handling
@ -328,6 +331,10 @@ int main(int argc, char** argv)
{
ret = ix::ws_httpd_main(port, hostname);
}
else if (app.got_subcommand("autobahn"))
{
ret = ix::ws_autobahn_main(url);
}
ix::uninitNetSystem();
return ret;

View File

@ -102,4 +102,6 @@ namespace ix
const std::string& appsConfigPath);
int ws_httpd_main(int port, const std::string& hostname);
int ws_autobahn_main(const std::string& url);
} // namespace ix

154
ws/ws_autobahn.cpp Normal file
View File

@ -0,0 +1,154 @@
/*
* ws_autobahn.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <atomic>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
namespace ix
{
class AutobahnTestCase
{
public:
AutobahnTestCase(const std::string& _url);
void run();
private:
void log(const std::string& msg);
std::string _url;
ix::WebSocket _webSocket;
std::atomic<bool> _done;
};
AutobahnTestCase::AutobahnTestCase(const std::string& url) :
_url(url),
_done(false)
{
_webSocket.disableAutomaticReconnection();
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
void AutobahnTestCase::log(const std::string& msg)
{
std::cerr << msg << std::endl;
}
void AutobahnTestCase::run()
{
_webSocket.setUrl(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("autobahn: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "autobahn: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
_done = true;
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
// ss << "autobahn: received message: "
// << msg->str;
// log(ss.str());
_webSocket.send(msg->str, msg->binary);
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
// And error can happen, in which case the test-case is marked done
_done = true;
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
std::cerr << "Received ping" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
std::cerr << "Received pong" << std::endl;
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
log("Waiting for being closed ...");
while (!_done)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
_webSocket.stop();
}
//
// make && bench ws autobahn --url 'ws://localhost:9001/runCase?case=9&agent=ixwebsocket' && ws connect -d 'ws://localhost:9001/updateReports?agent=ixwebsocket'
//
int ws_autobahn_main(const std::string& url)
{
int N = 1; // 519;
N++;
for (int i = 1 ; i < N; ++i)
{
int caseNumber = i;
std::stringstream ss;
ss << "ws://localhost:9001/runCase?case="
<< caseNumber
<< "&agent=ixwebsocket";
std::string url(ss.str());
AutobahnTestCase testCase(url);
testCase.run();
}
return 0;
}
}

View File

@ -12,6 +12,7 @@
#include <atomic>
#include <jsoncpp/json/json.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <spdlog/spdlog.h>
namespace ix
{
@ -23,6 +24,16 @@ namespace ix
const std::string& path,
bool stress)
{
std::atomic<int> sentMessages(0);
std::atomic<int> ackedMessages(0);
CobraConnection::setPublishTrackerCallback(
[&sentMessages, &ackedMessages](bool sent, bool acked)
{
if (sent) sentMessages++;
if (acked) ackedMessages++;
}
);
CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true);
@ -64,8 +75,11 @@ namespace ix
// Wait a bit for the message to get a chance to be sent
// there isn't any ack on publish right now so it's the best we can do
// FIXME: this comment is a lie now
std::this_thread::sleep_for(std::chrono::milliseconds(100));
spdlog::info("Sent messages: {} Acked messages {}", sentMessages, ackedMessages);
return 0;
}
}

View File

@ -57,7 +57,8 @@ namespace ix
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId)
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
@ -96,6 +97,10 @@ namespace ix
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
}
);

View File

@ -102,7 +102,8 @@ namespace ix
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId)
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
@ -169,6 +170,10 @@ namespace ix
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
}
);

View File

@ -95,7 +95,8 @@ namespace ix
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId)
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
@ -149,6 +150,10 @@ namespace ix
{
spdlog::error("Subscriber: error {}", errMsg);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
spdlog::error("Published message hacked: {}", msgId);
}
}
);