Compare commits

...

29 Commits

Author SHA1 Message Date
f7a12f52f8 On error while doing a client handshake, additionally display port number next to the host name 2019-09-17 12:08:52 -07:00
1be3b8f4b1 rename test file 2019-09-17 12:07:31 -07:00
0b844d8361 make test target does not try to install anything into /usr/local 2019-09-12 11:45:31 -07:00
57086e28d8 fix unittest warnings + remove trailing spaces 2019-09-12 11:43:52 -07:00
a55d4cdb76 update pre-commit file 2019-09-10 22:18:16 -07:00
40a45717db update clang format file 2019-09-10 22:17:08 -07:00
e853d9ac60 build fixes 2019-09-10 14:05:07 -07:00
4ec0d9b113 update appveyor file to new directory structure 2019-09-10 12:33:47 -07:00
0fde169aa4 restructure project 2019-09-10 12:19:22 -07:00
c09015e870 update ws CLI11 (our command line argument parsing library) to the latest, which fix a compiler bug about optional 2019-09-09 21:25:33 -07:00
7bfa6e8478 improve some websocket error messages + add a utility function with unittest to parse status line and stop using scanf which triggers warnings on Windows 2019-09-09 21:23:57 -07:00
983df2d8f9 improve some websocket error messages + add a utility function with unittest to parse status line and stop using scanf which triggers warnings on Windows 2019-09-09 17:34:36 -07:00
6beba16ca7 websocket and http server: server does not close the bound client socket in many cases 2019-09-09 16:48:26 -07:00
48cefe5525 move poll wrapper on top of select (only used on Windows) to the ix namespace 2019-09-08 11:15:08 -07:00
ae3856c10f Fix Windows CI with appveyor (#110)
Fix windows CI with appveyor + minor tweaks.
2019-09-07 14:07:00 -07:00
260a94d3b0 README: update link to the doc 2019-09-06 10:42:48 -07:00
88c6d6c4cb ci 2019-09-05 22:32:54 -07:00
d5a4931c92 travis linux 2019-09-05 22:29:00 -07:00
11f4e90bc6 ci tweak / install redis 2019-09-05 22:14:55 -07:00
2ce65e7a77 cobra metrics publisher test uses random free port 2019-09-05 22:05:00 -07:00
e81c2c3e5c cobra chat test uses random free port 2019-09-05 22:02:10 -07:00
e40dda7549 add cobra metrics publisher + server unittest 2019-09-05 21:57:05 -07:00
d959d73261 Add new cobra unittest, using server and client 2019-09-05 20:49:58 -07:00
07b7e37a92 snake unsubscription fixes 2019-09-05 20:47:15 -07:00
eb7888347a Fix compiler warning 2019-09-05 20:29:14 -07:00
d8664f4988 ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled 2019-09-05 20:28:34 -07:00
5e94791b13 IXCobraConnection / pdu handlers can crash if they receive json data which is not an object 2019-09-05 20:24:42 -07:00
3e3f7171fc cobra publish fix 2019-09-05 14:31:28 -07:00
308fda0b37 Update README.md 2019-09-05 14:30:51 -07:00
76 changed files with 4174 additions and 1046 deletions

View File

@ -11,7 +11,7 @@ AlignTrailingComments: true
AllowAllParametersOfDeclarationOnNextLine: true AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: true AllowShortCaseLabelsOnASingleLine: true
AllowShortFunctionsOnASingleLine: InlineOnly AllowShortFunctionsOnASingleLine: false
AllowShortIfStatementsOnASingleLine: true AllowShortIfStatementsOnASingleLine: true
AllowShortLoopsOnASingleLine: false AllowShortLoopsOnASingleLine: false
AlwaysBreakTemplateDeclarations: true AlwaysBreakTemplateDeclarations: true
@ -42,5 +42,6 @@ NamespaceIndentation: All
PenaltyReturnTypeOnItsOwnLine: 1000 PenaltyReturnTypeOnItsOwnLine: 1000
PointerAlignment: Left PointerAlignment: Left
SpaceAfterTemplateKeyword: false SpaceAfterTemplateKeyword: false
SpaceAfterCStyleCast: true
Standard: Cpp11 Standard: Cpp11
UseTab: Never UseTab: Never

View File

@ -5,8 +5,3 @@ repos:
- id: check-yaml - id: check-yaml
- id: end-of-file-fixer - id: end-of-file-fixer
- id: trailing-whitespace - id: trailing-whitespace
- repo: https://github.com/pocc/pre-commit-hooks
rev: ''
hooks:
- id: clang-format

View File

@ -6,23 +6,26 @@ language: bash
matrix: matrix:
include: include:
# macOS # macOS
- os: osx # - os: osx
env: # env:
- HOMEBREW_NO_AUTO_UPDATE=1 # - HOMEBREW_NO_AUTO_UPDATE=1
compiler: clang # compiler: clang
script: # script:
- brew install mbedtls # - brew install redis
- python test/run.py # - brew services start redis
- make ws # - brew install mbedtls
# - python test/run.py
# - make ws
# Linux Linux
- os: linux - os: linux
dist: bionic dist: bionic
before_install: before_install:
- sudo apt-get install -y libmbedtls-dev - sudo apt-get install -y libmbedtls-dev
- sudo apt-get install -y redis-server
script: script:
- python test/run.py - python test/run.py
- make ws # - make ws
env: env:
- CC=gcc - CC=gcc
- CXX=g++ - CXX=g++

View File

@ -12,8 +12,7 @@ set (CMAKE_CXX_STANDARD 14)
set (CXX_STANDARD_REQUIRED ON) set (CXX_STANDARD_REQUIRED ON)
set (CMAKE_CXX_EXTENSIONS OFF) set (CMAKE_CXX_EXTENSIONS OFF)
# -Wshorten-64-to-32 does not work with clang if (UNIX)
if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif() endif()
@ -115,10 +114,7 @@ endif()
set(USE_OPEN_SSL FALSE) set(USE_OPEN_SSL FALSE)
if (USE_TLS) if (USE_TLS)
add_definitions(-DIXWEBSOCKET_USE_TLS)
if (USE_MBED_TLS) if (USE_MBED_TLS)
add_definitions(-DIXWEBSOCKET_USE_MBED_TLS)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketMbedTLS.h) list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketMbedTLS.h)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketMbedTLS.cpp) list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketMbedTLS.cpp)
elseif (APPLE) elseif (APPLE)
@ -128,7 +124,6 @@ if (USE_TLS)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketSChannel.h) list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketSChannel.h)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketSChannel.cpp) list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketSChannel.cpp)
else() else()
add_definitions(-DIXWEBSOCKET_USE_OPEN_SSL)
set(USE_OPEN_SSL TRUE) set(USE_OPEN_SSL TRUE)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketOpenSSL.h) list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSocketOpenSSL.h)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketOpenSSL.cpp) list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSocketOpenSSL.cpp)
@ -140,6 +135,17 @@ add_library( ixwebsocket STATIC
${IXWEBSOCKET_HEADERS} ${IXWEBSOCKET_HEADERS}
) )
if (USE_TLS)
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_TLS)
if (USE_MBED_TLS)
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_MBED_TLS)
elseif (APPLE)
elseif (WIN32)
else()
target_compile_definitions(ixwebsocket PUBLIC IXWEBSOCKET_USE_OPEN_SSL)
endif()
endif()
if (APPLE AND USE_TLS AND NOT USE_MBED_TLS) if (APPLE AND USE_TLS AND NOT USE_MBED_TLS)
target_link_libraries(ixwebsocket "-framework foundation" "-framework security") target_link_libraries(ixwebsocket "-framework foundation" "-framework security")
endif() endif()
@ -171,7 +177,7 @@ if (USE_TLS AND USE_MBED_TLS)
target_link_libraries(ixwebsocket mbedtls) target_link_libraries(ixwebsocket mbedtls)
else() else()
find_package(MbedTLS REQUIRED) find_package(MbedTLS REQUIRED)
include_directories(${MBEDTLS_INCLUDE_DIRS}) target_include_directories(ixwebsocket PUBLIC ${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES}) target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES})
endif() endif()
endif() endif()
@ -204,6 +210,15 @@ install(TARGETS ixwebsocket
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_PREFIX}/include/ixwebsocket/ PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_PREFIX}/include/ixwebsocket/
) )
if (USE_WS) if (USE_WS OR USE_TEST)
add_subdirectory(ixcore)
add_subdirectory(ixcrypto)
add_subdirectory(ixcobra)
if (USE_WS)
add_subdirectory(ws) add_subdirectory(ws)
endif()
if (USE_TEST)
add_subdirectory(test)
endif()
endif() endif()

View File

@ -1 +1 @@
6.0.0 6.2.1

View File

@ -6,8 +6,8 @@ IXWebSocket is a C++ library for WebSocket client and server development. It has
It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android). It is been used on big mobile video game titles sending and receiving tons of messages since 2017 (iOS and Android).
Interested ? Go read the [docs](https://bsergean.github.io/IXWebSocket/site/) ! If things don't work as expected, please create an issue in github, or even better a pull request if you know how to fix your problem. Interested ? Go read the [docs](https://machinezone.github.io/IXWebSocket/) ! If things don't work as expected, please create an issue in github, or even better a pull request if you know how to fix your problem.
IXWebSocket is actively being developed, check out the [changelog](CHANGELOG.md) to know what's cooking. If you are looking for a real time messaging service (the chat-like 'server' your websocket code will talk to) with many features such as history, backed by Redis, look at [cobra](https://github.com/machinezone/cobra). IXWebSocket is actively being developed, check out the [changelog](CHANGELOG.md) to know what's cooking. If you are looking for a real time messaging service (the chat-like 'server' your websocket code will talk to) with many features such as history, backed by Redis, look at [cobra](https://github.com/machinezone/cobra).
IXWebSocket is not yet autobahn compliant, but we are working on changing this. See the current compliance [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html). The only tests that are still failing are the Websocket Compression ones (see section 12 and 13). IXWebSocket client code is autobahn compliant beginning with the 6.0.0 version. See the current [test results](https://bsergean.github.io/IXWebSocket/autobahn/index.html). Some tests are still failing in the server code.

View File

@ -2,13 +2,21 @@ image:
- Visual Studio 2017 - Visual Studio 2017
install: install:
- ls -al - cd C:\Tools\vcpkg
- git pull
- .\bootstrap-vcpkg.bat
- cd %APPVEYOR_BUILD_FOLDER%
- cmd: call "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\VC\Auxiliary\Build\vcvars64.bat" - cmd: call "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\VC\Auxiliary\Build\vcvars64.bat"
- cd test - vcpkg install zlib:x64-windows
- vcpkg install mbedtls:x64-windows
- mkdir build - mkdir build
- cd build - cd build
- cmake -G"NMake Makefiles" .. - cmake -DCMAKE_TOOLCHAIN_FILE=c:/tools/vcpkg/scripts/buildsystems/vcpkg.cmake -DUSE_WS=1 -DUSE_TEST=1 -DUSE_TLS=1 -G"NMake Makefiles" ..
- nmake - nmake
- ixwebsocket_unittest.exe - cd ..
- cd test
- ..\build\test\ixwebsocket_unittest.exe
cache: c:\tools\vcpkg\installed\
build: off build: off

View File

@ -1,6 +1,28 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [6.2.1] - 2019-09-17
- On error while doing a client handshake, additionally display port number next to the host name
## [6.2.0] - 2019-09-09
- websocket and http server: server does not close the bound client socket in many cases
- improve some websocket error messages
- add a utility function with unittest to parse status line and stop using scanf which triggers warnings on Windows
- update ws CLI11 (our command line argument parsing library) to the latest, which fix a compiler bug about optional
## [6.1.0] - 2019-09-08
- move poll wrapper on top of select (only used on Windows) to the ix namespace
## [6.0.1] - 2019-09-05
- add cobra metrics publisher + server unittest
- add cobra client + server unittest
- ws snake (cobra simple server) add basic support for unsubscription + subscribe send the proper subscription data + redis client subscription can be cancelled
- IXCobraConnection / pdu handlers can crash if they receive json data which is not an object
## [6.0.0] - 2019-09-04 ## [6.0.0] - 2019-09-04
- all client autobahn test should pass ! - all client autobahn test should pass !

View File

@ -21,6 +21,8 @@ Options for building:
* `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support (default on Windows) * `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support (default on Windows)
* `-DUSE_WS=1` will build the ws interactive command line tool * `-DUSE_WS=1` will build the ws interactive command line tool
If you are on Windows, look at the [appveyor](https://github.com/machinezone/IXWebSocket/blob/master/appveyor.yml) file that has instructions for building dependencies.
### vcpkg ### vcpkg
It is possible to get IXWebSocket through Microsoft [vcpkg](https://github.com/microsoft/vcpkg). It is possible to get IXWebSocket through Microsoft [vcpkg](https://github.com/microsoft/vcpkg).
@ -57,6 +59,3 @@ app@ca2340eb9106:~$ ws --help
ws is a websocket tool ws is a websocket tool
... ...
``` ```

30
ixcobra/CMakeLists.txt Normal file
View File

@ -0,0 +1,30 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
#
set (IXCOBRA_SOURCES
ixcobra/IXCobraConnection.cpp
ixcobra/IXCobraMetricsThreadedPublisher.cpp
ixcobra/IXCobraMetricsPublisher.cpp
)
set (IXCOBRA_HEADERS
ixcobra/IXCobraConnection.h
ixcobra/IXCobraMetricsThreadedPublisher.h
ixcobra/IXCobraMetricsPublisher.h
)
add_library(ixcobra STATIC
${IXCOBRA_SOURCES}
${IXCOBRA_HEADERS}
)
set(IXCOBRA_INCLUDE_DIRS
.
..
../ixcore
../ixcrypto
../third_party)
target_include_directories( ixcobra PUBLIC ${IXCOBRA_INCLUDE_DIRS} )

View File

@ -13,6 +13,7 @@
#include <cmath> #include <cmath>
#include <cassert> #include <cassert>
#include <cstring> #include <cstring>
#include <iostream>
namespace ix namespace ix
@ -191,7 +192,7 @@ namespace ix
{ {
if (!handleUnsubscriptionResponse(data)) if (!handleUnsubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", msg->str); invokeErrorCallback("Error processing unsubscribe response", msg->str);
} }
} }
else if (action == "rtm/unsubscribe/error") else if (action == "rtm/unsubscribe/error")
@ -300,6 +301,8 @@ namespace ix
// //
bool CobraConnection::handleHandshakeResponse(const Json::Value& pdu) bool CobraConnection::handleHandshakeResponse(const Json::Value& pdu)
{ {
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false; if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"]; Json::Value body = pdu["body"];
@ -349,6 +352,8 @@ namespace ix
bool CobraConnection::handleSubscriptionResponse(const Json::Value& pdu) bool CobraConnection::handleSubscriptionResponse(const Json::Value& pdu)
{ {
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false; if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"]; Json::Value body = pdu["body"];
@ -365,6 +370,8 @@ namespace ix
bool CobraConnection::handleUnsubscriptionResponse(const Json::Value& pdu) bool CobraConnection::handleUnsubscriptionResponse(const Json::Value& pdu)
{ {
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false; if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"]; Json::Value body = pdu["body"];
@ -381,6 +388,8 @@ namespace ix
bool CobraConnection::handleSubscriptionData(const Json::Value& pdu) bool CobraConnection::handleSubscriptionData(const Json::Value& pdu)
{ {
if (!pdu.isObject()) return false;
if (!pdu.isMember("body")) return false; if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"]; Json::Value body = pdu["body"];
@ -407,6 +416,8 @@ namespace ix
bool CobraConnection::handlePublishResponse(const Json::Value& pdu) bool CobraConnection::handlePublishResponse(const Json::Value& pdu)
{ {
if (!pdu.isObject()) return false;
if (!pdu.isMember("id")) return false; if (!pdu.isMember("id")) return false;
Json::Value id = pdu["id"]; Json::Value id = pdu["id"];
@ -453,6 +464,8 @@ namespace ix
{ {
invokePublishTrackerCallback(true, false); invokePublishTrackerCallback(true, false);
CobraConnection::MsgId msgId = _id;
_body["channels"] = channels; _body["channels"] = channels;
_body["message"] = msg; _body["message"] = msg;
_pdu["body"] = _body; _pdu["body"] = _body;
@ -460,27 +473,22 @@ namespace ix
std::string serializedJson = serializeJson(_pdu); std::string serializedJson = serializeJson(_pdu);
if (_publishMode == CobraConnection_PublishMode_Batch) //
// 1. When we use batch mode, we just enqueue and will do the flush explicitely
// 2. When we aren't authenticated yet to the cobra server, we need to enqueue
// and retry later
// 3. If the network connection was droped (WebSocket::send will return false),
// it means the message won't be sent so we need to enqueue as well.
//
// The order of the conditionals is important.
//
if (_publishMode == CobraConnection_PublishMode_Batch || !_authenticated ||
!publishMessage(serializedJson))
{ {
enqueue(serializedJson); enqueue(serializedJson);
return _id - 1;
} }
// return msgId;
// Fast path. We are authenticated and the publishing succeed
// This should happen for 99% of the cases.
//
if (_authenticated && publishMessage(serializedJson))
{
return _id - 1;
}
else // Or else we enqueue
// Slow code path is when we haven't connected yet (startup),
// or when the connection drops for some reason.
{
enqueue(serializedJson);
return 0;
}
} }
void CobraConnection::subscribe(const std::string& channel, void CobraConnection::subscribe(const std::string& channel,

19
ixcore/CMakeLists.txt Normal file
View File

@ -0,0 +1,19 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
#
set (IXCORE_SOURCES
ixcore/utils/IXCoreLogger.cpp
)
set (IXCORE_HEADERS
ixcore/utils/IXCoreLogger.h
)
add_library(ixcore STATIC
${IXCORE_SOURCES}
${IXCORE_HEADERS}
)
target_include_directories( ixcore PUBLIC . )

54
ixcrypto/CMakeLists.txt Normal file
View File

@ -0,0 +1,54 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
#
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../CMake;${CMAKE_MODULE_PATH}")
set (IXCRYPTO_SOURCES
ixcrypto/IXHMac.cpp
ixcrypto/IXBase64.cpp
ixcrypto/IXUuid.cpp
ixcrypto/IXHash.cpp
)
set (IXCRYPTO_HEADERS
ixcrypto/IXHMac.h
ixcrypto/IXBase64.h
ixcrypto/IXUuid.h
ixcrypto/IXHash.h
)
add_library(ixcrypto STATIC
${IXCRYPTO_SOURCES}
${IXCRYPTO_HEADERS}
)
set(IXCRYPTO_INCLUDE_DIRS
.
../ixcore)
target_include_directories( ixcrypto PUBLIC ${IXCRYPTO_INCLUDE_DIRS} )
# hmac computation needs a crypto library
if (WIN32)
set(USE_MBED_TLS TRUE)
endif()
target_compile_definitions(ixcrypto PUBLIC IXCRYPTO_USE_TLS)
if (USE_MBED_TLS)
find_package(MbedTLS REQUIRED)
target_include_directories(ixcrypto PUBLIC ${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixcrypto ${MBEDTLS_LIBRARIES})
target_compile_definitions(ixcrypto PUBLIC IXCRYPTO_USE_MBED_TLS)
elseif (APPLE)
elseif (WIN32)
else()
find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS})
message(STATUS "OpenSSL: " ${OPENSSL_VERSION})
include_directories(${OPENSSL_INCLUDE_DIR})
target_link_libraries(ixcrypto ${OPENSSL_LIBRARIES})
target_compile_definitions(ixcrypto PUBLIC IXCRYPTO_USE_OPEN_SSL)
endif()

View File

@ -7,12 +7,14 @@
#include "IXHMac.h" #include "IXHMac.h"
#include "IXBase64.h" #include "IXBase64.h"
#if defined(IXWEBSOCKET_USE_MBED_TLS) #if defined(IXCRYPTO_USE_MBED_TLS)
# include <mbedtls/md.h> # include <mbedtls/md.h>
#elif defined(__APPLE__) #elif defined(__APPLE__)
# include <CommonCrypto/CommonHMAC.h> # include <CommonCrypto/CommonHMAC.h>
#else #elif defined(IXCRYPTO_USE_OPEN_SSL)
# include <openssl/hmac.h> # include <openssl/hmac.h>
#else
# error "Unsupported configuration"
#endif #endif
namespace ix namespace ix
@ -22,7 +24,7 @@ namespace ix
constexpr size_t hashSize = 16; constexpr size_t hashSize = 16;
unsigned char hash[hashSize]; unsigned char hash[hashSize];
#if defined(IXWEBSOCKET_USE_MBED_TLS) #if defined(IXCRYPTO_USE_MBED_TLS)
mbedtls_md_hmac(mbedtls_md_info_from_type(MBEDTLS_MD_MD5), mbedtls_md_hmac(mbedtls_md_info_from_type(MBEDTLS_MD_MD5),
(unsigned char *) key.c_str(), key.size(), (unsigned char *) key.c_str(), key.size(),
(unsigned char *) data.c_str(), data.size(), (unsigned char *) data.c_str(), data.size(),
@ -32,11 +34,13 @@ namespace ix
key.c_str(), key.size(), key.c_str(), key.size(),
data.c_str(), data.size(), data.c_str(), data.size(),
&hash); &hash);
#else #elif defined(IXCRYPTO_USE_OPEN_SSL)
HMAC(EVP_md5(), HMAC(EVP_md5(),
key.c_str(), (int) key.size(), key.c_str(), (int) key.size(),
(unsigned char *) data.c_str(), (int) data.size(), (unsigned char *) data.c_str(), (int) data.size(),
(unsigned char *) hash, nullptr); (unsigned char *) hash, nullptr);
#else
# error "Unsupported configuration"
#endif #endif
std::string hashString(reinterpret_cast<char*>(hash), hashSize); std::string hashString(reinterpret_cast<char*>(hash), hashSize);

View File

@ -27,6 +27,36 @@ namespace ix
return out; return out;
} }
std::pair<std::string, int> Http::parseStatusLine(const std::string& line)
{
// Request-Line = Method SP Request-URI SP HTTP-Version CRLF
std::string token;
std::stringstream tokenStream(line);
std::vector<std::string> tokens;
// Split by ' '
while (std::getline(tokenStream, token, ' '))
{
tokens.push_back(token);
}
std::string httpVersion;
if (tokens.size() >= 1)
{
httpVersion = trim(tokens[0]);
}
int statusCode = -1;
if (tokens.size() >= 2)
{
std::stringstream ss;
ss << trim(tokens[1]);
ss >> statusCode;
}
return std::make_pair(httpVersion, statusCode);
}
std::tuple<std::string, std::string, std::string> Http::parseRequestLine(const std::string& line) std::tuple<std::string, std::string, std::string> Http::parseRequestLine(const std::string& line)
{ {
// Request-Line = Method SP Request-URI SP HTTP-Version CRLF // Request-Line = Method SP Request-URI SP HTTP-Version CRLF

View File

@ -115,6 +115,8 @@ namespace ix
std::shared_ptr<Socket> socket); std::shared_ptr<Socket> socket);
static bool sendResponse(HttpResponsePtr response, std::shared_ptr<Socket> socket); static bool sendResponse(HttpResponsePtr response, std::shared_ptr<Socket> socket);
static std::pair<std::string, int> parseStatusLine(
const std::string& line);
static std::tuple<std::string, std::string, std::string> parseRequestLine( static std::tuple<std::string, std::string, std::string> parseRequestLine(
const std::string& line); const std::string& line);
static std::string trim(const std::string& str); static std::string trim(const std::string& str);

View File

@ -95,6 +95,7 @@ namespace ix
} }
} }
connectionState->setTerminated(); connectionState->setTerminated();
Socket::closeSocket(fd);
_connectedClientsCount--; _connectedClientsCount--;
} }

View File

@ -34,10 +34,7 @@ namespace ix
return true; return true;
#endif #endif
} }
}
// This function should be in the global namespace
#ifdef _WIN32
// //
// That function could 'return WSAPoll(pfd, nfds, timeout);' // That function could 'return WSAPoll(pfd, nfds, timeout);'
// but WSAPoll is said to have weird behaviors on the internet // but WSAPoll is said to have weird behaviors on the internet
@ -47,6 +44,7 @@ namespace ix
// //
int poll(struct pollfd *fds, nfds_t nfds, int timeout) int poll(struct pollfd *fds, nfds_t nfds, int timeout)
{ {
#ifdef _WIN32
int maxfd = 0; int maxfd = 0;
fd_set readfds, writefds, errorfds; fd_set readfds, writefds, errorfds;
FD_ZERO(&readfds); FD_ZERO(&readfds);
@ -107,5 +105,9 @@ namespace ix
} }
return ret; return ret;
} #else
return ::poll(fds, nfds, timeout);
#endif #endif
}
} // namespace ix

View File

@ -13,11 +13,9 @@
#include <io.h> #include <io.h>
#include <ws2def.h> #include <ws2def.h>
// Define our own poll on Windows // Define our own poll on Windows, as a wrapper on top of select
typedef unsigned long int nfds_t; typedef unsigned long int nfds_t;
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
#else #else
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
@ -35,4 +33,6 @@ namespace ix
{ {
bool initNetSystem(); bool initNetSystem();
bool uninitNetSystem(); bool uninitNetSystem();
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
} // namespace ix } // namespace ix

View File

@ -79,7 +79,7 @@ namespace ix
} }
} }
int ret = ::poll(fds, nfds, timeoutMs); int ret = ix::poll(fds, nfds, timeoutMs);
PollResultType pollResult = PollResultType::ReadyForRead; PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0) if (ret < 0)

View File

@ -12,7 +12,6 @@
#include <cmath> #include <cmath>
#include <cassert> #include <cassert>
#include <iostream>
namespace ix namespace ix

View File

@ -164,23 +164,26 @@ namespace ix
} }
// Validate status // Validate status
int status; auto statusLine = Http::parseStatusLine(line);
std::string httpVersion = statusLine.first;
int status = statusLine.second;
// HTTP/1.0 is too old. // HTTP/1.0 is too old.
if (sscanf(line.c_str(), "HTTP/1.0 %d", &status) == 1) if (httpVersion != "HTTP/1.1")
{ {
std::stringstream ss; std::stringstream ss;
ss << "Server version is HTTP/1.0. Rejecting connection to " << host ss << "Expecting HTTP/1.1, got " << httpVersion << ". "
<< "Rejecting connection to " << host << ":" << port
<< ", status: " << status << ", status: " << status
<< ", HTTP Status line: " << line; << ", HTTP Status line: " << line;
return WebSocketInitResult(false, status, ss.str()); return WebSocketInitResult(false, status, ss.str());
} }
// We want an 101 HTTP status // We want an 101 HTTP status
if (sscanf(line.c_str(), "HTTP/1.1 %d", &status) != 1 || status != 101) if (status != 101)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Got bad status connecting to " << host ss << "Got bad status connecting to " << host << ":" << port
<< ", status: " << status << ", status: " << status
<< ", HTTP Status line: " << line; << ", HTTP Status line: " << line;
return WebSocketInitResult(false, status, ss.str()); return WebSocketInitResult(false, status, ss.str());
@ -295,9 +298,15 @@ namespace ix
return sendErrorResponse(400, "Missing Sec-WebSocket-Key value"); return sendErrorResponse(400, "Missing Sec-WebSocket-Key value");
} }
if (headers.find("upgrade") == headers.end())
{
return sendErrorResponse(400, "Missing Upgrade header");
}
if (!insensitiveStringCompare(headers["upgrade"], "WebSocket")) if (!insensitiveStringCompare(headers["upgrade"], "WebSocket"))
{ {
return sendErrorResponse(400, "Invalid or missing Upgrade header"); return sendErrorResponse(400, "Invalid Upgrade header, "
"need WebSocket, got " + headers["upgrade"]);
} }
if (headers.find("sec-websocket-version") == headers.end()) if (headers.find("sec-websocket-version") == headers.end())
@ -314,7 +323,7 @@ namespace ix
if (version != 13) if (version != 13)
{ {
return sendErrorResponse(400, "Invalid Sec-WebSocket-Version, " return sendErrorResponse(400, "Invalid Sec-WebSocket-Version, "
"need 13, got" + ss.str()); "need 13, got " + ss.str());
} }
} }

View File

@ -93,7 +93,7 @@ namespace ix
else else
{ {
std::stringstream ss; std::stringstream ss;
ss << "WebSocketServer::handleConnection() error: " ss << "WebSocketServer::handleConnection() HTTP status: "
<< status.http_status << status.http_status
<< " error: " << " error: "
<< status.errorStr; << status.errorStr;
@ -111,6 +111,8 @@ namespace ix
logInfo("WebSocketServer::handleConnection() done"); logInfo("WebSocketServer::handleConnection() done");
connectionState->setTerminated(); connectionState->setTerminated();
Socket::closeSocket(fd);
} }
std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients() std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()

View File

@ -681,8 +681,12 @@ namespace ix
reason = WebSocketCloseConstants::kInvalidFramePayloadDataMessage; reason = WebSocketCloseConstants::kInvalidFramePayloadDataMessage;
} }
//
// Validate close codes. Autobahn 7.9.* // Validate close codes. Autobahn 7.9.*
// 1014, 1015 are debattable. The firefox MSDN has a description for them // 1014, 1015 are debattable. The firefox MSDN has a description for them.
// Full list of status code and status range is defined in the dedicated
// RFC section at https://tools.ietf.org/html/rfc6455#page-45
//
if (code < 1000 || code == 1004 || code == 1006 || if (code < 1000 || code == 1004 || code == 1006 ||
(code > 1013 && code < 3000)) (code > 1013 && code < 3000))
{ {

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "6.0.0" #define IX_WEBSOCKET_VERSION "6.2.1"

View File

@ -9,7 +9,7 @@ install: brew
# on osx it is good practice to make /usr/local user writable # on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local # sudo chown -R `whoami`/staff /usr/local
brew: brew:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; make -j install)
ws: ws:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j)
@ -47,7 +47,7 @@ trail:
sh third_party/remote_trailing_whitespaces.sh sh third_party/remote_trailing_whitespaces.sh
format: format:
find test ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \; clang-format -i `find test ixwebsocket ws -name '*.cpp' -o -name '*.h'`
# That target is used to start a node server, but isn't required as we have # That target is used to start a node server, but isn't required as we have
# a builtin C++ server started in the unittest now # a builtin C++ server started in the unittest now
@ -58,7 +58,8 @@ test_server:
# env TEST=Websocket_chat make test # env TEST=Websocket_chat make test
# env TEST=heartbeat make test # env TEST=heartbeat make test
test: test:
python2.7 test/run.py mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; make -j)
(cd test ; python2.7 run.py -r)
ws_test: ws ws_test: ws
(cd ws ; env DEBUG=1 PATH=../ws/build:$$PATH bash test_ws.sh) (cd ws ; env DEBUG=1 PATH=../ws/build:$$PATH bash test_ws.sh)

View File

@ -15,13 +15,15 @@ if (MAC)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
endif() endif()
add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket) set (WS ../ws)
include_directories( include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include ${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party
../third_party/msgpack11 ../third_party/msgpack11
../third_party/spdlog/include ../third_party/spdlog/include
../ws ../ws
../ws/snake
) )
# Shared sources # Shared sources
@ -30,7 +32,13 @@ set (SOURCES
IXTest.cpp IXTest.cpp
IXGetFreePort.cpp IXGetFreePort.cpp
../third_party/msgpack11/msgpack11.cpp ../third_party/msgpack11/msgpack11.cpp
../ws/ixcore/utils/IXCoreLogger.cpp ../third_party/jsoncpp/jsoncpp.cpp
${WS}/snake/IXSnakeServer.cpp
${WS}/snake/IXSnakeProtocol.cpp
${WS}/snake/IXAppConfig.cpp
${WS}/IXRedisClient.cpp
IXSocketTest.cpp IXSocketTest.cpp
IXSocketConnectTest.cpp IXSocketConnectTest.cpp
@ -41,14 +49,17 @@ set (SOURCES
IXHttpClientTest.cpp IXHttpClientTest.cpp
IXHttpServerTest.cpp IXHttpServerTest.cpp
IXUnityBuildsTest.cpp IXUnityBuildsTest.cpp
IXHttpTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet
if (UNIX) if (UNIX)
list(APPEND SOURCES list(APPEND SOURCES
IXDNSLookupTest.cpp IXDNSLookupTest.cpp
cmd_websocket_chat.cpp IXWebSocketChatTest.cpp
IXWebSocketCloseTest.cpp IXWebSocketCloseTest.cpp
IXCobraChatTest.cpp
IXCobraMetricsPublisherTest.cpp
) )
endif() endif()
@ -75,5 +86,9 @@ if (APPLE AND USE_TLS)
target_link_libraries(ixwebsocket_unittest "-framework foundation" "-framework security") target_link_libraries(ixwebsocket_unittest "-framework foundation" "-framework security")
endif() endif()
target_link_libraries(ixwebsocket_unittest ixcore)
target_link_libraries(ixwebsocket_unittest ixcrypto)
target_link_libraries(ixwebsocket_unittest ixcobra)
target_link_libraries(ixwebsocket_unittest ixwebsocket) target_link_libraries(ixwebsocket_unittest ixwebsocket)
install(TARGETS ixwebsocket_unittest DESTINATION bin) install(TARGETS ixwebsocket_unittest DESTINATION bin)

358
test/IXCobraChatTest.cpp Normal file
View File

@ -0,0 +1,358 @@
/*
* cmd_satori_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <chrono>
#include <ixcobra/IXCobraConnection.h>
#include <ixcrypto/IXUuid.h>
#include "IXTest.h"
#include "IXSnakeServer.h"
#include "catch.hpp"
using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback(
[](size_t size, bool incoming)
{
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
}
);
}
class SatoriChat
{
public:
SatoriChat(const std::string& user,
const std::string& session,
const std::string& endpoint);
void subscribe(const std::string& channel);
void start();
void stop();
void run();
bool isReady() const;
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
bool hasPendingMessages() const;
Json::Value popMessage();
private:
std::string _user;
std::string _session;
std::string _endpoint;
std::queue<Json::Value> _publish_queue;
mutable std::mutex _queue_mutex;
std::thread _thread;
std::atomic<bool> _stop;
ix::CobraConnection _conn;
std::atomic<bool> _connectedAndSubscribed;
std::queue<Json::Value> _receivedQueue;
std::mutex _logMutex;
};
SatoriChat::SatoriChat(const std::string& user,
const std::string& session,
const std::string& endpoint) :
_connectedAndSubscribed(false),
_stop(false),
_user(user),
_session(session),
_endpoint(endpoint)
{
}
void SatoriChat::start()
{
_thread = std::thread(&SatoriChat::run, this);
}
void SatoriChat::stop()
{
_stop = true;
_thread.join();
}
bool SatoriChat::isReady() const
{
return _connectedAndSubscribed;
}
size_t SatoriChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
}
bool SatoriChat::hasPendingMessages() const
{
std::unique_lock<std::mutex> lock(_queue_mutex);
return !_publish_queue.empty();
}
Json::Value SatoriChat::popMessage()
{
std::unique_lock<std::mutex> lock(_queue_mutex);
auto msg = _publish_queue.front();
_publish_queue.pop();
return msg;
}
//
// Callback to handle received messages, that are printed on the console
//
void SatoriChat::subscribe(const std::string& channel)
{
std::string filter;
_conn.subscribe(channel, filter,
[this](const Json::Value& msg)
{
std::cout << msg.toStyledString() << std::endl;
if (!msg.isObject()) return;
if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return;
std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString();
// We are not interested in messages
// from a different session.
if (msg_session != _session) return;
// We are not interested in our own messages
if (msg_user == _user) return;
_receivedQueue.push(msg);
std::stringstream ss;
ss << std::endl
<< msg_user << " > " << msg_text
<< std::endl
<< _user << " > ";
log(ss.str());
});
}
void SatoriChat::sendMessage(const std::string& text)
{
Json::Value msg;
msg["user"] = _user;
msg["session"] = _session;
msg["text"] = text;
std::unique_lock<std::mutex> lock(_queue_mutex);
_publish_queue.push(msg);
}
//
// Do satori communication on a background thread, where we can have
// something like an event loop that publish, poll and receive data
//
void SatoriChat::run()
{
snake::AppConfig appConfig = makeSnakeServerConfig(8008);
// Display config on the terminal for debugging
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string channel = _session;
std::string role = "_sub";
std::string secret = "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba";
_conn.configure(appkey, _endpoint, role, secret,
ix::WebSocketPerMessageDeflateOptions(true));
_conn.connect();
_conn.setEventCallback(
[this, channel]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
log("Subscriber connected: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated: " + _user);
subscribe(channel);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
log(errMsg + _user);
}
else if (eventType == ix::CobraConnection_EventType_Closed)
{
log("Connection closed: " + _user);
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
log("Subscription ok: " + _user + " subscription_id " + subscriptionId);
_connectedAndSubscribed = true;
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
log("Unsubscription ok: " + _user + " subscription_id " + subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
}
}
);
while (!_stop)
{
{
while (hasPendingMessages())
{
auto msg = popMessage();
std::string text = msg["text"].asString();
std::stringstream ss;
ss << "Sending msg [" << text << "]";
log(ss.str());
Json::Value channels;
channels.append(channel);
_conn.publish(channels, msg);
}
}
ix::msleep(50);
}
_conn.unsubscribe(channel);
ix::msleep(50);
_conn.disconnect();
_conn.setEventCallback([]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
;
});
snakeServer.stop();
}
}
TEST_CASE("Cobra_chat", "[cobra_chat]")
{
SECTION("Exchange and count sent/received messages.")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
int timeout;
setupTrafficTrackerCallback();
std::string session = ix::generateSessionId();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
SatoriChat chatA("jean", session, endpoint);
SatoriChat chatB("paul", session, endpoint);
chatA.start();
chatB.start();
// Wait for all chat instance to be ready
timeout = 10 * 1000; // 10s
while (true)
{
if (chatA.isReady() && chatB.isReady()) break;
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
snakeServer.stop();
REQUIRE(false); // timeout
}
}
// Add a bit of extra time, for the subscription to be active
ix::msleep(1000);
chatA.sendMessage("from A1");
chatA.sendMessage("from A2");
chatA.sendMessage("from A3");
chatB.sendMessage("from B1");
chatB.sendMessage("from B2");
// 1. Wait for all messages to be sent
timeout = 10 * 1000; // 10s
while (chatA.hasPendingMessages() || chatB.hasPendingMessages())
{
ix::msleep(10);
timeout -= 10;
if (timeout <= 0)
{
snakeServer.stop();
REQUIRE(false); // timeout
}
}
// Give us 1s for all messages to be received
ix::msleep(1000);
chatA.stop();
chatB.stop();
// FIXME: improve this and make it exact matches
// we get unreliable result set
REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
std::cout << incomingBytes << std::endl;
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
snakeServer.stop();
}
}

View File

@ -0,0 +1,237 @@
/*
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <set>
#include <ixcrypto/IXUuid.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include "IXTest.h"
#include "IXSnakeServer.h"
#include "catch.hpp"
using namespace ix;
namespace
{
//
// This project / appkey is configure on cobra to not do any batching.
// This way we can start a subscriber and receive all messages as they come in.
//
std::string APPKEY("FC2F10139A2BAc53BB72D9db967b024f");
std::string CHANNEL("unittest_channel");
std::string PUBLISHER_ROLE("_pub");
std::string PUBLISHER_SECRET("1c04DB8fFe76A4EeFE3E318C72d771db");
std::string SUBSCRIBER_ROLE("_sub");
std::string SUBSCRIBER_SECRET("66B1dA3ED5fA074EB5AE84Dd8CE3b5ba");
std::atomic<bool> gStop;
std::atomic<bool> gSubscriberConnectedAndSubscribed;
std::atomic<int> gUniqueMessageIdsCount;
std::atomic<int> gMessageCount;
std::set<std::string> gIds;
std::mutex gProtectIds; // std::set is no thread-safe, so protect access with this mutex.
//
// Background thread subscribe to the channel and validates what was sent
//
void startSubscriber(const std::string& endpoint)
{
gSubscriberConnectedAndSubscribed = false;
gUniqueMessageIdsCount = 0;
gMessageCount = 0;
ix::CobraConnection conn;
conn.configure(APPKEY, endpoint, SUBSCRIBER_ROLE, SUBSCRIBER_SECRET,
ix::WebSocketPerMessageDeflateOptions(true));
conn.connect();
conn.setEventCallback(
[&conn]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId)
{
if (eventType == ix::CobraConnection_EventType_Open)
{
Logger() << "Subscriber connected:";
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
log("Subscriber authenticated");
std::string filter;
conn.subscribe(CHANNEL, filter,
[](const Json::Value& msg)
{
log(msg.toStyledString());
std::string id = msg["id"].asString();
{
std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id);
}
gMessageCount++;
});
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
Logger() << "Subscriber: subscribed to channel " << subscriptionId;
if (subscriptionId == CHANNEL)
{
gSubscriberConnectedAndSubscribed = true;
}
else
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
Logger() << "Subscriber: ununexpected from channel " << subscriptionId;
if (subscriptionId != CHANNEL)
{
Logger() << "Subscriber: unexpected channel " << subscriptionId;
}
}
else if (eventType == ix::CobraConnection_EventType_Published)
{
Logger() << "Subscriber: published message acked: " << msgId;
}
}
);
while (!gStop)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
conn.unsubscribe(CHANNEL);
conn.disconnect();
gUniqueMessageIdsCount = gIds.size();
}
}
TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
// Make channel name unique
CHANNEL += uuid4();
gStop = false;
std::thread bgThread(&startSubscriber, endpoint);
int timeout = 10 * 1000; // 10s
// Wait until the subscriber is ready (authenticated + subscription successful)
while (!gSubscriberConnectedAndSubscribed)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
timeout -= 10;
if (timeout <= 0)
{
REQUIRE(false); // timeout
}
}
ix::CobraMetricsPublisher cobraMetricsPublisher;
bool perMessageDeflate = true;
cobraMetricsPublisher.configure(APPKEY, endpoint, CHANNEL,
PUBLISHER_ROLE, PUBLISHER_SECRET, perMessageDeflate);
cobraMetricsPublisher.setSession(uuid4());
cobraMetricsPublisher.enable(true); // disabled by default, needs to be enabled to be active
Json::Value data;
data["foo"] = "bar";
// (1) Publish without restrictions
cobraMetricsPublisher.push("sms_metric_A_id", data); // (msg #1)
cobraMetricsPublisher.push("sms_metric_B_id", data); // (msg #2)
// (2) Restrict what is sent using a blacklist
// Add one entry to the blacklist
// (will send msg #3)
cobraMetricsPublisher.setBlacklist({
"sms_metric_B_id" // this id will not be sent
});
// (msg #4)
cobraMetricsPublisher.push("sms_metric_A_id", data);
// ...
cobraMetricsPublisher.push("sms_metric_B_id", data); // this won't be sent
// Reset the blacklist
// (msg #5)
cobraMetricsPublisher.setBlacklist({}); // 4.
// (3) Restrict what is sent using rate control
// (msg #6)
cobraMetricsPublisher.setRateControl({
{"sms_metric_C_id", 1}, // published once per minute (60 seconds) max
});
// (msg #7)
cobraMetricsPublisher.push("sms_metric_C_id", data);
cobraMetricsPublisher.push("sms_metric_C_id", data); // this won't be sent
ix::msleep(1400);
// (msg #8)
cobraMetricsPublisher.push("sms_metric_C_id", data); // now this will be sent
ix::msleep(600); // wait a bit so that the last message is sent and can be received
log("Testing suspend/resume now, which will disconnect the cobraMetricsPublisher.");
// Test suspend + resume
for (int i = 0 ; i < 3 ; ++i)
{
cobraMetricsPublisher.suspend();
ix::msleep(500);
REQUIRE(!cobraMetricsPublisher.isConnected()); // Check that we are not connected anymore
cobraMetricsPublisher.push("sms_metric_D_id", data); // will not be sent this time
cobraMetricsPublisher.resume();
ix::msleep(2000); // give cobra 2s to connect
REQUIRE(cobraMetricsPublisher.isConnected()); // Check that we are connected now
cobraMetricsPublisher.push("sms_metric_E_id", data);
}
ix::msleep(500);
// Now stop the thread
gStop = true;
bgThread.join();
//
// Validate that we received all message kinds, and the correct number of messages
//
CHECK(gIds.count("sms_metric_A_id") == 1);
CHECK(gIds.count("sms_metric_B_id") == 1);
CHECK(gIds.count("sms_metric_C_id") == 1);
CHECK(gIds.count("sms_metric_D_id") == 1);
CHECK(gIds.count("sms_metric_E_id") == 1);
CHECK(gIds.count("sms_set_rate_control_id") == 1);
CHECK(gIds.count("sms_set_blacklist_id") == 1);
snakeServer.stop();
}

View File

@ -23,7 +23,6 @@ namespace ix
int getAnyFreePort() int getAnyFreePort()
{ {
int defaultPort = 8090;
int sockfd; int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{ {
@ -89,5 +88,3 @@ namespace ix
return -1; return -1;
} }
} // namespace ix } // namespace ix

55
test/IXHttpTest.cpp Normal file
View File

@ -0,0 +1,55 @@
/*
* IXHttpTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXHttp.h>
#include "catch.hpp"
#include <string.h>
namespace ix
{
TEST_CASE("http", "[http]")
{
SECTION("Normal case")
{
std::string line = "HTTP/1.1 200";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "HTTP/1.1");
REQUIRE(result.second == 200);
}
SECTION("http/1.0 case")
{
std::string line = "HTTP/1.0 200";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "HTTP/1.0");
REQUIRE(result.second == 200);
}
SECTION("empty case")
{
std::string line = "";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "");
REQUIRE(result.second == -1);
}
SECTION("empty case")
{
std::string line = "HTTP/1.1";
auto result = Http::parseStatusLine(line);
REQUIRE(result.first == "HTTP/1.1");
REQUIRE(result.second == -1);
}
}
}

View File

@ -137,4 +137,57 @@ namespace ix
server.start(); server.start();
return true; return true;
} }
std::vector<uint8_t> load(const std::string& path)
{
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
memblock.resize((size_t) size);
file.read((char*)&memblock.front(), static_cast<std::streamsize>(size));
return memblock;
}
std::string readAsString(const std::string& path)
{
auto vec = load(path);
return std::string(vec.begin(), vec.end());
}
snake::AppConfig makeSnakeServerConfig(int port)
{
snake::AppConfig appConfig;
appConfig.port = port;
appConfig.hostname = "127.0.0.1";
appConfig.verbose = true;
appConfig.redisPort = 6379;
appConfig.redisPassword = "";
appConfig.redisHosts.push_back("localhost"); // only one host supported now
std::string appsConfigPath("appsConfig.json");
// Parse config file
auto str = readAsString(appsConfigPath);
if (str.empty())
{
std::cout << "Cannot read content of " << appsConfigPath << std::endl;
return appConfig;
}
std::cout << str << std::endl;
auto apps = nlohmann::json::parse(str);
appConfig.apps = apps["apps"];
// Display config on the terminal for debugging
dumpConfig(appConfig);
return appConfig;
}
} }

View File

@ -9,6 +9,7 @@
#include "IXGetFreePort.h" #include "IXGetFreePort.h"
#include <iostream> #include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include "IXAppConfig.h"
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
@ -48,4 +49,6 @@ namespace ix
void log(const std::string& msg); void log(const std::string& msg);
bool startWebSocketEchoServer(ix::WebSocketServer& server); bool startWebSocketEchoServer(ix::WebSocketServer& server);
snake::AppConfig makeSnakeServerConfig(int port);
} // namespace ix } // namespace ix

View File

@ -28,7 +28,6 @@ namespace
void stop(); void stop();
void stop(uint16_t code, const std::string& reason); void stop(uint16_t code, const std::string& reason);
bool isReady() const; bool isReady() const;
void sendMessage(const std::string& text);
uint16_t getCloseCode(); uint16_t getCloseCode();
const std::string& getCloseReason(); const std::string& getCloseReason();
@ -171,11 +170,6 @@ namespace
_webSocket.start(); _webSocket.start();
} }
void WebSocketClient::sendMessage(const std::string& text)
{
_webSocket.send(text);
}
bool startServer(ix::WebSocketServer& server, bool startServer(ix::WebSocketServer& server,
uint16_t& receivedCloseCode, uint16_t& receivedCloseCode,
std::string& receivedCloseReason, std::string& receivedCloseReason,

View File

@ -112,6 +112,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
REQUIRE(lineValid);
auto line = lineResult.second; auto line = lineResult.second;
int status = -1; int status = -1;
@ -149,6 +151,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
REQUIRE(lineValid);
auto line = lineResult.second; auto line = lineResult.second;
int status = -1; int status = -1;
@ -190,6 +194,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
REQUIRE(lineValid);
auto line = lineResult.second; auto line = lineResult.second;
int status = -1; int status = -1;

14
test/appsConfig.json Normal file
View File

@ -0,0 +1,14 @@
{
"apps": {
"FC2F10139A2BAc53BB72D9db967b024f": {
"roles": {
"_sub": {
"secret": "66B1dA3ED5fA074EB5AE84Dd8CE3b5ba"
},
"_pub": {
"secret": "1c04DB8fFe76A4EeFE3E318C72d771db"
}
}
}
}
}

View File

@ -350,10 +350,11 @@ def generateXmlOutput(results, xmlOutput, testRunName, runTime):
def run(testName, buildDir, sanitizer, xmlOutput, def run(testName, buildDir, sanitizer, xmlOutput,
testRunName, buildOnly, useLLDB, cpuCount): testRunName, buildOnly, useLLDB, cpuCount, runOnly):
'''Main driver. Run cmake, compiles, execute and validate the testsuite.''' '''Main driver. Run cmake, compiles, execute and validate the testsuite.'''
# gen build files with CMake # gen build files with CMake
if not runOnly:
runCMake(sanitizer, buildDir) runCMake(sanitizer, buildDir)
if platform.system() == 'Linux': if platform.system() == 'Linux':
@ -454,6 +455,8 @@ def main():
help='Validate XML output.') help='Validate XML output.')
parser.add_argument('--build_only', '-b', action='store_true', parser.add_argument('--build_only', '-b', action='store_true',
help='Stop after building. Do not run the unittest.') help='Stop after building. Do not run the unittest.')
parser.add_argument('--run_only', '-r', action='store_true',
help='Only run the test, do not build anything.')
parser.add_argument('--output', '-o', help='Output XML file.') parser.add_argument('--output', '-o', help='Output XML file.')
parser.add_argument('--lldb', action='store_true', parser.add_argument('--lldb', action='store_true',
help='Run the test through lldb.') help='Run the test through lldb.')
@ -492,7 +495,7 @@ def main():
if platform.system() == 'Windows': if platform.system() == 'Windows':
TEST_EXE_PATH = os.path.join(buildDir, BUILD_TYPE, 'ixwebsocket_unittest.exe') TEST_EXE_PATH = os.path.join(buildDir, BUILD_TYPE, 'ixwebsocket_unittest.exe')
else: else:
TEST_EXE_PATH = os.path.join(buildDir, 'ixwebsocket_unittest') TEST_EXE_PATH = '../build/test/ixwebsocket_unittest'
if args.list: if args.list:
# catch2 exit with a different error code when requesting the list of files # catch2 exit with a different error code when requesting the list of files
@ -511,7 +514,7 @@ def main():
args.lldb = False args.lldb = False
return run(args.test, buildDir, sanitizer, xmlOutput, return run(args.test, buildDir, sanitizer, xmlOutput,
testRunName, args.build_only, args.lldb, args.cpu_count) testRunName, args.build_only, args.lldb, args.cpu_count, args.run_only)
if __name__ == '__main__': if __name__ == '__main__':

File diff suppressed because it is too large Load Diff

View File

@ -4945,8 +4945,3 @@ std::ostream& operator<<(std::ostream& sout, Value const& root) {
// ////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////
// End of content of file: src/lib_json/json_writer.cpp // End of content of file: src/lib_json/json_writer.cpp
// ////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////

View File

@ -41,4 +41,3 @@
[ ] Question [ ] Question
[ ] Enhancement [ ] Enhancement
[ ] Bug [ ] Bug

View File

@ -120,4 +120,3 @@ In addition to providing options for testing client-side features, the `ssl_clie
* [`x509/crl_app.c`](x509/crl_app.c): loads and dumps a certificate revocation list (CRL). * [`x509/crl_app.c`](x509/crl_app.c): loads and dumps a certificate revocation list (CRL).
* [`x509/req_app.c`](x509/req_app.c): loads and dumps a certificate signing request (CSR). * [`x509/req_app.c`](x509/req_app.c): loads and dumps a certificate signing request (CSR).

View File

@ -35,17 +35,6 @@ add_executable(ws
../third_party/jsoncpp/jsoncpp.cpp ../third_party/jsoncpp/jsoncpp.cpp
${STATSD_CLIENT_SOURCES} ${STATSD_CLIENT_SOURCES}
ixcore/utils/IXCoreLogger.cpp
ixcrypto/IXBase64.cpp
ixcrypto/IXHash.cpp
ixcrypto/IXUuid.cpp
ixcrypto/IXHMac.cpp
ixcobra/IXCobraConnection.cpp
ixcobra/IXCobraMetricsPublisher.cpp
ixcobra/IXCobraMetricsThreadedPublisher.cpp
snake/IXSnakeServer.cpp snake/IXSnakeServer.cpp
snake/IXSnakeProtocol.cpp snake/IXSnakeProtocol.cpp
snake/IXAppConfig.cpp snake/IXAppConfig.cpp
@ -73,6 +62,9 @@ add_executable(ws
ws_autobahn.cpp ws_autobahn.cpp
ws.cpp) ws.cpp)
target_link_libraries(ws ixcore)
target_link_libraries(ws ixcrypto)
target_link_libraries(ws ixcobra)
target_link_libraries(ws ixwebsocket) target_link_libraries(ws ixwebsocket)
if(NOT APPLE AND NOT USE_MBED_TLS) if(NOT APPLE AND NOT USE_MBED_TLS)

View File

@ -128,6 +128,8 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback) const OnRedisSubscribeCallback& callback)
{ {
_stop = false;
if (!_socket) return false; if (!_socket) return false;
std::stringstream ss; std::stringstream ss;
@ -159,7 +161,7 @@ namespace ix
if (!lineValid) return false; if (!lineValid) return false;
// There are 5 items for the subscribe repply // There are 5 items for the subscribe reply
for (int i = 0; i < 5; ++i) for (int i = 0; i < 5; ++i)
{ {
auto lineResult = _socket->readLine(nullptr); auto lineResult = _socket->readLine(nullptr);
@ -175,13 +177,21 @@ namespace ix
// Wait indefinitely for new messages // Wait indefinitely for new messages
while (true) while (true)
{ {
if (_stop) break;
// Wait until something is ready to read // Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1); int timeoutMs = 10;
auto pollResult = _socket->isReadyToRead(timeoutMs);
if (pollResult == PollResultType::Error) if (pollResult == PollResultType::Error)
{ {
return false; return false;
} }
if (pollResult == PollResultType::Timeout)
{
continue;
}
// The first line of the response describe the return type, // The first line of the response describe the return type,
// => *3 (an array of 3 elements) // => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr); auto lineResult = _socket->readLine(nullptr);
@ -231,4 +241,9 @@ namespace ix
return true; return true;
} }
void RedisClient::stop()
{
_stop = true;
}
} }

View File

@ -8,6 +8,7 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <atomic>
namespace ix namespace ix
{ {
@ -19,7 +20,7 @@ namespace ix
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>; using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>; using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default; RedisClient() : _stop(false) {}
~RedisClient() = default; ~RedisClient() = default;
bool connect(const std::string& hostname, int port); bool connect(const std::string& hostname, int port);
@ -32,9 +33,12 @@ namespace ix
const OnRedisSubscribeResponseCallback& responseCallback, const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback); const OnRedisSubscribeCallback& callback);
void stop();
private: private:
std::string writeString(const std::string& str); std::string writeString(const std::string& str);
std::shared_ptr<Socket> _socket; std::shared_ptr<Socket> _socket;
std::atomic<bool> _stop;
}; };
} // namespace ix } // namespace ix

View File

@ -6,7 +6,7 @@
#pragma once #pragma once
#include "nlohmann/json.hpp" #include <nlohmann/json.hpp>
#include <string> #include <string>
#include <vector> #include <vector>

View File

@ -163,6 +163,14 @@ namespace snake
return; return;
} }
} }
nlohmann::json response = {
{"action", "rtm/publish/ok"},
{"id", pdu.value("id", 1)},
{"body", {}}
};
ws->sendText(response.dump());
} }
// //
@ -220,12 +228,14 @@ namespace snake
{ {
auto msg = nlohmann::json::parse(messageStr); auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
nlohmann::json response = { nlohmann::json response = {
{"action", "rtm/subscription/data"}, {"action", "rtm/subscription/data"},
{"id", id++}, {"id", id++},
{"body", { {"body", {
{"subscription_id", subscriptionId}, {"subscription_id", subscriptionId},
{"messages", {{msg}}} {"messages", {msg}}
}} }}
}; };
@ -271,6 +281,27 @@ namespace snake
pdu); pdu);
} }
void handleUnSubscribe(
std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const nlohmann::json& pdu)
{
// extract subscription_id
auto body = pdu["body"];
auto subscriptionId = body["subscription_id"];
state->redisClient().stop();
nlohmann::json response = {
{"action", "rtm/unsubscribe/ok"},
{"id", pdu.value("id", 1)},
{"body", {
{"subscription_id", subscriptionId}
}}
};
ws->sendText(response.dump());
}
void processCobraMessage( void processCobraMessage(
std::shared_ptr<SnakeConnectionState> state, std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws, std::shared_ptr<ix::WebSocket> ws,
@ -299,6 +330,10 @@ namespace snake
{ {
handleSubscribe(state, ws, appConfig, pdu); handleSubscribe(state, ws, appConfig, pdu);
} }
else if (action == "rtm/unsubscribe")
{
handleUnSubscribe(state, ws, pdu);
}
else else
{ {
std::cerr << "Unhandled action: " << action << std::endl; std::cerr << "Unhandled action: " << action << std::endl;

View File

@ -4,10 +4,10 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include <IXSnakeServer.h> #include "IXSnakeServer.h"
#include <IXSnakeProtocol.h> #include "IXSnakeProtocol.h"
#include <IXSnakeConnectionState.h> #include "IXSnakeConnectionState.h"
#include <IXAppConfig.h> #include "IXAppConfig.h"
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
@ -118,8 +118,19 @@ namespace snake
} }
_server.start(); _server.start();
_server.wait();
return true; return true;
} }
void SnakeServer::runForever()
{
if (run())
{
_server.wait();
}
}
void SnakeServer::stop()
{
_server.stop();
}
} }

View File

@ -19,6 +19,8 @@ namespace snake
~SnakeServer() = default; ~SnakeServer() = default;
bool run(); bool run();
void runForever();
void stop();
private: private:
std::string parseAppKey(const std::string& path); std::string parseAppKey(const std::string& path);

View File

@ -21,12 +21,7 @@
// //
// //
// 2 Run the test server (using docker) // 2 Run the test server (using docker)
// docker run -it --rm \ // docker run -it --rm -v "${PWD}/config:/config" -v "${PWD}/reports:/reports" -p 9001:9001 --name fuzzingserver crossbario/autobahn-testsuite
// -v "${PWD}/config:/config" \
// -v "${PWD}/reports:/reports" \
// -p 9001:9001 \
// --name fuzzingserver \
// crossbario/autobahn-testsuite
// //
// 3. Run this command // 3. Run this command
// ws autobahn -q --url ws://localhost:9001 // ws autobahn -q --url ws://localhost:9001
@ -315,4 +310,3 @@ namespace ix
return generateReport(url) ? 0 : 1; return generateReport(url) ? 0 : 1;
} }
} }

View File

@ -76,6 +76,8 @@ namespace ix
dumpConfig(appConfig); dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig); snake::SnakeServer snakeServer(appConfig);
return snakeServer.run() ? 0 : 1; snakeServer.runForever();
return 0; // should never reach this
} }
} }