Merge branch 'machinezone:master' into master
This commit is contained in:
@@ -2,71 +2,45 @@
|
||||
# Author: Benjamin Sergeant
|
||||
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
|
||||
#
|
||||
cmake_minimum_required (VERSION 3.4.1)
|
||||
cmake_minimum_required (VERSION 3.14)
|
||||
project (ixwebsocket_unittest)
|
||||
|
||||
set (CMAKE_CXX_STANDARD 14)
|
||||
set (CMAKE_CXX_STANDARD 11)
|
||||
|
||||
if (MAC)
|
||||
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH})
|
||||
find_package(Sanitizers)
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
|
||||
set(CMAKE_LD_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
|
||||
option(USE_TLS "Add TLS support" ON)
|
||||
endif()
|
||||
|
||||
include_directories(
|
||||
${PROJECT_SOURCE_DIR}/Catch2/single_include
|
||||
../third_party
|
||||
../third_party/msgpack11
|
||||
../third_party/spdlog/include
|
||||
../ws
|
||||
)
|
||||
|
||||
add_definitions(-DSPDLOG_COMPILED_LIB=1)
|
||||
|
||||
find_package(JsonCpp)
|
||||
if (NOT JSONCPP_FOUND)
|
||||
include_directories(../third_party/jsoncpp)
|
||||
set(JSONCPP_SOURCES ../third_party/jsoncpp/jsoncpp.cpp)
|
||||
endif()
|
||||
option(USE_TLS "Add TLS support" ON)
|
||||
|
||||
# Shared sources
|
||||
set (SOURCES
|
||||
${JSONCPP_SOURCES}
|
||||
|
||||
test_runner.cpp
|
||||
IXTest.cpp
|
||||
IXGetFreePort.cpp
|
||||
../third_party/msgpack11/msgpack11.cpp
|
||||
|
||||
IXSocketTest.cpp
|
||||
IXSocketConnectTest.cpp
|
||||
IXWebSocketServerTest.cpp
|
||||
IXWebSocketTestConnectionDisconnection.cpp
|
||||
IXUrlParserTest.cpp
|
||||
IXWebSocketServerTest.cpp
|
||||
IXHttpClientTest.cpp
|
||||
IXHttpServerTest.cpp
|
||||
IXUnityBuildsTest.cpp
|
||||
IXHttpTest.cpp
|
||||
IXDNSLookupTest.cpp
|
||||
IXWebSocketSubProtocolTest.cpp
|
||||
IXSentryClientTest.cpp
|
||||
IXWebSocketChatTest.cpp
|
||||
IXWebSocketBroadcastTest.cpp
|
||||
set (TEST_TARGET_NAMES
|
||||
IXSocketTest
|
||||
IXSocketConnectTest
|
||||
IXWebSocketServerTest
|
||||
IXWebSocketTestConnectionDisconnection
|
||||
IXUrlParserTest
|
||||
IXHttpClientTest
|
||||
IXUnityBuildsTest
|
||||
IXHttpTest
|
||||
IXDNSLookupTest
|
||||
IXWebSocketSubProtocolTest
|
||||
# IXWebSocketBroadcastTest ## FIXME was depending on cobra / take a broadcast server from ws
|
||||
IXStrCaseCompareTest
|
||||
)
|
||||
|
||||
# Some unittest don't work on windows yet
|
||||
# Windows without TLS does not have hmac yet
|
||||
if (UNIX)
|
||||
list(APPEND SOURCES
|
||||
IXWebSocketCloseTest.cpp
|
||||
IXCobraChatTest.cpp
|
||||
IXCobraMetricsPublisherTest.cpp
|
||||
IXCobraToSentryBotTest.cpp
|
||||
IXCobraToStatsdBotTest.cpp
|
||||
IXCobraToStdoutBotTest.cpp
|
||||
list(APPEND TEST_TARGET_NAMES
|
||||
IXWebSocketCloseTest
|
||||
|
||||
# Fail on Windows in CI probably because the pathing is wrong and
|
||||
# some resource files cannot be found
|
||||
IXHttpServerTest
|
||||
IXWebSocketChatTest
|
||||
)
|
||||
endif()
|
||||
|
||||
if (USE_ZLIB)
|
||||
list(APPEND TEST_TARGET_NAMES
|
||||
IXWebSocketPerMessageDeflateCompressorTest
|
||||
)
|
||||
endif()
|
||||
|
||||
@@ -74,32 +48,51 @@ endif()
|
||||
# IXWebSocketPingTest.cpp
|
||||
# IXWebSocketPingTimeoutTest.cpp
|
||||
|
||||
# IXWebSocketLeakTest.cpp # commented until we have a fix for #224 /
|
||||
# that was was fixed but now the test does not compile
|
||||
|
||||
# Disable tests for now that are failing or not reliable
|
||||
|
||||
add_executable(ixwebsocket_unittest ${SOURCES})
|
||||
add_library(ixwebsocket_test)
|
||||
target_sources(ixwebsocket_test PRIVATE
|
||||
${JSONCPP_SOURCES}
|
||||
test_runner.cpp
|
||||
IXTest.cpp
|
||||
../third_party/msgpack11/msgpack11.cpp
|
||||
)
|
||||
target_compile_definitions(ixwebsocket_test PRIVATE ${TEST_PROGRAMS_DEFINITIONS})
|
||||
target_include_directories(ixwebsocket_test PRIVATE
|
||||
${PROJECT_SOURCE_DIR}/Catch2/single_include
|
||||
../third_party
|
||||
)
|
||||
target_link_libraries(ixwebsocket_test ixwebsocket)
|
||||
target_link_libraries(ixwebsocket_test spdlog)
|
||||
|
||||
if (MAC)
|
||||
add_sanitizers(ixwebsocket_unittest)
|
||||
endif()
|
||||
foreach(TEST_TARGET_NAME ${TEST_TARGET_NAMES})
|
||||
add_executable(${TEST_TARGET_NAME}
|
||||
${TEST_TARGET_NAME}.cpp
|
||||
)
|
||||
|
||||
if (APPLE AND USE_TLS)
|
||||
target_link_libraries(ixwebsocket_unittest "-framework foundation" "-framework security")
|
||||
endif()
|
||||
target_include_directories(${TEST_TARGET_NAME} PRIVATE
|
||||
${PROJECT_SOURCE_DIR}/Catch2/single_include
|
||||
../third_party
|
||||
../third_party/msgpack11
|
||||
)
|
||||
|
||||
if (JSONCPP_FOUND)
|
||||
target_include_directories(ixwebsocket_unittest PUBLIC ${JSONCPP_INCLUDE_DIRS})
|
||||
target_link_libraries(ixwebsocket_unittest ${JSONCPP_LIBRARIES})
|
||||
endif()
|
||||
target_compile_definitions(${TEST_TARGET_NAME} PRIVATE SPDLOG_COMPILED_LIB=1)
|
||||
|
||||
# library with the most dependencies come first
|
||||
target_link_libraries(ixwebsocket_unittest ixbots)
|
||||
target_link_libraries(ixwebsocket_unittest ixsnake)
|
||||
target_link_libraries(ixwebsocket_unittest ixcobra)
|
||||
target_link_libraries(ixwebsocket_unittest ixsentry)
|
||||
target_link_libraries(ixwebsocket_unittest ixwebsocket)
|
||||
target_link_libraries(ixwebsocket_unittest ixcrypto)
|
||||
target_link_libraries(ixwebsocket_unittest ixcore)
|
||||
if (APPLE AND USE_TLS)
|
||||
target_link_libraries(${TEST_TARGET_NAME} "-framework foundation" "-framework security")
|
||||
endif()
|
||||
|
||||
target_link_libraries(ixwebsocket_unittest spdlog)
|
||||
# library with the most dependencies come first
|
||||
target_link_libraries(${TEST_TARGET_NAME} ixwebsocket_test)
|
||||
target_link_libraries(${TEST_TARGET_NAME} ixwebsocket)
|
||||
|
||||
install(TARGETS ixwebsocket_unittest DESTINATION bin)
|
||||
target_link_libraries(${TEST_TARGET_NAME} spdlog)
|
||||
|
||||
add_test(NAME ${TEST_TARGET_NAME}
|
||||
COMMAND ${TEST_TARGET_NAME}
|
||||
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
||||
endforeach()
|
||||
|
@@ -1,349 +0,0 @@
|
||||
/*
|
||||
* cmd_satori_chat.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2017 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <ixcobra/IXCobraConnection.h>
|
||||
#include <ixcrypto/IXUuid.h>
|
||||
#include <ixsnake/IXRedisServer.h>
|
||||
#include <ixsnake/IXSnakeServer.h>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
std::atomic<size_t> incomingBytes(0);
|
||||
std::atomic<size_t> outgoingBytes(0);
|
||||
|
||||
void setupTrafficTrackerCallback()
|
||||
{
|
||||
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
|
||||
if (incoming)
|
||||
{
|
||||
incomingBytes += size;
|
||||
}
|
||||
else
|
||||
{
|
||||
outgoingBytes += size;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class CobraChat
|
||||
{
|
||||
public:
|
||||
CobraChat(const std::string& user,
|
||||
const std::string& session,
|
||||
const ix::CobraConfig& config);
|
||||
|
||||
void subscribe(const std::string& channel);
|
||||
void start();
|
||||
void stop();
|
||||
void run();
|
||||
bool isReady() const;
|
||||
|
||||
void sendMessage(const std::string& text);
|
||||
size_t getReceivedMessagesCount() const;
|
||||
|
||||
bool hasPendingMessages() const;
|
||||
Json::Value popMessage();
|
||||
|
||||
private:
|
||||
std::string _user;
|
||||
std::string _session;
|
||||
ix::CobraConfig _cobraConfig;
|
||||
|
||||
std::queue<Json::Value> _publish_queue;
|
||||
mutable std::mutex _queue_mutex;
|
||||
|
||||
std::thread _thread;
|
||||
std::atomic<bool> _stop;
|
||||
|
||||
ix::CobraConnection _conn;
|
||||
std::atomic<bool> _connectedAndSubscribed;
|
||||
|
||||
std::queue<Json::Value> _receivedQueue;
|
||||
|
||||
std::mutex _logMutex;
|
||||
};
|
||||
|
||||
CobraChat::CobraChat(const std::string& user,
|
||||
const std::string& session,
|
||||
const ix::CobraConfig& config)
|
||||
: _user(user)
|
||||
, _session(session)
|
||||
, _cobraConfig(config)
|
||||
, _stop(false)
|
||||
, _connectedAndSubscribed(false)
|
||||
{
|
||||
}
|
||||
|
||||
void CobraChat::start()
|
||||
{
|
||||
_thread = std::thread(&CobraChat::run, this);
|
||||
}
|
||||
|
||||
void CobraChat::stop()
|
||||
{
|
||||
_stop = true;
|
||||
_thread.join();
|
||||
}
|
||||
|
||||
bool CobraChat::isReady() const
|
||||
{
|
||||
return _connectedAndSubscribed;
|
||||
}
|
||||
|
||||
size_t CobraChat::getReceivedMessagesCount() const
|
||||
{
|
||||
return _receivedQueue.size();
|
||||
}
|
||||
|
||||
bool CobraChat::hasPendingMessages() const
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
||||
return !_publish_queue.empty();
|
||||
}
|
||||
|
||||
Json::Value CobraChat::popMessage()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
||||
auto msg = _publish_queue.front();
|
||||
_publish_queue.pop();
|
||||
return msg;
|
||||
}
|
||||
|
||||
//
|
||||
// Callback to handle received messages, that are printed on the console
|
||||
//
|
||||
void CobraChat::subscribe(const std::string& channel)
|
||||
{
|
||||
std::string filter;
|
||||
std::string position("$");
|
||||
|
||||
_conn.subscribe(channel,
|
||||
filter,
|
||||
position,
|
||||
[this](const Json::Value& msg, const std::string& /*position*/) {
|
||||
spdlog::info("receive {}", msg.toStyledString());
|
||||
|
||||
if (!msg.isObject()) return;
|
||||
if (!msg.isMember("user")) return;
|
||||
if (!msg.isMember("text")) return;
|
||||
if (!msg.isMember("session")) return;
|
||||
|
||||
std::string msg_user = msg["user"].asString();
|
||||
std::string msg_text = msg["text"].asString();
|
||||
std::string msg_session = msg["session"].asString();
|
||||
|
||||
// We are not interested in messages
|
||||
// from a different session.
|
||||
if (msg_session != _session) return;
|
||||
|
||||
// We are not interested in our own messages
|
||||
if (msg_user == _user) return;
|
||||
|
||||
_receivedQueue.push(msg);
|
||||
|
||||
std::stringstream ss;
|
||||
ss << std::endl
|
||||
<< msg_user << " > " << msg_text << std::endl
|
||||
<< _user << " > ";
|
||||
log(ss.str());
|
||||
});
|
||||
}
|
||||
|
||||
void CobraChat::sendMessage(const std::string& text)
|
||||
{
|
||||
Json::Value msg;
|
||||
msg["user"] = _user;
|
||||
msg["session"] = _session;
|
||||
msg["text"] = text;
|
||||
|
||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
||||
_publish_queue.push(msg);
|
||||
}
|
||||
|
||||
//
|
||||
// Do satori communication on a background thread, where we can have
|
||||
// something like an event loop that publish, poll and receive data
|
||||
//
|
||||
void CobraChat::run()
|
||||
{
|
||||
std::string channel = _session;
|
||||
|
||||
_conn.configure(_cobraConfig);
|
||||
_conn.connect();
|
||||
|
||||
_conn.setEventCallback([this, channel](const CobraEventPtr& event) {
|
||||
if (event->type == ix::CobraEventType::Open)
|
||||
{
|
||||
log("Subscriber connected: " + _user);
|
||||
for (auto&& it : event->headers)
|
||||
{
|
||||
log("Headers " + it.first + " " + it.second);
|
||||
}
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Authenticated)
|
||||
{
|
||||
log("Subscriber authenticated: " + _user);
|
||||
subscribe(channel);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Error)
|
||||
{
|
||||
log(event->errMsg + _user);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Closed)
|
||||
{
|
||||
log("Connection closed: " + _user);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Subscribed)
|
||||
{
|
||||
log("Subscription ok: " + _user + " subscription_id " + event->subscriptionId);
|
||||
_connectedAndSubscribed = true;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||
{
|
||||
log("Unsubscription ok: " + _user + " subscription_id " + event->subscriptionId);
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Published)
|
||||
{
|
||||
TLogger() << "Subscriber: published message acked: " << event->msgId;
|
||||
}
|
||||
});
|
||||
|
||||
while (!_stop)
|
||||
{
|
||||
{
|
||||
while (hasPendingMessages())
|
||||
{
|
||||
auto msg = popMessage();
|
||||
|
||||
std::string text = msg["text"].asString();
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "Sending msg [" << text << "]";
|
||||
log(ss.str());
|
||||
|
||||
Json::Value channels;
|
||||
channels.append(channel);
|
||||
_conn.publish(channels, msg);
|
||||
}
|
||||
}
|
||||
|
||||
ix::msleep(50);
|
||||
}
|
||||
|
||||
_conn.unsubscribe(channel);
|
||||
|
||||
ix::msleep(50);
|
||||
_conn.disconnect();
|
||||
|
||||
_conn.setEventCallback([](const CobraEventPtr& /*event*/) {});
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Cobra_chat", "[cobra_chat]")
|
||||
{
|
||||
SECTION("Exchange and count sent/received messages.")
|
||||
{
|
||||
int port = getFreePort();
|
||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
||||
|
||||
// Start a redis server
|
||||
ix::RedisServer redisServer(appConfig.redisPort);
|
||||
auto res = redisServer.listen();
|
||||
REQUIRE(res.first);
|
||||
redisServer.start();
|
||||
|
||||
// Start a snake server
|
||||
snake::SnakeServer snakeServer(appConfig);
|
||||
snakeServer.run();
|
||||
|
||||
int timeout;
|
||||
setupTrafficTrackerCallback();
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
||||
std::string role = "_sub";
|
||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
||||
std::string endpoint = makeCobraEndpoint(port, true);
|
||||
|
||||
ix::CobraConfig config;
|
||||
config.endpoint = endpoint;
|
||||
config.appkey = appkey;
|
||||
config.rolename = role;
|
||||
config.rolesecret = secret;
|
||||
config.socketTLSOptions = makeClientTLSOptions();
|
||||
|
||||
CobraChat chatA("jean", session, config);
|
||||
CobraChat chatB("paul", session, config);
|
||||
|
||||
chatA.start();
|
||||
chatB.start();
|
||||
|
||||
// Wait for all chat instance to be ready
|
||||
timeout = 10 * 1000; // 10s
|
||||
while (true)
|
||||
{
|
||||
if (chatA.isReady() && chatB.isReady()) break;
|
||||
ix::msleep(10);
|
||||
|
||||
timeout -= 10;
|
||||
if (timeout <= 0)
|
||||
{
|
||||
snakeServer.stop();
|
||||
redisServer.stop();
|
||||
REQUIRE(false); // timeout
|
||||
}
|
||||
}
|
||||
|
||||
// Add a bit of extra time, for the subscription to be active
|
||||
ix::msleep(1000);
|
||||
|
||||
chatA.sendMessage("from A1");
|
||||
chatA.sendMessage("from A2");
|
||||
chatA.sendMessage("from A3");
|
||||
|
||||
chatB.sendMessage("from B1");
|
||||
chatB.sendMessage("from B2");
|
||||
|
||||
// 1. Wait for all messages to be sent
|
||||
timeout = 10 * 1000; // 10s
|
||||
while (chatA.hasPendingMessages() || chatB.hasPendingMessages())
|
||||
{
|
||||
ix::msleep(10);
|
||||
|
||||
timeout -= 10;
|
||||
if (timeout <= 0)
|
||||
{
|
||||
snakeServer.stop();
|
||||
redisServer.stop();
|
||||
REQUIRE(false); // timeout
|
||||
}
|
||||
}
|
||||
|
||||
// Give us 1s for all messages to be received
|
||||
ix::msleep(1000);
|
||||
|
||||
chatA.stop();
|
||||
chatB.stop();
|
||||
|
||||
REQUIRE(chatA.getReceivedMessagesCount() == 2);
|
||||
REQUIRE(chatB.getReceivedMessagesCount() == 3);
|
||||
|
||||
spdlog::info("Incoming bytes {}", incomingBytes);
|
||||
spdlog::info("Outgoing bytes {}", outgoingBytes);
|
||||
|
||||
spdlog::info("Stopping snake server...");
|
||||
snakeServer.stop();
|
||||
|
||||
spdlog::info("Stopping redis server...");
|
||||
redisServer.stop();
|
||||
}
|
||||
}
|
@@ -1,301 +0,0 @@
|
||||
/*
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2018 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <iostream>
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <ixcrypto/IXUuid.h>
|
||||
#include <ixsnake/IXRedisServer.h>
|
||||
#include <ixsnake/IXSnakeServer.h>
|
||||
#include <set>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
std::atomic<size_t> incomingBytes(0);
|
||||
std::atomic<size_t> outgoingBytes(0);
|
||||
|
||||
void setupTrafficTrackerCallback()
|
||||
{
|
||||
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
|
||||
if (incoming)
|
||||
{
|
||||
incomingBytes += size;
|
||||
}
|
||||
else
|
||||
{
|
||||
outgoingBytes += size;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::atomic<bool> gStop;
|
||||
std::atomic<bool> gSubscriberConnectedAndSubscribed;
|
||||
std::atomic<size_t> gUniqueMessageIdsCount;
|
||||
std::atomic<int> gMessageCount;
|
||||
|
||||
std::set<std::string> gIds;
|
||||
std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex.
|
||||
|
||||
//
|
||||
// Background thread subscribe to the channel and validates what was sent
|
||||
//
|
||||
void startSubscriber(const ix::CobraConfig& config, const std::string& channel)
|
||||
{
|
||||
gSubscriberConnectedAndSubscribed = false;
|
||||
gUniqueMessageIdsCount = 0;
|
||||
gMessageCount = 0;
|
||||
|
||||
ix::CobraConnection conn;
|
||||
conn.configure(config);
|
||||
conn.connect();
|
||||
|
||||
conn.setEventCallback([&conn, &channel](const CobraEventPtr& event) {
|
||||
if (event->type == ix::CobraEventType::Open)
|
||||
{
|
||||
TLogger() << "Subscriber connected:";
|
||||
for (auto&& it : event->headers)
|
||||
{
|
||||
log("Headers " + it.first + " " + it.second);
|
||||
}
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Closed)
|
||||
{
|
||||
TLogger() << "Subscriber closed:" << event->errMsg;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Error)
|
||||
{
|
||||
TLogger() << "Subscriber error:" << event->errMsg;
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Authenticated)
|
||||
{
|
||||
log("Subscriber authenticated");
|
||||
std::string filter;
|
||||
std::string position("$");
|
||||
|
||||
conn.subscribe(channel,
|
||||
filter,
|
||||
position,
|
||||
[](const Json::Value& msg, const std::string& /*position*/) {
|
||||
log(msg.toStyledString());
|
||||
|
||||
std::string id = msg["id"].asString();
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(gProtectIds);
|
||||
gIds.insert(id);
|
||||
}
|
||||
|
||||
gMessageCount++;
|
||||
});
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Subscribed)
|
||||
{
|
||||
TLogger() << "Subscriber: subscribed to channel " << event->subscriptionId;
|
||||
if (event->subscriptionId == channel)
|
||||
{
|
||||
gSubscriberConnectedAndSubscribed = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
|
||||
}
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::UnSubscribed)
|
||||
{
|
||||
TLogger() << "Subscriber: ununexpected from channel " << event->subscriptionId;
|
||||
if (event->subscriptionId != channel)
|
||||
{
|
||||
TLogger() << "Subscriber: unexpected channel " << event->subscriptionId;
|
||||
}
|
||||
}
|
||||
else if (event->type == ix::CobraEventType::Published)
|
||||
{
|
||||
TLogger() << "Subscriber: published message acked: " << event->msgId;
|
||||
}
|
||||
});
|
||||
|
||||
while (!gStop)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
|
||||
conn.unsubscribe(channel);
|
||||
conn.disconnect();
|
||||
|
||||
gUniqueMessageIdsCount = gIds.size();
|
||||
}
|
||||
|
||||
// publish 100 messages, during roughly 100ms
|
||||
// this is used to test thread safety of CobraMetricsPublisher::push
|
||||
void runAdditionalPublisher(ix::CobraMetricsPublisher* cobraMetricsPublisher)
|
||||
{
|
||||
Json::Value data;
|
||||
data["foo"] = "bar";
|
||||
|
||||
for (int i = 0; i < 100; ++i)
|
||||
{
|
||||
cobraMetricsPublisher->push("sms_metric_F_id", data);
|
||||
ix::msleep(1);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
|
||||
{
|
||||
int port = getFreePort();
|
||||
bool preferTLS = true;
|
||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, preferTLS);
|
||||
|
||||
// Start a redis server
|
||||
ix::RedisServer redisServer(appConfig.redisPort);
|
||||
auto res = redisServer.listen();
|
||||
REQUIRE(res.first);
|
||||
redisServer.start();
|
||||
|
||||
// Start a snake server
|
||||
snake::SnakeServer snakeServer(appConfig);
|
||||
snakeServer.run();
|
||||
|
||||
setupTrafficTrackerCallback();
|
||||
|
||||
std::string channel = ix::generateSessionId();
|
||||
std::string endpoint = makeCobraEndpoint(port, preferTLS);
|
||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
||||
std::string role = "_sub";
|
||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
||||
|
||||
ix::CobraConfig config;
|
||||
config.endpoint = endpoint;
|
||||
config.appkey = appkey;
|
||||
config.rolename = role;
|
||||
config.rolesecret = secret;
|
||||
config.socketTLSOptions = makeClientTLSOptions();
|
||||
|
||||
gStop = false;
|
||||
std::thread subscriberThread(&startSubscriber, config, channel);
|
||||
|
||||
int timeout = 10 * 1000; // 10s
|
||||
|
||||
// Wait until the subscriber is ready (authenticated + subscription successful)
|
||||
while (!gSubscriberConnectedAndSubscribed)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
|
||||
timeout -= 10;
|
||||
if (timeout <= 0)
|
||||
{
|
||||
snakeServer.stop();
|
||||
redisServer.stop();
|
||||
REQUIRE(false); // timeout
|
||||
}
|
||||
}
|
||||
|
||||
ix::CobraMetricsPublisher cobraMetricsPublisher;
|
||||
cobraMetricsPublisher.configure(config, channel);
|
||||
cobraMetricsPublisher.setSession(uuid4());
|
||||
cobraMetricsPublisher.enable(true);
|
||||
|
||||
Json::Value data;
|
||||
data["foo"] = "bar";
|
||||
|
||||
// (1) Publish without restrictions
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1)
|
||||
cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2)
|
||||
|
||||
// (2) Restrict what is sent using a blacklist
|
||||
// Add one entry to the blacklist
|
||||
// (will send msg #3)
|
||||
cobraMetricsPublisher.setBlacklist({
|
||||
"sms_metric_B_id" // this id will not be sent
|
||||
});
|
||||
// (msg #4)
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", data);
|
||||
// ...
|
||||
cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent
|
||||
|
||||
// Reset the blacklist
|
||||
// (msg #5)
|
||||
cobraMetricsPublisher.setBlacklist({}); // 4.
|
||||
|
||||
// (3) Restrict what is sent using rate control
|
||||
|
||||
// (msg #6)
|
||||
cobraMetricsPublisher.setRateControl({
|
||||
{"sms_metric_C_id", 1}, // published once per minute (60 seconds) max
|
||||
});
|
||||
// (msg #7)
|
||||
cobraMetricsPublisher.push("sms_metric_C_id", data);
|
||||
cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent
|
||||
|
||||
ix::msleep(1400);
|
||||
|
||||
// (msg #8)
|
||||
cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent
|
||||
|
||||
ix::msleep(600); // wait a bit so that the last message is sent and can be received
|
||||
|
||||
log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher.");
|
||||
|
||||
// Test suspend + resume
|
||||
for (int i = 0; i < 3; ++i)
|
||||
{
|
||||
cobraMetricsPublisher.suspend();
|
||||
ix::msleep(500);
|
||||
REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time
|
||||
|
||||
cobraMetricsPublisher.resume();
|
||||
ix::msleep(2000); // give cobra 2s to connect
|
||||
REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_E_id", data);
|
||||
}
|
||||
|
||||
ix::msleep(500);
|
||||
|
||||
// Test multi-threaded publish
|
||||
std::thread bgPublisher1(&runAdditionalPublisher, &cobraMetricsPublisher);
|
||||
std::thread bgPublisher2(&runAdditionalPublisher, &cobraMetricsPublisher);
|
||||
std::thread bgPublisher3(&runAdditionalPublisher, &cobraMetricsPublisher);
|
||||
std::thread bgPublisher4(&runAdditionalPublisher, &cobraMetricsPublisher);
|
||||
std::thread bgPublisher5(&runAdditionalPublisher, &cobraMetricsPublisher);
|
||||
|
||||
bgPublisher1.join();
|
||||
bgPublisher2.join();
|
||||
bgPublisher3.join();
|
||||
bgPublisher4.join();
|
||||
bgPublisher5.join();
|
||||
|
||||
// Now stop the thread
|
||||
gStop = true;
|
||||
subscriberThread.join();
|
||||
|
||||
//
|
||||
// Validate that we received all message kinds, and the correct number of messages
|
||||
//
|
||||
CHECK(gIds.count("sms_metric_A_id") == 1);
|
||||
CHECK(gIds.count("sms_metric_B_id") == 1);
|
||||
CHECK(gIds.count("sms_metric_C_id") == 1);
|
||||
CHECK(gIds.count("sms_metric_D_id") == 1);
|
||||
CHECK(gIds.count("sms_metric_E_id") == 1);
|
||||
CHECK(gIds.count("sms_metric_F_id") == 1);
|
||||
CHECK(gIds.count("sms_set_rate_control_id") == 1);
|
||||
CHECK(gIds.count("sms_set_blacklist_id") == 1);
|
||||
|
||||
spdlog::info("Incoming bytes {}", incomingBytes);
|
||||
spdlog::info("Outgoing bytes {}", outgoingBytes);
|
||||
|
||||
spdlog::info("Stopping snake server...");
|
||||
snakeServer.stop();
|
||||
|
||||
spdlog::info("Stopping redis server...");
|
||||
redisServer.stop();
|
||||
}
|
@@ -1,182 +0,0 @@
|
||||
/*
|
||||
* IXCobraToSentryTest.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <ixbots/IXCobraToSentryBot.h>
|
||||
#include <ixcobra/IXCobraConnection.h>
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <ixcrypto/IXUuid.h>
|
||||
#include <ixsentry/IXSentryClient.h>
|
||||
#include <ixsnake/IXRedisServer.h>
|
||||
#include <ixsnake/IXSnakeServer.h>
|
||||
#include <ixwebsocket/IXHttpServer.h>
|
||||
#include <ixwebsocket/IXUserAgent.h>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
std::atomic<size_t> incomingBytes(0);
|
||||
std::atomic<size_t> outgoingBytes(0);
|
||||
|
||||
void setupTrafficTrackerCallback()
|
||||
{
|
||||
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
|
||||
if (incoming)
|
||||
{
|
||||
incomingBytes += size;
|
||||
}
|
||||
else
|
||||
{
|
||||
outgoingBytes += size;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
|
||||
{
|
||||
ix::CobraMetricsPublisher cobraMetricsPublisher;
|
||||
cobraMetricsPublisher.configure(config, channel);
|
||||
cobraMetricsPublisher.setSession(uuid4());
|
||||
cobraMetricsPublisher.enable(true);
|
||||
|
||||
Json::Value msg;
|
||||
msg["fps"] = 60;
|
||||
|
||||
cobraMetricsPublisher.setGenericAttributes("game", "ody");
|
||||
|
||||
// Wait a bit
|
||||
ix::msleep(500);
|
||||
|
||||
// publish some messages
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
|
||||
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
|
||||
ix::msleep(500);
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
|
||||
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
|
||||
ix::msleep(500);
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
|
||||
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
|
||||
ix::msleep(500);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
|
||||
{
|
||||
SECTION("Exchange and count sent/received messages.")
|
||||
{
|
||||
int port = getFreePort();
|
||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
||||
|
||||
// Start a redis server
|
||||
ix::RedisServer redisServer(appConfig.redisPort);
|
||||
auto res = redisServer.listen();
|
||||
REQUIRE(res.first);
|
||||
redisServer.start();
|
||||
|
||||
// Start a snake server
|
||||
snake::SnakeServer snakeServer(appConfig);
|
||||
snakeServer.run();
|
||||
|
||||
// Start a fake sentry http server
|
||||
SocketTLSOptions tlsOptionsServer = makeServerTLSOptions(true);
|
||||
|
||||
int sentryPort = getFreePort();
|
||||
ix::HttpServer sentryServer(sentryPort, "127.0.0.1");
|
||||
sentryServer.setTLSOptions(tlsOptionsServer);
|
||||
|
||||
sentryServer.setOnConnectionCallback(
|
||||
[](HttpRequestPtr request,
|
||||
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Server"] = userAgent();
|
||||
|
||||
// Log request
|
||||
std::stringstream ss;
|
||||
ss << request->method << " " << request->headers["User-Agent"] << " "
|
||||
<< request->uri;
|
||||
|
||||
if (request->method == "POST")
|
||||
{
|
||||
return std::make_shared<HttpResponse>(
|
||||
200, "OK", HttpErrorCode::Ok, headers, std::string());
|
||||
}
|
||||
else
|
||||
{
|
||||
return std::make_shared<HttpResponse>(
|
||||
405, "OK", HttpErrorCode::Invalid, headers, std::string("Invalid method"));
|
||||
}
|
||||
});
|
||||
|
||||
res = sentryServer.listen();
|
||||
REQUIRE(res.first);
|
||||
sentryServer.start();
|
||||
|
||||
setupTrafficTrackerCallback();
|
||||
|
||||
// Run the bot for a small amount of time
|
||||
std::string channel = ix::generateSessionId();
|
||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
||||
std::string role = "_sub";
|
||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
||||
std::string endpoint = makeCobraEndpoint(port, true);
|
||||
|
||||
ix::CobraConfig config;
|
||||
config.endpoint = endpoint;
|
||||
config.appkey = appkey;
|
||||
config.rolename = role;
|
||||
config.rolesecret = secret;
|
||||
config.socketTLSOptions = makeClientTLSOptions();
|
||||
|
||||
std::thread publisherThread(runPublisher, config, channel);
|
||||
|
||||
ix::CobraBotConfig cobraBotConfig;
|
||||
cobraBotConfig.cobraConfig = config;
|
||||
cobraBotConfig.channel = channel;
|
||||
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
|
||||
cobraBotConfig.enableHeartbeat = false;
|
||||
bool verbose = true;
|
||||
|
||||
// FIXME: try to get this working with https instead of http
|
||||
// to regress the TLS 1.3 OpenSSL bug
|
||||
// -> https://github.com/openssl/openssl/issues/7967
|
||||
// https://xxxxx:yyyyyy@sentry.io/1234567
|
||||
std::stringstream oss;
|
||||
oss << getHttpScheme() << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
|
||||
std::string dsn = oss.str();
|
||||
|
||||
SocketTLSOptions tlsOptionsClient = makeClientTLSOptions();
|
||||
|
||||
SentryClient sentryClient(dsn);
|
||||
sentryClient.setTLSOptions(tlsOptionsClient);
|
||||
|
||||
int64_t sentCount = cobra_to_sentry_bot(cobraBotConfig, sentryClient, verbose);
|
||||
//
|
||||
// We want at least 2 messages to be sent
|
||||
//
|
||||
REQUIRE(sentCount >= 2);
|
||||
|
||||
// Give us 1s for all messages to be received
|
||||
ix::msleep(1000);
|
||||
|
||||
spdlog::info("Incoming bytes {}", incomingBytes);
|
||||
spdlog::info("Outgoing bytes {}", outgoingBytes);
|
||||
|
||||
spdlog::info("Stopping snake server...");
|
||||
snakeServer.stop();
|
||||
|
||||
spdlog::info("Stopping redis server...");
|
||||
redisServer.stop();
|
||||
|
||||
publisherThread.join();
|
||||
sentryServer.stop();
|
||||
}
|
||||
}
|
@@ -1,133 +0,0 @@
|
||||
/*
|
||||
* IXCobraToStatsdTest.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <ixbots/IXCobraToStatsdBot.h>
|
||||
#include <ixcobra/IXCobraConnection.h>
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <ixcrypto/IXUuid.h>
|
||||
#include <ixsentry/IXSentryClient.h>
|
||||
#include <ixsnake/IXRedisServer.h>
|
||||
#include <ixsnake/IXSnakeServer.h>
|
||||
#include <ixwebsocket/IXHttpServer.h>
|
||||
#include <ixwebsocket/IXUserAgent.h>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
|
||||
{
|
||||
ix::CobraMetricsPublisher cobraMetricsPublisher;
|
||||
cobraMetricsPublisher.configure(config, channel);
|
||||
cobraMetricsPublisher.setSession(uuid4());
|
||||
cobraMetricsPublisher.enable(true);
|
||||
|
||||
Json::Value msg;
|
||||
msg["fps"] = 60;
|
||||
|
||||
cobraMetricsPublisher.setGenericAttributes("game", "ody");
|
||||
|
||||
// Wait a bit
|
||||
ix::msleep(500);
|
||||
|
||||
// publish some messages
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
|
||||
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
|
||||
ix::msleep(500);
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
|
||||
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
|
||||
ix::msleep(500);
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
|
||||
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
|
||||
ix::msleep(500);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Cobra_to_statsd_bot", "[cobra_bots]")
|
||||
{
|
||||
SECTION("Exchange and count sent/received messages.")
|
||||
{
|
||||
int port = getFreePort();
|
||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
||||
|
||||
// Start a redis server
|
||||
ix::RedisServer redisServer(appConfig.redisPort);
|
||||
auto res = redisServer.listen();
|
||||
REQUIRE(res.first);
|
||||
redisServer.start();
|
||||
|
||||
// Start a snake server
|
||||
snake::SnakeServer snakeServer(appConfig);
|
||||
snakeServer.run();
|
||||
|
||||
// Start a fake statsd server (ultimately)
|
||||
|
||||
// Run the bot for a small amount of time
|
||||
std::string channel = ix::generateSessionId();
|
||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
||||
std::string role = "_sub";
|
||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
||||
std::string endpoint = makeCobraEndpoint(port, true);
|
||||
|
||||
ix::CobraConfig config;
|
||||
config.endpoint = endpoint;
|
||||
config.appkey = appkey;
|
||||
config.rolename = role;
|
||||
config.rolesecret = secret;
|
||||
config.socketTLSOptions = makeClientTLSOptions();
|
||||
|
||||
std::thread publisherThread(runPublisher, config, channel);
|
||||
|
||||
ix::CobraBotConfig cobraBotConfig;
|
||||
cobraBotConfig.cobraConfig = config;
|
||||
cobraBotConfig.channel = channel;
|
||||
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
|
||||
cobraBotConfig.enableHeartbeat = false;
|
||||
|
||||
std::string hostname("127.0.0.1");
|
||||
// std::string hostname("www.google.com");
|
||||
int statsdPort = 8125;
|
||||
std::string prefix("ix.test");
|
||||
StatsdClient statsdClient(hostname, statsdPort, prefix);
|
||||
|
||||
std::string errMsg;
|
||||
bool initialized = statsdClient.init(errMsg);
|
||||
if (!initialized)
|
||||
{
|
||||
spdlog::error(errMsg);
|
||||
}
|
||||
REQUIRE(initialized);
|
||||
|
||||
std::string fields("device.game\ndevice.os_name");
|
||||
std::string gauge;
|
||||
std::string timer;
|
||||
bool verbose = true;
|
||||
|
||||
int64_t sentCount =
|
||||
ix::cobra_to_statsd_bot(cobraBotConfig, statsdClient, fields, gauge, timer, verbose);
|
||||
//
|
||||
// We want at least 2 messages to be sent
|
||||
//
|
||||
REQUIRE(sentCount >= 2);
|
||||
|
||||
// Give us 1s for all messages to be received
|
||||
ix::msleep(1000);
|
||||
|
||||
spdlog::info("Stopping snake server...");
|
||||
snakeServer.stop();
|
||||
|
||||
spdlog::info("Stopping redis server...");
|
||||
redisServer.stop();
|
||||
|
||||
publisherThread.join();
|
||||
}
|
||||
}
|
@@ -1,115 +0,0 @@
|
||||
/*
|
||||
* IXCobraToStdoutTest.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <ixbots/IXCobraToStdoutBot.h>
|
||||
#include <ixcobra/IXCobraConnection.h>
|
||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||
#include <ixcrypto/IXUuid.h>
|
||||
#include <ixsentry/IXSentryClient.h>
|
||||
#include <ixsnake/IXRedisServer.h>
|
||||
#include <ixsnake/IXSnakeServer.h>
|
||||
#include <ixwebsocket/IXHttpServer.h>
|
||||
#include <ixwebsocket/IXUserAgent.h>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
void runPublisher(const ix::CobraConfig& config, const std::string& channel)
|
||||
{
|
||||
ix::CobraMetricsPublisher cobraMetricsPublisher;
|
||||
cobraMetricsPublisher.configure(config, channel);
|
||||
cobraMetricsPublisher.setSession(uuid4());
|
||||
cobraMetricsPublisher.enable(true);
|
||||
|
||||
Json::Value msg;
|
||||
msg["fps"] = 60;
|
||||
|
||||
cobraMetricsPublisher.setGenericAttributes("game", "ody");
|
||||
|
||||
// Wait a bit
|
||||
ix::msleep(500);
|
||||
|
||||
// publish some messages
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #1)
|
||||
cobraMetricsPublisher.push("sms_metric_B_id", msg); // (msg #2)
|
||||
ix::msleep(500);
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #3)
|
||||
cobraMetricsPublisher.push("sms_metric_D_id", msg); // (msg #4)
|
||||
ix::msleep(500);
|
||||
|
||||
cobraMetricsPublisher.push("sms_metric_A_id", msg); // (msg #4)
|
||||
cobraMetricsPublisher.push("sms_metric_F_id", msg); // (msg #5)
|
||||
ix::msleep(500);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
|
||||
{
|
||||
SECTION("Exchange and count sent/received messages.")
|
||||
{
|
||||
int port = getFreePort();
|
||||
snake::AppConfig appConfig = makeSnakeServerConfig(port, true);
|
||||
|
||||
// Start a redis server
|
||||
ix::RedisServer redisServer(appConfig.redisPort);
|
||||
auto res = redisServer.listen();
|
||||
REQUIRE(res.first);
|
||||
redisServer.start();
|
||||
|
||||
// Start a snake server
|
||||
snake::SnakeServer snakeServer(appConfig);
|
||||
snakeServer.run();
|
||||
|
||||
// Run the bot for a small amount of time
|
||||
std::string channel = ix::generateSessionId();
|
||||
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
|
||||
std::string role = "_sub";
|
||||
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
|
||||
std::string endpoint = makeCobraEndpoint(port, true);
|
||||
|
||||
ix::CobraConfig config;
|
||||
config.endpoint = endpoint;
|
||||
config.appkey = appkey;
|
||||
config.rolename = role;
|
||||
config.rolesecret = secret;
|
||||
config.socketTLSOptions = makeClientTLSOptions();
|
||||
|
||||
std::thread publisherThread(runPublisher, config, channel);
|
||||
|
||||
ix::CobraBotConfig cobraBotConfig;
|
||||
cobraBotConfig.cobraConfig = config;
|
||||
cobraBotConfig.channel = channel;
|
||||
cobraBotConfig.runtime = 3; // Only run the bot for 3 seconds
|
||||
cobraBotConfig.enableHeartbeat = false;
|
||||
bool quiet = false;
|
||||
|
||||
// We could try to capture the output ... not sure how.
|
||||
bool fluentd = true;
|
||||
|
||||
int64_t sentCount = ix::cobra_to_stdout_bot(cobraBotConfig, fluentd, quiet);
|
||||
//
|
||||
// We want at least 2 messages to be sent
|
||||
//
|
||||
REQUIRE(sentCount >= 2);
|
||||
|
||||
// Give us 1s for all messages to be received
|
||||
ix::msleep(1000);
|
||||
|
||||
spdlog::info("Stopping snake server...");
|
||||
snakeServer.stop();
|
||||
|
||||
spdlog::info("Stopping redis server...");
|
||||
redisServer.stop();
|
||||
|
||||
publisherThread.join();
|
||||
}
|
||||
}
|
@@ -24,6 +24,8 @@ TEST_CASE("dns", "[net]")
|
||||
res = dnsLookup->resolve(errMsg, [] { return false; });
|
||||
std::cerr << "Error message: " << errMsg << std::endl;
|
||||
REQUIRE(res != nullptr);
|
||||
|
||||
dnsLookup->release(res);
|
||||
}
|
||||
|
||||
SECTION("Test resolving a non-existing hostname")
|
||||
@@ -31,11 +33,7 @@ TEST_CASE("dns", "[net]")
|
||||
auto dnsLookup = std::make_shared<DNSLookup>("wwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww", 80);
|
||||
|
||||
std::string errMsg;
|
||||
struct addrinfo* res = dnsLookup->resolve(errMsg,
|
||||
[]
|
||||
{
|
||||
return false;
|
||||
});
|
||||
struct addrinfo* res = dnsLookup->resolve(errMsg, [] { return false; });
|
||||
std::cerr << "Error message: " << errMsg << std::endl;
|
||||
REQUIRE(res == nullptr);
|
||||
}
|
||||
@@ -46,11 +44,7 @@ TEST_CASE("dns", "[net]")
|
||||
|
||||
std::string errMsg;
|
||||
// The callback returning true means we are requesting cancellation
|
||||
struct addrinfo* res = dnsLookup->resolve(errMsg,
|
||||
[]
|
||||
{
|
||||
return true;
|
||||
});
|
||||
struct addrinfo* res = dnsLookup->resolve(errMsg, [] { return true; });
|
||||
std::cerr << "Error message: " << errMsg << std::endl;
|
||||
REQUIRE(res == nullptr);
|
||||
}
|
||||
|
@@ -1,97 +0,0 @@
|
||||
/*
|
||||
* IXGetFreePort.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
// Using inet_addr will trigger an error on uwp without this
|
||||
// FIXME: use a different api
|
||||
#ifdef _WIN32
|
||||
#ifndef _WINSOCK_DEPRECATED_NO_WARNINGS
|
||||
#define _WINSOCK_DEPRECATED_NO_WARNINGS
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#include "IXGetFreePort.h"
|
||||
|
||||
#include <ixwebsocket/IXNetSystem.h>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <random>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int getAnyFreePortRandom()
|
||||
{
|
||||
std::random_device rd;
|
||||
std::uniform_int_distribution<int> dist(1024 + 1, 65535);
|
||||
|
||||
return dist(rd);
|
||||
}
|
||||
|
||||
int getAnyFreePort()
|
||||
{
|
||||
int sockfd;
|
||||
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
|
||||
{
|
||||
return getAnyFreePortRandom();
|
||||
}
|
||||
|
||||
int enable = 1;
|
||||
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*) &enable, sizeof(enable)) < 0)
|
||||
{
|
||||
return getAnyFreePortRandom();
|
||||
}
|
||||
|
||||
// Bind to port 0. This is the standard way to get a free port.
|
||||
struct sockaddr_in server; // server address information
|
||||
server.sin_family = AF_INET;
|
||||
server.sin_port = htons(0);
|
||||
server.sin_addr.s_addr = inet_addr("127.0.0.1");
|
||||
|
||||
if (bind(sockfd, (struct sockaddr*) &server, sizeof(server)) < 0)
|
||||
{
|
||||
Socket::closeSocket(sockfd);
|
||||
return getAnyFreePortRandom();
|
||||
}
|
||||
|
||||
struct sockaddr_in sa; // server address information
|
||||
socklen_t len = sizeof(sa);
|
||||
if (getsockname(sockfd, (struct sockaddr*) &sa, &len) < 0)
|
||||
{
|
||||
Socket::closeSocket(sockfd);
|
||||
return getAnyFreePortRandom();
|
||||
}
|
||||
|
||||
int port = ntohs(sa.sin_port);
|
||||
Socket::closeSocket(sockfd);
|
||||
|
||||
return port;
|
||||
}
|
||||
|
||||
int getFreePort()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
#if defined(__has_feature)
|
||||
#if __has_feature(address_sanitizer)
|
||||
int port = getAnyFreePortRandom();
|
||||
#else
|
||||
int port = getAnyFreePort();
|
||||
#endif
|
||||
#else
|
||||
int port = getAnyFreePort();
|
||||
#endif
|
||||
//
|
||||
// Only port above 1024 can be used by non root users, but for some
|
||||
// reason I got port 7 returned with macOS when binding on port 0...
|
||||
//
|
||||
if (port > 1024)
|
||||
{
|
||||
return port;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
} // namespace ix
|
@@ -1,12 +0,0 @@
|
||||
/*
|
||||
* IXGetFreePort.h
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2019 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace ix
|
||||
{
|
||||
int getFreePort();
|
||||
} // namespace ix
|
@@ -4,9 +4,9 @@
|
||||
* Copyright (c) 2019 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXGetFreePort.h"
|
||||
#include "catch.hpp"
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXGetFreePort.h>
|
||||
#include <ixwebsocket/IXHttpClient.h>
|
||||
#include <ixwebsocket/IXHttpServer.h>
|
||||
|
||||
@@ -63,11 +63,60 @@ TEST_CASE("http server", "[httpd]")
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
SECTION("Posting plain text data to a local HTTP server")
|
||||
{
|
||||
int port = getFreePort();
|
||||
ix::HttpServer server(port, "127.0.0.1");
|
||||
|
||||
server.setOnConnectionCallback(
|
||||
[](HttpRequestPtr request, std::shared_ptr<ConnectionState>) -> HttpResponsePtr {
|
||||
if (request->method == "POST")
|
||||
{
|
||||
return std::make_shared<HttpResponse>(
|
||||
200, "OK", HttpErrorCode::Ok, WebSocketHttpHeaders(), request->body);
|
||||
}
|
||||
|
||||
return std::make_shared<HttpResponse>(400, "BAD REQUEST");
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
REQUIRE(res.first);
|
||||
server.start();
|
||||
|
||||
HttpClient httpClient;
|
||||
WebSocketHttpHeaders headers;
|
||||
headers["Content-Type"] = "text/plain";
|
||||
|
||||
std::string url("http://127.0.0.1:");
|
||||
url += std::to_string(port);
|
||||
auto args = httpClient.createRequest(url);
|
||||
|
||||
args->extraHeaders = headers;
|
||||
args->connectTimeout = 60;
|
||||
args->transferTimeout = 60;
|
||||
args->verbose = true;
|
||||
args->logger = [](const std::string& msg) { std::cout << msg; };
|
||||
args->body = "Hello World!";
|
||||
|
||||
auto response = httpClient.post(url, args->body, args);
|
||||
|
||||
std::cerr << "Status: " << response->statusCode << std::endl;
|
||||
std::cerr << "Error message: " << response->errorMsg << std::endl;
|
||||
std::cerr << "Body: " << response->body << std::endl;
|
||||
|
||||
REQUIRE(response->errorCode == HttpErrorCode::Ok);
|
||||
REQUIRE(response->statusCode == 200);
|
||||
REQUIRE(response->body == args->body);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("http server redirection", "[httpd_redirect]")
|
||||
{
|
||||
SECTION("Connect to a local HTTP server, with redirection enabled")
|
||||
SECTION(
|
||||
"Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
|
||||
{
|
||||
int port = getFreePort();
|
||||
ix::HttpServer server(port, "127.0.0.1");
|
||||
@@ -117,4 +166,54 @@ TEST_CASE("http server redirection", "[httpd_redirect]")
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
SECTION("Connect to a local HTTP server, with redirection enabled, but we do follow redirects")
|
||||
{
|
||||
int port = getFreePort();
|
||||
ix::HttpServer server(port, "127.0.0.1");
|
||||
server.makeRedirectServer("http://www.google.com");
|
||||
|
||||
auto res = server.listen();
|
||||
REQUIRE(res.first);
|
||||
server.start();
|
||||
|
||||
HttpClient httpClient;
|
||||
WebSocketHttpHeaders headers;
|
||||
|
||||
std::string url("http://127.0.0.1:");
|
||||
url += std::to_string(port);
|
||||
url += "/data/foo.txt";
|
||||
auto args = httpClient.createRequest(url);
|
||||
|
||||
args->extraHeaders = headers;
|
||||
args->connectTimeout = 60;
|
||||
args->transferTimeout = 60;
|
||||
args->followRedirects = true;
|
||||
args->maxRedirects = 10;
|
||||
args->verbose = true;
|
||||
args->compress = true;
|
||||
args->logger = [](const std::string& msg) { std::cout << msg; };
|
||||
args->onProgressCallback = [](int current, int total) -> bool {
|
||||
std::cerr << "\r"
|
||||
<< "Downloaded " << current << " bytes out of " << total;
|
||||
return true;
|
||||
};
|
||||
|
||||
auto response = httpClient.get(url, args);
|
||||
|
||||
for (auto it : response->headers)
|
||||
{
|
||||
std::cerr << it.first << ": " << it.second << std::endl;
|
||||
}
|
||||
|
||||
std::cerr << "Upload size: " << response->uploadSize << std::endl;
|
||||
std::cerr << "Download size: " << response->downloadSize << std::endl;
|
||||
std::cerr << "Status: " << response->statusCode << std::endl;
|
||||
std::cerr << "Error message: " << response->errorMsg << std::endl;
|
||||
|
||||
REQUIRE(response->errorCode == HttpErrorCode::Ok);
|
||||
REQUIRE(response->statusCode == 200);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
47
test/IXStrCaseCompareTest.cpp
Normal file
47
test/IXStrCaseCompareTest.cpp
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* IXStrCaseCompareTest.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXUrlParser.h>
|
||||
#include <string.h>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace ix
|
||||
{
|
||||
TEST_CASE("str_case_compare", "[str_case_compare]")
|
||||
{
|
||||
SECTION("1")
|
||||
{
|
||||
using HttpHeaders = std::map<std::string, std::string, CaseInsensitiveLess>;
|
||||
|
||||
HttpHeaders httpHeaders;
|
||||
|
||||
httpHeaders["foo"] = "foo";
|
||||
|
||||
REQUIRE(httpHeaders["foo"] == "foo");
|
||||
REQUIRE(httpHeaders["missing"] == "");
|
||||
|
||||
// Comparison should be case insensitive
|
||||
REQUIRE(httpHeaders["Foo"] == "foo");
|
||||
REQUIRE(httpHeaders["Foo"] != "bar");
|
||||
}
|
||||
|
||||
SECTION("2")
|
||||
{
|
||||
using HttpHeaders = std::map<std::string, std::string, CaseInsensitiveLess>;
|
||||
|
||||
HttpHeaders headers;
|
||||
|
||||
headers["Upgrade"] = "webSocket";
|
||||
|
||||
REQUIRE(!CaseInsensitiveLess::cmp(headers["upGRADE"], "webSocket"));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ix
|
42
test/IXStreamSqlTest.cpp
Normal file
42
test/IXStreamSqlTest.cpp
Normal file
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* IXStreamSqlTest.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <iostream>
|
||||
#include <ixsnake/IXStreamSql.h>
|
||||
#include <string.h>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace ix
|
||||
{
|
||||
TEST_CASE("stream_sql", "[streamsql]")
|
||||
{
|
||||
SECTION("expression A")
|
||||
{
|
||||
snake::StreamSql streamSql(
|
||||
"select * from subscriber_republished_v1_neo where session LIKE '%123456%'");
|
||||
|
||||
nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}};
|
||||
|
||||
CHECK(streamSql.match(msg));
|
||||
}
|
||||
|
||||
SECTION("expression A")
|
||||
{
|
||||
snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where "
|
||||
"session = '30091320ed8d4e50b758f8409b83bed7'");
|
||||
|
||||
nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"},
|
||||
{"id", "foo_id"},
|
||||
{"timestamp", 12}};
|
||||
|
||||
CHECK(streamSql.match(msg));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ix
|
@@ -11,6 +11,7 @@
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXNetSystem.h>
|
||||
#include <ixwebsocket/IXUuid.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <mutex>
|
||||
#include <random>
|
||||
@@ -84,36 +85,37 @@ namespace ix
|
||||
|
||||
bool startWebSocketEchoServer(ix::WebSocketServer& server)
|
||||
{
|
||||
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback(
|
||||
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
TLogger() << "Remote ip: " << remoteIp;
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
TLogger() << "Closed connection";
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client.get() != &webSocket)
|
||||
{
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
client->send(msg->str, msg->binary);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
TLogger() << "Closed connection";
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client != webSocket)
|
||||
{
|
||||
client->send(msg->str, msg->binary);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
@@ -137,8 +139,9 @@ namespace ix
|
||||
std::streamoff size = file.tellg();
|
||||
file.seekg(0, file.beg);
|
||||
|
||||
memblock.resize((size_t) size);
|
||||
file.read((char*) &memblock.front(), static_cast<std::streamsize>(size));
|
||||
memblock.reserve((size_t) size);
|
||||
memblock.insert(
|
||||
memblock.begin(), std::istream_iterator<char>(file), std::istream_iterator<char>());
|
||||
|
||||
return memblock;
|
||||
}
|
||||
@@ -202,44 +205,4 @@ namespace ix
|
||||
#endif
|
||||
return scheme;
|
||||
}
|
||||
|
||||
snake::AppConfig makeSnakeServerConfig(int port, bool preferTLS)
|
||||
{
|
||||
snake::AppConfig appConfig;
|
||||
appConfig.port = port;
|
||||
appConfig.hostname = "127.0.0.1";
|
||||
appConfig.verbose = true;
|
||||
appConfig.redisPort = getFreePort();
|
||||
appConfig.redisPassword = "";
|
||||
appConfig.redisHosts.push_back("localhost"); // only one host supported now
|
||||
appConfig.socketTLSOptions = makeServerTLSOptions(preferTLS);
|
||||
|
||||
std::string appsConfigPath("appsConfig.json");
|
||||
|
||||
// Parse config file
|
||||
auto str = readAsString(appsConfigPath);
|
||||
if (str.empty())
|
||||
{
|
||||
std::cout << "Cannot read content of " << appsConfigPath << std::endl;
|
||||
return appConfig;
|
||||
}
|
||||
|
||||
std::cout << str << std::endl;
|
||||
auto apps = nlohmann::json::parse(str);
|
||||
appConfig.apps = apps["apps"];
|
||||
|
||||
// Display config on the terminal for debugging
|
||||
dumpConfig(appConfig);
|
||||
|
||||
return appConfig;
|
||||
}
|
||||
|
||||
std::string makeCobraEndpoint(int port, bool preferTLS)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << getWsScheme(preferTLS) << "localhost:" << port;
|
||||
std::string endpoint = ss.str();
|
||||
|
||||
return endpoint;
|
||||
}
|
||||
} // namespace ix
|
||||
|
@@ -6,9 +6,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "IXGetFreePort.h"
|
||||
#include <iostream>
|
||||
#include <ixsnake/IXAppConfig.h>
|
||||
#include <ixwebsocket/IXGetFreePort.h>
|
||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||
#include <ixwebsocket/IXWebSocketServer.h>
|
||||
#include <mutex>
|
||||
@@ -51,12 +50,8 @@ namespace ix
|
||||
|
||||
bool startWebSocketEchoServer(ix::WebSocketServer& server);
|
||||
|
||||
snake::AppConfig makeSnakeServerConfig(int port, bool preferTLS);
|
||||
|
||||
SocketTLSOptions makeClientTLSOptions();
|
||||
SocketTLSOptions makeServerTLSOptions(bool preferTLS);
|
||||
std::string getHttpScheme();
|
||||
std::string getWsScheme(bool preferTLS);
|
||||
|
||||
std::string makeCobraEndpoint(int port, bool preferTLS);
|
||||
} // namespace ix
|
||||
|
@@ -18,10 +18,10 @@ using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
class WebSocketChat
|
||||
class WebSocketBroadcastChat
|
||||
{
|
||||
public:
|
||||
WebSocketChat(const std::string& user, const std::string& session, int port);
|
||||
WebSocketBroadcastChat(const std::string& user, const std::string& session, int port);
|
||||
|
||||
void subscribe(const std::string& channel);
|
||||
void start();
|
||||
@@ -47,7 +47,9 @@ namespace
|
||||
mutable std::mutex _mutex;
|
||||
};
|
||||
|
||||
WebSocketChat::WebSocketChat(const std::string& user, const std::string& session, int port)
|
||||
WebSocketBroadcastChat::WebSocketBroadcastChat(const std::string& user,
|
||||
const std::string& session,
|
||||
int port)
|
||||
: _user(user)
|
||||
, _session(session)
|
||||
, _port(port)
|
||||
@@ -55,42 +57,40 @@ namespace
|
||||
_webSocket.setTLSOptions(makeClientTLSOptions());
|
||||
}
|
||||
|
||||
size_t WebSocketChat::getReceivedMessagesCount() const
|
||||
size_t WebSocketBroadcastChat::getReceivedMessagesCount() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
return _receivedMessages.size();
|
||||
}
|
||||
|
||||
const std::vector<std::string>& WebSocketChat::getReceivedMessages() const
|
||||
const std::vector<std::string>& WebSocketBroadcastChat::getReceivedMessages() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
return _receivedMessages;
|
||||
}
|
||||
|
||||
void WebSocketChat::appendMessage(const std::string& message)
|
||||
void WebSocketBroadcastChat::appendMessage(const std::string& message)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
_receivedMessages.push_back(message);
|
||||
}
|
||||
|
||||
bool WebSocketChat::isReady() const
|
||||
bool WebSocketBroadcastChat::isReady() const
|
||||
{
|
||||
return _webSocket.getReadyState() == ix::ReadyState::Open;
|
||||
}
|
||||
|
||||
void WebSocketChat::stop()
|
||||
void WebSocketBroadcastChat::stop()
|
||||
{
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
void WebSocketChat::start()
|
||||
void WebSocketBroadcastChat::start()
|
||||
{
|
||||
//
|
||||
// Which server ??
|
||||
//
|
||||
std::string url;
|
||||
{
|
||||
bool preferTLS = true;
|
||||
url = makeCobraEndpoint(_port, preferTLS);
|
||||
}
|
||||
|
||||
_webSocket.setUrl(url);
|
||||
|
||||
std::stringstream ss;
|
||||
@@ -156,7 +156,8 @@ namespace
|
||||
_webSocket.start();
|
||||
}
|
||||
|
||||
std::pair<std::string, std::string> WebSocketChat::decodeMessage(const std::string& str)
|
||||
std::pair<std::string, std::string> WebSocketBroadcastChat::decodeMessage(
|
||||
const std::string& str)
|
||||
{
|
||||
std::string errMsg;
|
||||
MsgPack msg = MsgPack::parse(str, errMsg);
|
||||
@@ -167,7 +168,7 @@ namespace
|
||||
return std::pair<std::string, std::string>(msg_user, msg_text);
|
||||
}
|
||||
|
||||
std::string WebSocketChat::encodeMessage(const std::string& text)
|
||||
std::string WebSocketBroadcastChat::encodeMessage(const std::string& text)
|
||||
{
|
||||
std::map<MsgPack, MsgPack> obj;
|
||||
obj["user"] = _user;
|
||||
@@ -179,7 +180,7 @@ namespace
|
||||
return output;
|
||||
}
|
||||
|
||||
void WebSocketChat::sendMessage(const std::string& text)
|
||||
void WebSocketBroadcastChat::sendMessage(const std::string& text)
|
||||
{
|
||||
_webSocket.sendBinary(encodeMessage(text));
|
||||
}
|
||||
@@ -189,15 +190,17 @@ namespace
|
||||
bool preferTLS = true;
|
||||
server.setTLSOptions(makeServerTLSOptions(preferTLS));
|
||||
|
||||
server.setOnConnectionCallback([&server, &connectionId](
|
||||
std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
connectionState->computeId();
|
||||
TLogger() << "remote ip: " << remoteIp;
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
@@ -216,14 +219,13 @@ namespace
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client != webSocket)
|
||||
if (client.get() != &webSocket)
|
||||
{
|
||||
client->send(msg->str, msg->binary);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
@@ -247,11 +249,11 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
|
||||
REQUIRE(startServer(server, connectionId));
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
std::vector<std::shared_ptr<WebSocketChat>> chatClients;
|
||||
std::vector<std::shared_ptr<WebSocketBroadcastChat>> chatClients;
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
std::string user("user_" + std::to_string(i));
|
||||
chatClients.push_back(std::make_shared<WebSocketChat>(user, session, port));
|
||||
chatClients.push_back(std::make_shared<WebSocketBroadcastChat>(user, session, port));
|
||||
chatClients[i]->start();
|
||||
ix::msleep(50);
|
||||
}
|
||||
|
@@ -193,37 +193,38 @@ namespace
|
||||
|
||||
bool startServer(ix::WebSocketServer& server)
|
||||
{
|
||||
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback(
|
||||
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
server.setOnClientMessageCallback(
|
||||
[&server](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
TLogger() << "remote ip: " << remoteIp;
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
log("Closed connection");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client.get() != &webSocket)
|
||||
{
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
client->sendBinary(msg->str);
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
log("Closed connection");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client != webSocket)
|
||||
{
|
||||
client->sendBinary(msg->str);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
@@ -284,27 +285,27 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
|
||||
int attempts = 0;
|
||||
while (chatA.getReceivedMessagesCount() != 3 || chatB.getReceivedMessagesCount() != 3)
|
||||
{
|
||||
REQUIRE(attempts++ < 10);
|
||||
CHECK(attempts++ < 10);
|
||||
ix::msleep(1000);
|
||||
}
|
||||
|
||||
chatA.stop();
|
||||
chatB.stop();
|
||||
|
||||
REQUIRE(chatA.getReceivedMessagesCount() == 3);
|
||||
REQUIRE(chatB.getReceivedMessagesCount() == 3);
|
||||
CHECK(chatA.getReceivedMessagesCount() == 3);
|
||||
CHECK(chatB.getReceivedMessagesCount() == 3);
|
||||
|
||||
REQUIRE(chatB.getReceivedMessages()[0] == "from A1");
|
||||
REQUIRE(chatB.getReceivedMessages()[1] == "from A2");
|
||||
REQUIRE(chatB.getReceivedMessages()[2] == "from A3");
|
||||
CHECK(chatB.getReceivedMessages()[0] == "from A1");
|
||||
CHECK(chatB.getReceivedMessages()[1] == "from A2");
|
||||
CHECK(chatB.getReceivedMessages()[2] == "from A3");
|
||||
|
||||
REQUIRE(chatA.getReceivedMessages()[0] == "from B1");
|
||||
REQUIRE(chatA.getReceivedMessages()[1] == "from B2");
|
||||
REQUIRE(chatA.getReceivedMessages()[2].size() == bigMessage.size());
|
||||
CHECK(chatA.getReceivedMessages()[0] == "from B1");
|
||||
CHECK(chatA.getReceivedMessages()[1] == "from B2");
|
||||
CHECK(chatA.getReceivedMessages()[2].size() == bigMessage.size());
|
||||
|
||||
// Give us 1000ms for the server to notice that clients went away
|
||||
ix::msleep(1000);
|
||||
REQUIRE(server.getClients().size() == 0);
|
||||
CHECK(server.getClients().size() == 0);
|
||||
|
||||
ix::reportWebSocketTraffic();
|
||||
}
|
||||
|
@@ -168,41 +168,37 @@ namespace
|
||||
std::mutex& mutexWrite)
|
||||
{
|
||||
// A dev/null server
|
||||
server.setOnConnectionCallback(
|
||||
server.setOnClientMessageCallback(
|
||||
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
|
||||
std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback([webSocket,
|
||||
connectionState,
|
||||
&receivedCloseCode,
|
||||
&receivedCloseReason,
|
||||
&receivedCloseRemote,
|
||||
&mutexWrite](const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& /*webSocket*/,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New server connection";
|
||||
TLogger() << "remote ip: " << remoteIp;
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << "New server connection";
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Server closed connection(" << msg->closeInfo.code << ","
|
||||
<< msg->closeInfo.reason << ")";
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Server closed connection(" << msg->closeInfo.code << ","
|
||||
<< msg->closeInfo.reason << ")";
|
||||
log(ss.str());
|
||||
|
||||
std::lock_guard<std::mutex> lck(mutexWrite);
|
||||
std::lock_guard<std::mutex> lck(mutexWrite);
|
||||
|
||||
receivedCloseCode = msg->closeInfo.code;
|
||||
receivedCloseReason = std::string(msg->closeInfo.reason);
|
||||
receivedCloseRemote = msg->closeInfo.remote;
|
||||
}
|
||||
});
|
||||
receivedCloseCode = msg->closeInfo.code;
|
||||
receivedCloseReason = std::string(msg->closeInfo.reason);
|
||||
receivedCloseRemote = msg->closeInfo.remote;
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
|
183
test/IXWebSocketLeakTest.cpp
Normal file
183
test/IXWebSocketLeakTest.cpp
Normal file
@@ -0,0 +1,183 @@
|
||||
/*
|
||||
* IXWebSocketServer.cpp
|
||||
* Author: Benjamin Sergeant, @marcelkauf
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <ixwebsocket/IXWebSocketServer.h>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
class WebSocketClient
|
||||
{
|
||||
public:
|
||||
WebSocketClient(int port);
|
||||
void start();
|
||||
void stop();
|
||||
bool isReady() const;
|
||||
bool hasConnectionError() const;
|
||||
|
||||
private:
|
||||
ix::WebSocket _webSocket;
|
||||
int _port;
|
||||
std::atomic<bool> _connectionError;
|
||||
};
|
||||
|
||||
WebSocketClient::WebSocketClient(int port)
|
||||
: _port(port)
|
||||
, _connectionError(false)
|
||||
{
|
||||
}
|
||||
|
||||
bool WebSocketClient::hasConnectionError() const
|
||||
{
|
||||
return _connectionError;
|
||||
}
|
||||
|
||||
bool WebSocketClient::isReady() const
|
||||
{
|
||||
return _webSocket.getReadyState() == ix::ReadyState::Open;
|
||||
}
|
||||
|
||||
void WebSocketClient::stop()
|
||||
{
|
||||
_webSocket.stop();
|
||||
}
|
||||
|
||||
void WebSocketClient::start()
|
||||
{
|
||||
std::string url;
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "ws://localhost:" << _port << "/";
|
||||
|
||||
url = ss.str();
|
||||
}
|
||||
|
||||
_webSocket.setUrl(url);
|
||||
_webSocket.disableAutomaticReconnection();
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("Connecting to url: ") + url);
|
||||
|
||||
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) {
|
||||
std::stringstream ss;
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
log("client connected");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
log("client disconnected");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
_connectionError = true;
|
||||
log("error");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
log("pong");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
log("ping");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
log("message");
|
||||
}
|
||||
else
|
||||
{
|
||||
log("invalid type");
|
||||
}
|
||||
});
|
||||
|
||||
_webSocket.start();
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Websocket leak test")
|
||||
{
|
||||
SECTION("Websocket destructor is called when closing the connection.")
|
||||
{
|
||||
// stores the server websocket in order to check the use_count
|
||||
std::shared_ptr<WebSocket> webSocketPtr;
|
||||
|
||||
{
|
||||
int port = getFreePort();
|
||||
WebSocketServer server(port);
|
||||
|
||||
server.setOnConnectionCallback(
|
||||
[&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState,
|
||||
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||
// original ptr in WebSocketServer::handleConnection and the callback argument
|
||||
REQUIRE(webSocket.use_count() == 2);
|
||||
webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
log(std::string("New connection id: ") + connectionState->getId());
|
||||
// original ptr in WebSocketServer::handleConnection, captured ptr of
|
||||
// this callback, and ptr in WebSocketServer::_clients
|
||||
REQUIRE(webSocket.use_count() == 3);
|
||||
webSocketPtr = std::shared_ptr<WebSocket>(webSocket);
|
||||
REQUIRE(webSocket.use_count() == 4);
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
log(std::string("Client closed connection id: ") +
|
||||
connectionState->getId());
|
||||
}
|
||||
else
|
||||
{
|
||||
log(std::string(msg->str));
|
||||
}
|
||||
});
|
||||
// original ptr in WebSocketServer::handleConnection, argument of this callback,
|
||||
// and captured ptr in websocket callback
|
||||
REQUIRE(webSocket.use_count() == 3);
|
||||
});
|
||||
|
||||
server.listen();
|
||||
server.start();
|
||||
|
||||
WebSocketClient webSocketClient(port);
|
||||
webSocketClient.start();
|
||||
|
||||
while (true)
|
||||
{
|
||||
REQUIRE(!webSocketClient.hasConnectionError());
|
||||
if (webSocketClient.isReady()) break;
|
||||
ix::msleep(10);
|
||||
}
|
||||
|
||||
REQUIRE(server.getClients().size() == 1);
|
||||
// same value as in Open-handler above
|
||||
REQUIRE(webSocketPtr.use_count() == 4);
|
||||
|
||||
ix::msleep(500);
|
||||
webSocketClient.stop();
|
||||
ix::msleep(500);
|
||||
REQUIRE(server.getClients().size() == 0);
|
||||
|
||||
// websocket should only be referenced by webSocketPtr but is still used by the
|
||||
// websocket callback
|
||||
REQUIRE(webSocketPtr.use_count() == 1);
|
||||
webSocketPtr->setOnMessageCallback(nullptr);
|
||||
// websocket should only be referenced by webSocketPtr
|
||||
REQUIRE(webSocketPtr.use_count() == 1);
|
||||
server.stop();
|
||||
}
|
||||
// websocket should only be referenced by webSocketPtr
|
||||
REQUIRE(webSocketPtr.use_count() == 1);
|
||||
}
|
||||
}
|
76
test/IXWebSocketPerMessageDeflateCompressorTest.cpp
Normal file
76
test/IXWebSocketPerMessageDeflateCompressorTest.cpp
Normal file
@@ -0,0 +1,76 @@
|
||||
/*
|
||||
* IXWebSocketPerMessageDeflateCodecTest.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||
*
|
||||
* make build_test && build/test/ixwebsocket_unittest per-message-deflate-codec
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXWebSocketPerMessageDeflateCodec.h>
|
||||
#include <string.h>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace ix
|
||||
{
|
||||
std::string compressAndDecompress(const std::string& a)
|
||||
{
|
||||
std::string b, c;
|
||||
|
||||
WebSocketPerMessageDeflateCompressor compressor;
|
||||
compressor.init(11, true);
|
||||
compressor.compress(a, b);
|
||||
|
||||
WebSocketPerMessageDeflateDecompressor decompressor;
|
||||
decompressor.init(11, true);
|
||||
decompressor.decompress(b, c);
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
std::string compressAndDecompressVector(const std::string& a)
|
||||
{
|
||||
std::string b, c;
|
||||
|
||||
std::vector<uint8_t> vec(a.begin(), a.end());
|
||||
|
||||
WebSocketPerMessageDeflateCompressor compressor;
|
||||
compressor.init(11, true);
|
||||
compressor.compress(vec, b);
|
||||
|
||||
WebSocketPerMessageDeflateDecompressor decompressor;
|
||||
decompressor.init(11, true);
|
||||
decompressor.decompress(b, c);
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
TEST_CASE("per-message-deflate-codec", "[zlib]")
|
||||
{
|
||||
SECTION("string api")
|
||||
{
|
||||
REQUIRE(compressAndDecompress("") == "");
|
||||
REQUIRE(compressAndDecompress("foo") == "foo");
|
||||
REQUIRE(compressAndDecompress("bar") == "bar");
|
||||
REQUIRE(compressAndDecompress("asdcaseqw`21897dehqwed") == "asdcaseqw`21897dehqwed");
|
||||
REQUIRE(compressAndDecompress("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
|
||||
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
|
||||
}
|
||||
|
||||
SECTION("vector api")
|
||||
{
|
||||
REQUIRE(compressAndDecompressVector("") == "");
|
||||
REQUIRE(compressAndDecompressVector("foo") == "foo");
|
||||
REQUIRE(compressAndDecompressVector("bar") == "bar");
|
||||
REQUIRE(compressAndDecompressVector("asdcaseqw`21897dehqwed") ==
|
||||
"asdcaseqw`21897dehqwed");
|
||||
REQUIRE(
|
||||
compressAndDecompressVector("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
|
||||
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ix
|
@@ -19,7 +19,7 @@ namespace
|
||||
class WebSocketClient
|
||||
{
|
||||
public:
|
||||
WebSocketClient(int port, bool useHeartBeatMethod);
|
||||
WebSocketClient(int port);
|
||||
|
||||
void start();
|
||||
void stop();
|
||||
@@ -29,12 +29,10 @@ namespace
|
||||
private:
|
||||
ix::WebSocket _webSocket;
|
||||
int _port;
|
||||
bool _useHeartBeatMethod;
|
||||
};
|
||||
|
||||
WebSocketClient::WebSocketClient(int port, bool useHeartBeatMethod)
|
||||
WebSocketClient::WebSocketClient(int port)
|
||||
: _port(port)
|
||||
, _useHeartBeatMethod(useHeartBeatMethod)
|
||||
{
|
||||
;
|
||||
}
|
||||
@@ -63,49 +61,37 @@ namespace
|
||||
|
||||
// The important bit for this test.
|
||||
// Set a 1 second heartbeat with the setter method to test
|
||||
if (_useHeartBeatMethod)
|
||||
{
|
||||
_webSocket.setPingInterval(1);
|
||||
}
|
||||
else
|
||||
{
|
||||
_webSocket.setPingInterval(1);
|
||||
}
|
||||
_webSocket.setPingInterval(1);
|
||||
|
||||
std::stringstream ss;
|
||||
log(std::string("Connecting to url: ") + url);
|
||||
|
||||
_webSocket.setOnMessageCallback([](ix::WebSocketMessageType messageType,
|
||||
const std::string& str,
|
||||
size_t wireSize,
|
||||
const ix::WebSocketErrorInfo& error,
|
||||
const ix::WebSocketOpenInfo& openInfo,
|
||||
const ix::WebSocketCloseInfo& closeInfo) {
|
||||
_webSocket.setOnMessageCallback([](const ix::WebSocketMessagePtr& msg) {
|
||||
std::stringstream ss;
|
||||
if (messageType == ix::WebSocketMessageType::Open)
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
log("client connected");
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Close)
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
log("client disconnected");
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Error)
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "Error ! " << error.reason;
|
||||
ss << "Error ! " << msg->errorInfo.reason;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Pong)
|
||||
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||
{
|
||||
ss << "Received pong message " << str;
|
||||
ss << "Received pong message " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Ping)
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
ss << "Received ping message " << str;
|
||||
ss << "Received ping message " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Message)
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
// too many messages to log
|
||||
}
|
||||
@@ -132,33 +118,28 @@ namespace
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback(
|
||||
[webSocket, connectionState, &server, &receivedPingMessages](
|
||||
ix::WebSocketMessageType messageType,
|
||||
const std::string& str,
|
||||
size_t wireSize,
|
||||
const ix::WebSocketErrorInfo& error,
|
||||
const ix::WebSocketOpenInfo& openInfo,
|
||||
const ix::WebSocketCloseInfo& closeInfo) {
|
||||
if (messageType == ix::WebSocketMessageType::Open)
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New server connection";
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << openInfo.uri;
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : openInfo.headers)
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Close)
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
log("Server closed connection");
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Ping)
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
{
|
||||
log("Server received a ping");
|
||||
receivedPingMessages++;
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Message)
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
// to many messages to log
|
||||
for (auto client : server.getClients())
|
||||
@@ -193,8 +174,7 @@ TEST_CASE("Websocket_ping_no_data_sent_setPingInterval", "[setPingInterval]")
|
||||
REQUIRE(startServer(server, serverReceivedPingMessages));
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval
|
||||
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
|
||||
WebSocketClient webSocketClient(port);
|
||||
|
||||
webSocketClient.start();
|
||||
|
||||
@@ -236,8 +216,7 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
|
||||
REQUIRE(startServer(server, serverReceivedPingMessages));
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval
|
||||
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
|
||||
WebSocketClient webSocketClient(port);
|
||||
|
||||
webSocketClient.start();
|
||||
|
||||
@@ -261,7 +240,7 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval", "[setPingInterval]")
|
||||
// Here we test ping interval
|
||||
// client has sent data, but ping should have been sent no matter what
|
||||
// -> expected ping messages == 3 as 900+900+1300 = 3100 seconds, 1 ping sent every second
|
||||
REQUIRE(serverReceivedPingMessages == 3);
|
||||
REQUIRE(serverReceivedPingMessages >= 2);
|
||||
|
||||
// Give us 1000ms for the server to notice that clients went away
|
||||
ix::msleep(1000);
|
||||
@@ -284,8 +263,7 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval_half_full", "[setPingInterva
|
||||
REQUIRE(startServer(server, serverReceivedPingMessages));
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval
|
||||
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
|
||||
WebSocketClient webSocketClient(port);
|
||||
|
||||
webSocketClient.start();
|
||||
|
||||
@@ -338,8 +316,7 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval_full", "[setPingInterval]")
|
||||
REQUIRE(startServer(server, serverReceivedPingMessages));
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
bool useSetHeartBeatPeriodMethod = false; // so use setPingInterval
|
||||
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
|
||||
WebSocketClient webSocketClient(port);
|
||||
|
||||
webSocketClient.start();
|
||||
|
||||
@@ -363,8 +340,9 @@ TEST_CASE("Websocket_ping_data_sent_setPingInterval_full", "[setPingInterval]")
|
||||
|
||||
// Here we test ping interval
|
||||
// client has sent data, but ping should have been sent no matter what
|
||||
// -> expected ping messages == 1, 1 ping sent every second
|
||||
REQUIRE(serverReceivedPingMessages == 1);
|
||||
// -> expected ping messages == 2, 1 ping sent every second
|
||||
// The first ping is sent right away on connect
|
||||
REQUIRE(serverReceivedPingMessages == 2);
|
||||
|
||||
ix::msleep(100);
|
||||
|
||||
@@ -392,8 +370,7 @@ TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setPingInterval]")
|
||||
REQUIRE(startServer(server, serverReceivedPingMessages));
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
bool useSetHeartBeatPeriodMethod = true;
|
||||
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
|
||||
WebSocketClient webSocketClient(port);
|
||||
|
||||
webSocketClient.start();
|
||||
|
||||
@@ -406,14 +383,13 @@ TEST_CASE("Websocket_ping_no_data_sent_setHeartBeatPeriod", "[setPingInterval]")
|
||||
|
||||
REQUIRE(server.getClients().size() == 1);
|
||||
|
||||
ix::msleep(1900);
|
||||
ix::msleep(2100);
|
||||
|
||||
webSocketClient.stop();
|
||||
|
||||
|
||||
// Here we test ping interval
|
||||
// -> expected ping messages == 1 as 1900 seconds, 1 ping sent every second
|
||||
REQUIRE(serverReceivedPingMessages == 1);
|
||||
// -> expected ping messages == 2 as 2100 seconds, 1 ping sent every second
|
||||
REQUIRE(serverReceivedPingMessages == 2);
|
||||
|
||||
// Give us 1000ms for the server to notice that clients went away
|
||||
ix::msleep(1000);
|
||||
@@ -436,8 +412,7 @@ TEST_CASE("Websocket_ping_data_sent_setHeartBeatPeriod", "[setPingInterval]")
|
||||
REQUIRE(startServer(server, serverReceivedPingMessages));
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
bool useSetHeartBeatPeriodMethod = true;
|
||||
WebSocketClient webSocketClient(port, useSetHeartBeatPeriodMethod);
|
||||
WebSocketClient webSocketClient(port);
|
||||
|
||||
webSocketClient.start();
|
||||
|
||||
@@ -464,7 +439,7 @@ TEST_CASE("Websocket_ping_data_sent_setHeartBeatPeriod", "[setPingInterval]")
|
||||
// Here we test ping interval
|
||||
// client has sent data, but ping should have been sent no matter what
|
||||
// -> expected ping messages == 2 as 900+900+1100 = 2900 seconds, 1 ping sent every second
|
||||
REQUIRE(serverReceivedPingMessages == 2);
|
||||
REQUIRE(serverReceivedPingMessages >= 2);
|
||||
|
||||
// Give us 1000ms for the server to notice that clients went away
|
||||
ix::msleep(1000);
|
||||
|
@@ -33,15 +33,17 @@ namespace ix
|
||||
};
|
||||
server.setConnectionStateFactory(factory);
|
||||
|
||||
server.setOnConnectionCallback([&server, &connectionId](
|
||||
std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
connectionState->computeId();
|
||||
TLogger() << "remote ip: " << remoteIp;
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
@@ -60,14 +62,13 @@ namespace ix
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
if (client != webSocket)
|
||||
if (client.get() != &webSocket)
|
||||
{
|
||||
client->send(msg->str, msg->binary);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
|
@@ -16,39 +16,39 @@ using namespace ix;
|
||||
|
||||
bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
|
||||
{
|
||||
server.setOnConnectionCallback(
|
||||
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback([webSocket, connectionState, &server, &subProtocols](
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
server.setOnClientMessageCallback(
|
||||
[&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState,
|
||||
WebSocket& webSocket,
|
||||
const ix::WebSocketMessagePtr& msg) {
|
||||
auto remoteIp = connectionState->getRemoteIp();
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
TLogger() << "remote ip: " << remoteIp;
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
|
||||
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
subProtocols = msg->openInfo.headers["Sec-WebSocket-Protocol"];
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
log("Closed connection");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
log("Closed connection");
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
for (auto&& client : server.getClients())
|
||||
if (client.get() != &webSocket)
|
||||
{
|
||||
if (client != webSocket)
|
||||
{
|
||||
client->sendBinary(msg->str);
|
||||
}
|
||||
client->sendBinary(msg->str);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
|
171
test/compatibility/cpp/libwebsockets/devnull_client.cpp
Normal file
171
test/compatibility/cpp/libwebsockets/devnull_client.cpp
Normal file
@@ -0,0 +1,171 @@
|
||||
/*
|
||||
* lws-minimal-ws-client
|
||||
*
|
||||
* Written in 2010-2019 by Andy Green <andy@warmcat.com>
|
||||
*
|
||||
* This file is made available under the Creative Commons CC0 1.0
|
||||
* Universal Public Domain Dedication.
|
||||
*
|
||||
* This demonstrates the a minimal ws client using lws.
|
||||
*
|
||||
* Original programs connects to https://libwebsockets.org/ and makes a
|
||||
* wss connection to the dumb-increment protocol there. While
|
||||
* connected, it prints the numbers it is being sent by
|
||||
* dumb-increment protocol.
|
||||
*
|
||||
* This is modified to make a test client which counts how much messages
|
||||
* per second can be received.
|
||||
*
|
||||
* libwebsockets$ make && ./a.out
|
||||
* g++ --std=c++14 -I/usr/local/opt/openssl/include devnull_client.cpp -lwebsockets
|
||||
* messages received: 0 per second 0 total
|
||||
* [2020/08/02 19:22:21:4774] U: LWS minimal ws client rx [-d <logs>] [--h2]
|
||||
* [2020/08/02 19:22:21:4814] U: callback_dumb_increment: established
|
||||
* messages received: 0 per second 0 total
|
||||
* messages received: 180015 per second 180015 total
|
||||
* messages received: 172866 per second 352881 total
|
||||
* messages received: 176177 per second 529058 total
|
||||
* messages received: 174191 per second 703249 total
|
||||
* messages received: 193397 per second 896646 total
|
||||
* messages received: 196385 per second 1093031 total
|
||||
* messages received: 194593 per second 1287624 total
|
||||
* messages received: 189484 per second 1477108 total
|
||||
* messages received: 200825 per second 1677933 total
|
||||
* messages received: 183542 per second 1861475 total
|
||||
* ^C[2020/08/02 19:22:33:4450] U: Completed OK
|
||||
*
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <iostream>
|
||||
#include <libwebsockets.h>
|
||||
#include <signal.h>
|
||||
#include <string.h>
|
||||
#include <thread>
|
||||
|
||||
static int interrupted;
|
||||
static struct lws* client_wsi;
|
||||
|
||||
std::atomic<uint64_t> receivedCount(0);
|
||||
|
||||
static int callback_dumb_increment(
|
||||
struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len)
|
||||
{
|
||||
switch (reason)
|
||||
{
|
||||
/* because we are protocols[0] ... */
|
||||
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
||||
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char*) in : "(null)");
|
||||
client_wsi = NULL;
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: established\n", __func__); break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE: receivedCount++; break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_CLOSED: client_wsi = NULL; break;
|
||||
|
||||
default: break;
|
||||
}
|
||||
|
||||
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
||||
}
|
||||
|
||||
static const struct lws_protocols protocols[] = {{
|
||||
"dumb-increment-protocol",
|
||||
callback_dumb_increment,
|
||||
0,
|
||||
0,
|
||||
},
|
||||
{NULL, NULL, 0, 0}};
|
||||
|
||||
static void sigint_handler(int sig)
|
||||
{
|
||||
interrupted = 1;
|
||||
}
|
||||
|
||||
int main(int argc, const char** argv)
|
||||
{
|
||||
uint64_t receivedCountTotal(0);
|
||||
uint64_t receivedCountPerSecs(0);
|
||||
|
||||
auto timer = [&receivedCountTotal, &receivedCountPerSecs] {
|
||||
while (!interrupted)
|
||||
{
|
||||
std::cerr << "messages received: " << receivedCountPerSecs << " per second "
|
||||
<< receivedCountTotal << " total" << std::endl;
|
||||
|
||||
receivedCountPerSecs = receivedCount - receivedCountTotal;
|
||||
receivedCountTotal += receivedCountPerSecs;
|
||||
|
||||
auto duration = std::chrono::seconds(1);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
};
|
||||
|
||||
std::thread t1(timer);
|
||||
|
||||
struct lws_context_creation_info info;
|
||||
struct lws_client_connect_info i;
|
||||
struct lws_context* context;
|
||||
const char* p;
|
||||
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
|
||||
/* for LLL_ verbosity above NOTICE to be built into lws, lws
|
||||
* must have been configured with -DCMAKE_BUILD_TYPE=DEBUG
|
||||
* instead of =RELEASE */
|
||||
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
|
||||
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
|
||||
/* | LLL_DEBUG */;
|
||||
|
||||
signal(SIGINT, sigint_handler);
|
||||
if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p);
|
||||
|
||||
lws_set_log_level(logs, NULL);
|
||||
lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n");
|
||||
|
||||
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
|
||||
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
|
||||
info.protocols = protocols;
|
||||
info.timeout_secs = 10;
|
||||
|
||||
/*
|
||||
* since we know this lws context is only ever going to be used with
|
||||
* one client wsis / fds / sockets at a time, let lws know it doesn't
|
||||
* have to use the default allocations for fd tables up to ulimit -n.
|
||||
* It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we
|
||||
* will use.
|
||||
*/
|
||||
info.fd_limit_per_thread = 1 + 1 + 1;
|
||||
|
||||
context = lws_create_context(&info);
|
||||
if (!context)
|
||||
{
|
||||
lwsl_err("lws init failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
|
||||
i.context = context;
|
||||
i.port = 8008;
|
||||
i.address = "127.0.0.1";
|
||||
i.path = "/";
|
||||
i.host = i.address;
|
||||
i.origin = i.address;
|
||||
i.protocol = protocols[0].name; /* "dumb-increment-protocol" */
|
||||
i.pwsi = &client_wsi;
|
||||
|
||||
if (lws_cmdline_option(argc, argv, "--h2")) i.alpn = "h2";
|
||||
|
||||
lws_client_connect_via_info(&i);
|
||||
|
||||
while (n >= 0 && client_wsi && !interrupted)
|
||||
n = lws_service(context, 0);
|
||||
|
||||
lws_context_destroy(context);
|
||||
|
||||
lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed");
|
||||
|
||||
t1.join();
|
||||
|
||||
return receivedCount > 10;
|
||||
}
|
2
test/compatibility/csharp/.gitignore
vendored
Normal file
2
test/compatibility/csharp/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
bin
|
||||
obj
|
99
test/compatibility/csharp/Main.cs
Normal file
99
test/compatibility/csharp/Main.cs
Normal file
@@ -0,0 +1,99 @@
|
||||
//
|
||||
// Main.cs
|
||||
// Author: Benjamin Sergeant
|
||||
// Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
//
|
||||
// In a different terminal, start a push server:
|
||||
// $ ws push_server -q
|
||||
//
|
||||
// $ dotnet run
|
||||
// messages received per second: 145157
|
||||
// messages received per second: 141405
|
||||
// messages received per second: 152202
|
||||
// messages received per second: 157149
|
||||
// messages received per second: 157673
|
||||
// messages received per second: 153594
|
||||
// messages received per second: 157830
|
||||
// messages received per second: 158422
|
||||
//
|
||||
|
||||
using System;
|
||||
using System.Net.WebSockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
public class DevNullClientCli
|
||||
{
|
||||
private static int receivedMessage = 0;
|
||||
|
||||
public static async Task<byte[]> ReceiveAsync(ClientWebSocket ws, CancellationToken token)
|
||||
{
|
||||
int bufferSize = 8192; // 8K
|
||||
var buffer = new byte[bufferSize];
|
||||
var offset = 0;
|
||||
var free = buffer.Length;
|
||||
|
||||
while (true)
|
||||
{
|
||||
var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer, offset, free), token).ConfigureAwait(false);
|
||||
|
||||
offset += result.Count;
|
||||
free -= result.Count;
|
||||
if (result.EndOfMessage) break;
|
||||
|
||||
if (free == 0)
|
||||
{
|
||||
// No free space
|
||||
// Resize the outgoing buffer
|
||||
var newSize = buffer.Length + bufferSize;
|
||||
|
||||
var newBuffer = new byte[newSize];
|
||||
Array.Copy(buffer, 0, newBuffer, 0, offset);
|
||||
buffer = newBuffer;
|
||||
free = buffer.Length - offset;
|
||||
}
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private static void OnTimedEvent(object source, EventArgs e)
|
||||
{
|
||||
Console.WriteLine($"messages received per second: {receivedMessage}");
|
||||
receivedMessage = 0;
|
||||
}
|
||||
|
||||
public static async Task ReceiveMessagesAsync(string url)
|
||||
{
|
||||
var ws = new ClientWebSocket();
|
||||
|
||||
System.Uri uri = new System.Uri(url);
|
||||
var cancellationToken = CancellationToken.None;
|
||||
|
||||
try
|
||||
{
|
||||
await ws.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
|
||||
while (true)
|
||||
{
|
||||
var data = await DevNullClientCli.ReceiveAsync(ws, cancellationToken);
|
||||
receivedMessage += 1;
|
||||
}
|
||||
}
|
||||
catch (System.Net.WebSockets.WebSocketException e)
|
||||
{
|
||||
Console.WriteLine($"WebSocket error: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public static async Task Main()
|
||||
{
|
||||
var timer = new System.Timers.Timer(1000);
|
||||
timer.Elapsed += OnTimedEvent;
|
||||
timer.Enabled = true;
|
||||
timer.Start();
|
||||
|
||||
var url = "ws://localhost:8008";
|
||||
await ReceiveMessagesAsync(url);
|
||||
}
|
||||
}
|
6
test/compatibility/csharp/devnull_client.csproj
Normal file
6
test/compatibility/csharp/devnull_client.csproj
Normal file
@@ -0,0 +1,6 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp3.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
</Project>
|
42
test/compatibility/node/devnull_client.js
Normal file
42
test/compatibility/node/devnull_client.js
Normal file
@@ -0,0 +1,42 @@
|
||||
//
|
||||
// With ws@7.3.1
|
||||
// and
|
||||
// node --version
|
||||
// v13.11.0
|
||||
//
|
||||
// In a different terminal, start a push server:
|
||||
// $ ws push_server -q
|
||||
//
|
||||
// $ node devnull_client.js
|
||||
// messages received per second: 16643
|
||||
// messages received per second: 28065
|
||||
// messages received per second: 28432
|
||||
// messages received per second: 22207
|
||||
// messages received per second: 28805
|
||||
// messages received per second: 28694
|
||||
// messages received per second: 28180
|
||||
// messages received per second: 28601
|
||||
// messages received per second: 28698
|
||||
// messages received per second: 28931
|
||||
// messages received per second: 27975
|
||||
//
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const ws = new WebSocket('ws://localhost:8008');
|
||||
|
||||
ws.on('open', function open() {
|
||||
ws.send('hello from node');
|
||||
});
|
||||
|
||||
var receivedMessages = 0;
|
||||
|
||||
setInterval(function timeout() {
|
||||
console.log(`messages received per second: ${receivedMessages}`)
|
||||
receivedMessages = 0;
|
||||
}, 1000);
|
||||
|
||||
ws.on('message', function incoming(data) {
|
||||
receivedMessages += 1;
|
||||
});
|
||||
|
||||
|
44
test/compatibility/python/websockets/devnull_client.py
Normal file
44
test/compatibility/python/websockets/devnull_client.py
Normal file
@@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# websocket send client
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import websockets
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
uvloop.install()
|
||||
except ImportError:
|
||||
print('uvloop not available')
|
||||
pass
|
||||
|
||||
msgCount = 0
|
||||
|
||||
async def timer():
|
||||
global msgCount
|
||||
|
||||
while True:
|
||||
print(f'Received messages: {msgCount}')
|
||||
msgCount = 0
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def client(url):
|
||||
global msgCount
|
||||
|
||||
asyncio.ensure_future(timer())
|
||||
|
||||
async with websockets.connect(url) as ws:
|
||||
async for message in ws:
|
||||
msgCount += 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='websocket proxy.')
|
||||
parser.add_argument('--url', help='Remote websocket url',
|
||||
default='wss://echo.websocket.org')
|
||||
args = parser.parse_args()
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(client(args.url))
|
@@ -10,7 +10,7 @@ import websockets
|
||||
async def echo(websocket, path):
|
||||
while True:
|
||||
msg = await websocket.recv()
|
||||
print(f'Received {len(msg)} bytes')
|
||||
# print(f'Received {len(msg)} bytes')
|
||||
await websocket.send(msg)
|
||||
|
||||
host = os.getenv('BIND_HOST', 'localhost')
|
||||
|
6
test/compatibility/ruby/README.md
Normal file
6
test/compatibility/ruby/README.md
Normal file
@@ -0,0 +1,6 @@
|
||||
```
|
||||
export GEM_HOME=$HOME/local/gems
|
||||
bundle install faye-websocket
|
||||
```
|
||||
|
||||
https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv
|
59
test/compatibility/ruby/devnull_client.rb
Normal file
59
test/compatibility/ruby/devnull_client.rb
Normal file
@@ -0,0 +1,59 @@
|
||||
#
|
||||
# $ ruby --version
|
||||
# ruby 2.6.3p62 (2019-04-16 revision 67580) [universal.x86_64-darwin19]
|
||||
#
|
||||
# Install a gem locally by setting GEM_HOME
|
||||
# https://stackoverflow.com/questions/486995/ruby-equivalent-of-virtualenv
|
||||
# export GEM_HOME=$HOME/local/gems
|
||||
# bundle install faye-websocket
|
||||
#
|
||||
# In a different terminal, start a push server:
|
||||
# $ ws push_server -q
|
||||
#
|
||||
# $ ruby devnull_client.rb
|
||||
# [:open]
|
||||
# Connected to server
|
||||
# messages received per second: 115926
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# messages received per second: 119157
|
||||
# messages received per second: 119156
|
||||
# ^C[:close, 1006, ""]
|
||||
#
|
||||
require 'faye/websocket'
|
||||
require 'eventmachine'
|
||||
|
||||
EM.run {
|
||||
ws = Faye::WebSocket::Client.new('ws://127.0.0.1:8008')
|
||||
|
||||
counter = 0
|
||||
|
||||
EM.add_periodic_timer(1) do
|
||||
print "messages received per second: #{counter}\n"
|
||||
counter = 0 # reset counter
|
||||
end
|
||||
|
||||
ws.on :open do |event|
|
||||
p [:open]
|
||||
print "Connected to server\n"
|
||||
end
|
||||
|
||||
ws.on :message do |event|
|
||||
# Uncomment the next line to validate that we receive something correct
|
||||
# p [:message, event.data]
|
||||
counter += 1
|
||||
end
|
||||
|
||||
ws.on :close do |event|
|
||||
p [:close, event.code, event.reason]
|
||||
ws = nil
|
||||
end
|
||||
}
|
@@ -6,49 +6,21 @@
|
||||
|
||||
#define CATCH_CONFIG_RUNNER
|
||||
#include "catch.hpp"
|
||||
#include <ixcore/utils/IXCoreLogger.h>
|
||||
#include <ixwebsocket/IXNetSystem.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
|
||||
#ifndef _WIN32
|
||||
#include <signal.h>
|
||||
#endif
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
ix::initNetSystem();
|
||||
|
||||
ix::CoreLogger::LogFunc logFunc = [](const char* msg, ix::LogLevel level) {
|
||||
switch (level)
|
||||
{
|
||||
case ix::LogLevel::Debug:
|
||||
{
|
||||
spdlog::debug(msg);
|
||||
}
|
||||
break;
|
||||
|
||||
case ix::LogLevel::Info:
|
||||
{
|
||||
spdlog::info(msg);
|
||||
}
|
||||
break;
|
||||
|
||||
case ix::LogLevel::Warning:
|
||||
{
|
||||
spdlog::warn(msg);
|
||||
}
|
||||
break;
|
||||
|
||||
case ix::LogLevel::Error:
|
||||
{
|
||||
spdlog::error(msg);
|
||||
}
|
||||
break;
|
||||
|
||||
case ix::LogLevel::Critical:
|
||||
{
|
||||
spdlog::critical(msg);
|
||||
}
|
||||
break;
|
||||
}
|
||||
};
|
||||
ix::CoreLogger::setLogFunction(logFunc);
|
||||
#ifndef _WIN32
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
#endif
|
||||
spdlog::set_level(spdlog::level::debug);
|
||||
|
||||
int result = Catch::Session().run(argc, argv);
|
||||
|
||||
|
Reference in New Issue
Block a user