heartbeat correct

This commit is contained in:
Benjamin Sergeant 2019-01-25 16:11:39 -08:00
parent 75d01c0c11
commit 885d245afb
5 changed files with 56 additions and 18 deletions

View File

@ -309,4 +309,13 @@ A ping message can be sent to the server, with an optional data string.
``` ```
websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long"); websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long");
### Heartbeat.
You can configure an optional heart beat / keep-alive, sent every 45 seconds
when there isn't any traffic to make sure that load balancers do not kill an
idle connection.
```
webSocket.setHeartBeatPeriod(45);
``` ```

View File

@ -42,7 +42,8 @@ namespace ix
_closeWireSize(0), _closeWireSize(0),
_enablePerMessageDeflate(false), _enablePerMessageDeflate(false),
_requestInitCancellation(false), _requestInitCancellation(false),
_heartBeatPeriod(kDefaultHeartBeatPeriod) _heartBeatPeriod(kDefaultHeartBeatPeriod),
_lastSendTimePoint(std::chrono::steady_clock::now())
{ {
} }
@ -155,12 +156,23 @@ namespace ix
_onCloseCallback = onCloseCallback; _onCloseCallback = onCloseCallback;
} }
bool WebSocketTransport::exceedSendHeartBeatTimeOut()
{
std::lock_guard<std::mutex> lock(_lastSendTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _lastSendTimePoint > std::chrono::seconds(_heartBeatPeriod);
}
void WebSocketTransport::poll() void WebSocketTransport::poll()
{ {
_socket->poll( _socket->poll(
[this](PollResultType pollResult) [this](PollResultType pollResult)
{ {
if (pollResult == PollResultType_Timeout) // If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
exceedSendHeartBeatTimeOut())
{ {
std::stringstream ss; std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
@ -572,6 +584,9 @@ namespace ix
_txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret); _txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret);
} }
} }
std::lock_guard<std::mutex> lck(_lastSendTimePointMutex);
_lastSendTimePoint = std::chrono::steady_clock::now();
} }
void WebSocketTransport::close() void WebSocketTransport::close()

View File

@ -122,6 +122,11 @@ namespace ix
int _heartBeatPeriod; int _heartBeatPeriod;
static const int kDefaultHeartBeatPeriod; static const int kDefaultHeartBeatPeriod;
const static std::string kHeartBeatPingMessage; const static std::string kHeartBeatPingMessage;
mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period
bool exceedSendHeartBeatTimeOut();
void sendOnSocket(); void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,

View File

@ -24,6 +24,8 @@ test_server:
(cd test && npm i ws && node broadcast-server.js) (cd test && npm i ws && node broadcast-server.js)
# env TEST=Websocket_server make test # env TEST=Websocket_server make test
# env TEST=websocket_server make test
# env TEST=heartbeat make test
test: test:
python test/run.py python test/run.py

View File

@ -27,6 +27,7 @@ namespace
void start(); void start();
void stop(); void stop();
bool isReady() const; bool isReady() const;
void sendMessage(const std::string& text);
private: private:
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
@ -103,6 +104,11 @@ namespace
ss << "Received ping message " << str; ss << "Received ping message " << str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "Received message " << str;
log(ss.str());
}
else else
{ {
ss << "Invalid ix::WebSocketMessageType"; ss << "Invalid ix::WebSocketMessageType";
@ -113,8 +119,14 @@ namespace
_webSocket.start(); _webSocket.start();
} }
void WebSocketClient::sendMessage(const std::string& text)
{
_webSocket.send(text);
}
bool startServer(ix::WebSocketServer& server, std::atomic<int>& receivedPingMessages) bool startServer(ix::WebSocketServer& server, std::atomic<int>& receivedPingMessages)
{ {
// A dev/null server
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket) [&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
{ {
@ -128,7 +140,7 @@ namespace
{ {
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New connection"; Logger() << "New server connection";
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
@ -138,23 +150,13 @@ namespace
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
log("Closed connection"); log("Server closed connection");
} }
else if (messageType == ix::WebSocket_MessageType_Ping) else if (messageType == ix::WebSocket_MessageType_Ping)
{ {
log("Received a ping"); log("Server received a ping");
receivedPingMessages++; receivedPingMessages++;
} }
else if (messageType == ix::WebSocket_MessageType_Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
} }
); );
} }
@ -178,7 +180,7 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
{ {
ix::setupWebSocketTrafficTrackerCallback(); ix::setupWebSocketTrafficTrackerCallback();
int port = 8092; int port = 8093;
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0); std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages)); REQUIRE(startServer(server, serverReceivedPingMessages));
@ -199,12 +201,17 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
REQUIRE(server.getClients().size() == 2); REQUIRE(server.getClients().size() == 2);
ix::msleep(3000); ix::msleep(900);
webSocketClientB.sendMessage("hello world");
ix::msleep(900);
webSocketClientB.sendMessage("hello world");
ix::msleep(900);
webSocketClientA.stop(); webSocketClientA.stop();
webSocketClientB.stop(); webSocketClientB.stop();
REQUIRE(serverReceivedPingMessages >= 4); REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4);
// Give us 500ms for the server to notice that clients went away // Give us 500ms for the server to notice that clients went away
ix::msleep(500); ix::msleep(500);