IXWebSocket/ixwebsocket/IXWebSocket.cpp

560 lines
18 KiB
C++
Raw Permalink Normal View History

2018-09-27 23:56:48 +02:00
/*
* IXWebSocket.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocket.h"
2019-09-23 19:25:23 +02:00
#include "IXExponentialBackoff.h"
2019-09-23 19:25:23 +02:00
#include "IXSetThreadName.h"
#include "IXUtf8Validator.h"
2019-09-23 19:25:23 +02:00
#include "IXWebSocketHandshake.h"
2018-09-27 23:56:48 +02:00
#include <cassert>
2019-09-23 19:25:23 +02:00
#include <cmath>
2018-09-27 23:56:48 +02:00
2018-12-23 23:14:38 +01:00
namespace ix
{
2018-09-27 23:56:48 +02:00
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
2019-01-04 03:33:08 +01:00
const int WebSocket::kDefaultHandShakeTimeoutSecs(60);
const int WebSocket::kDefaultPingIntervalSecs(-1);
const bool WebSocket::kDefaultEnablePong(true);
const uint32_t WebSocket::kDefaultMaxWaitBetweenReconnectionRetries(10 * 1000); // 10s
2018-09-27 23:56:48 +02:00
2019-09-23 19:25:23 +02:00
WebSocket::WebSocket()
: _onMessageCallback(OnMessageCallback())
, _stop(false)
, _automaticReconnection(true)
, _maxWaitBetweenReconnectionRetries(kDefaultMaxWaitBetweenReconnectionRetries)
, _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
, _enablePong(kDefaultEnablePong)
, _pingIntervalSecs(kDefaultPingIntervalSecs)
2018-09-27 23:56:48 +02:00
{
_ws.setOnCloseCallback(
2019-09-23 19:25:23 +02:00
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) {
_onMessageCallback(
std::make_unique<WebSocketMessage>(WebSocketMessageType::Close,
2019-09-23 19:25:23 +02:00
"",
wireSize,
WebSocketErrorInfo(),
WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
});
2018-09-27 23:56:48 +02:00
}
WebSocket::~WebSocket()
2018-09-27 23:56:48 +02:00
{
stop();
_ws.setOnCloseCallback(nullptr);
2018-09-27 23:56:48 +02:00
}
void WebSocket::setUrl(const std::string& url)
2018-09-27 23:56:48 +02:00
{
std::lock_guard<std::mutex> lock(_configMutex);
2018-09-27 23:56:48 +02:00
_url = url;
}
void WebSocket::setExtraHeaders(const WebSocketHttpHeaders& headers)
{
std::lock_guard<std::mutex> lock(_configMutex);
_extraHeaders = headers;
}
2018-09-27 23:56:48 +02:00
const std::string& WebSocket::getUrl() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _url;
}
2019-09-23 19:25:23 +02:00
void WebSocket::setPerMessageDeflateOptions(
const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions)
{
std::lock_guard<std::mutex> lock(_configMutex);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::setTLSOptions(const SocketTLSOptions& socketTLSOptions)
{
std::lock_guard<std::mutex> lock(_configMutex);
_socketTLSOptions = socketTLSOptions;
}
const WebSocketPerMessageDeflateOptions& WebSocket::getPerMessageDeflateOptions() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _perMessageDeflateOptions;
}
void WebSocket::setPingInterval(int pingIntervalSecs)
{
std::lock_guard<std::mutex> lock(_configMutex);
_pingIntervalSecs = pingIntervalSecs;
}
int WebSocket::getPingInterval() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _pingIntervalSecs;
}
void WebSocket::enablePong()
{
std::lock_guard<std::mutex> lock(_configMutex);
_enablePong = true;
}
void WebSocket::disablePong()
{
std::lock_guard<std::mutex> lock(_configMutex);
_enablePong = false;
}
2019-04-20 01:57:38 +02:00
void WebSocket::enablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(true);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::disablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(false);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::setMaxWaitBetweenReconnectionRetries(uint32_t maxWaitBetweenReconnectionRetries)
{
std::lock_guard<std::mutex> lock(_configMutex);
_maxWaitBetweenReconnectionRetries = maxWaitBetweenReconnectionRetries;
}
uint32_t WebSocket::getMaxWaitBetweenReconnectionRetries() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _maxWaitBetweenReconnectionRetries;
}
2018-09-27 23:56:48 +02:00
void WebSocket::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&WebSocket::run, this);
}
2019-09-23 19:25:23 +02:00
void WebSocket::stop(uint16_t code, const std::string& reason)
2018-09-27 23:56:48 +02:00
{
close(code, reason);
2018-09-27 23:56:48 +02:00
if (_thread.joinable())
2018-09-27 23:56:48 +02:00
{
// wait until working thread will exit
// it will exit after close operation is finished
_stop = true;
_sleepCondition.notify_one();
_thread.join();
_stop = false;
2018-09-27 23:56:48 +02:00
}
}
2019-01-04 03:33:08 +01:00
WebSocketInitResult WebSocket::connect(int timeoutSecs)
2018-09-27 23:56:48 +02:00
{
{
std::lock_guard<std::mutex> lock(_configMutex);
2020-03-21 01:00:18 +01:00
_ws.configure(
_perMessageDeflateOptions, _socketTLSOptions, _enablePong, _pingIntervalSecs);
2018-09-27 23:56:48 +02:00
}
WebSocketHttpHeaders headers(_extraHeaders);
std::string subProtocolsHeader;
auto subProtocols = getSubProtocols();
if (!subProtocols.empty())
{
//
// Sub Protocol strings are comma separated.
// Python code to do that is:
// >>> ','.join(['json', 'msgpack'])
// 'json,msgpack'
//
int i = 0;
for (auto subProtocol : subProtocols)
{
if (i++ != 0)
{
subProtocolsHeader += ",";
}
subProtocolsHeader += subProtocol;
}
headers["Sec-WebSocket-Protocol"] = subProtocolsHeader;
}
WebSocketInitResult status = _ws.connectToUrl(_url, headers, timeoutSecs);
2018-09-27 23:56:48 +02:00
if (!status.success)
{
return status;
}
_onMessageCallback(std::make_unique<WebSocketMessage>(
WebSocketMessageType::Open,
"",
0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers, status.protocol),
WebSocketCloseInfo()));
2020-03-18 19:45:28 +01:00
if (_pingIntervalSecs > 0)
{
// Send a heart beat right away
_ws.sendHeartBeat();
}
2018-09-27 23:56:48 +02:00
return status;
}
2020-03-24 20:40:58 +01:00
WebSocketInitResult WebSocket::connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs)
{
{
std::lock_guard<std::mutex> lock(_configMutex);
2020-03-21 01:00:18 +01:00
_ws.configure(
_perMessageDeflateOptions, _socketTLSOptions, _enablePong, _pingIntervalSecs);
}
2020-03-24 20:40:58 +01:00
WebSocketInitResult status = _ws.connectToSocket(std::move(socket), timeoutSecs);
if (!status.success)
{
return status;
}
_onMessageCallback(
std::make_unique<WebSocketMessage>(WebSocketMessageType::Open,
2019-09-23 19:25:23 +02:00
"",
0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
2020-03-18 19:45:28 +01:00
if (_pingIntervalSecs > 0)
{
// Send a heart beat right away
_ws.sendHeartBeat();
}
return status;
}
2018-09-27 23:56:48 +02:00
bool WebSocket::isConnected() const
{
return getReadyState() == ReadyState::Open;
2018-09-27 23:56:48 +02:00
}
bool WebSocket::isClosing() const
{
return getReadyState() == ReadyState::Closing;
2018-09-27 23:56:48 +02:00
}
2019-09-23 19:25:23 +02:00
void WebSocket::close(uint16_t code, const std::string& reason)
2018-09-27 23:56:48 +02:00
{
_ws.close(code, reason);
2018-09-27 23:56:48 +02:00
}
2019-05-11 19:24:28 +02:00
void WebSocket::checkConnection(bool firstConnectionAttempt)
2018-09-27 23:56:48 +02:00
{
using millis = std::chrono::duration<double, std::milli>;
uint32_t retries = 0;
millis duration(0);
2018-09-27 23:56:48 +02:00
2019-05-11 19:24:28 +02:00
// Try to connect perpertually
while (true)
2018-09-27 23:56:48 +02:00
{
if (isConnected() || isClosing() || _stop)
2018-09-27 23:56:48 +02:00
{
break;
}
2018-09-27 23:56:48 +02:00
2019-05-11 19:24:28 +02:00
if (!firstConnectionAttempt && !_automaticReconnection)
{
2019-05-11 19:24:28 +02:00
// Do not attempt to reconnect
break;
}
2019-05-11 19:24:28 +02:00
firstConnectionAttempt = false;
// Only sleep if we are retrying
if (duration.count() > 0)
{
std::unique_lock<std::mutex> lock(_sleepMutex);
_sleepCondition.wait_for(lock, duration);
}
if (_stop)
{
break;
}
2019-05-11 19:24:28 +02:00
// Try to connect synchronously
ix::WebSocketInitResult status = connect(_handshakeTimeoutSecs);
2018-09-27 23:56:48 +02:00
if (!status.success)
{
WebSocketErrorInfo connectErr;
if (_automaticReconnection)
{
2019-09-23 19:25:23 +02:00
duration = millis(calculateRetryWaitMilliseconds(
retries++, _maxWaitBetweenReconnectionRetries));
connectErr.wait_time = duration.count();
connectErr.retries = retries;
}
2019-09-23 19:25:23 +02:00
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(std::make_unique<WebSocketMessage>(WebSocketMessageType::Error,
2019-09-23 19:25:23 +02:00
"",
0,
connectErr,
WebSocketOpenInfo(),
WebSocketCloseInfo()));
2018-09-27 23:56:48 +02:00
}
}
}
void WebSocket::run()
{
setThreadName(getUrl());
2018-12-23 23:14:38 +01:00
2019-05-11 19:24:28 +02:00
bool firstConnectionAttempt = true;
while (true)
2018-09-27 23:56:48 +02:00
{
// 1. Make sure we are always connected
2019-05-11 19:24:28 +02:00
checkConnection(firstConnectionAttempt);
2019-05-11 19:24:28 +02:00
firstConnectionAttempt = false;
// if here we are closed then checkConnection was not able to connect
if (getReadyState() == ReadyState::Closed)
{
break;
}
2018-09-27 23:56:48 +02:00
// We can avoid to poll if we want to stop and are not closing
if (_stop && !isClosing()) break;
// 2. Poll to see if there's any new data available
WebSocketTransport::PollResult pollResult = _ws.poll();
2018-09-27 23:56:48 +02:00
// 3. Dispatch the incoming messages
_ws.dispatch(
pollResult,
2018-10-25 21:01:47 +02:00
[this](const std::string& msg,
size_t wireSize,
2018-11-15 00:52:28 +01:00
bool decompressionError,
2019-09-23 19:25:23 +02:00
WebSocketTransport::MessageKind messageKind) {
2018-10-25 21:01:47 +02:00
WebSocketMessageType webSocketMessageType;
switch (messageKind)
{
case WebSocketTransport::MessageKind::MSG_TEXT:
case WebSocketTransport::MessageKind::MSG_BINARY:
2018-10-25 21:01:47 +02:00
{
webSocketMessageType = WebSocketMessageType::Message;
2019-09-23 19:25:23 +02:00
}
break;
2018-10-25 21:01:47 +02:00
case WebSocketTransport::MessageKind::PING:
2018-10-25 21:01:47 +02:00
{
webSocketMessageType = WebSocketMessageType::Ping;
2019-09-23 19:25:23 +02:00
}
break;
2018-10-25 21:01:47 +02:00
case WebSocketTransport::MessageKind::PONG:
2018-10-25 21:01:47 +02:00
{
webSocketMessageType = WebSocketMessageType::Pong;
2019-09-23 19:25:23 +02:00
}
break;
case WebSocketTransport::MessageKind::FRAGMENT:
{
webSocketMessageType = WebSocketMessageType::Fragment;
2019-09-23 19:25:23 +02:00
}
break;
2018-10-25 21:01:47 +02:00
}
2018-11-15 00:52:28 +01:00
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
_onMessageCallback(std::make_unique<WebSocketMessage>(webSocketMessageType,
2019-09-23 19:25:23 +02:00
msg,
wireSize,
webSocketErrorInfo,
WebSocketOpenInfo(),
WebSocketCloseInfo(),
binary));
2018-09-27 23:56:48 +02:00
WebSocket::invokeTrafficTrackerCallback(wireSize, true);
2018-09-27 23:56:48 +02:00
});
}
}
void WebSocket::setOnMessageCallback(const OnMessageCallback& callback)
{
_onMessageCallback = callback;
2018-09-27 23:56:48 +02:00
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
{
_onTrafficTrackerCallback = callback;
}
void WebSocket::resetTrafficTrackerCallback()
{
setTrafficTrackerCallback(nullptr);
}
void WebSocket::invokeTrafficTrackerCallback(size_t size, bool incoming)
{
if (_onTrafficTrackerCallback)
{
_onTrafficTrackerCallback(size, incoming);
}
}
WebSocketSendInfo WebSocket::send(const std::string& data,
bool binary,
const OnProgressCallback& onProgressCallback)
2018-10-25 21:01:47 +02:00
{
return (binary) ? sendBinary(data, onProgressCallback) : sendText(data, onProgressCallback);
}
2019-06-09 19:22:27 +02:00
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
if (!validateUtf8(text))
{
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
return false;
}
return sendMessage(text, SendMessageKind::Text, onProgressCallback);
2018-10-25 21:01:47 +02:00
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
2018-10-25 21:01:47 +02:00
{
// Standard limit ping message size
constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false);
2018-10-25 21:01:47 +02:00
return sendMessage(text, SendMessageKind::Ping);
2018-10-25 21:01:47 +02:00
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
SendMessageKind sendMessageKind,
const OnProgressCallback& onProgressCallback)
2018-09-27 23:56:48 +02:00
{
if (!isConnected()) return WebSocketSendInfo(false);
2018-09-27 23:56:48 +02:00
//
// It is OK to read and write on the same socket in 2 different threads.
// https://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid
//
// This makes it so that messages are sent right away, and we dont need
// a timeout while we poll to keep wake ups to a minimum (which helps
// with battery life), and use the system select call to notify us when
// incoming messages are arriving / there's data to be received.
//
std::lock_guard<std::mutex> lock(_writeMutex);
WebSocketSendInfo webSocketSendInfo;
2018-10-25 21:01:47 +02:00
switch (sendMessageKind)
2018-10-25 21:01:47 +02:00
{
case SendMessageKind::Text:
{
webSocketSendInfo = _ws.sendText(text, onProgressCallback);
2019-09-23 19:25:23 +02:00
}
break;
case SendMessageKind::Binary:
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
2019-09-23 19:25:23 +02:00
}
break;
case SendMessageKind::Ping:
{
webSocketSendInfo = _ws.sendPing(text);
2019-09-23 19:25:23 +02:00
}
break;
2018-10-25 21:01:47 +02:00
}
2018-09-27 23:56:48 +02:00
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
2018-09-27 23:56:48 +02:00
return webSocketSendInfo;
2018-09-27 23:56:48 +02:00
}
ReadyState WebSocket::getReadyState() const
{
switch (_ws.getReadyState())
2018-09-27 23:56:48 +02:00
{
2019-09-23 19:25:23 +02:00
case ix::WebSocketTransport::ReadyState::OPEN: return ReadyState::Open;
case ix::WebSocketTransport::ReadyState::CONNECTING: return ReadyState::Connecting;
2019-09-23 19:25:23 +02:00
case ix::WebSocketTransport::ReadyState::CLOSING: return ReadyState::Closing;
case ix::WebSocketTransport::ReadyState::CLOSED: return ReadyState::Closed;
default: return ReadyState::Closed;
2018-09-27 23:56:48 +02:00
}
}
std::string WebSocket::readyStateToString(ReadyState readyState)
{
switch (readyState)
{
2019-09-23 19:25:23 +02:00
case ReadyState::Open: return "OPEN";
case ReadyState::Connecting: return "CONNECTING";
2019-09-23 19:25:23 +02:00
case ReadyState::Closing: return "CLOSING";
case ReadyState::Closed: return "CLOSED";
default: return "UNKNOWN";
2018-09-27 23:56:48 +02:00
}
}
void WebSocket::enableAutomaticReconnection()
{
_automaticReconnection = true;
}
void WebSocket::disableAutomaticReconnection()
{
_automaticReconnection = false;
}
bool WebSocket::isAutomaticReconnectionEnabled() const
{
return _automaticReconnection;
}
size_t WebSocket::bufferedAmount() const
{
return _ws.bufferedAmount();
}
void WebSocket::addSubProtocol(const std::string& subProtocol)
{
std::lock_guard<std::mutex> lock(_configMutex);
_subProtocols.push_back(subProtocol);
}
const std::vector<std::string>& WebSocket::getSubProtocols()
{
std::lock_guard<std::mutex> lock(_configMutex);
return _subProtocols;
}
2019-09-23 19:25:23 +02:00
} // namespace ix