per message deflate support (with zlib)
This commit is contained in:
3
examples/satori_publisher/.gitignore
vendored
Normal file
3
examples/satori_publisher/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
venv
|
||||
build
|
||||
node_modules
|
@ -6,6 +6,9 @@
|
||||
cmake_minimum_required (VERSION 3.4.1)
|
||||
project (satori_publisher)
|
||||
|
||||
# There's -Weverything too for clang
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
|
||||
|
||||
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
|
||||
|
||||
set (CMAKE_CXX_STANDARD 11)
|
||||
|
@ -12,7 +12,6 @@
|
||||
#include <cmath>
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
@ -36,19 +35,10 @@ namespace ix
|
||||
|
||||
SatoriConnection::SatoriConnection() :
|
||||
_authenticated(false),
|
||||
_authenticatedCallback(nullptr)
|
||||
_onEventCallback(nullptr)
|
||||
{
|
||||
_pdu["action"] = "rtm/publish";
|
||||
|
||||
_webSocket.setOnMessageCallback(
|
||||
[](ix::WebSocketMessageType messageType,
|
||||
const std::string& str,
|
||||
const ix::WebSocketErrorInfo& error,
|
||||
const ix::CloseInfo& closeInfo)
|
||||
{
|
||||
;
|
||||
}
|
||||
);
|
||||
resetOnMessageCallback();
|
||||
}
|
||||
|
||||
SatoriConnection::~SatoriConnection()
|
||||
@ -74,43 +64,42 @@ namespace ix
|
||||
}
|
||||
}
|
||||
|
||||
void SatoriConnection::setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback)
|
||||
void SatoriConnection::setOnEventCallback(const OnEventCallback& onEventCallback)
|
||||
{
|
||||
_authenticatedCallback = authenticatedCallback;
|
||||
_onEventCallback = onEventCallback;
|
||||
}
|
||||
|
||||
void SatoriConnection::invokeAuthenticatedCallback()
|
||||
void SatoriConnection::invokeOnEventCallback(ix::SatoriConnectionEventType eventType,
|
||||
const std::string& errorMsg,
|
||||
const WebSocketHttpHeaders& headers)
|
||||
{
|
||||
|
||||
if (_authenticatedCallback)
|
||||
if (_onEventCallback)
|
||||
{
|
||||
_authenticatedCallback();
|
||||
_onEventCallback(eventType, errorMsg, headers);
|
||||
}
|
||||
}
|
||||
|
||||
void SatoriConnection::setErrorCallback(const ErrorCallback& errorCallback)
|
||||
{
|
||||
_errorCallback = errorCallback;
|
||||
}
|
||||
|
||||
void SatoriConnection::invokeErrorCallback(const std::string& errorMsg)
|
||||
{
|
||||
|
||||
if (_errorCallback)
|
||||
{
|
||||
_errorCallback(errorMsg);
|
||||
}
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Error, errorMsg);
|
||||
}
|
||||
|
||||
void SatoriConnection::disconnect()
|
||||
{
|
||||
_webSocket.stop();
|
||||
|
||||
resetOnMessageCallback();
|
||||
}
|
||||
|
||||
void SatoriConnection::resetOnMessageCallback()
|
||||
{
|
||||
_webSocket.setOnMessageCallback(
|
||||
[](ix::WebSocketMessageType messageType,
|
||||
const std::string& str,
|
||||
const ix::WebSocketErrorInfo& error,
|
||||
const ix::CloseInfo& closeInfo)
|
||||
[](ix::WebSocketMessageType,
|
||||
const std::string&,
|
||||
size_t,
|
||||
const ix::WebSocketErrorInfo&,
|
||||
const ix::WebSocketCloseInfo&,
|
||||
const ix::WebSocketHttpHeaders&)
|
||||
{
|
||||
;
|
||||
}
|
||||
@ -120,7 +109,8 @@ namespace ix
|
||||
void SatoriConnection::configure(const std::string& appkey,
|
||||
const std::string& endpoint,
|
||||
const std::string& rolename,
|
||||
const std::string& rolesecret)
|
||||
const std::string& rolesecret,
|
||||
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions)
|
||||
{
|
||||
_appkey = appkey;
|
||||
_endpoint = endpoint;
|
||||
@ -132,22 +122,37 @@ namespace ix
|
||||
ss << "/v2?appkey=";
|
||||
ss << appkey;
|
||||
|
||||
_webSocket.configure(ss.str());
|
||||
std::string url = ss.str();
|
||||
_webSocket.setUrl(url);
|
||||
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
|
||||
|
||||
_webSocket.setOnMessageCallback(
|
||||
[this](ix::WebSocketMessageType messageType,
|
||||
const std::string& str,
|
||||
size_t wireSize,
|
||||
const ix::WebSocketErrorInfo& error,
|
||||
const ix::CloseInfo& closeInfo)
|
||||
const ix::WebSocketCloseInfo& closeInfo,
|
||||
const ix::WebSocketHttpHeaders& headers)
|
||||
{
|
||||
SatoriConnection::invokeTrafficTrackerCallback(wireSize, true);
|
||||
|
||||
std::stringstream ss;
|
||||
if (messageType == ix::WebSocket_MessageType_Open)
|
||||
{
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Open,
|
||||
std::string(),
|
||||
headers);
|
||||
sendHandshakeMessage();
|
||||
}
|
||||
else if (messageType == ix::WebSocket_MessageType_Close)
|
||||
{
|
||||
_authenticated = false;
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "Close code " << closeInfo.code;
|
||||
ss << " reason " << closeInfo.reason;
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Closed,
|
||||
ss.str());
|
||||
}
|
||||
else if (messageType == ix::WebSocket_MessageType_Message)
|
||||
{
|
||||
@ -180,7 +185,7 @@ namespace ix
|
||||
else if (action == "auth/authenticate/ok")
|
||||
{
|
||||
_authenticated = true;
|
||||
invokeAuthenticatedCallback();
|
||||
invokeOnEventCallback(ix::SatoriConnection_EventType_Authenticated);
|
||||
flushQueue();
|
||||
}
|
||||
else if (action == "auth/authenticate/error")
|
||||
@ -238,7 +243,7 @@ namespace ix
|
||||
std::string serializedJson = writeJsonCompact(pdu);
|
||||
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
|
||||
|
||||
return _webSocket.send(serializedJson);
|
||||
return _webSocket.send(serializedJson).success;
|
||||
}
|
||||
|
||||
//
|
||||
@ -300,7 +305,7 @@ namespace ix
|
||||
std::string serializedJson = writeJsonCompact(pdu);
|
||||
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
|
||||
|
||||
return _webSocket.send(serializedJson);
|
||||
return _webSocket.send(serializedJson).success;
|
||||
}
|
||||
|
||||
|
||||
@ -455,8 +460,10 @@ namespace ix
|
||||
|
||||
bool SatoriConnection::publishMessage(const std::string& serializedJson)
|
||||
{
|
||||
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
|
||||
return _webSocket.send(serializedJson);
|
||||
auto webSocketSendInfo = _webSocket.send(serializedJson);
|
||||
SatoriConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize,
|
||||
false);
|
||||
return webSocketSendInfo.success;
|
||||
}
|
||||
|
||||
} // namespace ix
|
||||
|
@ -14,12 +14,22 @@
|
||||
|
||||
#include "jsoncpp/json/json.h"
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <ixwebsocket/IXWebSocketPerMessageDeflateOptions.h>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
enum SatoriConnectionEventType
|
||||
{
|
||||
SatoriConnection_EventType_Authenticated = 0,
|
||||
SatoriConnection_EventType_Error = 1,
|
||||
SatoriConnection_EventType_Open = 2,
|
||||
SatoriConnection_EventType_Closed = 3
|
||||
};
|
||||
|
||||
using SubscriptionCallback = std::function<void(const Json::Value&)>;
|
||||
using AuthenticatedCallback = std::function<void()>;
|
||||
using ErrorCallback = std::function<void(const std::string& errorMsg)>;
|
||||
using OnEventCallback = std::function<void(SatoriConnectionEventType,
|
||||
const std::string&,
|
||||
const WebSocketHttpHeaders&)>;
|
||||
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
|
||||
|
||||
class SatoriConnection
|
||||
@ -33,7 +43,8 @@ namespace ix
|
||||
void configure(const std::string& appkey,
|
||||
const std::string& endpoint,
|
||||
const std::string& rolename,
|
||||
const std::string& rolesecret);
|
||||
const std::string& rolesecret,
|
||||
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions);
|
||||
|
||||
/// Set the traffic tracker callback
|
||||
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
|
||||
@ -41,10 +52,8 @@ namespace ix
|
||||
/// Reset the traffic tracker callback to an no-op one.
|
||||
static void resetTrafficTrackerCallback();
|
||||
|
||||
/// Set the authenticated callback
|
||||
void setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback);
|
||||
/// Set the error callback
|
||||
void setErrorCallback(const ErrorCallback& errorCallback);
|
||||
/// Set the closed callback
|
||||
void setOnEventCallback(const OnEventCallback& onEventCallback);
|
||||
|
||||
/// Start the worker thread, used for background publishing
|
||||
void start();
|
||||
@ -77,6 +86,8 @@ namespace ix
|
||||
bool sendAuthMessage(const std::string& nonce);
|
||||
bool handleSubscriptionData(const Json::Value& pdu);
|
||||
|
||||
void resetOnMessageCallback();
|
||||
|
||||
bool publishMessage(const std::string& serializedJson);
|
||||
bool flushQueue();
|
||||
void enqueue(const std::string& msg);
|
||||
@ -84,8 +95,10 @@ namespace ix
|
||||
/// Invoke the traffic tracker callback
|
||||
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
|
||||
|
||||
/// Invoke lifecycle callbacks
|
||||
void invokeAuthenticatedCallback();
|
||||
/// Invoke event callbacks
|
||||
void invokeOnEventCallback(SatoriConnectionEventType eventType,
|
||||
const std::string& errorMsg = std::string(),
|
||||
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders());
|
||||
void invokeErrorCallback(const std::string& errorMsg);
|
||||
|
||||
///
|
||||
@ -98,7 +111,6 @@ namespace ix
|
||||
std::string _endpoint;
|
||||
std::string _role_name;
|
||||
std::string _role_secret;
|
||||
uint32_t _history;
|
||||
|
||||
// Can be set on control+background thread, protecting with an atomic
|
||||
std::atomic<bool> _authenticated;
|
||||
@ -111,8 +123,7 @@ namespace ix
|
||||
static OnTrafficTrackerCallback _onTrafficTrackerCallback;
|
||||
|
||||
/// Callbacks
|
||||
AuthenticatedCallback _authenticatedCallback;
|
||||
ErrorCallback _errorCallback;
|
||||
OnEventCallback _onEventCallback;
|
||||
|
||||
/// Subscription callbacks, only one per channel
|
||||
std::unordered_map<std::string, SubscriptionCallback> _cbs;
|
||||
|
@ -34,12 +34,7 @@ namespace ix
|
||||
"abcdefghijklmnopqrstuvwxyz"
|
||||
"0123456789+/";
|
||||
|
||||
static inline bool is_base64(unsigned char c)
|
||||
{
|
||||
return (isalnum(c) || (c == '+') || (c == '/'));
|
||||
}
|
||||
|
||||
std::string base64_encode(const std::string& data, uint32_t len)
|
||||
std::string base64_encode(const std::string& data, size_t len)
|
||||
{
|
||||
std::string ret;
|
||||
int i = 0;
|
||||
|
@ -10,5 +10,5 @@
|
||||
|
||||
namespace ix
|
||||
{
|
||||
std::string base64_encode(const std::string& data, uint32_t len);
|
||||
std::string base64_encode(const std::string& data, size_t len);
|
||||
}
|
||||
|
@ -5,13 +5,14 @@
|
||||
*/
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const wss = new WebSocket.Server({ port: 5678, perMessageDeflate: false });
|
||||
let wss = new WebSocket.Server({ port: 5678, perMessageDeflate: true })
|
||||
|
||||
let handshake = false
|
||||
let authenticated = false
|
||||
wss.on('connection', (ws) => {
|
||||
|
||||
wss.on('connection', function connection(ws) {
|
||||
ws.on('message', function incoming(data) {
|
||||
let handshake = false
|
||||
let authenticated = false
|
||||
|
||||
ws.on('message', (data) => {
|
||||
|
||||
console.log(data.toString('utf-8'))
|
||||
|
||||
@ -41,4 +42,4 @@ wss.on('connection', function connection(ws) {
|
||||
console.log(data)
|
||||
}
|
||||
});
|
||||
});
|
||||
})
|
||||
|
@ -20,6 +20,11 @@ void msleep(int ms)
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
if (argc != 7)
|
||||
{
|
||||
std::cerr << "Usage error: need 6 arguments." << std::endl;
|
||||
}
|
||||
|
||||
std::string endpoint = argv[1];
|
||||
std::string appkey = argv[2];
|
||||
std::string channel = argv[3];
|
||||
@ -45,51 +50,72 @@ int main(int argc, char* argv[])
|
||||
|
||||
bool done = false;
|
||||
ix::SatoriConnection satoriConnection;
|
||||
satoriConnection.configure(appkey, endpoint, rolename, rolesecret);
|
||||
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
|
||||
false, false, false, 15, 15);
|
||||
satoriConnection.configure(appkey, endpoint, rolename, rolesecret,
|
||||
webSocketPerMessageDeflateOptions);
|
||||
satoriConnection.connect();
|
||||
satoriConnection.setAuthenticatedCallback(
|
||||
[&satoriConnection, channel, path, &done]()
|
||||
satoriConnection.setOnEventCallback(
|
||||
[&satoriConnection, channel, path, &done]
|
||||
(ix::SatoriConnectionEventType eventType,
|
||||
const std::string& errMsg,
|
||||
const ix::WebSocketHttpHeaders& headers)
|
||||
{
|
||||
std::cout << "Authenticated" << std::endl;;
|
||||
|
||||
std::string line;
|
||||
std::ifstream f(path);
|
||||
if (!f.is_open())
|
||||
if (eventType == ix::SatoriConnection_EventType_Open)
|
||||
{
|
||||
std::cerr << "Error while opening file: " << path << std::endl;
|
||||
std::cout << "Handshake Headers:" << std::endl;
|
||||
for (auto it : headers)
|
||||
{
|
||||
std::cout << it.first << ": " << it.second << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
while (getline(f, line))
|
||||
else if (eventType == ix::SatoriConnection_EventType_Authenticated)
|
||||
{
|
||||
Json::Value value;
|
||||
Json::Reader reader;
|
||||
reader.parse(line, value);
|
||||
std::cout << "Authenticated" << std::endl;
|
||||
|
||||
satoriConnection.publish(channel, value);
|
||||
std::string line;
|
||||
std::ifstream f(path);
|
||||
if (!f.is_open())
|
||||
{
|
||||
std::cerr << "Error while opening file: " << path << std::endl;
|
||||
}
|
||||
|
||||
int n = 0;
|
||||
while (getline(f, line))
|
||||
{
|
||||
Json::Value value;
|
||||
Json::Reader reader;
|
||||
reader.parse(line, value);
|
||||
|
||||
satoriConnection.publish(channel, value);
|
||||
n++;
|
||||
}
|
||||
std::cerr << "#published messages: " << n << std::endl;
|
||||
|
||||
if (f.bad())
|
||||
{
|
||||
std::cerr << "Error while opening file: " << path << std::endl;
|
||||
}
|
||||
|
||||
done = true;
|
||||
}
|
||||
|
||||
if (f.bad())
|
||||
else if (eventType == ix::SatoriConnection_EventType_Error)
|
||||
{
|
||||
std::cerr << "Error while opening file: " << path << std::endl;
|
||||
std::cerr << "Satori Error received: " << errMsg << std::endl;
|
||||
done = true;
|
||||
}
|
||||
else if (eventType == ix::SatoriConnection_EventType_Closed)
|
||||
{
|
||||
std::cerr << "Satori connection closed" << std::endl;
|
||||
}
|
||||
|
||||
done = true;
|
||||
}
|
||||
);
|
||||
satoriConnection.setErrorCallback(
|
||||
[&done](const std::string& errMsg)
|
||||
{
|
||||
std::cerr << "Satori Error received: " << errMsg << std::endl;
|
||||
done = true;
|
||||
}
|
||||
);
|
||||
|
||||
while (!done)
|
||||
{
|
||||
msleep(10);
|
||||
msleep(1);
|
||||
}
|
||||
|
||||
std::cout << incomingBytes << std::endl;
|
||||
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
|
||||
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
|
||||
|
||||
|
@ -6,6 +6,6 @@ appkey="appkey"
|
||||
channel="foo"
|
||||
rolename="a_role"
|
||||
rolesecret="a_secret"
|
||||
path=events.jsonl
|
||||
filename=${FILENAME:=events.jsonl}
|
||||
|
||||
build/satori_publisher $endpoint $appkey $channel $rolename $rolesecret $path
|
||||
build/satori_publisher $endpoint $appkey $channel $rolename $rolesecret $filename
|
||||
|
Reference in New Issue
Block a user