From bb0b1836cd26a4af2ab137fd9d92e74aee93053e Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 25 Oct 2018 18:51:19 -0700 Subject: [PATCH] capture an error code and a reason when the server closes the connection --- README.md | 16 ++++-- examples/chat/cmd_websocket_chat.cpp | 5 +- examples/ping_pong/ping_pong.cpp | 36 ++++++++++--- examples/ping_pong/server.py | 8 +++ examples/ws_connect/ws_connect.cpp | 5 +- ixwebsocket/IXWebSocket.cpp | 35 +++++------- ixwebsocket/IXWebSocket.h | 26 +++++++-- ixwebsocket/IXWebSocketTransport.cpp | 81 +++++++++++++++++----------- ixwebsocket/IXWebSocketTransport.h | 11 ++-- 9 files changed, 149 insertions(+), 74 deletions(-) diff --git a/README.md b/README.md index 0bee3e31..88776001 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,10 @@ webSocket.configure(url); // Setup a callback to be fired when a message or an event (open, close, error) is received webSocket.setOnMessageCallback( - [](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + [](ix::WebSocketMessageType messageType, + const std::string& str, + const ix::WebSocketErrorInfo& error, + const ix::CloseInfo& closeInfo) { if (messageType == ix::WebSocket_MessageType_Message) { @@ -124,7 +127,7 @@ The onMessage event will be fired when the connection is opened or closed. This ``` webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + [this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo closeInfo&) { if (messageType == ix::WebSocket_MessageType_Open) { @@ -133,6 +136,11 @@ webSocket.setOnMessageCallback( else if (messageType == ix::WebSocket_MessageType_Close) { puts("disconnected"); + + // The server can send an explicit code and reason for closing. + // This data can be accessed through the closeInfo object. + std::cout << closeInfo.code << std::endl; + std::cout << closeInfo.reason << std::endl; } } ); @@ -144,7 +152,7 @@ A message will be fired when there is an error with the connection. The message ``` webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + [this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo& closeInfo) { if (messageType == ix::WebSocket_MessageType_Error) { @@ -179,7 +187,7 @@ Ping/pong messages are used to implement keep-alive. 2 message types exists to i ``` webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + [this](ix::WebSocketMessageType messageType, const std::string& str, const ix::WebSocketErrorInfo& error, const ix::CloseInfo& closeInfo) { if (messageType == ix::WebSocket_MessageType_Ping || messageType == ix::WebSocket_MessageType_Pong) diff --git a/examples/chat/cmd_websocket_chat.cpp b/examples/chat/cmd_websocket_chat.cpp index 70e29f63..dc1cc4c8 100644 --- a/examples/chat/cmd_websocket_chat.cpp +++ b/examples/chat/cmd_websocket_chat.cpp @@ -83,7 +83,10 @@ namespace log(std::string("Connecting to url: ") + url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + [this](ix::WebSocketMessageType messageType, + const std::string& str, + const ix::WebSocketErrorInfo& error, + const ix::CloseInfo& closeInfo) { std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) diff --git a/examples/ping_pong/ping_pong.cpp b/examples/ping_pong/ping_pong.cpp index 90d219c7..f5a34e10 100644 --- a/examples/ping_pong/ping_pong.cpp +++ b/examples/ping_pong/ping_pong.cpp @@ -1,5 +1,5 @@ /* - * ws_connect.cpp + * ping_pong.cpp * Author: Benjamin Sergeant * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. */ @@ -28,6 +28,7 @@ namespace void stop(); void ping(const std::string& text); + void send(const std::string& text); private: std::string _url; @@ -53,32 +54,39 @@ namespace log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + [this](ix::WebSocketMessageType messageType, + const std::string& str, + const ix::WebSocketErrorInfo& error, + const ix::CloseInfo& closeInfo) { std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) { - log("ws_connect: connected"); + log("ping_pong: connected"); } else if (messageType == ix::WebSocket_MessageType_Close) { - log("ws_connect: disconnected"); + ss << "ping_pong: disconnected:" + << " code " << closeInfo.code + << " reason " << closeInfo.reason + << str; + log(ss.str()); } else if (messageType == ix::WebSocket_MessageType_Message) { - ss << "ws_connect: received message: " + ss << "ping_pong: received message: " << str; log(ss.str()); } else if (messageType == ix::WebSocket_MessageType_Ping) { - ss << "ws_connect: received ping message: " + ss << "ping_pong: received ping message: " << str; log(ss.str()); } else if (messageType == ix::WebSocket_MessageType_Pong) { - ss << "ws_connect: received pong message: " + ss << "ping_pong: received pong message: " << str; log(ss.str()); } @@ -109,6 +117,11 @@ namespace } } + void WebSocketPingPong::send(const std::string& text) + { + _webSocket.send(text); + } + void interactiveMain(const std::string& url) { std::cout << "Type Ctrl-D to exit prompt..." << std::endl; @@ -126,7 +139,14 @@ namespace break; } - webSocketPingPong.ping(text); + if (text == "/close") + { + webSocketPingPong.send(text); + } + else + { + webSocketPingPong.ping(text); + } } std::cout << std::endl; diff --git a/examples/ping_pong/server.py b/examples/ping_pong/server.py index c1f8b8a5..913332f0 100644 --- a/examples/ping_pong/server.py +++ b/examples/ping_pong/server.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import os import asyncio import websockets @@ -8,6 +9,13 @@ async def echo(websocket, path): print(message) await websocket.send(message) + if os.getenv('TEST_CLOSE'): + print('Closing') + # breakpoint() + await websocket.close(1001, 'close message') + # await websocket.close() + break + asyncio.get_event_loop().run_until_complete( websockets.serve(echo, 'localhost', 5678)) asyncio.get_event_loop().run_forever() diff --git a/examples/ws_connect/ws_connect.cpp b/examples/ws_connect/ws_connect.cpp index 7b5f8b33..dfb78505 100644 --- a/examples/ws_connect/ws_connect.cpp +++ b/examples/ws_connect/ws_connect.cpp @@ -53,7 +53,10 @@ namespace log(std::string("Connecting to url: ") + _url); _webSocket.setOnMessageCallback( - [this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + [this](ix::WebSocketMessageType messageType, + const std::string& str, + const ix::WebSocketErrorInfo& error, + const ix::CloseInfo& closeInfo) { std::stringstream ss; if (messageType == ix::WebSocket_MessageType_Open) diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index f012bea7..db9b8f87 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -10,9 +10,8 @@ #include #include -namespace { - - // FIXME: put this in a shared location, and use it in +namespace +{ uint64_t calculateRetryWaitMilliseconds(uint64_t retry_count) { // This will overflow quite fast for large value of retry_count @@ -24,7 +23,6 @@ namespace { uint64_t tenSeconds = 10 * 1000; return (wait_time > tenSeconds || retry_count > 10) ? tenSeconds : wait_time; } - } namespace ix { @@ -32,7 +30,6 @@ namespace ix { OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr; WebSocket::WebSocket() : - _verbose(false), _onMessageCallback(OnMessageCallback()), _stop(false), _automaticReconnection(true) @@ -83,20 +80,11 @@ namespace ix { _ws.configure(_url); } - _ws.setOnStateChangeCallback( - [this](WebSocketTransport::ReadyStateValues readyStateValue) + _ws.setOnCloseCallback( + [this](uint16_t code, const std::string& reason) { - if (readyStateValue == WebSocketTransport::CLOSED) - { - _onMessageCallback(WebSocket_MessageType_Close, "", WebSocketErrorInfo()); - } - - if (_verbose) - { - std::cout << "connection state changed -> " - << readyStateToString(getReadyState()) - << std::endl; - } + _onMessageCallback(WebSocket_MessageType_Close, "", + WebSocketErrorInfo(), CloseInfo(code, reason)); } ); @@ -106,7 +94,8 @@ namespace ix { return status; } - _onMessageCallback(WebSocket_MessageType_Open, "", WebSocketErrorInfo()); + _onMessageCallback(WebSocket_MessageType_Open, "", + WebSocketErrorInfo(), CloseInfo()); return status; } @@ -150,9 +139,8 @@ namespace ix { connectErr.wait_time = duration.count(); connectErr.reason = status.errorStr; connectErr.http_status = status.http_status; - _onMessageCallback(WebSocket_MessageType_Error, "", connectErr); - - if (_verbose) std::cout << "Sleeping for " << duration.count() << "ms" << std::endl; + _onMessageCallback(WebSocket_MessageType_Error, "", + connectErr, CloseInfo()); std::this_thread::sleep_for(duration); } @@ -199,7 +187,8 @@ namespace ix { } break; } - _onMessageCallback(webSocketMessageType, msg, WebSocketErrorInfo()); + _onMessageCallback(webSocketMessageType, msg, + WebSocketErrorInfo(), CloseInfo()); WebSocket::invokeTrafficTrackerCallback(msg.size(), true); }); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 749e9df7..39847382 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -45,7 +45,28 @@ namespace ix std::string reason; }; - using OnMessageCallback = std::function; + struct CloseInfo + { + uint16_t code; + std::string reason; + + CloseInfo(uint64_t c, const std::string& r) + { + code = c; + reason = r; + } + + CloseInfo() + { + code = 0; + reason = ""; + } + }; + + using OnMessageCallback = std::function; using OnTrafficTrackerCallback = std::function; class WebSocket @@ -65,8 +86,6 @@ namespace ix static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void resetTrafficTrackerCallback(); - void setVerbose(bool verbose) { _verbose = verbose; } - const std::string& getUrl() const; ReadyState getReadyState() const; @@ -86,7 +105,6 @@ namespace ix std::string _url; mutable std::mutex _urlMutex; - bool _verbose; OnMessageCallback _onMessageCallback; static OnTrafficTrackerCallback _onTrafficTrackerCallback; diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index be113148..1f413b49 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -255,13 +255,20 @@ namespace ix { void WebSocketTransport::setReadyState(ReadyStateValues readyStateValue) { + if (readyStateValue == CLOSED) + { + std::lock_guard lock(_closeDataMutex); + _onCloseCallback(_closeCode, _closeReason); + _closeCode = 0; + _closeReason = std::string(); + } + _readyState = readyStateValue; - _onStateChangeCallback(readyStateValue); } - void WebSocketTransport::setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback) + void WebSocketTransport::setOnCloseCallback(const OnCloseCallback& onCloseCallback) { - _onStateChangeCallback = onStateChangeCallback; + _onCloseCallback = onCloseCallback; } void WebSocketTransport::poll() @@ -334,6 +341,17 @@ namespace ix { _txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end()); } + void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws) + { + if (ws.mask) + { + for (size_t j = 0; j != ws.N; ++j) + { + _rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3]; + } + } + } + // // http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol // @@ -358,8 +376,8 @@ namespace ix { // void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback) { - // TODO: consider acquiring a lock on _rxbuf... - while (true) { + while (true) + { wsheader_type ws; if (_rxbuf.size() < 2) return; /* Need at least 2 */ const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume @@ -434,13 +452,7 @@ namespace ix { || ws.opcode == wsheader_type::BINARY_FRAME || ws.opcode == wsheader_type::CONTINUATION ) { - if (ws.mask) - { - for (size_t j = 0; j != ws.N; ++j) - { - _rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3]; - } - } + unmaskReceiveBuffer(ws); _receivedData.insert(_receivedData.end(), _rxbuf.begin()+ws.header_size, _rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed @@ -456,14 +468,7 @@ namespace ix { } else if (ws.opcode == wsheader_type::PING) { - if (ws.mask) - { - for (size_t j = 0; j != ws.N; ++j) - { - _rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3]; - } - } - + unmaskReceiveBuffer(ws); std::string pingData(_rxbuf.begin()+ws.header_size, _rxbuf.begin()+ws.header_size + (size_t) ws.N); @@ -475,21 +480,37 @@ namespace ix { } else if (ws.opcode == wsheader_type::PONG) { - if (ws.mask) - { - for (size_t j = 0; j != ws.N; ++j) - { - _rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3]; - } - } - + unmaskReceiveBuffer(ws); std::string pongData(_rxbuf.begin()+ws.header_size, _rxbuf.begin()+ws.header_size + (size_t) ws.N); onMessageCallback(pongData, PONG); } - else if (ws.opcode == wsheader_type::CLOSE) { close(); } - else { close(); } + else if (ws.opcode == wsheader_type::CLOSE) + { + unmaskReceiveBuffer(ws); + + // Extract the close code first, available as the first 2 bytes + uint16_t code = 0; + code |= ((uint64_t) _rxbuf[ws.header_size]) << 8; + code |= ((uint64_t) _rxbuf[ws.header_size+1]) << 0; + + // Get the reason. + std::string reason(_rxbuf.begin()+ws.header_size + 2, + _rxbuf.begin()+ws.header_size + 2 + (size_t) ws.N); + + { + std::lock_guard lock(_closeDataMutex); + _closeCode = code; + _closeReason = reason; + } + + close(); + } + else + { + close(); + } _rxbuf.erase(_rxbuf.begin(), _rxbuf.begin() + ws.header_size + (size_t) ws.N); diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index 08f872c4..c0608786 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -63,7 +63,8 @@ namespace ix using OnMessageCallback = std::function; - using OnStateChangeCallback = std::function; + using OnCloseCallback = std::function; WebSocketTransport(); ~WebSocketTransport(); @@ -79,7 +80,7 @@ namespace ix void close(); ReadyStateValues getReadyState() const; void setReadyState(ReadyStateValues readyStateValue); - void setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback); + void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void dispatch(const OnMessageCallback& onMessageCallback); static void printUrl(const std::string& url); @@ -120,7 +121,10 @@ namespace ix std::atomic _readyState; - OnStateChangeCallback _onStateChangeCallback; + OnCloseCallback _onCloseCallback; + uint16_t _closeCode; + std::string _closeReason; + mutable std::mutex _closeDataMutex; void sendOnSocket(); void sendData(wsheader_type::opcode_type type, @@ -137,5 +141,6 @@ namespace ix void appendToSendBuffer(const std::vector& buffer); unsigned getRandomUnsigned(); + void unmaskReceiveBuffer(const wsheader_type& ws); }; }