per message deflate support (with zlib)

This commit is contained in:
Benjamin Sergeant
2018-11-09 18:23:49 -08:00
parent de8bcd36e8
commit 4fed156b90
32 changed files with 1003 additions and 257 deletions

3
examples/chat/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
build
venv
node_modules

View File

@ -77,7 +77,7 @@ namespace
void WebSocketChat::start()
{
std::string url("ws://localhost:8080/");
_webSocket.configure(url);
_webSocket.setUrl(url);
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
@ -85,8 +85,10 @@ namespace
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
const ix::WebSocketCloseInfo& closeInfo,
const ix::WebSocketHttpHeaders& headers)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)

31
examples/chat/package-lock.json generated Normal file
View File

@ -0,0 +1,31 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"safe-buffer": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
},
"ultron": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ultron/-/ultron-1.1.1.tgz",
"integrity": "sha512-UIEXBNeYmKptWH6z8ZnqTeS8fV74zG0/eRU9VGkpzz+LIJNs8W/zM/L+7ctCkRrgbNnnR0xxw4bKOr0cW0N0Og=="
},
"ws": {
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz",
"integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==",
"requires": {
"async-limiter": "1.0.0",
"safe-buffer": "5.1.2",
"ultron": "1.1.1"
}
}
}
}

View File

@ -1 +1,2 @@
venv
build

View File

@ -48,16 +48,18 @@ namespace
void WebSocketPingPong::start()
{
_webSocket.configure(_url);
_webSocket.setUrl(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketCloseInfo& closeInfo,
const ix::WebSocketHttpHeaders& headers)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
@ -110,7 +112,7 @@ namespace
void WebSocketPingPong::ping(const std::string& text)
{
if (!_webSocket.ping(text))
if (!_webSocket.ping(text).success)
{
std::cerr << "Failed to send ping message. Message too long (> 125 bytes) or endpoint is disconnected"
<< std::endl;

3
examples/satori_publisher/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
venv
build
node_modules

View File

@ -6,6 +6,9 @@
cmake_minimum_required (VERSION 3.4.1)
project (satori_publisher)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 11)

View File

@ -12,7 +12,6 @@
#include <cmath>
#include <cassert>
#include <cstring>
#include <iostream>
namespace ix
{
@ -36,19 +35,10 @@ namespace ix
SatoriConnection::SatoriConnection() :
_authenticated(false),
_authenticatedCallback(nullptr)
_onEventCallback(nullptr)
{
_pdu["action"] = "rtm/publish";
_webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
{
;
}
);
resetOnMessageCallback();
}
SatoriConnection::~SatoriConnection()
@ -74,43 +64,42 @@ namespace ix
}
}
void SatoriConnection::setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback)
void SatoriConnection::setOnEventCallback(const OnEventCallback& onEventCallback)
{
_authenticatedCallback = authenticatedCallback;
_onEventCallback = onEventCallback;
}
void SatoriConnection::invokeAuthenticatedCallback()
void SatoriConnection::invokeOnEventCallback(ix::SatoriConnectionEventType eventType,
const std::string& errorMsg,
const WebSocketHttpHeaders& headers)
{
if (_authenticatedCallback)
if (_onEventCallback)
{
_authenticatedCallback();
_onEventCallback(eventType, errorMsg, headers);
}
}
void SatoriConnection::setErrorCallback(const ErrorCallback& errorCallback)
{
_errorCallback = errorCallback;
}
void SatoriConnection::invokeErrorCallback(const std::string& errorMsg)
{
if (_errorCallback)
{
_errorCallback(errorMsg);
}
invokeOnEventCallback(ix::SatoriConnection_EventType_Error, errorMsg);
}
void SatoriConnection::disconnect()
{
_webSocket.stop();
resetOnMessageCallback();
}
void SatoriConnection::resetOnMessageCallback()
{
_webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
[](ix::WebSocketMessageType,
const std::string&,
size_t,
const ix::WebSocketErrorInfo&,
const ix::WebSocketCloseInfo&,
const ix::WebSocketHttpHeaders&)
{
;
}
@ -120,7 +109,8 @@ namespace ix
void SatoriConnection::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret)
const std::string& rolesecret,
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions)
{
_appkey = appkey;
_endpoint = endpoint;
@ -132,22 +122,37 @@ namespace ix
ss << "/v2?appkey=";
ss << appkey;
_webSocket.configure(ss.str());
std::string url = ss.str();
_webSocket.setUrl(url);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
const ix::WebSocketCloseInfo& closeInfo,
const ix::WebSocketHttpHeaders& headers)
{
SatoriConnection::invokeTrafficTrackerCallback(wireSize, true);
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
invokeOnEventCallback(ix::SatoriConnection_EventType_Open,
std::string(),
headers);
sendHandshakeMessage();
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
_authenticated = false;
std::stringstream ss;
ss << "Close code " << closeInfo.code;
ss << " reason " << closeInfo.reason;
invokeOnEventCallback(ix::SatoriConnection_EventType_Closed,
ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
@ -180,7 +185,7 @@ namespace ix
else if (action == "auth/authenticate/ok")
{
_authenticated = true;
invokeAuthenticatedCallback();
invokeOnEventCallback(ix::SatoriConnection_EventType_Authenticated);
flushQueue();
}
else if (action == "auth/authenticate/error")
@ -238,7 +243,7 @@ namespace ix
std::string serializedJson = writeJsonCompact(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson);
return _webSocket.send(serializedJson).success;
}
//
@ -300,7 +305,7 @@ namespace ix
std::string serializedJson = writeJsonCompact(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson);
return _webSocket.send(serializedJson).success;
}
@ -455,8 +460,10 @@ namespace ix
bool SatoriConnection::publishMessage(const std::string& serializedJson)
{
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson);
auto webSocketSendInfo = _webSocket.send(serializedJson);
SatoriConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize,
false);
return webSocketSendInfo.success;
}
} // namespace ix

View File

@ -14,12 +14,22 @@
#include "jsoncpp/json/json.h"
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
namespace ix
{
enum SatoriConnectionEventType
{
SatoriConnection_EventType_Authenticated = 0,
SatoriConnection_EventType_Error = 1,
SatoriConnection_EventType_Open = 2,
SatoriConnection_EventType_Closed = 3
};
using SubscriptionCallback = std::function<void(const Json::Value&)>;
using AuthenticatedCallback = std::function<void()>;
using ErrorCallback = std::function<void(const std::string& errorMsg)>;
using OnEventCallback = std::function<void(SatoriConnectionEventType,
const std::string&,
const WebSocketHttpHeaders&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class SatoriConnection
@ -33,7 +43,8 @@ namespace ix
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret);
const std::string& rolesecret,
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions);
/// Set the traffic tracker callback
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@ -41,10 +52,8 @@ namespace ix
/// Reset the traffic tracker callback to an no-op one.
static void resetTrafficTrackerCallback();
/// Set the authenticated callback
void setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback);
/// Set the error callback
void setErrorCallback(const ErrorCallback& errorCallback);
/// Set the closed callback
void setOnEventCallback(const OnEventCallback& onEventCallback);
/// Start the worker thread, used for background publishing
void start();
@ -77,6 +86,8 @@ namespace ix
bool sendAuthMessage(const std::string& nonce);
bool handleSubscriptionData(const Json::Value& pdu);
void resetOnMessageCallback();
bool publishMessage(const std::string& serializedJson);
bool flushQueue();
void enqueue(const std::string& msg);
@ -84,8 +95,10 @@ namespace ix
/// Invoke the traffic tracker callback
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
/// Invoke lifecycle callbacks
void invokeAuthenticatedCallback();
/// Invoke event callbacks
void invokeOnEventCallback(SatoriConnectionEventType eventType,
const std::string& errorMsg = std::string(),
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders());
void invokeErrorCallback(const std::string& errorMsg);
///
@ -98,7 +111,6 @@ namespace ix
std::string _endpoint;
std::string _role_name;
std::string _role_secret;
uint32_t _history;
// Can be set on control+background thread, protecting with an atomic
std::atomic<bool> _authenticated;
@ -111,8 +123,7 @@ namespace ix
static OnTrafficTrackerCallback _onTrafficTrackerCallback;
/// Callbacks
AuthenticatedCallback _authenticatedCallback;
ErrorCallback _errorCallback;
OnEventCallback _onEventCallback;
/// Subscription callbacks, only one per channel
std::unordered_map<std::string, SubscriptionCallback> _cbs;

View File

@ -34,12 +34,7 @@ namespace ix
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
static inline bool is_base64(unsigned char c)
{
return (isalnum(c) || (c == '+') || (c == '/'));
}
std::string base64_encode(const std::string& data, uint32_t len)
std::string base64_encode(const std::string& data, size_t len)
{
std::string ret;
int i = 0;

View File

@ -10,5 +10,5 @@
namespace ix
{
std::string base64_encode(const std::string& data, uint32_t len);
std::string base64_encode(const std::string& data, size_t len);
}

View File

@ -5,13 +5,14 @@
*/
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 5678, perMessageDeflate: false });
let wss = new WebSocket.Server({ port: 5678, perMessageDeflate: true })
let handshake = false
let authenticated = false
wss.on('connection', (ws) => {
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(data) {
let handshake = false
let authenticated = false
ws.on('message', (data) => {
console.log(data.toString('utf-8'))
@ -41,4 +42,4 @@ wss.on('connection', function connection(ws) {
console.log(data)
}
});
});
})

View File

@ -20,6 +20,11 @@ void msleep(int ms)
int main(int argc, char* argv[])
{
if (argc != 7)
{
std::cerr << "Usage error: need 6 arguments." << std::endl;
}
std::string endpoint = argv[1];
std::string appkey = argv[2];
std::string channel = argv[3];
@ -45,51 +50,72 @@ int main(int argc, char* argv[])
bool done = false;
ix::SatoriConnection satoriConnection;
satoriConnection.configure(appkey, endpoint, rolename, rolesecret);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
false, false, false, 15, 15);
satoriConnection.configure(appkey, endpoint, rolename, rolesecret,
webSocketPerMessageDeflateOptions);
satoriConnection.connect();
satoriConnection.setAuthenticatedCallback(
[&satoriConnection, channel, path, &done]()
satoriConnection.setOnEventCallback(
[&satoriConnection, channel, path, &done]
(ix::SatoriConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers)
{
std::cout << "Authenticated" << std::endl;;
std::string line;
std::ifstream f(path);
if (!f.is_open())
if (eventType == ix::SatoriConnection_EventType_Open)
{
std::cerr << "Error while opening file: " << path << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
while (getline(f, line))
else if (eventType == ix::SatoriConnection_EventType_Authenticated)
{
Json::Value value;
Json::Reader reader;
reader.parse(line, value);
std::cout << "Authenticated" << std::endl;
satoriConnection.publish(channel, value);
std::string line;
std::ifstream f(path);
if (!f.is_open())
{
std::cerr << "Error while opening file: " << path << std::endl;
}
int n = 0;
while (getline(f, line))
{
Json::Value value;
Json::Reader reader;
reader.parse(line, value);
satoriConnection.publish(channel, value);
n++;
}
std::cerr << "#published messages: " << n << std::endl;
if (f.bad())
{
std::cerr << "Error while opening file: " << path << std::endl;
}
done = true;
}
if (f.bad())
else if (eventType == ix::SatoriConnection_EventType_Error)
{
std::cerr << "Error while opening file: " << path << std::endl;
std::cerr << "Satori Error received: " << errMsg << std::endl;
done = true;
}
else if (eventType == ix::SatoriConnection_EventType_Closed)
{
std::cerr << "Satori connection closed" << std::endl;
}
done = true;
}
);
satoriConnection.setErrorCallback(
[&done](const std::string& errMsg)
{
std::cerr << "Satori Error received: " << errMsg << std::endl;
done = true;
}
);
while (!done)
{
msleep(10);
msleep(1);
}
std::cout << incomingBytes << std::endl;
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;

View File

@ -6,6 +6,6 @@ appkey="appkey"
channel="foo"
rolename="a_role"
rolesecret="a_secret"
path=events.jsonl
filename=${FILENAME:=events.jsonl}
build/satori_publisher $endpoint $appkey $channel $rolename $rolesecret $path
build/satori_publisher $endpoint $appkey $channel $rolename $rolesecret $filename

3
examples/ws_connect/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
build
venv
node_modules

View File

@ -47,7 +47,7 @@ namespace
void WebSocketConnect::start()
{
_webSocket.configure(_url);
_webSocket.setUrl(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
@ -55,8 +55,10 @@ namespace
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
const ix::WebSocketCloseInfo& closeInfo,
const ix::WebSocketHttpHeaders& headers)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)