Add client support for websocket subprotocol. Look for the new addSubProtocol method for details

This commit is contained in:
Benjamin Sergeant 2019-10-13 13:37:34 -07:00
parent 128545cc2b
commit c3a619f114
17 changed files with 231 additions and 40 deletions

View File

@ -78,6 +78,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/IXWebSocketInitResult.h
ixwebsocket/IXWebSocketMessage.h
ixwebsocket/IXWebSocketMessageQueue.h
ixwebsocket/IXWebSocketMessageType.h

View File

@ -1 +1 @@
7.0.0
7.1.0

View File

@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.
## [7.1.0] - 2019-10-13
- Add client support for websocket subprotocol. Look for the new addSubProtocol method for details.
## [7.0.0] - 2019-10-01
- TLS support in server code, only implemented for the OpenSSL SSL backend for now.

View File

@ -187,6 +187,21 @@ headers["foo"] = "bar";
webSocket.setExtraHeaders(headers);
```
### Subprotocols
You can specify subprotocols to be set during the WebSocket handshake. For more info you can refer to [this doc](https://hpbn.co/websocket/#subprotocol-negotiation).
```
webSocket.addSubprotocol("appProtocol-v1");
webSocket.addSubprotocol("appProtocol-v2");
```
The protocol that the server did accept is available in the open info `protocol` field.
```
std::cout << "protocol: " << msg->openInfo.protocol << std::endl;
```
### Automatic reconnection
Automatic reconnection kicks in when the connection is disconnected without the user consent. This feature is on by default and can be turned off.

View File

@ -32,6 +32,7 @@ namespace ix
{
_pdu["action"] = "rtm/publish";
_webSocket->addSubProtocol("json");
initWebSocketOnMessageCallback();
}

View File

@ -20,9 +20,9 @@ typedef unsigned long int nfds_t;
#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <poll.h>
#include <sys/select.h>
#include <sys/socket.h>

View File

@ -185,19 +185,32 @@ namespace ix
_pingTimeoutSecs);
}
WebSocketInitResult status = _ws.connectToUrl(_url, _extraHeaders, timeoutSecs);
WebSocketHttpHeaders headers(_extraHeaders);
std::string subProtocolsHeader;
auto subProtocols = getSubProtocols();
if (!subProtocols.empty())
{
for (auto subProtocol : subProtocols)
{
subProtocolsHeader += ",";
subProtocolsHeader += subProtocol;
}
headers["Sec-WebSocket-Protocol"] = subProtocolsHeader;
}
WebSocketInitResult status = _ws.connectToUrl(_url, headers, timeoutSecs);
if (!status.success)
{
return status;
}
_onMessageCallback(
std::make_shared<WebSocketMessage>(WebSocketMessageType::Open,
"",
0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
_onMessageCallback(std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open,
"",
0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers, status.protocol),
WebSocketCloseInfo()));
return status;
}
@ -525,4 +538,16 @@ namespace ix
{
return _ws.bufferedAmount();
}
void WebSocket::addSubProtocol(const std::string& subProtocol)
{
std::lock_guard<std::mutex> lock(_configMutex);
_subProtocols.push_back(subProtocol);
}
const std::vector<std::string>& WebSocket::getSubProtocols()
{
std::lock_guard<std::mutex> lock(_configMutex);
return _subProtocols;
}
} // namespace ix

View File

@ -57,6 +57,7 @@ namespace ix
void enablePong();
void disablePong();
void disablePerMessageDeflate();
void addSubProtocol(const std::string& subProtocol);
// Run asynchronously, by calling start and stop.
void start();
@ -101,6 +102,7 @@ namespace ix
bool isAutomaticReconnectionEnabled() const;
void setMaxWaitBetweenReconnectionRetries(uint32_t maxWaitBetweenReconnectionRetries);
uint32_t getMaxWaitBetweenReconnectionRetries() const;
const std::vector<std::string>& getSubProtocols();
private:
WebSocketSendInfo sendMessage(const std::string& text,
@ -151,6 +153,9 @@ namespace ix
static const int kDefaultPingIntervalSecs;
static const int kDefaultPingTimeoutSecs;
// Subprotocols
std::vector<std::string> _subProtocols;
friend class WebSocketServer;
};
} // namespace ix

View File

@ -9,6 +9,7 @@
#include "IXCancellationRequest.h"
#include "IXSocket.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketInitResult.h"
#include "IXWebSocketPerMessageDeflate.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include <atomic>
@ -18,28 +19,6 @@
namespace ix
{
struct WebSocketInitResult
{
bool success;
int http_status;
std::string errorStr;
WebSocketHttpHeaders headers;
std::string uri;
WebSocketInitResult(bool s = false,
int status = 0,
const std::string& e = std::string(),
WebSocketHttpHeaders h = WebSocketHttpHeaders(),
const std::string& u = std::string())
{
success = s;
http_status = status;
errorStr = e;
headers = h;
uri = u;
}
};
class WebSocketHandshake
{
public:

View File

@ -0,0 +1,36 @@
/*
* IXWebSocketInitResult.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocketHttpHeaders.h"
namespace ix
{
struct WebSocketInitResult
{
bool success;
int http_status;
std::string errorStr;
WebSocketHttpHeaders headers;
std::string uri;
std::string protocol;
WebSocketInitResult(bool s = false,
int status = 0,
const std::string& e = std::string(),
WebSocketHttpHeaders h = WebSocketHttpHeaders(),
const std::string& u = std::string())
{
success = s;
http_status = status;
errorStr = e;
headers = h;
uri = u;
protocol = h["Sec-WebSocket-Protocol"];
}
};
} // namespace ix

View File

@ -12,11 +12,14 @@ namespace ix
{
std::string uri;
WebSocketHttpHeaders headers;
std::string protocol;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
const WebSocketHttpHeaders& h = WebSocketHttpHeaders(),
const std::string& p = std::string())
: uri(u)
, headers(h)
, protocol(p)
{
;
}

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "7.0.0"
#define IX_WEBSOCKET_VERSION "7.1.0"

View File

@ -44,6 +44,7 @@ set (SOURCES
IXCobraChatTest.cpp
IXCobraMetricsPublisherTest.cpp
IXDNSLookupTest.cpp
IXWebSocketSubProtocolTest.cpp
)
# Some unittest don't work on windows yet

View File

@ -0,0 +1,108 @@
/*
* IXWebSocketServerTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
using namespace ix;
bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
{
server.setOnConnectionCallback(
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
webSocket->setOnMessageCallback([webSocket, connectionState, &server, &subProtocols](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->sendBinary(msg->str);
}
}
}
});
});
auto res = server.listen();
if (!res.first)
{
log(res.second);
return false;
}
server.start();
return true;
}
TEST_CASE("subprotocol", "[websocket_subprotocol]")
{
SECTION("Connect to the server")
{
int port = getFreePort();
ix::WebSocketServer server(port);
std::string subProtocols;
startServer(server, subProtocols);
std::atomic<bool> connected(false);
ix::WebSocket webSocket;
webSocket.setOnMessageCallback([&connected](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
connected = true;
log("Client connected");
}
});
webSocket.addSubProtocol("json");
webSocket.addSubProtocol("msgpack");
std::string url;
std::stringstream ss;
ss << "ws://127.0.0.1:" << port;
url = ss.str();
webSocket.setUrl(url);
webSocket.start();
int attempts = 0;
while (!connected)
{
REQUIRE(attempts++ < 10);
ix::msleep(10);
}
webSocket.stop();
server.stop();
REQUIRE(subProtocols == "json,msgpack");
}
}

View File

@ -71,6 +71,7 @@ int main(int argc, char** argv)
std::string redisHosts("127.0.0.1");
std::string redisPassword;
std::string appsConfigPath("appsConfig.json");
std::string subprotocol;
ix::SocketTLSOptions tlsOptions;
std::string ciphers;
std::string redirectUrl;
@ -149,6 +150,7 @@ int main(int argc, char** argv)
connectApp->add_option("--max_wait",
maxWaitBetweenReconnectionRetries,
"Max Wait Time between reconnection retries");
connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
addTLSOptions(connectApp);
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
@ -329,7 +331,8 @@ int main(int argc, char** argv)
disablePerMessageDeflate,
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions);
tlsOptions,
subprotocol);
}
else if (app.got_subcommand("chat"))
{

View File

@ -45,7 +45,8 @@ namespace ix
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions);
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol);
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate,

View File

@ -23,7 +23,8 @@ namespace ix
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions);
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol);
void subscribe(const std::string& channel);
void start();
@ -48,7 +49,8 @@ namespace ix
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions)
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol)
: _url(url)
, _disablePerMessageDeflate(disablePerMessageDeflate)
, _binaryMode(binaryMode)
@ -61,6 +63,11 @@ namespace ix
_webSocket.setTLSOptions(tlsOptions);
_headers = parseHeaders(headers);
if (!subprotocol.empty())
{
_webSocket.addSubProtocol(subprotocol);
}
}
void WebSocketConnect::log(const std::string& msg)
@ -191,7 +198,8 @@ namespace ix
bool disablePerMessageDeflate,
bool binaryMode,
uint32_t maxWaitBetweenReconnectionRetries,
const ix::SocketTLSOptions& tlsOptions)
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url,
@ -200,7 +208,8 @@ namespace ix
disablePerMessageDeflate,
binaryMode,
maxWaitBetweenReconnectionRetries,
tlsOptions);
tlsOptions,
subprotocol);
webSocketChat.start();
while (true)