(satori_publisher) better error handling

This commit is contained in:
Benjamin Sergeant 2018-11-07 14:54:44 -08:00
parent 3cf44c8078
commit 32f4c8305e
6 changed files with 117 additions and 26 deletions

View File

@ -88,9 +88,18 @@ namespace ix
} }
} }
void SatoriConnection::logError(const std::string& error) void SatoriConnection::setErrorCallback(const ErrorCallback& errorCallback)
{ {
std::cerr << "SatoriConnection: " << error; _errorCallback = errorCallback;
}
void SatoriConnection::invokeErrorCallback(const std::string& errorMsg)
{
if (_errorCallback)
{
_errorCallback(errorMsg);
}
} }
void SatoriConnection::disconnect() void SatoriConnection::disconnect()
@ -145,13 +154,13 @@ namespace ix
Json::Value data; Json::Value data;
if (!parseJson(str, data)) if (!parseJson(str, data))
{ {
logError(std::string("Invalid json: ") + str); invokeErrorCallback(std::string("Invalid json: ") + str);
return; return;
} }
if (!data.isMember("action")) if (!data.isMember("action"))
{ {
logError("Missing action"); invokeErrorCallback("Missing action");
return; return;
} }
@ -161,12 +170,12 @@ namespace ix
{ {
if (!handleHandshakeResponse(data)) if (!handleHandshakeResponse(data))
{ {
logError("Error extracting nonce from handshake response"); invokeErrorCallback("Error extracting nonce from handshake response");
} }
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
logError("Handshake error."); // print full message ? invokeErrorCallback("Handshake error."); // print full message ?
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -176,7 +185,7 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
logError("Authentication error."); // print full message ? invokeErrorCallback("Authentication error."); // print full message ?
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -184,7 +193,7 @@ namespace ix
} }
else else
{ {
logError(std::string("Un-handled message type: ") + action); invokeErrorCallback(std::string("Un-handled message type: ") + action);
} }
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
@ -194,7 +203,7 @@ namespace ix
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << error.http_status << std::endl;
logError(ss.str()); invokeErrorCallback(ss.str());
} }
}); });
} }

View File

@ -19,6 +19,7 @@ namespace ix
{ {
using SubscriptionCallback = std::function<void(const Json::Value&)>; using SubscriptionCallback = std::function<void(const Json::Value&)>;
using AuthenticatedCallback = std::function<void()>; using AuthenticatedCallback = std::function<void()>;
using ErrorCallback = std::function<void(const std::string& errorMsg)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>; using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class SatoriConnection class SatoriConnection
@ -40,8 +41,10 @@ namespace ix
/// Reset the traffic tracker callback to an no-op one. /// Reset the traffic tracker callback to an no-op one.
static void resetTrafficTrackerCallback(); static void resetTrafficTrackerCallback();
/// Reset the traffic tracker callback to an no-op one. /// Set the authenticated callback
void setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback); void setAuthenticatedCallback(const AuthenticatedCallback& authenticatedCallback);
/// Set the error callback
void setErrorCallback(const ErrorCallback& errorCallback);
/// Start the worker thread, used for background publishing /// Start the worker thread, used for background publishing
void start(); void start();
@ -67,8 +70,6 @@ namespace ix
/// Returns true only if we're connected /// Returns true only if we're connected
bool isConnected() const; bool isConnected() const;
void logError(const std::string& error);
private: private:
bool sendHandshakeMessage(); bool sendHandshakeMessage();
@ -83,8 +84,9 @@ namespace ix
/// Invoke the traffic tracker callback /// Invoke the traffic tracker callback
static void invokeTrafficTrackerCallback(size_t size, bool incoming); static void invokeTrafficTrackerCallback(size_t size, bool incoming);
/// Invoke the authenticated callback /// Invoke lifecycle callbacks
void invokeAuthenticatedCallback(); void invokeAuthenticatedCallback();
void invokeErrorCallback(const std::string& errorMsg);
/// ///
/// Member variables /// Member variables
@ -108,8 +110,9 @@ namespace ix
/// Traffic tracker callback /// Traffic tracker callback
static OnTrafficTrackerCallback _onTrafficTrackerCallback; static OnTrafficTrackerCallback _onTrafficTrackerCallback;
/// Callback invoked when we are authenticated /// Callbacks
AuthenticatedCallback _authenticatedCallback; AuthenticatedCallback _authenticatedCallback;
ErrorCallback _errorCallback;
/// Subscription callbacks, only one per channel /// Subscription callbacks, only one per channel
std::unordered_map<std::string, SubscriptionCallback> _cbs; std::unordered_map<std::string, SubscriptionCallback> _cbs;

View File

@ -0,0 +1,3 @@
{"array":[1,2,3],"boolean":true,"color":"#82b92c","null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Foo"}
{"array":[1,2,3],"boolean":true,"color":"#82b92c","null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Bar"}
{"array":[1,2,3],"boolean":true,"color":"#82b92c","null":null,"number":123,"object":{"a":"b","c":"d","e":"f"},"string":"Baz"}

View File

@ -56,7 +56,7 @@ int main(int argc, char* argv[])
std::ifstream f(path); std::ifstream f(path);
if (!f.is_open()) if (!f.is_open())
{ {
std::cerr << "error while opening file" << std::endl; std::cerr << "Error while opening file: " << path << std::endl;
} }
while (getline(f, line)) while (getline(f, line))
@ -70,16 +70,23 @@ int main(int argc, char* argv[])
if (f.bad()) if (f.bad())
{ {
std::cerr << "error while reading file" << std::endl; std::cerr << "Error while opening file: " << path << std::endl;
} }
done = true; done = true;
} }
); );
satoriConnection.setErrorCallback(
[&done](const std::string& errMsg)
{
std::cerr << "Satori Error received: " << errMsg << std::endl;
done = true;
}
);
while (!done) while (!done)
{ {
msleep(1000); msleep(10);
} }
std::cout << incomingBytes << std::endl; std::cout << incomingBytes << std::endl;

View File

@ -21,7 +21,6 @@
#include "libwshandshake.hpp" #include "libwshandshake.hpp"
// #include <unistd.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -40,9 +39,10 @@
namespace ix { namespace ix {
WebSocketTransport::WebSocketTransport() : WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED) _readyState(CLOSED),
_enablePerMessageDeflate(false)
{ {
; _perMessageDeflate.init();
} }
WebSocketTransport::~WebSocketTransport() WebSocketTransport::~WebSocketTransport()
@ -206,18 +206,27 @@ namespace ix {
std::string secWebSocketKey = genRandomString(22); std::string secWebSocketKey = genRandomString(22);
secWebSocketKey += "=="; secWebSocketKey += "==";
char line[256]; std::string extensions;
if (_enablePerMessageDeflate)
{
// extensions = "Sec-WebSocket-Extensions: permessage-deflate; client_no_context_takeover; server_no_context_takeover\r\n";
extensions = "Sec-WebSocket-Extensions: permessage-deflate\r\n";
}
char line[512];
int status; int status;
int i; int i;
snprintf(line, 256, snprintf(line, 512,
"GET %s HTTP/1.1\r\n" "GET %s HTTP/1.1\r\n"
"Host: %s:%d\r\n" "Host: %s:%d\r\n"
"Upgrade: websocket\r\n" "Upgrade: websocket\r\n"
"Connection: Upgrade\r\n" "Connection: Upgrade\r\n"
"Sec-WebSocket-Key: %s\r\n" "Sec-WebSocket-Key: %s\r\n"
"Sec-WebSocket-Version: 13\r\n" "Sec-WebSocket-Version: 13\r\n"
"%s"
"\r\n", "\r\n",
path.c_str(), host.c_str(), port, secWebSocketKey.c_str()); path.c_str(), host.c_str(), port,
secWebSocketKey.c_str(), extensions.c_str());
size_t lineSize = strlen(line); size_t lineSize = strlen(line);
if (_socket->send(line, lineSize) != lineSize) if (_socket->send(line, lineSize) != lineSize)
@ -295,11 +304,13 @@ namespace ix {
// subtract 1 for '\0', 1 for '\n', 1 for '\r', // subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4 // 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon)); std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
// Make the name lower case. // Make the name lower case.
std::transform(name.begin(), name.end(), name.begin(), ::tolower); std::transform(name.begin(), name.end(), name.begin(), ::tolower);
headers[name] = lineStr.substr(colon + 2, i - colon - 4); headers[name] = value;
std::cout << name << " -> " << value << std::endl;
} }
} }
@ -534,7 +545,33 @@ namespace ix {
// fire callback with a string message // fire callback with a string message
std::string stringMessage(_receivedData.begin(), std::string stringMessage(_receivedData.begin(),
_receivedData.end()); _receivedData.end());
onMessageCallback(stringMessage, MSG);
std::cout << "raw msg: " << stringMessage << std::endl;
std::cout << "raw msg size: " << stringMessage.size() << std::endl;
// ws.rsv1 means the message is compressed
// FIXME hack hack
std::string decompressedMessage;
if (_enablePerMessageDeflate && ws.rsv1)
{
if (_perMessageDeflate.decompress(stringMessage,
decompressedMessage))
{
std::cout << "decompressed msg: " << decompressedMessage << std::endl;
std::cout << "msg size: " << decompressedMessage.size() << std::endl;
onMessageCallback(decompressedMessage, MSG);
}
else
{
std::cout << "error decompressing msg !"<< std::endl;
}
}
else
{
onMessageCallback(stringMessage, MSG);
}
_receivedData.clear(); _receivedData.clear();
} }
@ -621,6 +658,13 @@ namespace ix {
(message_size >= 126 ? 2 : 0) + (message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0); (message_size >= 65536 ? 6 : 0) + 4, 0);
header[0] = 0x80 | type; header[0] = 0x80 | type;
// This bit indicate that the frame is compressed
if (_enablePerMessageDeflate)
{
header[0] |= 0x40;
}
if (message_size < 126) if (message_size < 126)
{ {
header[1] = (message_size & 0xff) | 0x80; header[1] = (message_size & 0xff) | 0x80;
@ -674,7 +718,27 @@ namespace ix {
void WebSocketTransport::sendBinary(const std::string& message) void WebSocketTransport::sendBinary(const std::string& message)
{ {
sendData(wsheader_type::BINARY_FRAME, message.size(), message.begin(), message.end()); if (_enablePerMessageDeflate)
{
// FIXME hack hack
std::string compressedMessage;
_perMessageDeflate.compress(message, compressedMessage);
std::cout << "uncompressedMessage " << message << std::endl;
std::cout << "uncompressedMessage.size() " << message.size() << std::endl;
std::cout << "compressedMessage.size() " << compressedMessage.size()
<< std::endl;
// sendData(wsheader_type::BINARY_FRAME, message.size(), message.begin(), message.end());
sendData(wsheader_type::BINARY_FRAME,
compressedMessage.size(),
compressedMessage.begin(),
compressedMessage.end());
}
else
{
sendData(wsheader_type::BINARY_FRAME, message.size(),
message.begin(), message.end());
}
} }
void WebSocketTransport::sendOnSocket() void WebSocketTransport::sendOnSocket()

View File

@ -17,6 +17,8 @@
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include "IXWebSocketPerMessageDeflate.h"
namespace ix namespace ix
{ {
class Socket; class Socket;
@ -121,12 +123,15 @@ namespace ix
std::shared_ptr<Socket> _socket; std::shared_ptr<Socket> _socket;
std::atomic<ReadyStateValues> _readyState; std::atomic<ReadyStateValues> _readyState;
std::atomic<bool> _enablePerMessageDeflate;
OnCloseCallback _onCloseCallback; OnCloseCallback _onCloseCallback;
uint16_t _closeCode; uint16_t _closeCode;
std::string _closeReason; std::string _closeReason;
mutable std::mutex _closeDataMutex; mutable std::mutex _closeDataMutex;
WebSocketPerMessageDeflate _perMessageDeflate;
void sendOnSocket(); void sendOnSocket();
void sendData(wsheader_type::opcode_type type, void sendData(wsheader_type::opcode_type type,
uint64_t message_size, uint64_t message_size,