Compare commits

...

9 Commits

13 changed files with 242 additions and 173 deletions

View File

@@ -2,7 +2,7 @@
IXWebSocket is a C++ library for WebSocket client and server development. It has minimal dependencies (no boost), is very simple to use and support everything you'll likely need for websocket dev (SSL, deflate compression, compiles on most platforms, etc...). HTTP client and server code is also available, but it hasn't received as much testing.
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Two important design goals are simplicity and correctness.
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It was tested on macOS, iOS, Linux, Android, Windows and FreeBSD. Note that the MinGW compiler is not supported at this point. Two important design goals are simplicity and correctness.
```cpp
/*

View File

@@ -1,67 +1,11 @@
version: "3"
version: "3.3"
services:
# snake:
# image: bsergean/ws:build
# entrypoint: ws snake --port 8767 --host 0.0.0.0 --redis_hosts redis1
# ports:
# - "8767:8767"
# networks:
# - ws-net
# depends_on:
# - redis1
push:
entrypoint: ws push_server --host 0.0.0.0
image: ${DOCKER_REPO}/ws:build
# proxy:
# image: bsergean/ws:build
# entrypoint: strace ws proxy_server --remote_host 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765 -v
# ports:
# - "8765:8765"
# networks:
# - ws-net
#pyproxy:
# image: bsergean/ws_proxy:build
# entrypoint: /usr/bin/ws_proxy.py --remote_url 'wss://cobra.addsrv.com' --host 0.0.0.0 --port 8765
# ports:
# - "8765:8765"
# networks:
# - ws-net
# # ws:
# # security_opt:
# # - seccomp:unconfined
# # cap_add:
# # - SYS_PTRACE
# # stdin_open: true
# # tty: true
# # image: bsergean/ws:build
# # entrypoint: sh
# # networks:
# # - ws-net
# # depends_on:
# # - redis1
# #
# # redis1:
# # image: redis:alpine
# # networks:
# # - ws-net
# #
# # statsd:
# # image: jaconel/statsd
# # ports:
# # - "8125:8125"
# # environment:
# # - STATSD_DUMP_MSG=true
# # - GRAPHITE_HOST=127.0.0.1
# # networks:
# # - ws-net
compile:
image: alpine
entrypoint: sh
stdin_open: true
tty: true
volumes:
- /Users/bsergeant/src/foss:/home/bsergean/src/foss
networks:
ws-net:
autoroute:
entrypoint: ws autoroute ws://push:8008
image: ${DOCKER_REPO}/ws:build
depends_on:
- push

View File

@@ -20,7 +20,7 @@ RUN make ws_mbedtls_install && \
FROM alpine:3.12 as runtime
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 && \
RUN apk add --no-cache libstdc++ mbedtls ca-certificates python3 strace && \
addgroup -S app && \
adduser -S -G app app

View File

@@ -2,6 +2,22 @@
All changes to this project will be documented in this file.
## [10.4.0] - 2020-09-12
(http server) read body request when the Content-Length is specified + set timeout to read the request to 30 seconds max by default, and make it configurable as a constructor parameter
## [10.3.5] - 2020-09-09
(ws) autoroute command exit on its own once all messages have been received
## [10.3.4] - 2020-09-04
(docker) ws docker file installs strace
## [10.3.3] - 2020-09-02
(ws) echo_client command renamed to autoroute. Command exit once the server close the connection. push_server commands exit once N messages have been sent.
## [10.3.2] - 2020-08-31
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel

View File

@@ -12,10 +12,8 @@ namespace ix
{
Bench::Bench(const std::string& description)
: _description(description)
, _start(std::chrono::high_resolution_clock::now())
, _reported(false)
{
;
reset();
}
Bench::~Bench()
@@ -26,6 +24,12 @@ namespace ix
}
}
void Bench::reset()
{
_start = std::chrono::high_resolution_clock::now();
_reported = false;
}
void Bench::report()
{
auto now = std::chrono::high_resolution_clock::now();

View File

@@ -17,6 +17,7 @@ namespace ix
Bench(const std::string& description);
~Bench();
void reset();
void report();
uint64_t getDuration() const;

View File

@@ -93,14 +93,12 @@ namespace ix
}
std::tuple<bool, std::string, HttpRequestPtr> Http::parseRequest(
std::unique_ptr<Socket>& socket)
std::unique_ptr<Socket>& socket, int timeoutSecs)
{
HttpRequestPtr httpRequest;
std::atomic<bool> requestInitCancellation(false);
int timeoutSecs = 5; // FIXME
auto isCancellationRequested =
makeCancellationRequestWithTimeout(timeoutSecs, requestInitCancellation);
@@ -130,7 +128,36 @@ namespace ix
return std::make_tuple(false, "Error parsing HTTP headers", httpRequest);
}
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, headers);
std::string body;
if (headers.find("Content-Length") != headers.end())
{
int contentLength = 0;
try
{
contentLength = std::stoi(headers["Content-Length"]);
}
catch (std::exception)
{
return std::make_tuple(
false, "Error parsing HTTP Header 'Content-Length'", httpRequest);
}
if (contentLength < 0)
{
return std::make_tuple(
false, "Error: 'Content-Length' should be a positive integer", httpRequest);
}
auto res = socket->readBytes(contentLength, nullptr, isCancellationRequested);
if (!res.first)
{
return std::make_tuple(
false, std::string("Error reading request: ") + res.second, httpRequest);
}
body = res.second;
}
httpRequest = std::make_shared<HttpRequest>(uri, method, httpVersion, body, headers);
return std::make_tuple(true, "", httpRequest);
}

View File

@@ -95,15 +95,18 @@ namespace ix
std::string uri;
std::string method;
std::string version;
std::string body;
WebSocketHttpHeaders headers;
HttpRequest(const std::string& u,
const std::string& m,
const std::string& v,
const std::string& b,
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, method(m)
, version(v)
, body(b)
, headers(h)
{
}
@@ -115,7 +118,7 @@ namespace ix
{
public:
static std::tuple<bool, std::string, HttpRequestPtr> parseRequest(
std::unique_ptr<Socket>& socket);
std::unique_ptr<Socket>& socket, int timeoutSecs);
static bool sendResponse(HttpResponsePtr response, std::unique_ptr<Socket>& socket);
static std::pair<std::string, int> parseStatusLine(const std::string& line);

View File

@@ -92,10 +92,17 @@ namespace
namespace ix
{
HttpServer::HttpServer(
int port, const std::string& host, int backlog, size_t maxConnections, int addressFamily)
const int HttpServer::kDefaultTimeoutSecs(30);
HttpServer::HttpServer(int port,
const std::string& host,
int backlog,
size_t maxConnections,
int addressFamily,
int timeoutSecs)
: SocketServer(port, host, backlog, maxConnections, addressFamily)
, _connectedClientsCount(0)
, _timeoutSecs(timeoutSecs)
{
setDefaultConnectionCallback();
}
@@ -124,7 +131,7 @@ namespace ix
{
_connectedClientsCount++;
auto ret = Http::parseRequest(socket);
auto ret = Http::parseRequest(socket, _timeoutSecs);
// FIXME: handle errors in parseRequest
if (std::get<0>(ret))

View File

@@ -29,7 +29,8 @@ namespace ix
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = SocketServer::kDefaultMaxConnections,
int addressFamily = SocketServer::kDefaultAddressFamily);
int addressFamily = SocketServer::kDefaultAddressFamily,
int timeoutSecs = HttpServer::kDefaultTimeoutSecs);
virtual ~HttpServer();
virtual void stop() final;
@@ -42,6 +43,9 @@ namespace ix
OnConnectionCallback _onConnectionCallback;
std::atomic<int> _connectedClientsCount;
const static int kDefaultTimeoutSecs;
int _timeoutSecs;
// Methods
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) final;

View File

@@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "10.3.2"
#define IX_WEBSOCKET_VERSION "10.4.0"

View File

@@ -63,6 +63,54 @@ TEST_CASE("http server", "[httpd]")
server.stop();
}
SECTION("Posting plain text data to a local HTTP server")
{
int port = getFreePort();
ix::HttpServer server(port, "127.0.0.1");
server.setOnConnectionCallback(
[](HttpRequestPtr request, std::shared_ptr<ConnectionState>) -> HttpResponsePtr {
if (request->method == "POST")
{
return std::make_shared<HttpResponse>(
200, "OK", HttpErrorCode::Ok, WebSocketHttpHeaders(), request->body);
}
return std::make_shared<HttpResponse>(400, "BAD REQUEST");
});
auto res = server.listen();
REQUIRE(res.first);
server.start();
HttpClient httpClient;
WebSocketHttpHeaders headers;
headers["Content-Type"] = "text/plain";
std::string url("http://127.0.0.1:");
url += std::to_string(port);
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->verbose = true;
args->logger = [](const std::string& msg) { std::cout << msg; };
args->body = "Hello World!";
auto response = httpClient.post(url, args->body, args);
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
std::cerr << "Payload: " << response->payload << std::endl;
REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
REQUIRE(response->payload == args->body);
server.stop();
}
}
TEST_CASE("http server redirection", "[httpd_redirect]")

201
ws/ws.cpp
View File

@@ -1103,21 +1103,26 @@ namespace ix
return 0;
}
int ws_echo_client(const std::string& url,
bool disablePerMessageDeflate,
bool binaryMode,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
const std::string& sendMsg,
bool noSend)
int ws_autoroute(const std::string& url,
bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions,
const std::string& subprotocol,
int pingIntervalSecs,
int msgCount)
{
Bench bench("ws_autoroute full test");
// Our websocket object
ix::WebSocket webSocket;
webSocket.setUrl(url);
std::string fullUrl(url);
fullUrl += "/";
fullUrl += std::to_string(msgCount);
webSocket.setUrl(fullUrl);
webSocket.setTLSOptions(tlsOptions);
webSocket.setPingInterval(pingIntervalSecs);
webSocket.disableAutomaticReconnection();
if (disablePerMessageDeflate)
{
@@ -1129,43 +1134,56 @@ namespace ix
webSocket.addSubProtocol(subprotocol);
}
std::atomic<uint64_t> receivedCount(0);
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
std::atomic<uint64_t> receivedCountPerSecs(0);
std::atomic<uint64_t> target(msgCount);
std::mutex conditionVariableMutex;
std::condition_variable condition;
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
// when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
if (!noSend)
std::atomic<bool> stop(false);
// Setup a callback to be fired
// when a message or an event (open, close, ping, pong, error) is received
webSocket.setOnMessageCallback(
[&webSocket, &receivedCountPerSecs, &target, &stop, &condition, &bench](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Message)
{
webSocket.send(msg->str, msg->binary);
receivedCountPerSecs++;
target -= 1;
if (target == 0)
{
stop = true;
condition.notify_one();
bench.report();
}
}
receivedCount++;
}
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_echo_client: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
else if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("{}: {}", it.first, it.second);
bench.reset();
spdlog::info("ws_autoroute: connected");
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");
for (auto it : msg->openInfo.headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
spdlog::info("ws_autoroute: connection closed");
}
});
webSocket.send(sendMsg, binaryMode);
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
spdlog::info("Received pong {}", msg->str);
}
});
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
auto timer = [&receivedCountPerSecs, &stop] {
setThreadName("Timer");
while (true)
while (!stop)
{
//
// We cannot write to sentCount and receivedCount
@@ -1173,13 +1191,11 @@ namespace ix
// our own counters
//
std::stringstream ss;
ss << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total";
ss << "messages received per second: " << receivedCountPerSecs;
CoreLogger::info(ss.str());
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
receivedCountPerSecs = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
@@ -1192,17 +1208,12 @@ namespace ix
std::cout << "Connecting to " << url << "..." << std::endl;
webSocket.start();
// Send a message to the server (default to TEXT mode)
webSocket.send("hello world");
// Wait for all the messages to be received
std::unique_lock<std::mutex> lock(conditionVariableMutex);
condition.wait(lock);
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
webSocket.send(text);
}
t1.join();
webSocket.stop();
return 0;
}
@@ -1640,7 +1651,6 @@ namespace ix
}
int ws_push_server(int port,
bool greetings,
const std::string& hostname,
const ix::SocketTLSOptions& tlsOptions,
bool ipv6,
@@ -1671,10 +1681,13 @@ namespace ix
server.disablePong();
}
// push one million messages
std::atomic<bool> stop(false);
server.setOnClientMessageCallback(
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
[&sendMsg, &stop](std::shared_ptr<ConnectionState> connectionState,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
@@ -1688,20 +1701,30 @@ namespace ix
spdlog::info("{}: {}", it.first, it.second);
}
if (greetings)
{
webSocket.sendText("Welcome !");
}
// Parse the msg count from the uri.
int msgCount = -1;
std::stringstream ss;
auto uriSize = msg->openInfo.uri.size();
ss << msg->openInfo.uri.substr(1, uriSize - 1);
ss >> msgCount;
bool binary = false;
while (true)
if (msgCount == -1)
{
auto sendInfo = webSocket.send(sendMsg, binary);
if (!sendInfo.success)
spdlog::info("Error parsing message count, closing connection");
webSocket.close();
}
else
{
bool binary = false;
for (int i = 0; i < msgCount; ++i)
{
spdlog::info("Error sending message, closing connection");
webSocket.close();
break;
auto sendInfo = webSocket.send(sendMsg, binary);
if (!sendInfo.success)
{
spdlog::info("Error sending message, closing connection");
webSocket.close();
break;
}
}
}
}
@@ -1711,6 +1734,7 @@ namespace ix
connectionState->getId(),
msg->closeInfo.code,
msg->closeInfo.reason);
stop = true;
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@@ -1734,7 +1758,14 @@ namespace ix
}
server.start();
server.wait();
while (!stop)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
server.stop();
return 0;
}
@@ -2839,7 +2870,6 @@ int main(int argc, char** argv)
bool version = false;
bool verifyNone = false;
bool disablePong = false;
bool noSend = false;
int port = 8008;
int redisPort = 6379;
int statsdPort = 8125;
@@ -2848,6 +2878,7 @@ int main(int argc, char** argv)
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
int msgCount = 1000 * 1000;
uint32_t maxWaitBetweenReconnectionRetries;
int pingIntervalSecs = 30;
@@ -2941,17 +2972,14 @@ int main(int argc, char** argv)
addGenericOptions(connectApp);
addTLSOptions(connectApp);
CLI::App* echoClientApp =
app.add_subcommand("echo_client", "Echo messages sent by a remote server");
CLI::App* echoClientApp = app.add_subcommand("autoroute", "Test websocket client performance");
echoClientApp->fallthrough();
echoClientApp->add_option("url", url, "Connection url")->required();
echoClientApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
echoClientApp->add_flag("-b", binaryMode, "Send in binary mode");
echoClientApp->add_option(
"--ping_interval", pingIntervalSecs, "Interval between sending pings");
echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
echoClientApp->add_option("--send_msg", sendMsg, "Send message");
echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
echoClientApp->add_option("--msg_count", msgCount, "Total message count to be sent");
addTLSOptions(echoClientApp);
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
@@ -2976,7 +3004,6 @@ int main(int argc, char** argv)
pushServerApp->add_option("--port", port, "Port");
pushServerApp->add_option("--host", hostname, "Hostname");
pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
pushServerApp->add_flag("-g", greetings, "Greet");
pushServerApp->add_flag("-6", ipv6, "IpV6");
pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
@@ -3267,16 +3294,10 @@ int main(int argc, char** argv)
subprotocol,
pingIntervalSecs);
}
else if (app.got_subcommand("echo_client"))
else if (app.got_subcommand("autoroute"))
{
ret = ix::ws_echo_client(url,
disablePerMessageDeflate,
binaryMode,
tlsOptions,
subprotocol,
pingIntervalSecs,
sendMsg,
noSend);
ret = ix::ws_autoroute(
url, disablePerMessageDeflate, tlsOptions, subprotocol, pingIntervalSecs, msgCount);
}
else if (app.got_subcommand("echo_server"))
{
@@ -3285,14 +3306,8 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("push_server"))
{
ret = ix::ws_push_server(port,
greetings,
hostname,
tlsOptions,
ipv6,
disablePerMessageDeflate,
disablePong,
sendMsg);
ret = ix::ws_push_server(
port, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong, sendMsg);
}
else if (app.got_subcommand("transfer"))
{