From 2268b743aeb2b852d163c6b4f0d124c3f20f1fb5 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Mon, 30 Mar 2020 22:27:41 -0700 Subject: [PATCH] add broadcasting test where 10 clients exchange messages, to try to trigger threading errors --- makefile | 18 +- test/CMakeLists.txt | 1 + test/IXWebSocketBroadcastTest.cpp | 305 ++++++++++++++++++++++++++++++ test/IXWebSocketServerTest.cpp | 2 +- 4 files changed, 323 insertions(+), 3 deletions(-) create mode 100644 test/IXWebSocketBroadcastTest.cpp diff --git a/makefile b/makefile index a339ffe2..d462d597 100644 --- a/makefile +++ b/makefile @@ -126,17 +126,31 @@ test_tsan_openssl: (cd build/test ; ln -sf Debug/ixwebsocket_unittest) (cd test ; python2.7 run.py -r) +test_ubsan_openssl: + mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_OPEN_SSL=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableUndefinedBehaviorSanitizer YES) + (cd build/test ; ln -sf Debug/ixwebsocket_unittest) + (cd test ; python2.7 run.py -r) + +test_tsan_openssl_release: + mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Release -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_OPEN_SSL=1 .. && xcodebuild -project ixwebsocket.xcodeproj -configuration Release -target ixwebsocket_unittest -enableThreadSanitizer YES) + (cd build/test ; ln -sf Release/ixwebsocket_unittest) + (cd test ; python2.7 run.py -r) + test_tsan_mbedtls: mkdir -p build && (cd build && cmake -GXcode -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 -DUSE_MBED_TLS=1 .. && xcodebuild -project ixwebsocket.xcodeproj -target ixwebsocket_unittest -enableThreadSanitizer YES) (cd build/test ; ln -sf Debug/ixwebsocket_unittest) (cd test ; python2.7 run.py -r) -test_openssl: +build_test_openssl: mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_OPEN_SSL=1 -DUSE_TEST=1 .. ; make -j 4) + +test_openssl: build_test_openssl (cd test ; python2.7 run.py -r) -test_mbedtls: +build_test_mbedtls: mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_MBED_TLS=1 -DUSE_TEST=1 .. ; make -j 4) + +test_mbedtls: build_test_mbedtls (cd test ; python2.7 run.py -r) test_no_ssl: diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9cf82bc2..a623d27e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -54,6 +54,7 @@ set (SOURCES IXWebSocketSubProtocolTest.cpp IXSentryClientTest.cpp IXWebSocketChatTest.cpp + IXWebSocketBroadcastTest.cpp ) # Some unittest don't work on windows yet diff --git a/test/IXWebSocketBroadcastTest.cpp b/test/IXWebSocketBroadcastTest.cpp new file mode 100644 index 00000000..fed38830 --- /dev/null +++ b/test/IXWebSocketBroadcastTest.cpp @@ -0,0 +1,305 @@ +/* + * IXWebSocketServerTest.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone. All rights reserved. + */ + +#include "IXTest.h" +#include "catch.hpp" +#include +#include "msgpack11.hpp" +#include +#include +#include +#include + +using msgpack11::MsgPack; +using namespace ix; + +namespace +{ + class WebSocketChat + { + public: + WebSocketChat(const std::string& user, const std::string& session, int port); + + void subscribe(const std::string& channel); + void start(); + void stop(); + bool isReady() const; + + void sendMessage(const std::string& text); + size_t getReceivedMessagesCount() const; + const std::vector& getReceivedMessages() const; + + std::string encodeMessage(const std::string& text); + std::pair decodeMessage(const std::string& str); + void appendMessage(const std::string& message); + + private: + std::string _user; + std::string _session; + int _port; + + ix::WebSocket _webSocket; + + std::vector _receivedMessages; + mutable std::mutex _mutex; + }; + + WebSocketChat::WebSocketChat(const std::string& user, const std::string& session, int port) + : _user(user) + , _session(session) + , _port(port) + { + _webSocket.setTLSOptions(makeClientTLSOptions()); + } + + size_t WebSocketChat::getReceivedMessagesCount() const + { + std::lock_guard lock(_mutex); + return _receivedMessages.size(); + } + + const std::vector& WebSocketChat::getReceivedMessages() const + { + std::lock_guard lock(_mutex); + return _receivedMessages; + } + + void WebSocketChat::appendMessage(const std::string& message) + { + std::lock_guard lock(_mutex); + _receivedMessages.push_back(message); + } + + bool WebSocketChat::isReady() const + { + return _webSocket.getReadyState() == ix::ReadyState::Open; + } + + void WebSocketChat::stop() + { + _webSocket.stop(); + } + + void WebSocketChat::start() + { + std::string url; + { + bool preferTLS = true; + url = makeCobraEndpoint(_port, preferTLS); + } + + _webSocket.setUrl(url); + + std::stringstream ss; + log(std::string("Connecting to url: ") + url); + + _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) { + std::stringstream ss; + if (msg->type == ix::WebSocketMessageType::Open) + { + ss << "websocket_broadcast_client: " << _user << " Connected !"; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + ss << "websocket_broadcast_client: " << _user << " disconnected !"; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + auto result = decodeMessage(msg->str); + + // Our "chat" / "broacast" node.js server does not send us + // the messages we send, so we don't need to have a msg_user != user + // as we do for the satori chat example. + + // store text + appendMessage(result.second); + + std::string payload = result.second; + if (payload.size() > 2000) + { + payload = ""; + } + + ss << std::endl << result.first << " > " << payload << std::endl << _user << " > "; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Error) + { + ss << "websocket_broadcast_client: " << _user << " Error ! " << msg->errorInfo.reason; + log(ss.str()); + } + else if (msg->type == ix::WebSocketMessageType::Ping) + { + log("websocket_broadcast_client: received ping message"); + } + else if (msg->type == ix::WebSocketMessageType::Pong) + { + log("websocket_broadcast_client: received pong message"); + } + else if (msg->type == ix::WebSocketMessageType::Fragment) + { + log("websocket_broadcast_client: received message fragment"); + } + else + { + ss << "Unexpected ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + std::pair WebSocketChat::decodeMessage(const std::string& str) + { + std::string errMsg; + MsgPack msg = MsgPack::parse(str, errMsg); + + std::string msg_user = msg["user"].string_value(); + std::string msg_text = msg["text"].string_value(); + + return std::pair(msg_user, msg_text); + } + + std::string WebSocketChat::encodeMessage(const std::string& text) + { + std::map obj; + obj["user"] = _user; + obj["text"] = text; + + MsgPack msg(obj); + + std::string output = msg.dump(); + return output; + } + + void WebSocketChat::sendMessage(const std::string& text) + { + _webSocket.sendBinary(encodeMessage(text)); + } + + bool startServer(ix::WebSocketServer& server, std::string& connectionId) + { + bool preferTLS = true; + server.setTLSOptions(makeServerTLSOptions(preferTLS)); + + server.setOnConnectionCallback([&server, &connectionId]( + std::shared_ptr webSocket, + std::shared_ptr connectionState) { + webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server]( + const ix::WebSocketMessagePtr& msg) { + if (msg->type == ix::WebSocketMessageType::Open) + { + TLogger() << "New connection"; + connectionState->computeId(); + TLogger() << "id: " << connectionState->getId(); + TLogger() << "Uri: " << msg->openInfo.uri; + TLogger() << "Headers:"; + for (auto it : msg->openInfo.headers) + { + TLogger() << it.first << ": " << it.second; + } + + connectionId = connectionState->getId(); + } + else if (msg->type == ix::WebSocketMessageType::Close) + { + TLogger() << "Closed connection"; + } + else if (msg->type == ix::WebSocketMessageType::Message) + { + for (auto&& client : server.getClients()) + { + if (client != webSocket) + { + client->send(msg->str, msg->binary); + } + } + } + }); + }); + + auto res = server.listen(); + if (!res.first) + { + TLogger() << res.second; + return false; + } + + server.start(); + return true; + } +} // namespace ix + +TEST_CASE("Websocket_broadcast_server", "[websocket_server]") +{ + SECTION("Connect to the server, do not send anything. Should timeout and return 400") + { + int port = getFreePort(); + ix::WebSocketServer server(port); + std::string connectionId; + REQUIRE(startServer(server, connectionId)); + + std::string session = ix::generateSessionId(); + std::vector> chatClients; + for (int i = 0 ; i < 10; ++i) + { + std::string user("user_" + std::to_string(i)); + chatClients.push_back(std::make_shared(user, session, port)); + chatClients[i]->start(); + ix::msleep(50); + } + + // Wait for all chat instance to be ready + while (true) + { + bool allReady = true; + for (size_t i = 0 ; i < chatClients.size(); ++i) + { + allReady &= chatClients[i]->isReady(); + } + if (allReady) break; + ix::msleep(10); + } + + for (int j = 0; j < 1000; j++) + { + for (size_t i = 0 ; i < chatClients.size(); ++i) + { + chatClients[i]->sendMessage("hello world"); + } + + if (j == 250) + { + server.stop(); + ix::msleep(100); + } + if (j == 500) + { + server.start(); + ix::msleep(100); + } + } + + // wait 1 second + ix::msleep(2000); + + // Stop all clients + size_t messageCount = chatClients.size() * 50; + for (size_t i = 0 ; i < chatClients.size(); ++i) + { + REQUIRE(chatClients[i]->getReceivedMessagesCount() >= messageCount); + chatClients[i]->stop(); + } + + // Give us 500ms for the server to notice that clients went away + ix::msleep(500); + server.stop(); + REQUIRE(server.getClients().size() == 0); + } +} diff --git a/test/IXWebSocketServerTest.cpp b/test/IXWebSocketServerTest.cpp index c51304bf..a53c697d 100644 --- a/test/IXWebSocketServerTest.cpp +++ b/test/IXWebSocketServerTest.cpp @@ -62,7 +62,7 @@ namespace ix { if (client != webSocket) { - client->send(msg->str); + client->send(msg->str, msg->binary); } } }