capture an error code and a reason when the server closes the connection

This commit is contained in:
Benjamin Sergeant
2018-10-25 18:51:19 -07:00
parent 390044b716
commit a8dfd640a7
9 changed files with 149 additions and 74 deletions

View File

@ -10,9 +10,8 @@
#include <cmath>
#include <cassert>
namespace {
// FIXME: put this in a shared location, and use it in
namespace
{
uint64_t calculateRetryWaitMilliseconds(uint64_t retry_count)
{
// This will overflow quite fast for large value of retry_count
@ -24,7 +23,6 @@ namespace {
uint64_t tenSeconds = 10 * 1000;
return (wait_time > tenSeconds || retry_count > 10) ? tenSeconds : wait_time;
}
}
namespace ix {
@ -32,7 +30,6 @@ namespace ix {
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
WebSocket::WebSocket() :
_verbose(false),
_onMessageCallback(OnMessageCallback()),
_stop(false),
_automaticReconnection(true)
@ -83,20 +80,11 @@ namespace ix {
_ws.configure(_url);
}
_ws.setOnStateChangeCallback(
[this](WebSocketTransport::ReadyStateValues readyStateValue)
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason)
{
if (readyStateValue == WebSocketTransport::CLOSED)
{
_onMessageCallback(WebSocket_MessageType_Close, "", WebSocketErrorInfo());
}
if (_verbose)
{
std::cout << "connection state changed -> "
<< readyStateToString(getReadyState())
<< std::endl;
}
_onMessageCallback(WebSocket_MessageType_Close, "",
WebSocketErrorInfo(), CloseInfo(code, reason));
}
);
@ -106,7 +94,8 @@ namespace ix {
return status;
}
_onMessageCallback(WebSocket_MessageType_Open, "", WebSocketErrorInfo());
_onMessageCallback(WebSocket_MessageType_Open, "",
WebSocketErrorInfo(), CloseInfo());
return status;
}
@ -150,9 +139,8 @@ namespace ix {
connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", connectErr);
if (_verbose) std::cout << "Sleeping for " << duration.count() << "ms" << std::endl;
_onMessageCallback(WebSocket_MessageType_Error, "",
connectErr, CloseInfo());
std::this_thread::sleep_for(duration);
}
@ -199,7 +187,8 @@ namespace ix {
} break;
}
_onMessageCallback(webSocketMessageType, msg, WebSocketErrorInfo());
_onMessageCallback(webSocketMessageType, msg,
WebSocketErrorInfo(), CloseInfo());
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});

View File

@ -45,7 +45,28 @@ namespace ix
std::string reason;
};
using OnMessageCallback = std::function<void(WebSocketMessageType, const std::string&, const WebSocketErrorInfo)>;
struct CloseInfo
{
uint16_t code;
std::string reason;
CloseInfo(uint64_t c, const std::string& r)
{
code = c;
reason = r;
}
CloseInfo()
{
code = 0;
reason = "";
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
const WebSocketErrorInfo,
const CloseInfo)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class WebSocket
@ -65,8 +86,6 @@ namespace ix
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback();
void setVerbose(bool verbose) { _verbose = verbose; }
const std::string& getUrl() const;
ReadyState getReadyState() const;
@ -86,7 +105,6 @@ namespace ix
std::string _url;
mutable std::mutex _urlMutex;
bool _verbose;
OnMessageCallback _onMessageCallback;
static OnTrafficTrackerCallback _onTrafficTrackerCallback;

View File

@ -255,13 +255,20 @@ namespace ix {
void WebSocketTransport::setReadyState(ReadyStateValues readyStateValue)
{
if (readyStateValue == CLOSED)
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_onCloseCallback(_closeCode, _closeReason);
_closeCode = 0;
_closeReason = std::string();
}
_readyState = readyStateValue;
_onStateChangeCallback(readyStateValue);
}
void WebSocketTransport::setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback)
void WebSocketTransport::setOnCloseCallback(const OnCloseCallback& onCloseCallback)
{
_onStateChangeCallback = onStateChangeCallback;
_onCloseCallback = onCloseCallback;
}
void WebSocketTransport::poll()
@ -334,6 +341,17 @@ namespace ix {
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
}
void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
}
//
// http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol
//
@ -358,8 +376,8 @@ namespace ix {
//
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback)
{
// TODO: consider acquiring a lock on _rxbuf...
while (true) {
while (true)
{
wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */
const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume
@ -434,13 +452,7 @@ namespace ix {
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
unmaskReceiveBuffer(ws);
_receivedData.insert(_receivedData.end(),
_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed
@ -456,14 +468,7 @@ namespace ix {
}
else if (ws.opcode == wsheader_type::PING)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
unmaskReceiveBuffer(ws);
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
@ -475,21 +480,37 @@ namespace ix {
}
else if (ws.opcode == wsheader_type::PONG)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
unmaskReceiveBuffer(ws);
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
onMessageCallback(pongData, PONG);
}
else if (ws.opcode == wsheader_type::CLOSE) { close(); }
else { close(); }
else if (ws.opcode == wsheader_type::CLOSE)
{
unmaskReceiveBuffer(ws);
// Extract the close code first, available as the first 2 bytes
uint16_t code = 0;
code |= ((uint64_t) _rxbuf[ws.header_size]) << 8;
code |= ((uint64_t) _rxbuf[ws.header_size+1]) << 0;
// Get the reason.
std::string reason(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + 2 + (size_t) ws.N);
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code;
_closeReason = reason;
}
close();
}
else
{
close();
}
_rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N);

View File

@ -63,7 +63,8 @@ namespace ix
using OnMessageCallback = std::function<void(const std::string&,
MessageKind)>;
using OnStateChangeCallback = std::function<void(ReadyStateValues)>;
using OnCloseCallback = std::function<void(uint16_t,
const std::string&)>;
WebSocketTransport();
~WebSocketTransport();
@ -79,7 +80,7 @@ namespace ix
void close();
ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue);
void setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback);
static void printUrl(const std::string& url);
@ -120,7 +121,10 @@ namespace ix
std::atomic<ReadyStateValues> _readyState;
OnStateChangeCallback _onStateChangeCallback;
OnCloseCallback _onCloseCallback;
uint16_t _closeCode;
std::string _closeReason;
mutable std::mutex _closeDataMutex;
void sendOnSocket();
void sendData(wsheader_type::opcode_type type,
@ -137,5 +141,6 @@ namespace ix
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);
};
}