Compare commits

...

11 Commits

Author SHA1 Message Date
Benjamin Sergeant
e15a2900e7 (websocket) traffic tracker received bytes is message size while it should be wire size 2020-02-26 11:24:41 -08:00
Benjamin Sergeant
140a21c8b3 (ws_connect) display sent/received bytes statistics on exit 2020-02-26 11:23:36 -08:00
Benjamin Sergeant
6d0c568aaa update doc / fix incorrect comment about sending defaultint to binary mode 2020-02-24 16:24:32 -08:00
Benjamin Sergeant
c96abcef1c build status github badge 2020-02-23 09:46:08 -08:00
Benjamin Sergeant
4a9b0b9dfd (server) give thread name to some usual worker threads / unittest is broken !! 2020-02-23 09:44:58 -08:00
Benjamin Sergeant
8837d5e784 (websocket server) fix regression from 8.1.2, where per-deflate message compression was always disabled 2020-02-22 10:15:43 -08:00
Benjamin Sergeant
242c945400 (client + server) Fix #155 / http header parser should treat the space(s) after the : delimiter as optional. Fixing this bug made us discover that websocket sub-protocols are not properly serialiazed, but start with a , 2020-02-21 14:05:38 -08:00
Benjamin Sergeant
feab4dee0f split httpd test case into 2 test cases 2020-02-21 12:24:22 -08:00
Benjamin Sergeant
8175829b4b unittest / add extra test for checking headers 2020-02-21 12:22:37 -08:00
Benjamin Sergeant
4c66a7561e (WebSocketServer) add option to disable deflate compression, exposed with the -x option to ws echo_server 2020-02-18 21:38:28 -08:00
Benjamin Sergeant
111475e65c (ws cobra to statsd and sentry sender) exit if no messages are received for one minute, which is a sign that something goes wrong on the server side. That should be changed to be configurable in the future 2020-02-18 12:43:07 -08:00
20 changed files with 212 additions and 30 deletions

View File

@@ -1,4 +1,4 @@
name: C/C++ CI
name: unittest
on: [push]

View File

@@ -1,6 +1,6 @@
## Hello world
![Build status badge](https://travis-ci.org/machinezone/IXWebSocket.svg?branch=master)
![Build status](https://github.com/machinezone/IXWebSocket/workflows/unittest/badge.svg)
IXWebSocket is a C++ library for WebSocket client and server development. It has minimal dependencies (no boost), is very simple to use and support everything you'll likely need for websocket dev (SSL, deflate compression, compiles on most platforms, etc...). HTTP client and server code is also available, but it hasn't received as much testing.

View File

@@ -1,6 +1,34 @@
# Changelog
All changes to this project will be documented in this file.
## [8.1.7] - 2020-02-26
(websocket) traffic tracker received bytes is message size while it should be wire size
## [8.1.6] - 2020-02-26
(ws_connect) display sent/received bytes statistics on exit
## [8.1.5] - 2020-02-23
(server) give thread name to some usual worker threads / unittest is broken !!
## [8.1.4] - 2020-02-22
(websocket server) fix regression from 8.1.2, where per-deflate message compression was always disabled
## [8.1.3] - 2020-02-21
(client + server) Fix #155 / http header parser should treat the space(s) after the : delimiter as optional. Fixing this bug made us discover that websocket sub-protocols are not properly serialiazed, but start with a ,
## [8.1.2] - 2020-02-18
(WebSocketServer) add option to disable deflate compression, exposed with the -x option to ws echo_server
## [8.1.1] - 2020-02-18
(ws cobra to statsd and sentry sender) exit if no messages are received for one minute, which is a sign that something goes wrong on the server side. That should be changed to be configurable in the future
## [8.1.0] - 2020-02-13
(http client + sentry minidump upload) Multipart stream closing boundary is invalid + mark some options as mandatory in the command line tools

View File

@@ -42,7 +42,8 @@ namespace
namespace ix
{
HttpServer::HttpServer(int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily)
HttpServer::HttpServer(
int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily)
: SocketServer(port, host, backlog, maxConnections, addressFamily)
, _connectedClientsCount(0)
{

View File

@@ -7,6 +7,7 @@
#include "IXSocketServer.h"
#include "IXNetSystem.h"
#include "IXSetThreadName.h"
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXSocketFactory.h"
@@ -23,11 +24,8 @@ namespace ix
const size_t SocketServer::kDefaultMaxConnections(32);
const int SocketServer::kDefaultAddressFamily(AF_INET);
SocketServer::SocketServer(int port,
const std::string& host,
int backlog,
size_t maxConnections,
int addressFamily)
SocketServer::SocketServer(
int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily)
: _port(port)
, _host(host)
, _backlog(backlog)
@@ -97,7 +95,8 @@ namespace ix
{
std::stringstream ss;
ss << "SocketServer::listen() error calling inet_pton "
<< "at address " << _host << ":" << _port << " : " << strerror(Socket::getErrno());
<< "at address " << _host << ":" << _port << " : "
<< strerror(Socket::getErrno());
Socket::closeSocket(_serverFd);
return std::make_pair(false, ss.str());
@@ -108,7 +107,8 @@ namespace ix
{
std::stringstream ss;
ss << "SocketServer::listen() error calling bind "
<< "at address " << _host << ":" << _port << " : " << strerror(Socket::getErrno());
<< "at address " << _host << ":" << _port << " : "
<< strerror(Socket::getErrno());
Socket::closeSocket(_serverFd);
return std::make_pair(false, ss.str());
@@ -124,7 +124,8 @@ namespace ix
{
std::stringstream ss;
ss << "SocketServer::listen() error calling inet_pton "
<< "at address " << _host << ":" << _port << " : " << strerror(Socket::getErrno());
<< "at address " << _host << ":" << _port << " : "
<< strerror(Socket::getErrno());
Socket::closeSocket(_serverFd);
return std::make_pair(false, ss.str());
@@ -135,7 +136,8 @@ namespace ix
{
std::stringstream ss;
ss << "SocketServer::listen() error calling bind "
<< "at address " << _host << ":" << _port << " : " << strerror(Socket::getErrno());
<< "at address " << _host << ":" << _port << " : "
<< strerror(Socket::getErrno());
Socket::closeSocket(_serverFd);
return std::make_pair(false, ss.str());
@@ -246,6 +248,8 @@ namespace ix
// Set the socket to non blocking mode, so that accept calls are not blocking
SocketConnect::configure(_serverFd);
setThreadName("SocketServer::listen");
for (;;)
{
if (_stop) return;
@@ -346,6 +350,8 @@ namespace ix
void SocketServer::runGC()
{
setThreadName("SocketServer::GC");
for (;;)
{
// Garbage collection to shutdown/join threads for closed connections.

View File

@@ -134,6 +134,13 @@ namespace ix
_enablePong = false;
}
void WebSocket::enablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(true);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::disablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
@@ -191,9 +198,19 @@ namespace ix
auto subProtocols = getSubProtocols();
if (!subProtocols.empty())
{
//
// Sub Protocol strings are comma separated.
// Python code to do that is:
// >>> ','.join(['json', 'msgpack'])
// 'json,msgpack'
//
int i = 0;
for (auto subProtocol : subProtocols)
{
subProtocolsHeader += ",";
if (i++ != 0)
{
subProtocolsHeader += ",";
}
subProtocolsHeader += subProtocol;
}
headers["Sec-WebSocket-Protocol"] = subProtocolsHeader;
@@ -395,7 +412,7 @@ namespace ix
WebSocketCloseInfo(),
binary));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
WebSocket::invokeTrafficTrackerCallback(wireSize, true);
});
}
}

View File

@@ -19,10 +19,10 @@
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <condition_variable>
namespace ix
{
@@ -57,6 +57,7 @@ namespace ix
void setPingTimeout(int pingTimeoutSecs);
void enablePong();
void disablePong();
void enablePerMessageDeflate();
void disablePerMessageDeflate();
void addSubProtocol(const std::string& subProtocol);
@@ -71,7 +72,7 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs);
void run();
// send is in binary mode by default
// send is in text mode by default
WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr);

View File

@@ -12,6 +12,7 @@
#include "IXUserAgent.h"
#include "libwshandshake.hpp"
#include <algorithm>
#include <iostream>
#include <random>
#include <sstream>
@@ -335,8 +336,9 @@ namespace ix
std::string header = headers["sec-websocket-extensions"];
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(header);
// If the client has requested that extension, enable it.
if (webSocketPerMessageDeflateOptions.enabled())
// If the client has requested that extension,
// and the server does not prevent it, enable it.
if (_enablePerMessageDeflate && webSocketPerMessageDeflateOptions.enabled())
{
_enablePerMessageDeflate = true;

View File

@@ -66,12 +66,23 @@ namespace ix
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// colon is ':', usually colon+1 is ' ', and colon+2 is the start of the value.
// some webservers do not put a space after the colon character, so
// the start of the value might be farther than colon+2.
// The spec says that space after the : should be discarded.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
// since we use an std::string later on and don't account for '\0',
// plus the optional first space, total is -2
int start = colon + 1;
while (lineStr[start] == ' ')
{
start++;
}
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
std::string value(lineStr.substr(start, lineStr.size() - start - 2));
headers[name] = value;
}

View File

@@ -7,6 +7,7 @@
#include "IXWebSocketServer.h"
#include "IXNetSystem.h"
#include "IXSetThreadName.h"
#include "IXSocketConnect.h"
#include "IXWebSocket.h"
#include "IXWebSocketTransport.h"
@@ -28,6 +29,7 @@ namespace ix
: SocketServer(port, host, backlog, maxConnections, addressFamily)
, _handshakeTimeoutSecs(handshakeTimeoutSecs)
, _enablePong(kDefaultEnablePong)
, _enablePerMessageDeflate(true)
{
}
@@ -59,6 +61,11 @@ namespace ix
_enablePong = false;
}
void WebSocketServer::disablePerMessageDeflate()
{
_enablePerMessageDeflate = false;
}
void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback)
{
_onConnectionCallback = callback;
@@ -67,15 +74,30 @@ namespace ix
void WebSocketServer::handleConnection(std::shared_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
{
setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection();
if (_enablePong)
{
webSocket->enablePong();
}
else
{
webSocket->disablePong();
}
if (_enablePerMessageDeflate)
{
webSocket->enablePerMessageDeflate();
}
else
{
webSocket->disablePerMessageDeflate();
}
// Add this client to our client set
{

View File

@@ -36,6 +36,7 @@ namespace ix
void enablePong();
void disablePong();
void disablePerMessageDeflate();
void setOnConnectionCallback(const OnConnectionCallback& callback);
@@ -48,6 +49,7 @@ namespace ix
// Member variables
int _handshakeTimeoutSecs;
bool _enablePong;
bool _enablePerMessageDeflate;
OnConnectionCallback _onConnectionCallback;

View File

@@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "8.1.0"
#define IX_WEBSOCKET_VERSION "8.1.7"

View File

@@ -59,10 +59,14 @@ TEST_CASE("http server", "[httpd]")
REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
REQUIRE(response->headers["Accept-Encoding"] == "gzip");
server.stop();
}
}
TEST_CASE("http server redirection", "[httpd_redirect]")
{
SECTION("Connect to a local HTTP server, with redirection enabled")
{
int port = getFreePort();

View File

@@ -174,6 +174,7 @@ int main(int argc, char** argv)
echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
echoServerApp->add_flag("-6", ipv6, "IpV6");
echoServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
addTLSOptions(echoServerApp);
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
@@ -336,8 +337,12 @@ int main(int argc, char** argv)
addTLSOptions(proxyServerApp);
CLI::App* minidumpApp = app.add_subcommand("upload_minidump", "Upload a minidump to sentry");
minidumpApp->add_option("--minidump", minidump, "Minidump path")->required()->check(CLI::ExistingPath);
minidumpApp->add_option("--metadata", metadata, "Metadata path")->required()->check(CLI::ExistingPath);
minidumpApp->add_option("--minidump", minidump, "Minidump path")
->required()
->check(CLI::ExistingPath);
minidumpApp->add_option("--metadata", metadata, "Metadata path")
->required()
->check(CLI::ExistingPath);
minidumpApp->add_option("--project", project, "Sentry Project")->required();
minidumpApp->add_option("--key", key, "Sentry Key")->required();
minidumpApp->add_flag("-v", verbose, "Verbose");
@@ -394,7 +399,8 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(port, greetings, hostname, tlsOptions, ipv6);
ret = ix::ws_echo_server_main(
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate);
}
else if (app.got_subcommand("broadcast_server"))
{

View File

@@ -30,7 +30,8 @@ namespace ix
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6);
bool ipv6,
bool disablePerMessageDeflate);
int ws_broadcast_server_main(int port,
const std::string& hostname,

View File

@@ -135,6 +135,31 @@ namespace ix
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount] {
std::string state("na");
while (true)
{
std::stringstream ss;
ss << "messages received " << receivedCount;
ss << "messages sent " << sentCount;
std::string currentState = ss.str();
if (currentState == state)
{
spdlog::error("no messages received or sent for 1 minute, exiting");
exit(1);
}
state = currentState;
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t2(heartbeat);
auto sentrySender =
[&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
SentryClient sentryClient(dsn);

View File

@@ -153,6 +153,31 @@ namespace ix
std::thread t1(timer);
auto heartbeat = [&sentCount, &receivedCount] {
std::string state("na");
while (true)
{
std::stringstream ss;
ss << "messages received " << receivedCount;
ss << "messages sent " << sentCount;
std::string currentState = ss.str();
if (currentState == state)
{
spdlog::error("no messages received or sent for 1 minute, exiting");
exit(1);
}
state = currentState;
auto duration = std::chrono::minutes(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t2(heartbeat);
auto statsdSender = [&queueManager, &host, &port, &sentCount, &tokens, &prefix, &stop] {
// statsd client
// test with netcat as a server: `nc -ul 8125`
@@ -184,7 +209,7 @@ namespace ix
}
};
std::thread t2(statsdSender);
std::thread t3(statsdSender);
conn.setEventCallback(
[&conn, &channel, &filter, &jsonWriter, verbose, &queueManager, &receivedCount](

View File

@@ -30,6 +30,9 @@ namespace ix
void start();
void stop();
int getSentBytes() { return _sentBytes; }
int getReceivedBytes() { return _receivedBytes; }
void sendMessage(const std::string& text);
private:
@@ -38,6 +41,8 @@ namespace ix
ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
bool _binaryMode;
std::atomic<int> _receivedBytes;
std::atomic<int> _sentBytes;
void log(const std::string& msg);
WebSocketHttpHeaders parseHeaders(const std::string& data);
@@ -54,6 +59,8 @@ namespace ix
: _url(url)
, _disablePerMessageDeflate(disablePerMessageDeflate)
, _binaryMode(binaryMode)
, _receivedBytes(0)
, _sentBytes(0)
{
if (disableAutomaticReconnection)
{
@@ -68,6 +75,20 @@ namespace ix
{
_webSocket.addSubProtocol(subprotocol);
}
WebSocket::setTrafficTrackerCallback(
[this](int size, bool incoming)
{
if (incoming)
{
_receivedBytes += size;
}
else
{
_sentBytes += size;
}
}
);
}
void WebSocketConnect::log(const std::string& msg)
@@ -246,6 +267,9 @@ namespace ix
spdlog::info("");
webSocketChat.stop();
spdlog::info("Received {} bytes", webSocketChat.getReceivedBytes());
spdlog::info("Sent {} bytes", webSocketChat.getSentBytes());
return 0;
}
} // namespace ix

View File

@@ -5,10 +5,10 @@
*/
#include <atomic>
#include <ixwebsocket/IXDNSLookup.h>
#include <ixwebsocket/IXNetSystem.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXDNSLookup.h>
namespace ix

View File

@@ -4,8 +4,8 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <ixwebsocket/IXWebSocketServer.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
@@ -15,7 +15,8 @@ namespace ix
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6)
bool ipv6,
bool disablePerMessageDeflate)
{
spdlog::info("Listening on {}:{}", hostname, port);
@@ -28,6 +29,12 @@ namespace ix
server.setTLSOptions(tlsOptions);
if (disablePerMessageDeflate)
{
spdlog::info("Disable per message deflate");
server.disablePerMessageDeflate();
}
server.setOnConnectionCallback(
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {