Compare commits

...

30 Commits

Author SHA1 Message Date
983df2d8f9 improve some websocket error messages + add a utility function with unittest to parse status line and stop using scanf which triggers warnings on Windows 2019-09-09 17:34:36 -07:00
6beba16ca7 websocket and http server: server does not close the bound client socket in many cases 2019-09-09 16:48:26 -07:00
48cefe5525 move poll wrapper on top of select (only used on Windows) to the ix namespace 2019-09-08 11:15:08 -07:00
ae3856c10f Fix Windows CI with appveyor (#110)
Fix windows CI with appveyor + minor tweaks.
2019-09-07 14:07:00 -07:00
260a94d3b0 README: update link to the doc 2019-09-06 10:42:48 -07:00
88c6d6c4cb ci 2019-09-05 22:32:54 -07:00
d5a4931c92 travis linux 2019-09-05 22:29:00 -07:00
11f4e90bc6 ci tweak / install redis 2019-09-05 22:14:55 -07:00
2ce65e7a77 cobra metrics publisher test uses random free port 2019-09-05 22:05:00 -07:00
e81c2c3e5c cobra chat test uses random free port 2019-09-05 22:02:10 -07:00
e40dda7549 add cobra metrics publisher + server unittest 2019-09-05 21:57:05 -07:00
d959d73261 Add new cobra unittest, using server and client 2019-09-05 20:49:58 -07:00
07b7e37a92 snake unsubscription fixes 2019-09-05 20:47:15 -07:00
eb7888347a Fix compiler warning 2019-09-05 20:29:14 -07:00
d8664f4988 ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled 2019-09-05 20:28:34 -07:00
5e94791b13 IXCobraConnection / pdu handlers can crash if they receive json data which is not an object 2019-09-05 20:24:42 -07:00
3e3f7171fc cobra publish fix 2019-09-05 14:31:28 -07:00
308fda0b37 Update README.md 2019-09-05 14:30:51 -07:00
66ed7577b1 all client autobahn test should pass ! last failing one was ...
+- zlib/deflate has a bug with windowsbits == 8, so we silently upgrade it to 9/ (fix autobahn test 13.X which uses 8 for the windows size)
2019-09-04 21:01:30 -07:00
cae23c764f Fragmentation: for sent messages which are compressed, the continuation fragments should not have the rsv1 bit set (fix all autobahn tests for zlib compression 12.X)
Websocket Server / do a case insensitive string search when looking for an Upgrade header whose value is websocket. (some client use WebSocket with some upper-case characters)
2019-09-04 18:23:56 -07:00
f25b2af6eb ws autobahn / use condition variables for stopping test case + add more logging on errors 2019-09-04 12:21:54 -07:00
508d372df1 ws autobahn / report progress with spdlog::info to get timing info 2019-09-04 10:16:32 -07:00
12c3275c36 truncate module 2019-09-03 20:14:35 -07:00
98189c23dc Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others) 2019-09-03 17:42:48 -07:00
ec55b4a82a Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling) 2019-09-03 16:07:48 -07:00
5d58982f77 IXWebSocketTransport message processing refactoring 2019-09-03 15:48:55 -07:00
57665ca825 Validate close codes. Autobahn 7.9.* 2019-09-03 15:43:16 -07:00
deaa753657 Validate that the close reason is proper utf-8. Autobahn 7.5.1 2019-09-03 14:35:40 -07:00
7c7c877621 Sending invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling) 2019-09-03 14:12:40 -07:00
afa71a6b4b Framentation: data and continuation blocks received out of order (fix autobahn test: 5.9 through 5.20 Fragmentation) 2019-09-03 12:02:56 -07:00
41 changed files with 1321 additions and 203 deletions

View File

@ -6,23 +6,26 @@ language: bash
matrix:
include:
# macOS
- os: osx
env:
- HOMEBREW_NO_AUTO_UPDATE=1
compiler: clang
script:
- brew install mbedtls
- python test/run.py
- make ws
# - os: osx
# env:
# - HOMEBREW_NO_AUTO_UPDATE=1
# compiler: clang
# script:
# - brew install redis
# - brew services start redis
# - brew install mbedtls
# - python test/run.py
# - make ws
# Linux
Linux
- os: linux
dist: bionic
before_install:
- sudo apt-get install -y libmbedtls-dev
- sudo apt-get install -y redis-server
script:
- python test/run.py
- make ws
- python test/run.py
# - make ws
env:
- CC=gcc
- CXX=g++

View File

@ -69,6 +69,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXUtf8Validator.h
ixwebsocket/IXUserAgent.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketCloseConstants.h
@ -114,10 +115,7 @@ endif()
set(USE_OPEN_SSL FALSE)
if (USE_TLS)
add_definitions(-DIXWEBSOCKET_USE_TLS)
if (USE_MBED_TLS)
add_definitions(-DIXWEBSOCKET_USE_MBED_TLS)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketMbedTLS.h)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketMbedTLS.cpp)
elseif (APPLE)
@ -127,7 +125,6 @@ if (USE_TLS)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketSChannel.h)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketSChannel.cpp)
else()
add_definitions(-DIXWEBSOCKET_USE_OPEN_SSL)
set(USE_OPEN_SSL TRUE)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketOpenSSL.h)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketOpenSSL.cpp)
@ -139,6 +136,17 @@ add_library( ixwebsocket STATIC
${IXWEBSOCKET_HEADERS}
)
if (USE_TLS)
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_TLS)
if (USE_MBED_TLS)
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_MBED_TLS)
elseif (APPLE)
elseif (WIN32)
else()
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_OPEN_SSL)
endif()
endif()
if (APPLE AND USE_TLS AND NOT USE_MBED_TLS)
target_link_libraries(ixwebsocket "-framework foundation" "-framework security")
endif()
@ -170,7 +178,7 @@ if (USE_TLS AND USE_MBED_TLS)
target_link_libraries(ixwebsocket mbedtls)
else()
find_package(MbedTLS REQUIRED)
include_directories(${MBEDTLS_INCLUDE_DIRS})
target_include_directories(ixwebsocket PUBLIC ${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES})
endif()
endif()

View File

@ -1 +1 @@
5.1.4
6.2.0

View File

@ -6,8 +6,8 @@ IXWebSocket is a C++ library for WebSocket client and server development. It has
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android).
Interested ? Go read the [docs](https://bsergean.github.io/IXWebSocket/site/) ! If things don't work as expected, please create an issue in github, or even better a pull request if you know how to fix your problem.
Interested ? Go read the [docs](https://machinezone.github.io/IXWebSocket/) ! If things don't work as expected, please create an issue in github, or even better a pull request if you know how to fix your problem.
IXWebSocket is actively being developed, check out the [changelog](CHANGELOG.md) to know what's cooking. If you are looking for a real time messaging service (the chat-like 'server' your websocket code will talk to) with many features such as history, backed by Redis, look at [cobra](https://github.com/machinezone/cobra).
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html).
IXWebSocket client code is autobahn compliant beginning with the 6.0.0 version. See the current [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html). Some tests are still failing in the server code.

View File

@ -2,13 +2,21 @@ image:
- Visual Studio 2017
install:
- ls -al
- cd C:\Tools\vcpkg
- git pull
- .\bootstrap-vcpkg.bat
- cd %APPVEYOR_BUILD_FOLDER%
- cmd: call "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\VC\Auxiliary\Build\vcvars64.bat"
- vcpkg install zlib:x64-windows
- vcpkg install mbedtls:x64-windows
- cd test
- mkdir build
- cd build
- cmake -G"NMake Makefiles" ..
- cmake -DCMAKE_TOOLCHAIN_FILE=c:/tools/vcpkg/scripts/buildsystems/vcpkg.cmake -DUSE_TLS=1 -G"NMake Makefiles" ..
- nmake
- ixwebsocket_unittest.exe
- cd ..
- build\ixwebsocket_unittest.exe
cache: c:\tools\vcpkg\installed\
build: off

View File

@ -1,6 +1,55 @@
# Changelog
All notable changes to this project will be documented in this file.
## [6.2.0] - 2019-09-09
- websocket and http server: server does not close the bound client socket in many cases
## [6.1.0] - 2019-09-08
- move poll wrapper on top of select (only used on Windows) to the ix namespace
## [6.0.1] - 2019-09-05
- add cobra metrics publisher + server unittest
- add cobra client + server unittest
- ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled
- IXCobraConnection / pdu handlers can crash if they receive json data which is not an object
## [6.0.0] - 2019-09-04
- all client autobahn test should pass !
- zlib/deflate has a bug with windowsbits == 8, so we silently upgrade it to 9/ (fix autobahn test 13.X which uses 8 for the windows size)
## [5.2.0] - 2019-09-04
- Fragmentation: for sent messages which are compressed, the continuation fragments should not have the rsv1 bit set (fix all autobahn tests for zlib compression 12.X)
- Websocket Server / do a case insensitive string search when looking for an Upgrade header whose value is websocket. (some client use WebSocket with some upper-case characters)
## [5.1.9] - 2019-09-03
- ws autobahn / report progress with spdlog::info to get timing info
- ws autobahn / use condition variables for stopping test case + add more logging on errors
## [5.1.8] - 2019-09-03
- Per message deflate/compression: handle fragmented messages (fix autobahn test: 12.1.X and probably others)
## [5.1.7] - 2019-09-03
- Receiving invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
## [5.1.6] - 2019-09-03
- Sending invalid UTF-8 TEXT message should fail and close the connection (fix remaining autobahn test: 6.X UTF-8 Handling)
- Fix failing unittest which was sending binary data in text mode with WebSocket::send to call properly call WebSocket::sendBinary instead.
- Validate that the reason is proper utf-8. (fix autobahn test 7.5.1)
- Validate close codes. Autobahn 7.9.*
## [5.1.5] - 2019-09-03
Framentation: data and continuation blocks received out of order (fix autobahn test: 5.9 through 5.20 Fragmentation)
## [5.1.4] - 2019-09-03
Sending invalid UTF-8 TEXT message should fail and close the connection (fix **tons** of autobahn test: 6.X UTF-8 Handling)
@ -19,17 +68,17 @@ Close connections when reserved bits are used (fix autobahn test: 3.X Reserved B
## [5.1.0] - 2019-08-31
ws autobahn / Add code to test websocket client compliance with the autobahn test-suite
add utf-8 validation code, not hooked up properly yet
Ping received with a payload too large (> 125 bytes) trigger a connection closure
cobra / add tracking about published messages
cobra / publish returns a message id, that can be used when
cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system).
- ws autobahn / Add code to test websocket client compliance with the autobahn test-suite
- add utf-8 validation code, not hooked up properly yet
- Ping received with a payload too large (> 125 bytes) trigger a connection closure
- cobra / add tracking about published messages
- cobra / publish returns a message id, that can be used when
- cobra / new message type in the message received handler when publish/ok is received (can be used to implement an ack system).
## [5.0.9] - 2019-08-30
User-Agent header is set when not specified.
New option to cap the max wait between reconnection attempts. Still default to 10s. (setMaxWaitBetweenReconnectionRetries).
- User-Agent header is set when not specified.
- New option to cap the max wait between reconnection attempts. Still default to 10s. (setMaxWaitBetweenReconnectionRetries).
```
ws connect --max_wait 5000 ws://example.com # will only wait 5 seconds max between reconnection attempts

View File

@ -21,6 +21,8 @@ Options for building:
* `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support (default on Windows)
* `-DUSE_WS=1` will build the ws interactive command line tool
If you are on Windows, look at the [appveyor](https://github.com/machinezone/IXWebSocket/blob/master/appveyor.yml) file that has instructions for building dependencies.
### vcpkg
It is possible to get IXWebSocket through Microsoft [vcpkg](https://github.com/microsoft/vcpkg).

View File

@ -32,7 +32,6 @@ The regression test is running after each commit on travis.
* On Windows TLS is not setup yet to validate certificates.
* There is no convenient way to embed a ca cert.
* No utf-8 validation is made when sending TEXT message with sendText()
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue.

View File

@ -95,6 +95,7 @@ namespace ix
}
}
connectionState->setTerminated();
Socket::closeSocket(fd);
_connectedClientsCount--;
}

View File

@ -34,10 +34,7 @@ namespace ix
return true;
#endif
}
}
// This function should be in the global namespace
#ifdef _WIN32
//
// That function could 'return WSAPoll(pfd, nfds, timeout);'
// but WSAPoll is said to have weird behaviors on the internet
@ -47,6 +44,7 @@ namespace ix
//
int poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
#ifdef _WIN32
int maxfd = 0;
fd_set readfds, writefds, errorfds;
FD_ZERO(&readfds);
@ -107,5 +105,9 @@ namespace ix
}
return ret;
}
#else
return ::poll(fds, nfds, timeout);
#endif
}
} // namespace ix

View File

@ -13,11 +13,9 @@
#include <io.h>
#include <ws2def.h>
// Define our own poll on Windows
// Define our own poll on Windows, as a wrapper on top of select
typedef unsigned long int nfds_t;
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
#else
#include <arpa/inet.h>
#include <errno.h>
@ -35,4 +33,6 @@ namespace ix
{
bool initNetSystem();
bool uninitNetSystem();
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
} // namespace ix

View File

@ -79,7 +79,7 @@ namespace ix
}
}
int ret = ::poll(fds, nfds, timeoutMs);
int ret = ix::poll(fds, nfds, timeoutMs);
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0)

View File

@ -0,0 +1,167 @@
/*
* The following code is adapted from code originally written by Bjoern
* Hoehrmann <bjoern@hoehrmann.de>. See
* http://bjoern.hoehrmann.de/utf-8/decoder/dfa/ for details.
*
* The original license:
*
* Copyright (c) 2008-2009 Bjoern Hoehrmann <bjoern@hoehrmann.de>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
/*
* IXUtf8Validator.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*
* From websocketpp. Tiny modifications made for code style, function names etc...
*/
#pragma once
#include <cstdint>
#include <string>
namespace ix
{
/// State that represents a valid utf8 input sequence
static unsigned int const utf8_accept = 0;
/// State that represents an invalid utf8 input sequence
static unsigned int const utf8_reject = 1;
/// Lookup table for the UTF8 decode state machine
static uint8_t const utf8d[] = {
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 00..1f
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 20..3f
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 40..5f
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 60..7f
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, // 80..9f
7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, // a0..bf
8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, // c0..df
0xa,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x4,0x3,0x3, // e0..ef
0xb,0x6,0x6,0x6,0x5,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8, // f0..ff
0x0,0x1,0x2,0x3,0x5,0x8,0x7,0x1,0x1,0x1,0x4,0x6,0x1,0x1,0x1,0x1, // s0..s0
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,1, // s1..s2
1,2,1,1,1,1,1,2,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1, // s3..s4
1,2,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,1,1, // s5..s6
1,3,1,1,1,1,1,3,1,3,1,1,1,1,1,1,1,3,1,1,1,1,1,1,1,1,1,1,1,1,1,1, // s7..s8
};
/// Decode the next byte of a UTF8 sequence
/**
* @param [out] state The decoder state to advance
* @param [out] codep The codepoint to fill in
* @param [in] byte The byte to input
* @return The ending state of the decode operation
*/
inline uint32_t decodeNextByte(uint32_t * state, uint32_t * codep, uint8_t byte)
{
uint32_t type = utf8d[byte];
*codep = (*state != utf8_accept) ?
(byte & 0x3fu) | (*codep << 6) :
(0xff >> type) & (byte);
*state = utf8d[256 + *state*16 + type];
return *state;
}
/// Provides streaming UTF8 validation functionality
class Utf8Validator
{
public:
/// Construct and initialize the validator
Utf8Validator() : m_state(utf8_accept),m_codepoint(0) {}
/// Advance the state of the validator with the next input byte
/**
* @param byte The byte to advance the validation state with
* @return Whether or not the byte resulted in a validation error.
*/
bool consume(uint8_t byte)
{
if (decodeNextByte(&m_state,&m_codepoint,byte) == utf8_reject)
{
return false;
}
return true;
}
/// Advance Validator state with input from an iterator pair
/**
* @param begin Input iterator to the start of the input range
* @param end Input iterator to the end of the input range
* @return Whether or not decoding the bytes resulted in a validation error.
*/
template <typename iterator_type>
bool decode(iterator_type begin, iterator_type end)
{
for (iterator_type it = begin; it != end; ++it)
{
unsigned int result = decodeNextByte(
&m_state,
&m_codepoint,
static_cast<uint8_t>(*it)
);
if (result == utf8_reject)
{
return false;
}
}
return true;
}
/// Return whether the input sequence ended on a valid utf8 codepoint
/**
* @return Whether or not the input sequence ended on a valid codepoint.
*/
bool complete()
{
return m_state == utf8_accept;
}
/// Reset the Validator to decode another message
void reset()
{
m_state = utf8_accept;
m_codepoint = 0;
}
private:
uint32_t m_state;
uint32_t m_codepoint;
};
/// Validate a UTF8 string
/**
* convenience function that creates a Validator, validates a complete string
* and returns the result.
*/
inline bool validateUtf8(std::string const & s)
{
Utf8Validator v;
if (!v.decode(s.begin(),s.end()))
{
return false;
}
return v.complete();
}
} // namespace ix

View File

@ -8,66 +8,11 @@
#include "IXSetThreadName.h"
#include "IXWebSocketHandshake.h"
#include "IXExponentialBackoff.h"
#include "IXUtf8Validator.h"
#include <cmath>
#include <cassert>
namespace
{
//
// Stolen from here http://www.zedwood.com/article/cpp-is-valid-utf8-string-function
// There doesn't seem to be anything in the C++ library so far to do that.
// The closest thing is code for converting from utf-8 to utf-16 or utf-32 but
// that isn't working well for some broken input strings.
//
bool isValidUtf8(const std::string& str)
{
size_t i = 0;
size_t ix = str.length();
int c, n, j;
for (; i < ix; i++)
{
c = (unsigned char) str[i];
//if (c==0x09 || c==0x0a || c==0x0d || (0x20 <= c && c <= 0x7e) ) n = 0; // is_printable_ascii
if (0x00 <= c && c <= 0x7f)
{
n = 0; // 0bbbbbbb
}
else if ((c & 0xE0) == 0xC0)
{
n = 1; // 110bbbbb
}
else if ( c==0xed && i<(ix-1) && ((unsigned char)str[i+1] & 0xa0)==0xa0)
{
return false; //U+d800 to U+dfff
}
else if ((c & 0xF0) == 0xE0)
{
n = 2; // 1110bbbb
}
else if ((c & 0xF8) == 0xF0)
{
n = 3; // 11110bbb
}
//else if (($c & 0xFC) == 0xF8) n=4; // 111110bb //byte 5, unnecessary in 4 byte UTF-8
//else if (($c & 0xFE) == 0xFC) n=5; // 1111110b //byte 6, unnecessary in 4 byte UTF-8
else
{
return false;
}
for (j=0; j<n && i<ix; j++)
{ // n bytes matching 10bbbbbb follow ?
if ((++i == ix) || (( (unsigned char)str[i] & 0xC0) != 0x80))
{
return false;
}
}
}
return true;
}
}
namespace ix
{
@ -459,10 +404,10 @@ namespace ix
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
if (!isValidUtf8(text))
if (!validateUtf8(text))
{
close(WebSocketCloseConstants::kNormalClosureCode,
WebSocketCloseConstants::kInvalidUtf8);
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
return false;
}
return sendMessage(text, SendMessageKind::Text, onProgressCallback);

View File

@ -11,6 +11,7 @@ namespace ix
const uint16_t WebSocketCloseConstants::kNormalClosureCode(1000);
const uint16_t WebSocketCloseConstants::kInternalErrorCode(1011);
const uint16_t WebSocketCloseConstants::kAbnormalCloseCode(1006);
const uint16_t WebSocketCloseConstants::kInvalidFramePayloadData(1007);
const uint16_t WebSocketCloseConstants::kProtocolErrorCode(1002);
const uint16_t WebSocketCloseConstants::kNoStatusCodeErrorCode(1005);
@ -23,5 +24,8 @@ namespace ix
const std::string WebSocketCloseConstants::kProtocolErrorReservedBitUsed("Reserved bit used");
const std::string WebSocketCloseConstants::kProtocolErrorPingPayloadOversized("Ping reason control frame with payload length > 125 octets");
const std::string WebSocketCloseConstants::kProtocolErrorCodeControlMessageFragmented("Control message fragmented");
const std::string WebSocketCloseConstants::kInvalidUtf8("Invalid UTF-8");
const std::string WebSocketCloseConstants::kProtocolErrorCodeDataOpcodeOutOfSequence("Fragmentation: data message out of sequence");
const std::string WebSocketCloseConstants::kProtocolErrorCodeContinuationOpCodeOutOfSequence("Fragmentation: continuation opcode out of sequence");
const std::string WebSocketCloseConstants::kInvalidFramePayloadDataMessage("Invalid frame payload data");
const std::string WebSocketCloseConstants::kInvalidCloseCodeMessage("Invalid close code");
}

View File

@ -18,6 +18,7 @@ namespace ix
static const uint16_t kAbnormalCloseCode;
static const uint16_t kProtocolErrorCode;
static const uint16_t kNoStatusCodeErrorCode;
static const uint16_t kInvalidFramePayloadData;
static const std::string kNormalClosureMessage;
static const std::string kInternalErrorMessage;
@ -28,6 +29,9 @@ namespace ix
static const std::string kProtocolErrorReservedBitUsed;
static const std::string kProtocolErrorPingPayloadOversized;
static const std::string kProtocolErrorCodeControlMessageFragmented;
static const std::string kInvalidUtf8;
static const std::string kProtocolErrorCodeDataOpcodeOutOfSequence;
static const std::string kProtocolErrorCodeContinuationOpCodeOutOfSequence;
static const std::string kInvalidFramePayloadDataMessage;
static const std::string kInvalidCloseCodeMessage;
};
} // namespace ix

View File

@ -295,7 +295,7 @@ namespace ix
return sendErrorResponse(400, "Missing Sec-WebSocket-Key value");
}
if (headers["upgrade"] != "websocket")
if (!insensitiveStringCompare(headers["upgrade"], "WebSocket"))
{
return sendErrorResponse(400, "Invalid or missing Upgrade header");
}
@ -326,6 +326,7 @@ namespace ix
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";
ss << "Server: " << userAgent() << "\r\n";
// Parse the client headers. Does it support deflate ?
std::string header = headers["sec-websocket-extensions"];

View File

@ -33,6 +33,8 @@ namespace ix
_serverNoContextTakeover = serverNoContextTakeover;
_clientMaxWindowBits = clientMaxWindowBits;
_serverMaxWindowBits = serverMaxWindowBits;
sanitizeClientMaxWindowBits();
}
//
@ -107,10 +109,22 @@ namespace ix
_clientMaxWindowBits =
std::min(maxClientMaxWindowBits,
std::max(x, minClientMaxWindowBits));
sanitizeClientMaxWindowBits();
}
}
}
void WebSocketPerMessageDeflateOptions::sanitizeClientMaxWindowBits()
{
// zlib/deflate has a bug with windowsbits == 8, so we silently upgrade it to 9
// See https://bugs.chromium.org/p/chromium/issues/detail?id=691074
if (_clientMaxWindowBits == 8)
{
_clientMaxWindowBits = 9;
}
}
std::string WebSocketPerMessageDeflateOptions::generateHeader()
{
std::stringstream ss;

View File

@ -41,5 +41,7 @@ namespace ix
bool _serverNoContextTakeover;
int _clientMaxWindowBits;
int _serverMaxWindowBits;
void sanitizeClientMaxWindowBits();
};
} // namespace ix

View File

@ -93,7 +93,7 @@ namespace ix
else
{
std::stringstream ss;
ss << "WebSocketServer::handleConnection() error: "
ss << "WebSocketServer::handleConnection() HTTP status: "
<< status.http_status
<< " error: "
<< status.errorStr;
@ -111,6 +111,8 @@ namespace ix
logInfo("WebSocketServer::handleConnection() done");
connectionState->setTerminated();
Socket::closeSocket(fd);
}
std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()

View File

@ -37,6 +37,7 @@
#include "IXWebSocketHttpHeaders.h"
#include "IXUrlParser.h"
#include "IXSocketFactory.h"
#include "IXUtf8Validator.h"
#include <string.h>
#include <stdlib.h>
@ -76,6 +77,7 @@ namespace ix
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_compressedMessage(false),
_readyState(ReadyState::CLOSED),
_closeCode(WebSocketCloseConstants::kInternalErrorCode),
_closeReason(WebSocketCloseConstants::kInternalErrorMessage),
@ -551,19 +553,22 @@ namespace ix
|| ws.opcode == wsheader_type::PONG
|| ws.opcode == wsheader_type::CLOSE
)){
// Cntrol messages should not be fragmented
// Control messages should not be fragmented
close(WebSocketCloseConstants::kProtocolErrorCode,
WebSocketCloseConstants::kProtocolErrorCodeControlMessageFragmented);
return;
}
unmaskReceiveBuffer(ws);
std::string frameData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N);
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
unmaskReceiveBuffer(ws);
if (ws.opcode != wsheader_type::CONTINUATION)
{
@ -571,6 +576,21 @@ namespace ix
(ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
_compressedMessage = _enablePerMessageDeflate && ws.rsv1;
// Continuation message needs to follow a non-fin TEXT or BINARY message
if (!_chunks.empty())
{
close(WebSocketCloseConstants::kProtocolErrorCode,
WebSocketCloseConstants::kProtocolErrorCodeDataOpcodeOutOfSequence);
}
}
else if (_chunks.empty())
{
// Continuation message need to follow a non-fin TEXT or BINARY message
close(WebSocketCloseConstants::kProtocolErrorCode,
WebSocketCloseConstants::kProtocolErrorCodeContinuationOpCodeOutOfSequence);
}
//
@ -579,10 +599,11 @@ namespace ix
if (ws.fin && _chunks.empty())
{
emitMessage(_fragmentedMessageKind,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
frameData,
_compressedMessage,
onMessageCallback);
_compressedMessage = false;
}
else
{
@ -593,30 +614,26 @@ namespace ix
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
_chunks.emplace_back(frameData);
if (ws.fin)
{
emitMessage(_fragmentedMessageKind, getMergedChunks(),
ws, onMessageCallback);
_compressedMessage, onMessageCallback);
_chunks.clear();
_compressedMessage = false;
}
else
{
emitMessage(MessageKind::FRAGMENT, std::string(), ws, onMessageCallback);
emitMessage(MessageKind::FRAGMENT, std::string(), false, onMessageCallback);
}
}
}
else if (ws.opcode == wsheader_type::PING)
{
unmaskReceiveBuffer(ws);
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
// too large
if (pingData.size() > 125)
if (frameData.size() > 125)
{
// Unexpected frame type
close(WebSocketCloseConstants::kProtocolErrorCode,
@ -628,29 +645,23 @@ namespace ix
{
// Reply back right away
bool compress = false;
sendData(wsheader_type::PONG, pingData, compress);
sendData(wsheader_type::PONG, frameData, compress);
}
emitMessage(MessageKind::PING, pingData, ws, onMessageCallback);
emitMessage(MessageKind::PING, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::PONG)
{
unmaskReceiveBuffer(ws);
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
std::lock_guard<std::mutex> lck(_lastReceivePongTimePointMutex);
_lastReceivePongTimePoint = std::chrono::steady_clock::now();
emitMessage(MessageKind::PONG, pongData, ws, onMessageCallback);
emitMessage(MessageKind::PONG, frameData, false, onMessageCallback);
}
else if (ws.opcode == wsheader_type::CLOSE)
{
std::string reason;
uint16_t code = 0;
unmaskReceiveBuffer(ws);
if (ws.N >= 2)
{
// Extract the close code first, available as the first 2 bytes
@ -660,8 +671,28 @@ namespace ix
// Get the reason.
if (ws.N > 2)
{
reason.assign(_rxbuf.begin()+ws.header_size + 2,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
reason = frameData.substr(2, frameData.size());
}
// Validate that the reason is proper utf-8. Autobahn 7.5.1
if (!validateUtf8(reason))
{
code = WebSocketCloseConstants::kInvalidFramePayloadData;
reason = WebSocketCloseConstants::kInvalidFramePayloadDataMessage;
}
// Validate close codes. Autobahn 7.9.*
// 1014, 1015 are debattable. The firefox MSDN has a description for them
if (code < 1000 || code == 1004 || code == 1006 ||
(code > 1013 && code < 3000))
{
// build up an error message containing the bad error code
std::stringstream ss;
ss << WebSocketCloseConstants::kInvalidCloseCodeMessage
<< ": " << code;
reason = ss.str();
code = WebSocketCloseConstants::kProtocolErrorCode;
}
}
else
@ -747,8 +778,7 @@ namespace ix
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
msg += chunk;
}
return msg;
@ -756,21 +786,38 @@ namespace ix
void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback)
{
size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != MessageKind::FRAGMENT)
if (compressedMessage && messageKind != MessageKind::FRAGMENT)
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(decompressedMessage))
{
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
}
else
{
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
}
}
else
{
onMessageCallback(message, wireSize, false, messageKind);
if (messageKind == MessageKind::MSG_TEXT && !validateUtf8(message))
{
close(WebSocketCloseConstants::kInvalidFramePayloadData,
WebSocketCloseConstants::kInvalidFramePayloadDataMessage);
}
else
{
onMessageCallback(message, wireSize, false, messageKind);
}
}
}
@ -819,6 +866,8 @@ namespace ix
message_end = compressedMessage.end();
}
_txbuf.reserve(wireSize);
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)
{
@ -906,8 +955,9 @@ namespace ix
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed
if (compress)
// The rsv1 bit indicate that the frame is compressed
// continuation opcodes should not set it. Autobahn 12.2.10 and others 12.X
if (compress && type != wsheader_type::CONTINUATION)
{
header[0] |= 0x40;
}

View File

@ -149,13 +149,16 @@ namespace ix
// messages (tested messages up to 700M) and we cannot put them in a single
// buffer that is resized, as this operation can be slow when a buffer has its
// size increased 2 fold, while appending to a list has a fixed cost.
std::list<std::vector<uint8_t>> _chunks;
std::list<std::string> _chunks;
// Record the message kind (will be TEXT or BINARY) for a fragmented
// message, present in the first chunk, since the final chunk will be a
// CONTINUATION opcode and doesn't tell the full message kind
MessageKind _fragmentedMessageKind;
// Ditto for whether a message is compressed
bool _compressedMessage;
// Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15;
@ -244,7 +247,7 @@ namespace ix
void emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
bool compressedMessage,
const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const;

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "5.1.4"
#define IX_WEBSOCKET_VERSION "6.2.0"

View File

@ -63,6 +63,9 @@ test:
ws_test: ws
(cd ws ; env DEBUG=1 PATH=../ws/build:$$PATH bash test_ws.sh)
autobahn_report:
cp -rvf ~/sandbox/reports/clients/* ../bsergean.github.io/IXWebSocket/autobahn/
# For the fork that is configured with appveyor
rebase_upstream:
git fetch upstream

View File

@ -17,11 +17,15 @@ endif()
add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
set (WS ../ws)
include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party
../third_party/msgpack11
../third_party/spdlog/include
../ws
../ws/snake
)
# Shared sources
@ -30,7 +34,24 @@ set (SOURCES
IXTest.cpp
IXGetFreePort.cpp
../third_party/msgpack11/msgpack11.cpp
../ws/ixcore/utils/IXCoreLogger.cpp
../third_party/jsoncpp/jsoncpp.cpp
${WS}/ixcore/utils/IXCoreLogger.cpp
${WS}/ixcrypto/IXBase64.cpp
${WS}/ixcrypto/IXHash.cpp
${WS}/ixcrypto/IXUuid.cpp
${WS}/ixcrypto/IXHMac.cpp
${WS}/ixcobra/IXCobraConnection.cpp
${WS}/ixcobra/IXCobraMetricsPublisher.cpp
${WS}/ixcobra/IXCobraMetricsThreadedPublisher.cpp
${WS}/snake/IXSnakeServer.cpp
${WS}/snake/IXSnakeProtocol.cpp
${WS}/snake/IXAppConfig.cpp
${WS}/IXRedisClient.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
@ -49,6 +70,8 @@ if (UNIX)
IXDNSLookupTest.cpp
cmd_websocket_chat.cpp
IXWebSocketCloseTest.cpp
IXCobraChatTest.cpp
IXCobraMetricsPublisherTest.cpp
)
endif()

356
test/IXCobraChatTest.cpp Normal file
View File

@ -0,0 +1,356 @@
/*
* cmd_satori_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <chrono>
#include <ixcobra/IXCobraConnection.h>
#include <ixcrypto/IXUuid.h>
#include "IXTest.h"
#include "IXSnakeServer.h"
#include "catch.hpp"
using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback(
[](size_t size, bool incoming)
{
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
}
);
}
class SatoriChat
{
public:
SatoriChat(const std::string& user,
const std::string& session,
const std::string& endpoint);
void subscribe(const std::string& channel);
void start();
void stop();
void run();
bool isReady() const;
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
bool hasPendingMessages() const;
Json::Value popMessage();
private:
std::string _user;
std::string _session;
std::string _endpoint;
std::queue<Json::Value> _publish_queue;
mutable std::mutex _queue_mutex;
std::thread _thread;
std::atomic<bool> _stop;
ix::CobraConnection _conn;
std::atomic<bool> _connectedAndSubscribed;
std::queue<Json::Value> _receivedQueue;
std::mutex _logMutex;
};
SatoriChat::SatoriChat(const std::string& user,
const std::string& session,
const std::string& endpoint) :
_connectedAndSubscribed(false),
_stop(false),
_user(user),
_session(session),
_endpoint(endpoint)
{
}
void SatoriChat::start()
{
_thread = std::thread(&SatoriChat::run, this);
}
void SatoriChat::stop()
{
_stop = true;
_thread.join();
}
bool SatoriChat::isReady() const
{
return _connectedAndSubscribed;
}
size_t SatoriChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
}
bool SatoriChat::hasPendingMessages() const
{
std::unique_lock<std::mutex> lock(_queue_mutex);
return !_publish_queue.empty();
}
Json::Value SatoriChat::popMessage()
{
std::unique_lock<std::mutex> lock(_queue_mutex);
auto msg = _publish_queue.front();
_publish_queue.pop();
return msg;
}
//
// Callback to handle received messages, that are printed on the console
//
void SatoriChat::subscribe(const std::string& channel)
{
std::string filter;
_conn.subscribe(channel, filter,
[this](const Json::Value& msg)
{
std::cout << msg.toStyledString() << std::endl;
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in our own messages
if (msg_user == _user) return;
_receivedQueue.push(msg);
std::stringstream ss;
ss << std::endl
<< msg_user << " > " << msg_text
<< std::endl
<< _user << " > ";
log(ss.str());
});
}
void SatoriChat::sendMessage(const std::string& text)
{
Json::Value msg;
msg["user"] = _user;
msg["session"] = _session;
msg["text"] = text;
std::unique_lock<std::mutex> lock(_queue_mutex);
_publish_queue.push(msg);
}
//
// Do satori communication on a background thread, where we can have
// something like an event loop that publish, poll and receive data
//
void SatoriChat::run()
{
snake::AppConfig appConfig = makeSnakeServerConfig(8008);
// Display config on the terminal for debugging
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string channel = _session;
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
_conn.configure(appkey, _endpoint, role, secret,
ix::WebSocketPerMessageDeflateOptions(true));
_conn.connect();
_conn.setEventCallback(
[this, channel]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
log("Subscriber connected: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated: " + _user);
subscribe(channel);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
log(errMsg + _user);
}
else if (eventType == ix::CobraConnection_EventType_Closed)
{
log("Connection closed: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
log("Subscription ok: " + _user + " subscription_id " + subscriptionId);
_connectedAndSubscribed = true;
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
}
}
);
while (!_stop)
{
{
while (hasPendingMessages())
{
auto msg = popMessage();
std::string text = msg["text"].asString();
std::stringstream ss;
ss << "Sending msg [" << text << "]";
log(ss.str());
Json::Value channels;
channels.append(channel);
_conn.publish(channels, msg);
}
}
ix::msleep(50);
}
_conn.unsubscribe(channel);
ix::msleep(50);
_conn.disconnect();
_conn.setEventCallback([]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
;
});
snakeServer.stop();
}
}
TEST_CASE("Cobra_chat", "[cobra_chat]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
int timeout;
setupTrafficTrackerCallback();
std::string session = ix::generateSessionId();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
SatoriChat chatA("jean", session, endpoint);
SatoriChat chatB("paul", session, endpoint);
chatA.start();
chatB.start();
// Wait for all chat instance to be ready
timeout = 10 * 1000; // 10s
while (true)
{
if (chatA.isReady() && chatB.isReady()) break;
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
// Add a bit of extra time, for the subscription to be active
ix::msleep(1000);
chatA.sendMessage("from A1");
chatA.sendMessage("from A2");
chatA.sendMessage("from A3");
chatB.sendMessage("from B1");
chatB.sendMessage("from B2");
// 1. Wait for all messages to be sent
timeout = 10 * 1000; // 10s
while (chatA.hasPendingMessages() || chatB.hasPendingMessages())
{
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
// Give us 1s for all messages to be received
ix::msleep(1000);
chatA.stop();
chatB.stop();
// FIXME: improve this and make it exact matches
// we get unreliable result set
REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
std::cout << incomingBytes << std::endl;
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
snakeServer.stop();
}
}

View File

@ -0,0 +1,237 @@
/*
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <set>
#include <ixcrypto/IXUuid.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include "IXTest.h"
#include "IXSnakeServer.h"
#include "catch.hpp"
using namespace ix;
namespace
{
//
// This project / appkey is configure on cobra to not do any batching.
// This way we can start a subscriber and receive all messages as they come in.
//
std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f");
std::string CHANNEL("unittest_channel");
std::string PUBLISHER_ROLE("_pub");
std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db");
std::string SUBSCRIBER_ROLE("_sub");
std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba");
std::atomic<bool> gStop;
std::atomic<bool> gSubscriberConnectedAndSubscribed;
std::atomic<int> gUniqueMessageIdsCount;
std::atomic<int> gMessageCount;
std::set<std::string> gIds;
std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex.
//
// Background thread subscribe to the channel and validates what was sent
//
void startSubscriber(const std::string& endpoint)
{
gSubscriberConnectedAndSubscribed = false;
gUniqueMessageIdsCount = 0;
gMessageCount = 0;
ix::CobraConnection conn;
conn.configure(APPKEY, endpoint, SUBSCRIBER_ROLE, SUBSCRIBER_SECRET,
ix::WebSocketPerMessageDeflateOptions(true));
conn.connect();
conn.setEventCallback(
[&conn]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
Logger() << "Subscriber connected:";
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated");
std::string filter;
conn.subscribe(CHANNEL, filter,
[](const Json::Value& msg)
{
log(msg.toStyledString());
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
gMessageCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
Logger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == CHANNEL)
{
gSubscriberConnectedAndSubscribed = true;
}
else
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
Logger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != CHANNEL)
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
}
}
);
while (!gStop)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
conn.unsubscribe(CHANNEL);
conn.disconnect();
gUniqueMessageIdsCount = gIds.size();
}
}
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
// Make channel name unique
CHANNEL += uuid4();
gStop = false;
std::thread bgThread(&startSubscriber, endpoint);
int timeout = 10 * 1000; // 10s
// Wait until the subscriber is ready (authenticated + subscription successful)
while (!gSubscriberConnectedAndSubscribed)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
ix::CobraMetricsPublisher cobraMetricsPublisher;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(APPKEY, endpoint, CHANNEL,
PUBLISHER_ROLE, PUBLISHER_SECRET, perMessageDeflate);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
Json::Value data;
data["foo"] = "bar";
// (1) Publish without restrictions
cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2)
// (2) Restrict what is sent using a blacklist
// Add one entry to the blacklist
// (will send msg #3)
cobraMetricsPublisher.setBlacklist({
"sms_metric_B_id" // this id will not be sent
});
// (msg #4)
cobraMetricsPublisher.push("sms_metric_A_id", data);
// ...
cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent
// Reset the blacklist
// (msg #5)
cobraMetricsPublisher.setBlacklist({}); // 4.
// (3) Restrict what is sent using rate control
// (msg #6)
cobraMetricsPublisher.setRateControl({
{"sms_metric_C_id", 1}, // published once per minute (60 seconds) max
});
// (msg #7)
cobraMetricsPublisher.push("sms_metric_C_id", data);
cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent
ix::msleep(1400);
// (msg #8)
cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent
ix::msleep(600); // wait a bit so that the last message is sent and can be received
log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher.");
// Test suspend + resume
for (int i = 0 ; i < 3 ; ++i)
{
cobraMetricsPublisher.suspend();
ix::msleep(500);
REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore
cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time
cobraMetricsPublisher.resume();
ix::msleep(2000); // give cobra 2s to connect
REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now
cobraMetricsPublisher.push("sms_metric_E_id", data);
}
ix::msleep(500);
// Now stop the thread
gStop = true;
bgThread.join();
//
// Validate that we received all message kinds, and the correct number of messages
//
CHECK(gIds.count("sms_metric_A_id") == 1);
CHECK(gIds.count("sms_metric_B_id") == 1);
CHECK(gIds.count("sms_metric_C_id") == 1);
CHECK(gIds.count("sms_metric_D_id") == 1);
CHECK(gIds.count("sms_metric_E_id") == 1);
CHECK(gIds.count("sms_set_rate_control_id") == 1);
CHECK(gIds.count("sms_set_blacklist_id") == 1);
snakeServer.stop();
}

55
test/IXHttpTest.cpp Normal file
View File

@ -0,0 +1,55 @@
/*
* IXHttpTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXHttp.h>
#include "catch.hpp"
#include <string.h>
namespace ix
{
TEST_CASE("http", "[http]")
{
SECTION("Normal case")
{
std::string line = "HTTP/1.1 200";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "HTTP/1.1");
REQUIRE(result.second == 200);
}
SECTION("http/1.0 case")
{
std::string line = "HTTP/1.0 200";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "HTTP/1.0");
REQUIRE(result.second == 200);
}
SECTION("empty case")
{
std::string line = "";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "");
REQUIRE(result.second == -1);
}
SECTION("empty case")
{
std::string line = "HTTP/1.1";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "HTTP/1.1");
REQUIRE(result.second == -1);
}
}
}

View File

@ -137,4 +137,57 @@ namespace ix
server.start();
return true;
}
std::vector<uint8_t> load(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize((size_t) size);
file.read((char*)&memblock.front(), static_cast<std::streamsize>(size));
return memblock;
}
std::string readAsString(const std::string& path)
{
auto vec = load(path);
return std::string(vec.begin(), vec.end());
}
snake::AppConfig makeSnakeServerConfig(int port)
{
snake::AppConfig appConfig;
appConfig.port = port;
appConfig.hostname = "127.0.0.1";
appConfig.verbose = true;
appConfig.redisPort = 6379;
appConfig.redisPassword = "";
appConfig.redisHosts.push_back("localhost"); // only one host supported now
std::string appsConfigPath("appsConfig.json");
// Parse config file
auto str = readAsString(appsConfigPath);
if (str.empty())
{
std::cout << "Cannot read content of " << appsConfigPath << std::endl;
return appConfig;
}
std::cout << str << std::endl;
auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"];
// Display config on the terminal for debugging
dumpConfig(appConfig);
return appConfig;
}
}

View File

@ -9,6 +9,7 @@
#include "IXGetFreePort.h"
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include "IXAppConfig.h"
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
@ -48,4 +49,6 @@ namespace ix
void log(const std::string& msg);
bool startWebSocketEchoServer(ix::WebSocketServer& server);
snake::AppConfig makeSnakeServerConfig(int port);
} // namespace ix

14
test/appsConfig.json Normal file
View File

@ -0,0 +1,14 @@
{
"apps": {
"FC2F10139A2BAc53BB72D9db967b024f": {
"roles": {
"_sub": {
"secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"
},
"_pub": {
"secret": "1c04DB8fFe76A4EeFE3E318C72d771db"
}
}
}
}
}

View File

@ -206,7 +206,7 @@ namespace
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.send(encodeMessage(text));
_webSocket.sendBinary(encodeMessage(text));
}
bool startServer(ix::WebSocketServer& server)
@ -239,7 +239,7 @@ namespace
{
if (client != webSocket)
{
client->send(msg->str);
client->sendBinary(msg->str);
}
}
}

View File

@ -128,6 +128,8 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback)
{
_stop = false;
if (!_socket) return false;
std::stringstream ss;
@ -159,7 +161,7 @@ namespace ix
if (!lineValid) return false;
// There are 5 items for the subscribe repply
// There are 5 items for the subscribe reply
for (int i = 0; i < 5; ++i)
{
auto lineResult = _socket->readLine(nullptr);
@ -175,13 +177,21 @@ namespace ix
// Wait indefinitely for new messages
while (true)
{
if (_stop) break;
// Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1);
int timeoutMs = 10;
auto pollResult = _socket->isReadyToRead(timeoutMs);
if (pollResult == PollResultType::Error)
{
return false;
}
if (pollResult == PollResultType::Timeout)
{
continue;
}
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
@ -231,4 +241,9 @@ namespace ix
return true;
}
void RedisClient::stop()
{
_stop = true;
}
}

View File

@ -8,6 +8,7 @@
#include <functional>
#include <memory>
#include <atomic>
namespace ix
{
@ -19,7 +20,7 @@ namespace ix
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
RedisClient() : _stop(false) {}
~RedisClient() = default;
bool connect(const std::string& hostname, int port);
@ -32,9 +33,12 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback);
void stop();
private:
std::string writeString(const std::string& str);
std::shared_ptr<Socket> _socket;
std::atomic<bool> _stop;
};
} // namespace ix

View File

@ -13,6 +13,7 @@
#include <cmath>
#include <cassert>
#include <cstring>
#include <iostream>
namespace ix
@ -191,7 +192,7 @@ namespace ix
{
if (!handleUnsubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", msg->str);
invokeErrorCallback("Error processing unsubscribe response", msg->str);
}
}
else if (action == "rtm/unsubscribe/error")
@ -300,6 +301,8 @@ namespace ix
//
bool CobraConnection::handleHandshakeResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -349,6 +352,8 @@ namespace ix
bool CobraConnection::handleSubscriptionResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -365,6 +370,8 @@ namespace ix
bool CobraConnection::handleUnsubscriptionResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -381,6 +388,8 @@ namespace ix
bool CobraConnection::handleSubscriptionData(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -407,6 +416,8 @@ namespace ix
bool CobraConnection::handlePublishResponse(const Json::Value& pdu)
{
if (!pdu.isObject()) return false;
if (!pdu.isMember("id")) return false;
Json::Value id = pdu["id"];
@ -453,6 +464,8 @@ namespace ix
{
invokePublishTrackerCallback(true, false);
CobraConnection::MsgId msgId = _id;
_body["channels"] = channels;
_body["message"] = msg;
_pdu["body"] = _body;
@ -460,27 +473,22 @@ namespace ix
std::string serializedJson = serializeJson(_pdu);
if (_publishMode == CobraConnection_PublishMode_Batch)
//
// 1. When we use batch mode, we just enqueue and will do the flush explicitely
// 2. When we aren't authenticated yet to the cobra server, we need to enqueue
// and retry later
// 3. If the network connection was droped (WebSocket::send will return false),
// it means the message won't be sent so we need to enqueue as well.
//
// The order of the conditionals is important.
//
if (_publishMode == CobraConnection_PublishMode_Batch || !_authenticated ||
!publishMessage(serializedJson))
{
enqueue(serializedJson);
return _id - 1;
}
//
// Fast path. We are authenticated and the publishing succeed
// This should happen for 99% of the cases.
//
if (_authenticated && publishMessage(serializedJson))
{
return _id - 1;
}
else // Or else we enqueue
// Slow code path is when we haven't connected yet (startup),
// or when the connection drops for some reason.
{
enqueue(serializedJson);
return 0;
}
return msgId;
}
void CobraConnection::subscribe(const std::string& channel,

View File

@ -6,7 +6,7 @@
#pragma once
#include "nlohmann/json.hpp"
#include <nlohmann/json.hpp>
#include <string>
#include <vector>

View File

@ -163,6 +163,14 @@ namespace snake
return;
}
}
nlohmann::json response = {
{"action", "rtm/publish/ok"},
{"id", pdu.value("id", 1)},
{"body", {}}
};
ws->sendText(response.dump());
}
//
@ -220,12 +228,14 @@ namespace snake
{
auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
{"body", {
{"subscription_id", subscriptionId},
{"messages", {{msg}}}
{"messages", {msg}}
}}
};
@ -271,6 +281,27 @@ namespace snake
pdu);
}
void handleUnSubscribe(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const nlohmann::json& pdu)
{
// extract subscription_id
auto body = pdu["body"];
auto subscriptionId = body["subscription_id"];
state->redisClient().stop();
nlohmann::json response = {
{"action", "rtm/unsubscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {
{"subscription_id", subscriptionId}
}}
};
ws->sendText(response.dump());
}
void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
@ -299,6 +330,10 @@ namespace snake
{
handleSubscribe(state, ws, appConfig, pdu);
}
else if (action == "rtm/unsubscribe")
{
handleUnSubscribe(state, ws, pdu);
}
else
{
std::cerr << "Unhandled action: " << action << std::endl;

View File

@ -4,10 +4,10 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <IXSnakeServer.h>
#include <IXSnakeProtocol.h>
#include <IXSnakeConnectionState.h>
#include <IXAppConfig.h>
#include "IXSnakeServer.h"
#include "IXSnakeProtocol.h"
#include "IXSnakeConnectionState.h"
#include "IXAppConfig.h"
#include <iostream>
#include <sstream>
@ -118,8 +118,19 @@ namespace snake
}
_server.start();
_server.wait();
return true;
}
void SnakeServer::runForever()
{
if (run())
{
_server.wait();
}
}
void SnakeServer::stop()
{
_server.stop();
}
}

View File

@ -19,6 +19,8 @@ namespace snake
~SnakeServer() = default;
bool run();
void runForever();
void stop();
private:
std::string parseAppKey(const std::string& path);

View File

@ -42,6 +42,23 @@
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
#include <spdlog/spdlog.h>
namespace
{
std::string truncate(const std::string& str, size_t n)
{
if (str.size() < n)
{
return str;
}
else
{
return str.substr(0, n) + "...";
}
}
}
namespace ix
{
@ -117,7 +134,7 @@ namespace ix
ss << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "autobahn: received message: "
<< msg->str
<< truncate(msg->str, 40)
<< std::endl;
_webSocket.send(msg->str, msg->binary);
@ -161,7 +178,7 @@ namespace ix
_webSocket.stop();
}
void generateReport(const std::string& url)
bool generateReport(const std::string& url)
{
ix::WebSocket webSocket;
std::string reportUrl(url);
@ -169,14 +186,16 @@ namespace ix
webSocket.setUrl(reportUrl);
webSocket.disableAutomaticReconnection();
std::atomic<bool> done(false);
std::atomic<bool> success(true);
std::condition_variable condition;
webSocket.setOnMessageCallback(
[&done](const ix::WebSocketMessagePtr& msg)
[&condition, &success](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Report generated" << std::endl;
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -186,18 +205,24 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
success = false;
}
}
);
webSocket.start();
while (!done)
webSocket.start();
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (!success)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
spdlog::error("Cannot generate report at url {}", reportUrl);
}
webSocket.stop();
return success;
}
int getTestCaseCount(const std::string& url)
@ -208,15 +233,15 @@ namespace ix
webSocket.setUrl(caseCountUrl);
webSocket.disableAutomaticReconnection();
int count = 0;
int count = -1;
std::condition_variable condition;
std::atomic<bool> done(false);
webSocket.setOnMessageCallback(
[&done, &count](const ix::WebSocketMessagePtr& msg)
[&condition, &count](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Close)
{
done = true;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
@ -226,6 +251,8 @@ namespace ix
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str() << std::endl;
condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
@ -236,16 +263,18 @@ namespace ix
}
}
);
webSocket.start();
while (!done)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock);
webSocket.stop();
if (count == -1)
{
spdlog::error("Cannot retrieve test case count at url {}", caseCountUrl);
}
return count;
}
@ -254,14 +283,20 @@ namespace ix
//
int ws_autobahn_main(const std::string& url, bool quiet)
{
int N = getTestCaseCount(url);
std::cerr << "Test cases count: " << N << std::endl;
int testCasesCount = getTestCaseCount(url);
std::cerr << "Test cases count: " << testCasesCount << std::endl;
N++;
for (int i = 1 ; i < N; ++i)
if (testCasesCount == -1)
{
std::cerr << "Execute test case " << i << std::endl;
spdlog::error("Cannot retrieve test case count at url {}", url);
return 1;
}
testCasesCount++;
for (int i = 1 ; i < testCasesCount; ++i)
{
spdlog::info("Execute test case {}", i);
int caseNumber = i;
@ -277,9 +312,7 @@ namespace ix
testCase.run();
}
generateReport(url);
return 0;
return generateReport(url) ? 0 : 1;
}
}

View File

@ -76,6 +76,8 @@ namespace ix
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
return snakeServer.run() ? 0 : 1;
snakeServer.runForever();
return 0; // should never reach this
}
}