heartbeat correct

This commit is contained in:
Benjamin Sergeant 2019-01-25 16:11:39 -08:00
parent 3c9ec0aed0
commit fa7ef06f4d
5 changed files with 56 additions and 18 deletions

View File

@ -309,4 +309,13 @@ A ping message can be sent to the server, with an optional data string.
```
websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long");
### Heartbeat.
You can configure an optional heart beat / keep-alive, sent every 45 seconds
when there isn't any traffic to make sure that load balancers do not kill an
idle connection.
```
webSocket.setHeartBeatPeriod(45);
```

View File

@ -42,7 +42,8 @@ namespace ix
_closeWireSize(0),
_enablePerMessageDeflate(false),
_requestInitCancellation(false),
_heartBeatPeriod(kDefaultHeartBeatPeriod)
_heartBeatPeriod(kDefaultHeartBeatPeriod),
_lastSendTimePoint(std::chrono::steady_clock::now())
{
}
@ -155,12 +156,23 @@ namespace ix
_onCloseCallback = onCloseCallback;
}
bool WebSocketTransport::exceedSendHeartBeatTimeOut()
{
std::lock_guard<std::mutex> lock(_lastSendTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _lastSendTimePoint > std::chrono::seconds(_heartBeatPeriod);
}
void WebSocketTransport::poll()
{
_socket->poll(
[this](PollResultType pollResult)
{
if (pollResult == PollResultType_Timeout)
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
exceedSendHeartBeatTimeOut())
{
std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
@ -572,6 +584,9 @@ namespace ix
_txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret);
}
}
std::lock_guard<std::mutex> lck(_lastSendTimePointMutex);
_lastSendTimePoint = std::chrono::steady_clock::now();
}
void WebSocketTransport::close()

View File

@ -122,6 +122,11 @@ namespace ix
int _heartBeatPeriod;
static const int kDefaultHeartBeatPeriod;
const static std::string kHeartBeatPingMessage;
mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period
bool exceedSendHeartBeatTimeOut();
void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type,

View File

@ -24,6 +24,8 @@ test_server:
(cd test && npm i ws && node broadcast-server.js)
# env TEST=Websocket_server make test
# env TEST=websocket_server make test
# env TEST=heartbeat make test
test:
python test/run.py

View File

@ -27,6 +27,7 @@ namespace
void start();
void stop();
bool isReady() const;
void sendMessage(const std::string& text);
private:
ix::WebSocket _webSocket;
@ -103,6 +104,11 @@ namespace
ss << "Received ping message " << str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "Received message " << str;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
@ -113,8 +119,14 @@ namespace
_webSocket.start();
}
void WebSocketClient::sendMessage(const std::string& text)
{
_webSocket.send(text);
}
bool startServer(ix::WebSocketServer& server, std::atomic<int>& receivedPingMessages)
{
// A dev/null server
server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
{
@ -128,7 +140,7 @@ namespace
{
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
Logger() << "New server connection";
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
@ -138,23 +150,13 @@ namespace
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("Closed connection");
log("Server closed connection");
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("Received a ping");
log("Server received a ping");
receivedPingMessages++;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
}
);
}
@ -178,7 +180,7 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = 8092;
int port = 8093;
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
@ -199,12 +201,17 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
REQUIRE(server.getClients().size() == 2);
ix::msleep(3000);
ix::msleep(900);
webSocketClientB.sendMessage("hello world");
ix::msleep(900);
webSocketClientB.sendMessage("hello world");
ix::msleep(900);
webSocketClientA.stop();
webSocketClientB.stop();
REQUIRE(serverReceivedPingMessages >= 4);
REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4);
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);