capture an error code and a reason when the server closes the connection

This commit is contained in:
Benjamin Sergeant 2018-10-25 18:51:19 -07:00
parent d5c8815438
commit bb0b1836cd
9 changed files with 149 additions and 74 deletions

View File

@ -25,7 +25,10 @@ webSocket.configure(url);
// Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
[](ix::WebSocketMessageType messageType,
const std::string& str,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Message)
{
@ -124,7 +127,7 @@ The onMessage event will be fired when the connection is opened or closed. This
```
webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
[this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo closeInfo&)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
@ -133,6 +136,11 @@ webSocket.setOnMessageCallback(
else if (messageType == ix::WebSocket_MessageType_Close)
{
puts("disconnected");
// The server can send an explicit code and reason for closing.
// This data can be accessed through the closeInfo object.
std::cout << closeInfo.code << std::endl;
std::cout << closeInfo.reason << std::endl;
}
}
);
@ -144,7 +152,7 @@ A message will be fired when there is an error with the connection. The message
```
webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
[this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Error)
{
@ -179,7 +187,7 @@ Ping/pong messages are used to implement keep-alive. 2 message types exists to i
```
webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
[this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Ping ||
messageType == ix::WebSocket_MessageType_Pong)

View File

@ -83,7 +83,10 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)

View File

@ -1,5 +1,5 @@
/*
* ws_connect.cpp
* ping_pong.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
@ -28,6 +28,7 @@ namespace
void stop();
void ping(const std::string& text);
void send(const std::string& text);
private:
std::string _url;
@ -53,32 +54,39 @@ namespace
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
log("ws_connect: connected");
log("ping_pong: connected");
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("ws_connect: disconnected");
ss << "ping_pong: disconnected:"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason
<< str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "ws_connect: received message: "
ss << "ping_pong: received message: "
<< str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
ss << "ws_connect: received ping message: "
ss << "ping_pong: received ping message: "
<< str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
ss << "ws_connect: received pong message: "
ss << "ping_pong: received pong message: "
<< str;
log(ss.str());
}
@ -109,6 +117,11 @@ namespace
}
}
void WebSocketPingPong::send(const std::string& text)
{
_webSocket.send(text);
}
void interactiveMain(const std::string& url)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
@ -126,7 +139,14 @@ namespace
break;
}
webSocketPingPong.ping(text);
if (text == "/close")
{
webSocketPingPong.send(text);
}
else
{
webSocketPingPong.ping(text);
}
}
std::cout << std::endl;

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
import os
import asyncio
import websockets
@ -8,6 +9,13 @@ async def echo(websocket, path):
print(message)
await websocket.send(message)
if os.getenv('TEST_CLOSE'):
print('Closing')
# breakpoint()
await websocket.close(1001, 'close message')
# await websocket.close()
break
asyncio.get_event_loop().run_until_complete(
websockets.serve(echo, 'localhost', 5678))
asyncio.get_event_loop().run_forever()

View File

@ -53,7 +53,10 @@ namespace
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
const ix::WebSocketErrorInfo& error,
const ix::CloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)

View File

@ -10,9 +10,8 @@
#include <cmath>
#include <cassert>
namespace {
// FIXME: put this in a shared location, and use it in
namespace
{
uint64_t calculateRetryWaitMilliseconds(uint64_t retry_count)
{
// This will overflow quite fast for large value of retry_count
@ -24,7 +23,6 @@ namespace {
uint64_t tenSeconds = 10 * 1000;
return (wait_time > tenSeconds || retry_count > 10) ? tenSeconds : wait_time;
}
}
namespace ix {
@ -32,7 +30,6 @@ namespace ix {
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
WebSocket::WebSocket() :
_verbose(false),
_onMessageCallback(OnMessageCallback()),
_stop(false),
_automaticReconnection(true)
@ -83,20 +80,11 @@ namespace ix {
_ws.configure(_url);
}
_ws.setOnStateChangeCallback(
[this](WebSocketTransport::ReadyStateValues readyStateValue)
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason)
{
if (readyStateValue == WebSocketTransport::CLOSED)
{
_onMessageCallback(WebSocket_MessageType_Close, "", WebSocketErrorInfo());
}
if (_verbose)
{
std::cout << "connection state changed -> "
<< readyStateToString(getReadyState())
<< std::endl;
}
_onMessageCallback(WebSocket_MessageType_Close, "",
WebSocketErrorInfo(), CloseInfo(code, reason));
}
);
@ -106,7 +94,8 @@ namespace ix {
return status;
}
_onMessageCallback(WebSocket_MessageType_Open, "", WebSocketErrorInfo());
_onMessageCallback(WebSocket_MessageType_Open, "",
WebSocketErrorInfo(), CloseInfo());
return status;
}
@ -150,9 +139,8 @@ namespace ix {
connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", connectErr);
if (_verbose) std::cout << "Sleeping for " << duration.count() << "ms" << std::endl;
_onMessageCallback(WebSocket_MessageType_Error, "",
connectErr, CloseInfo());
std::this_thread::sleep_for(duration);
}
@ -199,7 +187,8 @@ namespace ix {
} break;
}
_onMessageCallback(webSocketMessageType, msg, WebSocketErrorInfo());
_onMessageCallback(webSocketMessageType, msg,
WebSocketErrorInfo(), CloseInfo());
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});

View File

@ -45,7 +45,28 @@ namespace ix
std::string reason;
};
using OnMessageCallback = std::function<void(WebSocketMessageType, const std::string&, const WebSocketErrorInfo)>;
struct CloseInfo
{
uint16_t code;
std::string reason;
CloseInfo(uint64_t c, const std::string& r)
{
code = c;
reason = r;
}
CloseInfo()
{
code = 0;
reason = "";
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
const WebSocketErrorInfo,
const CloseInfo)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class WebSocket
@ -65,8 +86,6 @@ namespace ix
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback();
void setVerbose(bool verbose) { _verbose = verbose; }
const std::string& getUrl() const;
ReadyState getReadyState() const;
@ -86,7 +105,6 @@ namespace ix
std::string _url;
mutable std::mutex _urlMutex;
bool _verbose;
OnMessageCallback _onMessageCallback;
static OnTrafficTrackerCallback _onTrafficTrackerCallback;

View File

@ -255,13 +255,20 @@ namespace ix {
void WebSocketTransport::setReadyState(ReadyStateValues readyStateValue)
{
if (readyStateValue == CLOSED)
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_onCloseCallback(_closeCode, _closeReason);
_closeCode = 0;
_closeReason = std::string();
}
_readyState = readyStateValue;
_onStateChangeCallback(readyStateValue);
}
void WebSocketTransport::setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback)
void WebSocketTransport::setOnCloseCallback(const OnCloseCallback& onCloseCallback)
{
_onStateChangeCallback = onStateChangeCallback;
_onCloseCallback = onCloseCallback;
}
void WebSocketTransport::poll()
@ -334,6 +341,17 @@ namespace ix {
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
}
void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
}
//
// http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol
//
@ -358,8 +376,8 @@ namespace ix {
//
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback)
{
// TODO: consider acquiring a lock on _rxbuf...
while (true) {
while (true)
{
wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */
const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume
@ -434,13 +452,7 @@ namespace ix {
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
unmaskReceiveBuffer(ws);
_receivedData.insert(_receivedData.end(),
_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed
@ -456,14 +468,7 @@ namespace ix {
}
else if (ws.opcode == wsheader_type::PING)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
unmaskReceiveBuffer(ws);
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
@ -475,21 +480,37 @@ namespace ix {
}
else if (ws.opcode == wsheader_type::PONG)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
unmaskReceiveBuffer(ws);
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
onMessageCallback(pongData, PONG);
}
else if (ws.opcode == wsheader_type::CLOSE) { close(); }
else { close(); }
else if (ws.opcode == wsheader_type::CLOSE)
{
unmaskReceiveBuffer(ws);
// Extract the close code first, available as the first 2 bytes
uint16_t code = 0;
code |= ((uint64_t) _rxbuf[ws.header_size]) << 8;
code |= ((uint64_t) _rxbuf[ws.header_size+1]) << 0;
// Get the reason.
std::string reason(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + 2 + (size_t) ws.N);
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code;
_closeReason = reason;
}
close();
}
else
{
close();
}
_rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N);

View File

@ -63,7 +63,8 @@ namespace ix
using OnMessageCallback = std::function<void(const std::string&,
MessageKind)>;
using OnStateChangeCallback = std::function<void(ReadyStateValues)>;
using OnCloseCallback = std::function<void(uint16_t,
const std::string&)>;
WebSocketTransport();
~WebSocketTransport();
@ -79,7 +80,7 @@ namespace ix
void close();
ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue);
void setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback);
static void printUrl(const std::string& url);
@ -120,7 +121,10 @@ namespace ix
std::atomic<ReadyStateValues> _readyState;
OnStateChangeCallback _onStateChangeCallback;
OnCloseCallback _onCloseCallback;
uint16_t _closeCode;
std::string _closeReason;
mutable std::mutex _closeDataMutex;
void sendOnSocket();
void sendData(wsheader_type::opcode_type type,
@ -137,5 +141,6 @@ namespace ix
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);
};
}