IXWebSocket/ixwebsocket/IXWebSocket.cpp

318 lines
9.3 KiB
C++
Raw Normal View History

2018-09-27 23:56:48 +02:00
/*
* IXWebSocket.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocket.h"
2018-12-23 23:14:38 +01:00
#include "IXSetThreadName.h"
2018-09-27 23:56:48 +02:00
#include <iostream>
#include <cmath>
#include <cassert>
namespace
{
2018-09-27 23:56:48 +02:00
uint64_t calculateRetryWaitMilliseconds(uint64_t retry_count)
{
// This will overflow quite fast for large value of retry_count
// and will become 0, in which case the wait time will be none
// and we'll be constantly retrying to connect.
uint64_t wait_time = ((uint64_t) std::pow(2, retry_count) * 100L);
// cap the wait time to 10s, or to retry_count == 10 for which wait_time > 10s
uint64_t tenSeconds = 10 * 1000;
return (wait_time > tenSeconds || retry_count > 10) ? tenSeconds : wait_time;
}
}
2018-12-23 23:14:38 +01:00
namespace ix
{
2018-09-27 23:56:48 +02:00
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
WebSocket::WebSocket() :
_onMessageCallback(OnMessageCallback()),
_stop(false),
_automaticReconnection(true)
{
}
WebSocket::~WebSocket()
{
stop();
}
void WebSocket::setUrl(const std::string& url)
2018-09-27 23:56:48 +02:00
{
std::lock_guard<std::mutex> lock(_configMutex);
2018-09-27 23:56:48 +02:00
_url = url;
}
const std::string& WebSocket::getUrl() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _url;
}
void WebSocket::setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions)
{
std::lock_guard<std::mutex> lock(_configMutex);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
const WebSocketPerMessageDeflateOptions& WebSocket::getPerMessageDeflateOptions() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _perMessageDeflateOptions;
}
2018-09-27 23:56:48 +02:00
void WebSocket::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&WebSocket::run, this);
}
void WebSocket::stop()
{
_automaticReconnection = false;
close();
if (!_thread.joinable())
{
_automaticReconnection = true;
return;
}
_stop = true;
_thread.join();
_stop = false;
_automaticReconnection = true;
}
WebSocketInitResult WebSocket::connect()
{
{
std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_url, _perMessageDeflateOptions);
2018-09-27 23:56:48 +02:00
}
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize)
2018-09-27 23:56:48 +02:00
{
_onMessageCallback(WebSocket_MessageType_Close, "", wireSize,
WebSocketErrorInfo(),
WebSocketCloseInfo(code, reason),
WebSocketHttpHeaders());
2018-09-27 23:56:48 +02:00
}
);
WebSocketInitResult status = _ws.init();
if (!status.success)
{
return status;
}
_onMessageCallback(WebSocket_MessageType_Open, "", 0,
WebSocketErrorInfo(), WebSocketCloseInfo(),
status.headers);
2018-09-27 23:56:48 +02:00
return status;
}
bool WebSocket::isConnected() const
{
return getReadyState() == WebSocket_ReadyState_Open;
}
bool WebSocket::isClosing() const
{
return getReadyState() == WebSocket_ReadyState_Closing;
}
void WebSocket::close()
{
_ws.close();
}
void WebSocket::reconnectPerpetuallyIfDisconnected()
{
uint64_t retries = 0;
WebSocketErrorInfo connectErr;
ix::WebSocketInitResult status;
using millis = std::chrono::duration<double, std::milli>;
millis duration;
while (true)
{
if (isConnected() || isClosing() || _stop || !_automaticReconnection)
{
break;
}
status = connect();
if (!status.success && !_stop)
{
duration = millis(calculateRetryWaitMilliseconds(retries++));
connectErr.retries = retries;
connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", 0,
connectErr, WebSocketCloseInfo(),
WebSocketHttpHeaders());
2018-09-27 23:56:48 +02:00
std::this_thread::sleep_for(duration);
}
}
}
void WebSocket::run()
{
2018-12-23 23:14:38 +01:00
setThreadName(_url);
2018-09-27 23:56:48 +02:00
while (true)
{
if (_stop) return;
// 1. Make sure we are always connected
reconnectPerpetuallyIfDisconnected();
if (_stop) return;
// 2. Poll to see if there's any new data available
_ws.poll();
if (_stop) return;
// 3. Dispatch the incoming messages
_ws.dispatch(
2018-10-25 21:01:47 +02:00
[this](const std::string& msg,
size_t wireSize,
2018-11-15 00:52:28 +01:00
bool decompressionError,
2018-10-25 21:01:47 +02:00
WebSocketTransport::MessageKind messageKind)
2018-09-27 23:56:48 +02:00
{
2018-10-25 21:01:47 +02:00
WebSocketMessageType webSocketMessageType;
switch (messageKind)
{
case WebSocketTransport::MSG:
{
webSocketMessageType = WebSocket_MessageType_Message;
} break;
case WebSocketTransport::PING:
{
webSocketMessageType = WebSocket_MessageType_Ping;
} break;
case WebSocketTransport::PONG:
{
webSocketMessageType = WebSocket_MessageType_Pong;
} break;
}
2018-11-15 00:52:28 +01:00
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize,
2018-11-15 00:52:28 +01:00
webSocketErrorInfo, WebSocketCloseInfo(),
WebSocketHttpHeaders());
2018-09-27 23:56:48 +02:00
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});
}
}
void WebSocket::setOnMessageCallback(const OnMessageCallback& callback)
{
_onMessageCallback = callback;
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
{
_onTrafficTrackerCallback = callback;
}
void WebSocket::resetTrafficTrackerCallback()
{
setTrafficTrackerCallback(nullptr);
}
void WebSocket::invokeTrafficTrackerCallback(size_t size, bool incoming)
{
if (_onTrafficTrackerCallback)
{
_onTrafficTrackerCallback(size, incoming);
}
}
WebSocketSendInfo WebSocket::send(const std::string& text)
2018-10-25 21:01:47 +02:00
{
return sendMessage(text, false);
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
2018-10-25 21:01:47 +02:00
{
// Standard limit ping message size
constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false);
2018-10-25 21:01:47 +02:00
return sendMessage(text, true);
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text, bool ping)
2018-09-27 23:56:48 +02:00
{
if (!isConnected()) return WebSocketSendInfo(false);
2018-09-27 23:56:48 +02:00
//
// It is OK to read and write on the same socket in 2 different threads.
// https://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid
//
// This makes it so that messages are sent right away, and we dont need
// a timeout while we poll to keep wake ups to a minimum (which helps
// with battery life), and use the system select call to notify us when
// incoming messages are arriving / there's data to be received.
//
std::lock_guard<std::mutex> lock(_writeMutex);
WebSocketSendInfo webSocketSendInfo;
2018-10-25 21:01:47 +02:00
if (ping)
{
webSocketSendInfo = _ws.sendPing(text);
2018-10-25 21:01:47 +02:00
}
else
{
webSocketSendInfo = _ws.sendBinary(text);
2018-10-25 21:01:47 +02:00
}
2018-09-27 23:56:48 +02:00
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
2018-09-27 23:56:48 +02:00
return webSocketSendInfo;
2018-09-27 23:56:48 +02:00
}
ReadyState WebSocket::getReadyState() const
{
switch (_ws.getReadyState())
{
case ix::WebSocketTransport::OPEN: return WebSocket_ReadyState_Open;
case ix::WebSocketTransport::CONNECTING: return WebSocket_ReadyState_Connecting;
case ix::WebSocketTransport::CLOSING: return WebSocket_ReadyState_Closing;
case ix::WebSocketTransport::CLOSED: return WebSocket_ReadyState_Closed;
}
}
std::string WebSocket::readyStateToString(ReadyState readyState)
{
switch (readyState)
{
case WebSocket_ReadyState_Open: return "OPEN";
case WebSocket_ReadyState_Connecting: return "CONNECTING";
case WebSocket_ReadyState_Closing: return "CLOSING";
case WebSocket_ReadyState_Closed: return "CLOSED";
}
}
}