Compare commits

...

24 Commits

Author SHA1 Message Date
6a3ffcb098 move poll wrapper on top of select (only used on Windows) to the ix namespace 2019-09-08 11:14:49 -07:00
260a94d3b0 README: update link to the doc 2019-09-06 10:42:48 -07:00
88c6d6c4cb ci 2019-09-05 22:32:54 -07:00
d5a4931c92 travis linux 2019-09-05 22:29:00 -07:00
11f4e90bc6 ci tweak / install redis 2019-09-05 22:14:55 -07:00
2ce65e7a77 cobra metrics publisher test uses random free port 2019-09-05 22:05:00 -07:00
e81c2c3e5c cobra chat test uses random free port 2019-09-05 22:02:10 -07:00
e40dda7549 add cobra metrics publisher + server unittest 2019-09-05 21:57:05 -07:00
d959d73261 Add new cobra unittest, using server and client 2019-09-05 20:49:58 -07:00
07b7e37a92 snake unsubscription fixes 2019-09-05 20:47:15 -07:00
eb7888347a Fix compiler warning 2019-09-05 20:29:14 -07:00
d8664f4988 ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled 2019-09-05 20:28:34 -07:00
5e94791b13 IXCobraConnection / pdu handlers can crash if they receive json data which is not an object 2019-09-05 20:24:42 -07:00
3e3f7171fc cobra publish fix 2019-09-05 14:31:28 -07:00
308fda0b37 Update README.md 2019-09-05 14:30:51 -07:00
66ed7577b1 all client autobahn test should pass ! last failing one was ...
+- zlib/deflate has a bug with windowsbits == 8, so we silently upgrade it to 9/ (fix autobahn test 13.X which uses 8 for the windows size)
2019-09-04 21:01:30 -07:00
cae23c764f Fragmentation: for sent messages which are compressed, the continuation fragments should not have the rsv1 bit set (fix all autobahn tests for zlib compression 12.X)
Websocket Server / do a case insensitive string search when looking for an Upgrade header whose value is websocket. (some client use WebSocket with some upper-case characters)
2019-09-04 18:23:56 -07:00
f25b2af6eb ws autobahn / use condition variables for stopping test case + add more logging on errors 2019-09-04 12:21:54 -07:00
508d372df1 ws autobahn / report progress with spdlog::info to get timing info 2019-09-04 10:16:32 -07:00
12c3275c36 truncate module 2019-09-03 20:14:35 -07:00
98189c23dc Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others) 2019-09-03 17:42:48 -07:00
ec55b4a82a Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling) 2019-09-03 16:07:48 -07:00
5d58982f77 IXWebSocketTransport message processing refactoring 2019-09-03 15:48:55 -07:00
57665ca825 Validate close codes. Autobahn 7.9.* 2019-09-03 15:43:16 -07:00
32 changed files with 1010 additions and 122 deletions

View File

@ -6,23 +6,26 @@ language: bash
matrix:
include:
# macOS
- os: osx
env:
- HOMEBREW_NO_AUTO_UPDATE=1
compiler: clang
script:
- brew install mbedtls
- python test/run.py
- make ws
# - os: osx
# env:
# - HOMEBREW_NO_AUTO_UPDATE=1
# compiler: clang
# script:
# - brew install redis
# - brew services start redis
# - brew install mbedtls
# - python test/run.py
# - make ws
# Linux
Linux
- os: linux
dist: bionic
before_install:
- sudo apt-get install -y libmbedtls-dev
- sudo apt-get install -y redis-server
script:
- python test/run.py
- make ws
- python test/run.py
# - make ws
env:
- CC=gcc
- CXX=g++

View File

@ -1 +1 @@
5.1.6
6.1.0

View File

@ -6,8 +6,8 @@ IXWebSocket is a C++ library for WebSocket client and server development. It has
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android).
Interested ? Go read the [docs](https://bsergean.github.io/IXWebSocket/site/) ! If things don't work as expected, please create an issue in github, or even better a pull request if you know how to fix your problem.
Interested ? Go read the [docs](https://machinezone.github.io/IXWebSocket/) ! If things don't work as expected, please create an issue in github, or even better a pull request if you know how to fix your problem.
IXWebSocket is actively being developed, check out the [changelog](CHANGELOG.md) to know what's cooking. If you are looking for a real time messaging service (the chat-like 'server' your websocket code will talk to) with many features such as history, backed by Redis, look at [cobra](https://github.com/machinezone/cobra).
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html).
IXWebSocket client code is autobahn compliant beginning with the 6.0.0 version. See the current [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html). Some tests are still failing in the server code.

View File

@ -1,11 +1,46 @@
# Changelog
All notable changes to this project will be documented in this file.
## [6.1.0] - 2019-09-08
- move poll wrapper on top of select (only used on Windows) to the ix namespace
## [6.0.1] - 2019-09-05
- add cobra metrics publisher + server unittest
- add cobra client + server unittest
- ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled
- IXCobraConnection / pdu handlers can crash if they receive json data which is not an object
## [6.0.0] - 2019-09-04
- all client autobahn test should pass !
- zlib/deflate has a bug with windowsbits == 8, so we silently upgrade it to 9/ (fix autobahn test 13.X which uses 8 for the windows size)
## [5.2.0] - 2019-09-04
- Fragmentation: for sent messages which are compressed, the continuation fragments should not have the rsv1 bit set (fix all autobahn tests for zlib compression 12.X)
- Websocket Server / do a case insensitive string search when looking for an Upgrade header whose value is websocket. (some client use WebSocket with some upper-case characters)
## [5.1.9] - 2019-09-03
- ws autobahn / report progress with spdlog::info to get timing info
- ws autobahn / use condition variables for stopping test case + add more logging on errors
## [5.1.8] - 2019-09-03
- Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others)
## [5.1.7] - 2019-09-03
- Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
## [5.1.6] - 2019-09-03
- Sending invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
- Fix failing unittest which was sending binary data in text mode with WebSocket::send to call properly call WebSocket::sendBinary instead.
- Validate that the reason is proper utf-8. (fix autobahn test 7.5.1)
- Validate close codes. Autobahn 7.9.*
## [5.1.5] - 2019-09-03

View File

@ -34,10 +34,7 @@ namespace ix
return true;
#endif
}
}
// This function should be in the global namespace
#ifdef _WIN32
//
// That function could 'return WSAPoll(pfd, nfds, timeout);'
// but WSAPoll is said to have weird behaviors on the internet
@ -47,6 +44,7 @@ namespace ix
//
int poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
#ifdef _WIN32
int maxfd = 0;
fd_set readfds, writefds, errorfds;
FD_ZERO(&readfds);
@ -107,5 +105,9 @@ namespace ix
}
return ret;
}
#else
return ::poll(fds, nfds, timeout);
#endif
}
} // namespace ix

View File

@ -13,11 +13,9 @@
#include <io.h>
#include <ws2def.h>
// Define our own poll on Windows
// Define our own poll on Windows, as a wrapper on top of select
typedef unsigned long int nfds_t;
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
#else
#include <arpa/inet.h>
#include <errno.h>
@ -35,4 +33,6 @@ namespace ix
{
bool initNetSystem();
bool uninitNetSystem();
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
} // namespace ix

View File

@ -79,7 +79,7 @@ namespace ix
}
}
int ret = ::poll(fds, nfds, timeoutMs);
int ret = ix::poll(fds, nfds, timeoutMs);
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0)

View File

@ -12,7 +12,6 @@
#include <cmath>
#include <cassert>
#include <iostream>
namespace ix

View File

@ -27,4 +27,5 @@ namespace ix
const std::string WebSocketCloseConstants::kProtocolErrorCodeDataOpcodeOutOfSequence("Fragmentation: data message out of sequence");
const std::string WebSocketCloseConstants::kProtocolErrorCodeContinuationOpCodeOutOfSequence("Fragmentation: continuation opcode out of sequence");
const std::string WebSocketCloseConstants::kInvalidFramePayloadDataMessage("Invalid frame payload data");
const std::string WebSocketCloseConstants::kInvalidCloseCodeMessage("Invalid close code");
}

View File

@ -32,5 +32,6 @@ namespace ix
static const std::string kProtocolErrorCodeDataOpcodeOutOfSequence;
static const std::string kProtocolErrorCodeContinuationOpCodeOutOfSequence;
static const std::string kInvalidFramePayloadDataMessage;
static const std::string kInvalidCloseCodeMessage;
};
} // namespace ix

View File

@ -295,7 +295,7 @@ namespace ix
return sendErrorResponse(400, "Missing Sec-WebSocket-Key value");
}
if (headers["upgrade"] != "websocket")
if (!insensitiveStringCompare(headers["upgrade"], "WebSocket"))
{
return sendErrorResponse(400, "Invalid or missing Upgrade header");
}
@ -326,6 +326,7 @@ namespace ix
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";
ss << "Server: " << userAgent() << "\r\n";
// Parse the client headers. Does it support deflate ?
std::string header = headers["sec-websocket-extensions"];

View File

@ -33,6 +33,8 @@ namespace ix
_serverNoContextTakeover = serverNoContextTakeover;
_clientMaxWindowBits = clientMaxWindowBits;
_serverMaxWindowBits = serverMaxWindowBits;
sanitizeClientMaxWindowBits();
}
//
@ -107,10 +109,22 @@ namespace ix
_clientMaxWindowBits =
std::min(maxClientMaxWindowBits,
std::max(x, minClientMaxWindowBits));
sanitizeClientMaxWindowBits();
}
}
}
void WebSocketPerMessageDeflateOptions::sanitizeClientMaxWindowBits()
{
// zlib/deflate has a bug with windowsbits == 8, so we silently upgrade it to 9
// See https://bugs.chromium.org/p/chromium/issues/detail?id=691074
if (_clientMaxWindowBits == 8)
{
_clientMaxWindowBits = 9;
}
}
std::string WebSocketPerMessageDeflateOptions::generateHeader()
{
std::stringstream ss;

View File

@ -41,5 +41,7 @@ namespace ix
bool _serverNoContextTakeover;
int _clientMaxWindowBits;
int _serverMaxWindowBits;
void sanitizeClientMaxWindowBits();
};
} // namespace ix

View File

@ -77,6 +77,7 @@ namespace ix
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_compressedMessage(false),
_readyState(ReadyState::CLOSED),
_closeCode(WebSocketCloseConstants::kInternalErrorCode),
_closeReason(WebSocketCloseConstants::kInternalErrorMessage),
@ -558,13 +559,16 @@ namespace ix
return;
}
unmaskReceiveBuffer(ws);
std::string frameData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N);
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
unmaskReceiveBuffer(ws);
if (ws.opcode != wsheader_type::CONTINUATION)
{
@ -573,6 +577,8 @@ namespace ix
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
_compressedMessage = _enablePerMessageDeflate && ws.rsv1;
// Continuation message needs to follow a non-fin TEXT or BINARY message
if (!_chunks.empty())
{
@ -593,10 +599,11 @@ namespace ix
if (ws.fin && _chunks.empty())
{
emitMessage(_fragmentedMessageKind,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
frameData,
_compressedMessage,
onMessageCallback);
_compressedMessage = false;
}
else
{
@ -607,30 +614,26 @@ namespace ix
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
_chunks.emplace_back(frameData);
if (ws.fin)
{
emitMessage(_fragmentedMessageKind, getMergedChunks(),
ws, onMessageCallback);
_compressedMessage, onMessageCallback);
_chunks.clear();
_compressedMessage = false;
}
else
{
emitMessage(MessageKind::FRAGMENT, std::string(), ws, onMessageCallback);
emitMessage(MessageKind::FRAGMENT, std::string(), false, onMessageCallback);
}
}
}
else if (ws.opcode == wsheader_type::PING)
{
unmaskReceiveBuffer(ws);
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
// too large
if (pingData.size() > 125)
if (frameData.size() > 125)
{
// Unexpected frame type
close(WebSocketCloseConstants::kProtocolErrorCode,
@ -642,29 +645,23 @@ namespace ix
{
// Reply back right away
bool compress = false;
sendData(wsheader_type::PONG, pingData, compress);
sendData(wsheader_type::PONG, frameData, compress);
}
emitMessage(MessageKind::PING, pingData, ws, onMessageCallback);
emitMessage(MessageKind::PING, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::PONG)
{
unmaskReceiveBuffer(ws);
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
emitMessage(MessageKind::PONG, pongData, ws, onMessageCallback);
emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)
{
std::string reason;
uint16_t code = 0;
unmaskReceiveBuffer(ws);
if (ws.N >= 2)
{
// Extract the close code first, available as the first 2 bytes
@ -674,8 +671,7 @@ namespace ix
// Get the reason.
if (ws.N > 2)
{
reason.assign(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
reason = frameData.substr(2, frameData.size());
}
// Validate that the reason is proper utf-8. Autobahn 7.5.1
@ -684,6 +680,20 @@ namespace ix
code = WebSocketCloseConstants::kInvalidFramePayloadData;
reason = WebSocketCloseConstants::kInvalidFramePayloadDataMessage;
}
// Validate close codes. Autobahn 7.9.*
// 1014, 1015 are debattable. The firefox MSDN has a description for them
if (code < 1000 || code == 1004 || code == 1006 ||
(code > 1013 && code < 3000))
{
// build up an error message containing the bad error code
std::stringstream ss;
ss << WebSocketCloseConstants::kInvalidCloseCodeMessage
<< ": " << code;
reason = ss.str();
code = WebSocketCloseConstants::kProtocolErrorCode;
}
}
else
{
@ -768,8 +778,7 @@ namespace ix
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
msg += chunk;
}
return msg;
@ -777,21 +786,38 @@ namespace ix
void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback)
{
size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != MessageKind::FRAGMENT)
if (compressedMessage && messageKind != MessageKind::FRAGMENT)
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(decompressedMessage))
{
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
}
else
{
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
}
}
else
{
onMessageCallback(message, wireSize, false, messageKind);
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(message))
{
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
}
else
{
onMessageCallback(message, wireSize, false, messageKind);
}
}
}
@ -840,6 +866,8 @@ namespace ix
message_end = compressedMessage.end();
}
_txbuf.reserve(wireSize);
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)
{
@ -927,8 +955,9 @@ namespace ix
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed
if (compress)
// The rsv1 bit indicate that the frame is compressed
// continuation opcodes should not set it. Autobahn 12.2.10 and others 12.X
if (compress && type != wsheader_type::CONTINUATION)
{
header[0] |= 0x40;
}

View File

@ -149,13 +149,16 @@ namespace ix
// messages (tested messages up to 700M) and we cannot put them in a single
// buffer that is resized, as this operation can be slow when a buffer has its
// size increased 2 fold, while appending to a list has a fixed cost.
std::list<std::vector<uint8_t>> _chunks;
std::list<std::string> _chunks;
// Record the message kind (will be TEXT or BINARY) for a fragmented
// message, present in the first chunk, since the final chunk will be a
// CONTINUATION opcode and doesn't tell the full message kind
MessageKind _fragmentedMessageKind;
// Ditto for whether a message is compressed
bool _compressedMessage;
// Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15;
@ -244,7 +247,7 @@ namespace ix
void emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const;

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "5.1.6"
#define IX_WEBSOCKET_VERSION "6.1.0"

View File

@ -63,6 +63,9 @@ test:
ws_test: ws
(cd ws ; env DEBUG=1 PATH=../ws/build:$$PATH bash test_ws.sh)
autobahn_report:
cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/
# For the fork that is configured with appveyor
rebase_upstream:
git fetch upstream

View File

@ -17,11 +17,15 @@ endif()
add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
set (WS ../ws)
include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party
../third_party/msgpack11
../third_party/spdlog/include
../ws
../ws/snake
)
# Shared sources
@ -30,7 +34,24 @@ set (SOURCES
IXTest.cpp
IXGetFreePort.cpp
../third_party/msgpack11/msgpack11.cpp
../ws/ixcore/utils/IXCoreLogger.cpp
../third_party/jsoncpp/jsoncpp.cpp
${WS}/ixcore/utils/IXCoreLogger.cpp
${WS}/ixcrypto/IXBase64.cpp
${WS}/ixcrypto/IXHash.cpp
${WS}/ixcrypto/IXUuid.cpp
${WS}/ixcrypto/IXHMac.cpp
${WS}/ixcobra/IXCobraConnection.cpp
${WS}/ixcobra/IXCobraMetricsPublisher.cpp
${WS}/ixcobra/IXCobraMetricsThreadedPublisher.cpp
${WS}/snake/IXSnakeServer.cpp
${WS}/snake/IXSnakeProtocol.cpp
${WS}/snake/IXAppConfig.cpp
${WS}/IXRedisClient.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
@ -41,6 +62,8 @@ set (SOURCES
IXHttpClientTest.cpp
IXHttpServerTest.cpp
IXUnityBuildsTest.cpp
IXCobraChatTest.cpp
IXCobraMetricsPublisherTest.cpp
)
# Some unittest don't work on windows yet

356
test/IXCobraChatTest.cpp Normal file
View File

@ -0,0 +1,356 @@
/*
* cmd_satori_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <chrono>
#include <ixcobra/IXCobraConnection.h>
#include <ixcrypto/IXUuid.h>
#include "IXTest.h"
#include "IXSnakeServer.h"
#include "catch.hpp"
using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback(
[](size_t size, bool incoming)
{
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
}
);
}
class SatoriChat
{
public:
SatoriChat(const std::string& user,
const std::string& session,
const std::string& endpoint);
void subscribe(const std::string& channel);
void start();
void stop();
void run();
bool isReady() const;
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
bool hasPendingMessages() const;
Json::Value popMessage();
private:
std::string _user;
std::string _session;
std::string _endpoint;
std::queue<Json::Value> _publish_queue;
mutable std::mutex _queue_mutex;
std::thread _thread;
std::atomic<bool> _stop;
ix::CobraConnection _conn;
std::atomic<bool> _connectedAndSubscribed;
std::queue<Json::Value> _receivedQueue;
std::mutex _logMutex;
};
SatoriChat::SatoriChat(const std::string& user,
const std::string& session,
const std::string& endpoint) :
_connectedAndSubscribed(false),
_stop(false),
_user(user),
_session(session),
_endpoint(endpoint)
{
}
void SatoriChat::start()
{
_thread = std::thread(&SatoriChat::run, this);
}
void SatoriChat::stop()
{
_stop = true;
_thread.join();
}
bool SatoriChat::isReady() const
{
return _connectedAndSubscribed;
}
size_t SatoriChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
}
bool SatoriChat::hasPendingMessages() const
{
std::unique_lock<std::mutex> lock(_queue_mutex);
return !_publish_queue.empty();
}
Json::Value SatoriChat::popMessage()
{
std::unique_lock<std::mutex> lock(_queue_mutex);
auto msg = _publish_queue.front();
_publish_queue.pop();
return msg;
}
//
// Callback to handle received messages, that are printed on the console
//
void SatoriChat::subscribe(const std::string& channel)
{
std::string filter;
_conn.subscribe(channel, filter,
[this](const Json::Value& msg)
{
std::cout << msg.toStyledString() << std::endl;
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in our own messages
if (msg_user == _user) return;
_receivedQueue.push(msg);
std::stringstream ss;
ss << std::endl
<< msg_user << " > " << msg_text
<< std::endl
<< _user << " > ";
log(ss.str());
});
}
void SatoriChat::sendMessage(const std::string& text)
{
Json::Value msg;
msg["user"] = _user;
msg["session"] = _session;
msg["text"] = text;
std::unique_lock<std::mutex> lock(_queue_mutex);
_publish_queue.push(msg);
}
//
// Do satori communication on a background thread, where we can have
// something like an event loop that publish, poll and receive data
//
void SatoriChat::run()
{
snake::AppConfig appConfig = makeSnakeServerConfig(8008);
// Display config on the terminal for debugging
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string channel = _session;
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
_conn.configure(appkey, _endpoint, role, secret,
ix::WebSocketPerMessageDeflateOptions(true));
_conn.connect();
_conn.setEventCallback(
[this, channel]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
log("Subscriber connected: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated: " + _user);
subscribe(channel);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
log(errMsg + _user);
}
else if (eventType == ix::CobraConnection_EventType_Closed)
{
log("Connection closed: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
log("Subscription ok: " + _user + " subscription_id " + subscriptionId);
_connectedAndSubscribed = true;
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
}
}
);
while (!_stop)
{
{
while (hasPendingMessages())
{
auto msg = popMessage();
std::string text = msg["text"].asString();
std::stringstream ss;
ss << "Sending msg [" << text << "]";
log(ss.str());
Json::Value channels;
channels.append(channel);
_conn.publish(channels, msg);
}
}
ix::msleep(50);
}
_conn.unsubscribe(channel);
ix::msleep(50);
_conn.disconnect();
_conn.setEventCallback([]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
;
});
snakeServer.stop();
}
}
TEST_CASE("Cobra_chat", "[cobra_chat]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
int timeout;
setupTrafficTrackerCallback();
std::string session = ix::generateSessionId();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
SatoriChat chatA("jean", session, endpoint);
SatoriChat chatB("paul", session, endpoint);
chatA.start();
chatB.start();
// Wait for all chat instance to be ready
timeout = 10 * 1000; // 10s
while (true)
{
if (chatA.isReady() && chatB.isReady()) break;
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
// Add a bit of extra time, for the subscription to be active
ix::msleep(1000);
chatA.sendMessage("from A1");
chatA.sendMessage("from A2");
chatA.sendMessage("from A3");
chatB.sendMessage("from B1");
chatB.sendMessage("from B2");
// 1. Wait for all messages to be sent
timeout = 10 * 1000; // 10s
while (chatA.hasPendingMessages() || chatB.hasPendingMessages())
{
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
// Give us 1s for all messages to be received
ix::msleep(1000);
chatA.stop();
chatB.stop();
// FIXME: improve this and make it exact matches
// we get unreliable result set
REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
std::cout << incomingBytes << std::endl;
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
snakeServer.stop();
}
}

View File

@ -0,0 +1,237 @@
/*
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <set>
#include <ixcrypto/IXUuid.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include "IXTest.h"
#include "IXSnakeServer.h"
#include "catch.hpp"
using namespace ix;
namespace
{
//
// This project / appkey is configure on cobra to not do any batching.
// This way we can start a subscriber and receive all messages as they come in.
//
std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f");
std::string CHANNEL("unittest_channel");
std::string PUBLISHER_ROLE("_pub");
std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db");
std::string SUBSCRIBER_ROLE("_sub");
std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba");
std::atomic<bool> gStop;
std::atomic<bool> gSubscriberConnectedAndSubscribed;
std::atomic<int> gUniqueMessageIdsCount;
std::atomic<int> gMessageCount;
std::set<std::string> gIds;
std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex.
//
// Background thread subscribe to the channel and validates what was sent
//
void startSubscriber(const std::string& endpoint)
{
gSubscriberConnectedAndSubscribed = false;
gUniqueMessageIdsCount = 0;
gMessageCount = 0;
ix::CobraConnection conn;
conn.configure(APPKEY, endpoint, SUBSCRIBER_ROLE, SUBSCRIBER_SECRET,
ix::WebSocketPerMessageDeflateOptions(true));
conn.connect();
conn.setEventCallback(
[&conn]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
Logger() << "Subscriber connected:";
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated");
std::string filter;
conn.subscribe(CHANNEL, filter,
[](const Json::Value& msg)
{
log(msg.toStyledString());
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
gMessageCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
Logger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == CHANNEL)
{
gSubscriberConnectedAndSubscribed = true;
}
else
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
Logger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != CHANNEL)
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
}
}
);
while (!gStop)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
conn.unsubscribe(CHANNEL);
conn.disconnect();
gUniqueMessageIdsCount = gIds.size();
}
}
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
// Make channel name unique
CHANNEL += uuid4();
gStop = false;
std::thread bgThread(&startSubscriber, endpoint);
int timeout = 10 * 1000; // 10s
// Wait until the subscriber is ready (authenticated + subscription successful)
while (!gSubscriberConnectedAndSubscribed)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
ix::CobraMetricsPublisher cobraMetricsPublisher;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(APPKEY, endpoint, CHANNEL,
PUBLISHER_ROLE, PUBLISHER_SECRET, perMessageDeflate);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
Json::Value data;
data["foo"] = "bar";
// (1) Publish without restrictions
cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2)
// (2) Restrict what is sent using a blacklist
// Add one entry to the blacklist
// (will send msg #3)
cobraMetricsPublisher.setBlacklist({
"sms_metric_B_id" // this id will not be sent
});
// (msg #4)
cobraMetricsPublisher.push("sms_metric_A_id", data);
// ...
cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent
// Reset the blacklist
// (msg #5)
cobraMetricsPublisher.setBlacklist({}); // 4.
// (3) Restrict what is sent using rate control
// (msg #6)
cobraMetricsPublisher.setRateControl({
{"sms_metric_C_id", 1}, // published once per minute (60 seconds) max
});
// (msg #7)
cobraMetricsPublisher.push("sms_metric_C_id", data);
cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent
ix::msleep(1400);
// (msg #8)
cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent
ix::msleep(600); // wait a bit so that the last message is sent and can be received
log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher.");
// Test suspend + resume
for (int i = 0 ; i < 3 ; ++i)
{
cobraMetricsPublisher.suspend();
ix::msleep(500);
REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore
cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time
cobraMetricsPublisher.resume();
ix::msleep(2000); // give cobra 2s to connect
REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now
cobraMetricsPublisher.push("sms_metric_E_id", data);
}
ix::msleep(500);
// Now stop the thread
gStop = true;
bgThread.join();
//
// Validate that we received all message kinds, and the correct number of messages
//
CHECK(gIds.count("sms_metric_A_id") == 1);
CHECK(gIds.count("sms_metric_B_id") == 1);
CHECK(gIds.count("sms_metric_C_id") == 1);
CHECK(gIds.count("sms_metric_D_id") == 1);
CHECK(gIds.count("sms_metric_E_id") == 1);
CHECK(gIds.count("sms_set_rate_control_id") == 1);
CHECK(gIds.count("sms_set_blacklist_id") == 1);
snakeServer.stop();
}

View File

@ -137,4 +137,57 @@ namespace ix
server.start();
return true;
}
std::vector<uint8_t> load(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize((size_t) size);
file.read((char*)&memblock.front(), static_cast<std::streamsize>(size));
return memblock;
}
std::string readAsString(const std::string& path)
{
auto vec = load(path);
return std::string(vec.begin(), vec.end());
}
snake::AppConfig makeSnakeServerConfig(int port)
{
snake::AppConfig appConfig;
appConfig.port = port;
appConfig.hostname = "127.0.0.1";
appConfig.verbose = true;
appConfig.redisPort = 6379;
appConfig.redisPassword = "";
appConfig.redisHosts.push_back("localhost"); // only one host supported now
std::string appsConfigPath("appsConfig.json");
// Parse config file
auto str = readAsString(appsConfigPath);
if (str.empty())
{
std::cout << "Cannot read content of " << appsConfigPath << std::endl;
return appConfig;
}
std::cout << str << std::endl;
auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"];
// Display config on the terminal for debugging
dumpConfig(appConfig);
return appConfig;
}
}

View File

@ -9,6 +9,7 @@
#include "IXGetFreePort.h"
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include "IXAppConfig.h"
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
@ -48,4 +49,6 @@ namespace ix
void log(const std::string& msg);
bool startWebSocketEchoServer(ix::WebSocketServer& server);
snake::AppConfig makeSnakeServerConfig(int port);
} // namespace ix

14
test/appsConfig.json Normal file
View File

@ -0,0 +1,14 @@
{
"apps": {
"FC2F10139A2BAc53BB72D9db967b024f": {
"roles": {
"_sub": {
"secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"
},
"_pub": {
"secret": "1c04DB8fFe76A4EeFE3E318C72d771db"
}
}
}
}
}

View File

@ -128,6 +128,8 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback)
{
_stop = false;
if (!_socket) return false;
std::stringstream ss;
@ -159,7 +161,7 @@ namespace ix
if (!lineValid) return false;
// There are 5 items for the subscribe repply
// There are 5 items for the subscribe reply
for (int i = 0; i < 5; ++i)
{
auto lineResult = _socket->readLine(nullptr);
@ -175,13 +177,21 @@ namespace ix
// Wait indefinitely for new messages
while (true)
{
if (_stop) break;
// Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1);
int timeoutMs = 10;
auto pollResult = _socket->isReadyToRead(timeoutMs);
if (pollResult == PollResultType::Error)
{
return false;
}
if (pollResult == PollResultType::Timeout)
{
continue;
}
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
@ -231,4 +241,9 @@ namespace ix
return true;
}
void RedisClient::stop()
{
_stop = true;
}
}

View File

@ -19,7 +19,7 @@ namespace ix
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
RedisClient() : _stop(false) {}
~RedisClient() = default;
bool connect(const std::string& hostname, int port);
@ -32,9 +32,12 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback);
void stop();
private:
std::string writeString(const std::string& str);
std::shared_ptr<Socket> _socket;
std::atomic<bool> _stop;
};
} // namespace ix

View File

@ -13,6 +13,7 @@
#include <cmath>
#include <cassert>
#include <cstring>
#include <iostream>
namespace ix
@ -191,7 +192,7 @@ namespace ix
{
if (!handleUnsubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", msg->str);
invokeErrorCallback("Error processing unsubscribe response", msg->str);
}
}
else if (action == "rtm/unsubscribe/error")
@ -300,6 +301,8 @@ namespace ix
//
bool CobraConnection::handleHandshakeResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -349,6 +352,8 @@ namespace ix
bool CobraConnection::handleSubscriptionResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -365,6 +370,8 @@ namespace ix
bool CobraConnection::handleUnsubscriptionResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -381,6 +388,8 @@ namespace ix
bool CobraConnection::handleSubscriptionData(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -407,6 +416,8 @@ namespace ix
bool CobraConnection::handlePublishResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("id")) return false;
Json::Value id = pdu["id"];
@ -453,6 +464,8 @@ namespace ix
{
invokePublishTrackerCallback(true, false);
CobraConnection::MsgId msgId = _id;
_body["channels"] = channels;
_body["message"] = msg;
_pdu["body"] = _body;
@ -460,27 +473,22 @@ namespace ix
std::string serializedJson = serializeJson(_pdu);
if (_publishMode == CobraConnection_PublishMode_Batch)
//
// 1. When we use batch mode, we just enqueue and will do the flush explicitely
// 2. When we aren't authenticated yet to the cobra server, we need to enqueue
// and retry later
// 3. If the network connection was droped (WebSocket::send will return false),
// it means the message won't be sent so we need to enqueue as well.
//
// The order of the conditionals is important.
//
if (_publishMode == CobraConnection_PublishMode_Batch || !_authenticated ||
!publishMessage(serializedJson))
{
enqueue(serializedJson);
return _id - 1;
}
//
// Fast path. We are authenticated and the publishing succeed
// This should happen for 99% of the cases.
//
if (_authenticated && publishMessage(serializedJson))
{
return _id - 1;
}
else // Or else we enqueue
// Slow code path is when we haven't connected yet (startup),
// or when the connection drops for some reason.
{
enqueue(serializedJson);
return 0;
}
return msgId;
}
void CobraConnection::subscribe(const std::string& channel,

View File

@ -6,7 +6,7 @@
#pragma once
#include "nlohmann/json.hpp"
#include <nlohmann/json.hpp>
#include <string>
#include <vector>

View File

@ -163,6 +163,14 @@ namespace snake
return;
}
}
nlohmann::json response = {
{"action", "rtm/publish/ok"},
{"id", pdu.value("id", 1)},
{"body", {}}
};
ws->sendText(response.dump());
}
//
@ -220,12 +228,14 @@ namespace snake
{
auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
{"body", {
{"subscription_id", subscriptionId},
{"messages", {{msg}}}
{"messages", {msg}}
}}
};
@ -271,6 +281,27 @@ namespace snake
pdu);
}
void handleUnSubscribe(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const nlohmann::json& pdu)
{
// extract subscription_id
auto body = pdu["body"];
auto subscriptionId = body["subscription_id"];
state->redisClient().stop();
nlohmann::json response = {
{"action", "rtm/unsubscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {
{"subscription_id", subscriptionId}
}}
};
ws->sendText(response.dump());
}
void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
@ -299,6 +330,10 @@ namespace snake
{
handleSubscribe(state, ws, appConfig, pdu);
}
else if (action == "rtm/unsubscribe")
{
handleUnSubscribe(state, ws, pdu);
}
else
{
std::cerr << "Unhandled action: " << action << std::endl;

View File

@ -4,10 +4,10 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <IXSnakeServer.h>
#include <IXSnakeProtocol.h>
#include <IXSnakeConnectionState.h>
#include <IXAppConfig.h>
#include "IXSnakeServer.h"
#include "IXSnakeProtocol.h"
#include "IXSnakeConnectionState.h"
#include "IXAppConfig.h"
#include <iostream>
#include <sstream>
@ -118,8 +118,19 @@ namespace snake
}
_server.start();
_server.wait();
return true;
}
void SnakeServer::runForever()
{
if (run())
{
_server.wait();
}
}
void SnakeServer::stop()
{
_server.stop();
}
}

View File

@ -19,6 +19,8 @@ namespace snake
~SnakeServer() = default;
bool run();
void runForever();
void stop();
private:
std::string parseAppKey(const std::string& path);

View File

@ -42,6 +42,23 @@
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
#include <spdlog/spdlog.h>
namespace
{
std::string truncate(const std::string& str, size_t n)
{
if (str.size() < n)
{
return str;
}
else
{
return str.substr(0, n) + "...";
}
}
}
namespace ix
{
@ -117,7 +134,7 @@ namespace ix
ss << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "autobahn: received message: "
<< msg->str
<< truncate(msg->str, 40)
<< std::endl;
_webSocket.send(msg->str, msg->binary);
@ -161,7 +178,7 @@ namespace ix
_webSocket.stop();
}
void generateReport(const std::string& url)
bool generateReport(const std::string& url)
{
ix::WebSocket webSocket;
std::string reportUrl(url);
@ -169,14 +186,16 @@ namespace ix
webSocket.setUrl(reportUrl);
webSocket.disableAutomaticReconnection();
std::atomic<bool> done(false);
std::atomic<bool> success(true);
std::condition_variable condition;
webSocket.setOnMessageCallback(
[&done](const ix::WebSocketMessagePtr& msg)
[&condition, &success](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Report generated" << std::endl;
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -186,18 +205,24 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
success = false;
}
}
);
webSocket.start();
while (!done)
webSocket.start();
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (!success)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
spdlog::error("Cannot generate report at url {}", reportUrl);
}
webSocket.stop();
return success;
}
int getTestCaseCount(const std::string& url)
@ -208,15 +233,15 @@ namespace ix
webSocket.setUrl(caseCountUrl);
webSocket.disableAutomaticReconnection();
int count = 0;
int count = -1;
std::condition_variable condition;
std::atomic<bool> done(false);
webSocket.setOnMessageCallback(
[&done, &count](const ix::WebSocketMessagePtr& msg)
[&condition, &count](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -226,6 +251,8 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -236,16 +263,18 @@ namespace ix
}
}
);
webSocket.start();
while (!done)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (count == -1)
{
spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl);
}
return count;
}
@ -254,14 +283,20 @@ namespace ix
//
int ws_autobahn_main(const std::string& url, bool quiet)
{
int N = getTestCaseCount(url);
std::cerr << "Test cases count: " << N << std::endl;
int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl;
N++;
for (int i = 1 ; i < N; ++i)
if (testCasesCount == -1)
{
std::cerr << "Execute test case " << i << std::endl;
spdlog::error("Cannot retrieve test case count at url {}", url);
return 1;
}
testCasesCount++;
for (int i = 1 ; i < testCasesCount; ++i)
{
spdlog::info("Execute test case {}", i);
int caseNumber = i;
@ -277,9 +312,7 @@ namespace ix
testCase.run();
}
generateReport(url);
return 0;
return generateReport(url) ? 0 : 1;
}
}

View File

@ -76,6 +76,8 @@ namespace ix
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
return snakeServer.run() ? 0 : 1;
snakeServer.runForever();
return 0; // should never reach this
}
}