Compare commits

...

18 Commits

Author SHA1 Message Date
1af96ed4e4 try to build vcpkg on travis 2019-06-10 13:35:47 -07:00
7cb5cc05e4 Add -DUSE_VENDORED_THIRD_PARTY=1 to build ws 2019-06-10 13:26:41 -07:00
750a752ac0 - mbedtls and zlib are searched with find_package, and we use the vendored version if nothing is found 2019-06-10 11:18:27 -07:00
61e5f52286 - travis CI uses g++ on Linux 2019-06-09 14:27:45 -07:00
ce0b716f54 compile error in IXWebSocketMessageQTest 2019-06-09 12:25:36 -07:00
aae8e5ec65 fix IXWebSocketMessageQTest.cpp 2019-06-09 12:08:00 -07:00
2723e8466e fix changelog 2019-06-09 12:02:38 -07:00
f13c610352 update README to reflect the new API 2019-06-09 12:02:02 -07:00
55c65b08bf - WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
2019-06-09 11:56:47 -07:00
a11aa3e0dd WebSocket::send takes a third arg, binary which default to true (can be text too) 2019-06-09 11:35:31 -07:00
de0bf5ebcd WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg 2019-06-09 11:33:17 -07:00
15369e1ae9 ... 2019-06-09 10:22:27 -07:00
d4115880b9 Add explicite WebSocket::sendBinary
New headers + WebSocketMessage class to hold message data, still not used across the board
2019-06-09 10:10:33 -07:00
3c80c75e4a Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility. 2019-06-08 09:46:26 -07:00
5cb72dce4c ws echo_server has a -g option to print a greeting message on connect 2019-06-08 09:16:33 -07:00
d2747487e3 IXSocketMbedTLS: better error handling in close and connect 2019-06-06 14:59:22 -07:00
12e664fc61 add a changelog 2019-06-06 13:59:12 -07:00
cbf21b4008 add an option to easily disable per message deflate compression 2019-06-06 13:48:53 -07:00
43 changed files with 779 additions and 644 deletions

View File

@ -12,23 +12,23 @@ matrix:
- python test/run.py - python test/run.py
- make ws - make ws
# # Linux # Linux
# - os: linux
# dist: xenial
# script:
# - python test/run.py
# - make ws
# env:
# - CC=gcc
# - CXX=g++
# Clang + Linux disabled for now
- os: linux - os: linux
dist: xenial dist: xenial
script: python test/run.py script:
- python test/run.py
- make ws
env: env:
- CC=clang - CC=gcc
- CXX=clang++ - CXX=g++
# Clang + Linux disabled for now
# - os: linux
# dist: xenial
# script: python test/run.py
# env:
# - CC=clang
# - CXX=clang++
# Windows # Windows
- os: windows - os: windows
@ -36,6 +36,36 @@ matrix:
- CMAKE_PATH="/c/Program Files/CMake/bin" - CMAKE_PATH="/c/Program Files/CMake/bin"
script: script:
- export PATH=$CMAKE_PATH:$PATH - export PATH=$CMAKE_PATH:$PATH
- cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 . - cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .
- cmake --build --parallel . - cmake --build --parallel .
- python test/run.py - python test/run.py
install:
# HACK: gcc 8.0.1 is missing movdirintrin.h so just download it. We need this for GLM and Vectrexy to build.
- sudo wget https://raw.githubusercontent.com/gcc-mirror/gcc/gcc-8-branch/gcc/config/i386/movdirintrin.h -P /usr/lib/gcc/x86_64-linux-gnu/8/include/
# Create deps dir
- mkdir -p ${DEPS_DIR}
# Set compiler vars
- export CC=${CC_COMPILER}
- export CXX=${CXX_COMPILER}
# Install vcpkg and dependencies
- |
set -e
mkdir -p ${DEPS_DIR}/vcpkg
pushd ${DEPS_DIR}/vcpkg
git init
git remote add origin https://github.com/Microsoft/vcpkg.git
git fetch origin master
git checkout -b master origin/master
./bootstrap-vcpkg.sh
# Only build release libs to save time. We inject a new line first since some cmake files don't end with one.
echo -e '\nset(VCPKG_BUILD_TYPE release)' >> ./triplets/${VCPKG_TRIPLET}.cmake
./vcpkg install sdl2 sdl2-net glew glm stb imgui
popd
cache:
directories:
- ${DEPS_DIR}/vcpkg/installed

31
CHANGELOG.md Normal file
View File

@ -0,0 +1,31 @@
# Changelog
All notable changes to this project will be documented in this file.
## [unreleased] - 2019-06-09
### Changed
- mbedtls and zlib are searched with find_package, and we use the vendored version if nothing is found
- travis CI uses g++ on Linux
## [4.0.0] - 2019-06-09
### Changed
- WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
- WebSocket::send takes a third arg, binary which default to true (can be text too)
- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg
- Add explicit WebSocket::sendBinary method
- New headers + WebSocketMessage class to hold message data, still not used across the board
- Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility.
- ws echo_server has a -g option to print a greeting message on connect
- IXSocketMbedTLS: better error handling in close and connect
## [3.1.2] - 2019-06-06
### Added
- ws connect has a -x option to disable per message deflate
- Add WebSocket::disablePerMessageDeflate() option.
## [3.0.0] - 2019-06-xx
### Changed
- TLS, aka SSL works on Windows (websocket and http clients)
- ws command line tool build on Windows
- Async API for HttpClient
- HttpClient API changed to use shared_ptr for response and request

13
CMake/FindMbedTLS.cmake Normal file
View File

@ -0,0 +1,13 @@
find_path(MBEDTLS_INCLUDE_DIRS mbedtls/ssl.h)
find_library(MBEDTLS_LIBRARY mbedtls)
find_library(MBEDX509_LIBRARY mbedx509)
find_library(MBEDCRYPTO_LIBRARY mbedcrypto)
set(MBEDTLS_LIBRARIES "${MBEDTLS_LIBRARY}" "${MBEDX509_LIBRARY}" "${MBEDCRYPTO_LIBRARY}")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(MBEDTLS DEFAULT_MSG
MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)
mark_as_advanced(MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)

View File

@ -4,6 +4,8 @@
# #
cmake_minimum_required(VERSION 3.4.1) cmake_minimum_required(VERSION 3.4.1)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake;${CMAKE_MODULE_PATH}")
project(ixwebsocket C CXX) project(ixwebsocket C CXX)
set (CMAKE_CXX_STANDARD 14) set (CMAKE_CXX_STANDARD 14)
@ -20,60 +22,64 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
endif() endif()
set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_SOURCES
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXWebSocket.cpp ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketHandshake.cpp ixwebsocket/IXWebSocketHandshake.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
ixwebsocket/IXWebSocketPerMessageDeflate.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXHttpClient.cpp ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/LUrlParser.cpp ixwebsocket/LUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
) )
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXHttpClient.h
ixwebsocket/IXNetSystem.h ixwebsocket/IXNetSystem.h
ixwebsocket/IXProgressCallback.h ixwebsocket/IXProgressCallback.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXWebSocket.h ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketCloseInfo.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketErrorInfo.h ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/IXWebSocketMessage.h
ixwebsocket/IXWebSocketMessageQueue.h
ixwebsocket/IXWebSocketMessageType.h
ixwebsocket/IXWebSocketOpenInfo.h
ixwebsocket/IXWebSocketPerMessageDeflate.h ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/libwshandshake.hpp ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXHttpClient.h ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXUrlParser.h
ixwebsocket/LUrlParser.h ixwebsocket/LUrlParser.h
ixwebsocket/IXSelectInterrupt.h ixwebsocket/libwshandshake.hpp
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketMessageQueue.h
) )
if (UNIX) if (UNIX)
@ -137,25 +143,28 @@ if (USE_OPEN_SSL)
endif() endif()
if (USE_MBED_TLS) if (USE_MBED_TLS)
if (USE_VENDORED_THIRD_PARTY)
set (ENABLE_PROGRAMS OFF) set (ENABLE_PROGRAMS OFF)
add_subdirectory(third_party/mbedtls) add_subdirectory(third_party/mbedtls)
include_directories(third_party/mbedtls/include) include_directories(third_party/mbedtls/include)
target_link_libraries(ixwebsocket mbedtls) target_link_libraries(ixwebsocket mbedtls)
else()
find_package(MbedTLS REQUIRED)
include_directories(${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES})
endif()
endif() endif()
if (WIN32) find_package(ZLIB REQUIRED)
if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
else()
add_subdirectory(third_party/zlib) add_subdirectory(third_party/zlib)
include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib) include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib)
target_link_libraries(ixwebsocket zlibstatic wsock32 ws2_32) target_link_libraries(ixwebsocket zlibstatic wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS) add_definitions(-D_CRT_SECURE_NO_WARNINGS)
else()
# gcc/Linux needs -pthread
find_package(Threads)
target_link_libraries(ixwebsocket
z ${CMAKE_THREAD_LIBS_INIT})
endif() endif()
set( IXWEBSOCKET_INCLUDE_DIRS set( IXWEBSOCKET_INCLUDE_DIRS

View File

@ -1 +1 @@
2.2.1 4.0.0

View File

@ -28,29 +28,27 @@ webSocket.setUrl(url);
// to make sure that load balancers do not kill an idle connection. // to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45); webSocket.setHeartBeatPeriod(45);
// Per message deflate connection is enabled by default. You can tweak its parameters or disable it
webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received // Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Message) if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cout << str << std::endl; std::cout << msg->str << std::endl;
} }
}); });
// Now that our callback is setup, we can start our background thread and receive messages // Now that our callback is setup, we can start our background thread and receive messages
webSocket.start(); webSocket.start();
// Send a message to the server (default to BINARY mode) // Send a message to the server (default to TEXT mode)
webSocket.send("hello world"); webSocket.send("hello world");
// The message can be sent in TEXT mode // The message can be sent in BINARY mode (useful if you send MsgPack data for example)
webSocket.sendText("hello again"); webSocket.sendBinary("some serialized binary data");
// ... finally ... // ... finally ...
@ -70,14 +68,9 @@ server.setOnConnectionCallback(
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
@ -88,19 +81,21 @@ server.setOnConnectionCallback(
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to. // The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
// For an echo server, we just send back to the client whatever was received by the server // For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example. // All connected clients are available in an std::set. See the broadcast cpp example.
webSocket->send(str); // Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket->send(msg->str, msg->binary);
} }
} }
); );
@ -255,7 +250,7 @@ No manual polling to fetch data is required. Data is sent and received instantly
### Automatic reconnection ### Automatic reconnection
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. This behavior can be disabled.
### Large messages ### Large messages
@ -331,32 +326,27 @@ The onMessage event will be fired when the connection is opened or closed. This
``` ```
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cout << "send greetings" << std::endl; std::cout << "send greetings" << std::endl;
// Headers can be inspected (pairs of string/string) // Headers can be inspected (pairs of string/string)
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : headers) for (auto it : msg->headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cout << "disconnected" << std::endl; std::cout << "disconnected" << std::endl;
// The server can send an explicit code and reason for closing. // The server can send an explicit code and reason for closing.
// This data can be accessed through the closeInfo object. // This data can be accessed through the closeInfo object.
std::cout << closeInfo.code << std::endl; std::cout << msg->closeInfo.code << std::endl;
std::cout << closeInfo.reason << std::endl; std::cout << msg->closeInfo.reason << std::endl;
} }
} }
); );
@ -368,20 +358,15 @@ A message will be fired when there is an error with the connection. The message
``` ```
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Error) if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Error: " << error.reason << std::endl; ss << "Error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << event.retries << std::endl; ss << "#retries: " << msg->eventInfo.retries << std::endl;
ss << "Wait time(ms): " << event.wait_time << std::endl; ss << "Wait time(ms): " << msg->eventInfo.wait_time << std::endl;
ss << "HTTP Status: " << event.http_status << std::endl; ss << "HTTP Status: " << msg->eventInfo.http_status << std::endl;
std::cout << ss.str() << std::endl; std::cout << ss.str() << std::endl;
} }
} }
@ -408,17 +393,12 @@ Ping/pong messages are used to implement keep-alive. 2 message types exists to i
``` ```
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Ping || if (msg->type == ix::WebSocketMessageType::Ping ||
messageType == ix::WebSocketMessageType::Pong) msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cout << "pong data: " << str << std::endl; std::cout << "pong data: " << msg->str << std::endl;
} }
} }
); );

View File

@ -24,6 +24,8 @@ namespace ix
bool SocketMbedTLS::init(const std::string& host, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, std::string& errMsg)
{ {
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_init(&_ssl); mbedtls_ssl_init(&_ssl);
mbedtls_ssl_config_init(&_conf); mbedtls_ssl_config_init(&_conf);
mbedtls_ctr_drbg_init(&_ctr_drbg); mbedtls_ctr_drbg_init(&_ctr_drbg);
@ -75,15 +77,24 @@ namespace ix
std::string& errMsg, std::string& errMsg,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
{
std::lock_guard<std::mutex> lock(_mutex);
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false; if (_sockfd == -1) return false;
if (!init(host, errMsg)) return false; }
if (!init(host, errMsg))
{
close();
return false;
}
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL); mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res; int res;
do do
{ {
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl); res = mbedtls_ssl_handshake(&_ssl);
} }
while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE); while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
@ -95,6 +106,8 @@ namespace ix
errMsg = "error in handshake : "; errMsg = "error in handshake : ";
errMsg += buf; errMsg += buf;
close();
return false; return false;
} }
@ -103,10 +116,14 @@ namespace ix
void SocketMbedTLS::close() void SocketMbedTLS::close()
{ {
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_free(&_ssl); mbedtls_ssl_free(&_ssl);
mbedtls_ssl_config_free(&_conf); mbedtls_ssl_config_free(&_conf);
mbedtls_ctr_drbg_free(&_ctr_drbg); mbedtls_ctr_drbg_free(&_ctr_drbg);
mbedtls_entropy_free(&_entropy); mbedtls_entropy_free(&_entropy);
Socket::close();
} }
ssize_t SocketMbedTLS::send(char* buf, size_t nbyte) ssize_t SocketMbedTLS::send(char* buf, size_t nbyte)

View File

@ -51,9 +51,11 @@ namespace ix
_ws.setOnCloseCallback( _ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote)
{ {
_onMessageCallback(WebSocketMessageType::Close, "", wireSize, _onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(), WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)); WebSocketCloseInfo(code, reason, remote)));
} }
); );
} }
@ -135,6 +137,13 @@ namespace ix
_enablePong = false; _enablePong = false;
} }
void WebSocket::disablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(false);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::start() void WebSocket::start()
{ {
if (_thread.joinable()) return; // we've already been started if (_thread.joinable()) return; // we've already been started
@ -173,10 +182,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(), WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers), WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()); WebSocketCloseInfo()));
return status; return status;
} }
@ -196,10 +207,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(), WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers), WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()); WebSocketCloseInfo()));
return status; return status;
} }
@ -267,9 +280,11 @@ namespace ix
connectErr.reason = status.errorStr; connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status; connectErr.http_status = status.http_status;
_onMessageCallback(WebSocketMessageType::Error, "", 0, _onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(), connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()); WebSocketCloseInfo()));
} }
} }
} }
@ -310,8 +325,8 @@ namespace ix
WebSocketMessageType webSocketMessageType; WebSocketMessageType webSocketMessageType;
switch (messageKind) switch (messageKind)
{ {
default: case WebSocketTransport::MessageKind::MSG_TEXT:
case WebSocketTransport::MessageKind::MSG: case WebSocketTransport::MessageKind::MSG_BINARY:
{ {
webSocketMessageType = WebSocketMessageType::Message; webSocketMessageType = WebSocketMessageType::Message;
} break; } break;
@ -335,9 +350,13 @@ namespace ix
WebSocketErrorInfo webSocketErrorInfo; WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError; webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize, bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
_onMessageCallback(
std::make_shared<WebSocketMessage>(
webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(), webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo()); WebSocketCloseInfo(), binary));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });
@ -368,9 +387,18 @@ namespace ix
} }
WebSocketSendInfo WebSocket::send(const std::string& data, WebSocketSendInfo WebSocket::send(const std::string& data,
bool binary,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {
return sendMessage(data, SendMessageKind::Binary, onProgressCallback); return sendMessage(data,
(binary) ? SendMessageKind::Binary: SendMessageKind::Text,
onProgressCallback);
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
} }
WebSocketSendInfo WebSocket::sendText(const std::string& text, WebSocketSendInfo WebSocket::sendText(const std::string& text,

View File

@ -13,6 +13,7 @@
#include "IXWebSocketCloseConstants.h" #include "IXWebSocketCloseConstants.h"
#include "IXWebSocketErrorInfo.h" #include "IXWebSocketErrorInfo.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketMessage.h"
#include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketSendInfo.h" #include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
@ -32,52 +33,7 @@ namespace ix
Closed = 3 Closed = 3
}; };
enum class WebSocketMessageType using OnMessageCallback = std::function<void(const WebSocketMessagePtr&)>;
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
size_t wireSize,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>; using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
@ -95,6 +51,7 @@ namespace ix
void setPingTimeout(int pingTimeoutSecs); void setPingTimeout(int pingTimeoutSecs);
void enablePong(); void enablePong();
void disablePong(); void disablePong();
void disablePerMessageDeflate();
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();
@ -107,14 +64,18 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs); WebSocketInitResult connect(int timeoutSecs);
void run(); void run();
// send binary data // send is in binary mode by default
WebSocketSendInfo send(const std::string& data, WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text, WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(uint16_t code = 1000, const std::string& reason = "Normal closure"); void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@ -168,7 +129,7 @@ namespace ix
bool _enablePong; bool _enablePong;
static const bool kDefaultEnablePong; static const bool kDefaultEnablePong;
// Optional ping and ping timeout // Optional ping and pong timeout
int _pingIntervalSecs; int _pingIntervalSecs;
int _pingTimeoutSecs; int _pingTimeoutSecs;
static const int kDefaultPingIntervalSecs; static const int kDefaultPingIntervalSecs;

View File

@ -0,0 +1,25 @@
/*
* IXWebSocketCloseInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
} // namespace ix

View File

@ -0,0 +1,49 @@
/*
* IXWebSocketMessage.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocketCloseInfo.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketMessageType.h"
#include "IXWebSocketOpenInfo.h"
#include <memory>
#include <string>
#include <thread>
namespace ix
{
struct WebSocketMessage
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
bool binary;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c,
bool b = false)
: type(t)
, str(std::move(s))
, wireSize(w)
, errorInfo(e)
, openInfo(o)
, closeInfo(c)
, binary(b)
{
;
}
};
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;
} // namespace ix

View File

@ -32,14 +32,7 @@ namespace ix
if (_websocket) if (_websocket)
{ {
// set dummy callback just to avoid crash // set dummy callback just to avoid crash
_websocket->setOnMessageCallback([]( _websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
WebSocketMessageType,
const std::string&,
size_t,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
{});
} }
_websocket = websocket; _websocket = websocket;
@ -47,27 +40,10 @@ namespace ix
// bind new // bind new
if (_websocket) if (_websocket)
{ {
_websocket->setOnMessageCallback([this]( _websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg)
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
{
MessagePtr message(new Message());
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{ {
std::lock_guard<std::mutex> lock(_messagesMutex); std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message)); _messages.emplace_back(std::move(msg));
}
}); });
} }
} }
@ -82,9 +58,9 @@ namespace ix
_onMessageUserCallback = std::move(callback); _onMessageUserCallback = std::move(callback);
} }
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage() WebSocketMessagePtr WebSocketMessageQueue::popMessage()
{ {
MessagePtr message; WebSocketMessagePtr message;
std::lock_guard<std::mutex> lock(_messagesMutex); std::lock_guard<std::mutex> lock(_messagesMutex);
if (!_messages.empty()) if (!_messages.empty())
@ -101,19 +77,11 @@ namespace ix
if (!_onMessageUserCallback) if (!_onMessageUserCallback)
return; return;
MessagePtr message; WebSocketMessagePtr message;
while (count > 0 && (message = popMessage())) while (count > 0 && (message = popMessage()))
{ {
_onMessageUserCallback( _onMessageUserCallback(message);
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count; --count;
} }
} }

View File

@ -30,24 +30,12 @@ namespace ix
void poll(int count = 512); void poll(int count = 512);
protected: protected:
struct Message WebSocketMessagePtr popMessage();
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessagePtr = std::shared_ptr<Message>;
MessagePtr popMessage();
private: private:
WebSocket* _websocket = nullptr; WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback; OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex; std::mutex _messagesMutex;
std::list<MessagePtr> _messages; std::list<WebSocketMessagePtr> _messages;
}; };
} // namespace ix } // namespace ix

View File

@ -0,0 +1,21 @@
/*
* IXWebSocketMessageType.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
}

View File

@ -0,0 +1,24 @@
/*
* IXWebSocketOpenInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
} // namespace ix

View File

@ -542,12 +542,17 @@ namespace ix
) { ) {
unmaskReceiveBuffer(ws); unmaskReceiveBuffer(ws);
MessageKind messageKind =
(ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
// //
// Usual case. Small unfragmented messages // Usual case. Small unfragmented messages
// //
if (ws.fin && _chunks.empty()) if (ws.fin && _chunks.empty())
{ {
emitMessage(MessageKind::MSG, emitMessage(messageKind,
std::string(_rxbuf.begin()+ws.header_size, std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N), _rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws, ws,
@ -567,7 +572,7 @@ namespace ix
_rxbuf.begin()+ws.header_size+(size_t)ws.N)); _rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin) if (ws.fin)
{ {
emitMessage(MessageKind::MSG, getMergedChunks(), ws, onMessageCallback); emitMessage(messageKind, getMergedChunks(), ws, onMessageCallback);
_chunks.clear(); _chunks.clear();
} }
else else

View File

@ -50,7 +50,8 @@ namespace ix
enum class MessageKind enum class MessageKind
{ {
MSG, MSG_TEXT,
MSG_BINARY,
PING, PING,
PONG, PONG,
FRAGMENT FRAGMENT

View File

@ -9,10 +9,10 @@ install: brew
# on osx it is good practice to make /usr/local user writable # on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local # sudo chown -R `whoami`/staff /usr/local
brew: brew:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j install) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
ws: ws:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .. ; make -j)
uninstall: uninstall:
xargs rm -fv < build/install_manifest.txt xargs rm -fv < build/install_manifest.txt
@ -44,7 +44,7 @@ trail:
sh third_party/remote_trailing_whitespaces.sh sh third_party/remote_trailing_whitespaces.sh
format: format:
find ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \; find test ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
# That target is used to start a node server, but isn't required as we have # That target is used to start a node server, but isn't required as we have
# a builtin C++ server started in the unittest now # a builtin C++ server started in the unittest now

View File

@ -178,34 +178,29 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str, msg->binary);
} }
} }
} }

View File

@ -6,13 +6,13 @@
#pragma once #pragma once
#include <string>
#include <vector>
#include <sstream>
#include <iostream> #include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketServer.h> #include <sstream>
#include <string>
#include <vector>
namespace ix namespace ix
{ {
@ -49,4 +49,4 @@ namespace ix
int getFreePort(); int getFreePort();
bool startWebSocketEchoServer(ix::WebSocketServer& server); bool startWebSocketEchoServer(ix::WebSocketServer& server);
} } // namespace ix

View File

@ -108,52 +108,47 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "client disconnected(" ss << "client disconnected("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(_mutexCloseData); std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = closeInfo.code; _closeCode = msg->closeInfo.code;
_closeReason = std::string(closeInfo.reason); _closeReason = std::string(msg->closeInfo.reason);
_closeRemote = closeInfo.remote; _closeRemote = msg->closeInfo.remote;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "Received message " << str; ss << "Received message " << msg->str;
log(ss.str()); log(ss.str());
} }
else else
@ -183,39 +178,34 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New server connection"; Logger() << "New server connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Server closed connection(" ss << "Server closed connection("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite); std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code; receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason); receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = closeInfo.remote; receivedCloseRemote = msg->closeInfo.remote;
} }
} }
); );

View File

@ -23,36 +23,31 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[connectionState, &server](ix::WebSocketMessageType messageType, [connectionState, &server](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const ix::WebSocketErrorInfo & error,
const ix::WebSocketOpenInfo & openInfo,
const ix::WebSocketCloseInfo & closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto&& it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Logger() << "Message received: " << str; Logger() << "Message received: " << msg->str;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
client->send(str); client->send(msg->str);
} }
} }
} }
@ -78,46 +73,41 @@ namespace
{ {
msgQ.bindWebsocket(&ws); msgQ.bindWebsocket(&ws);
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType, msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & error,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
{ {
REQUIRE(mainThreadId == std::this_thread::get_id()); REQUIRE(mainThreadId == std::this_thread::get_id());
std::stringstream ss; std::stringstream ss;
if (messageType == WebSocketMessageType::Open) if (msg->type == WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
sendNextMessage(); sendNextMessage();
} }
else if (messageType == WebSocketMessageType::Close) else if (msg->type == WebSocketMessageType::Close)
{ {
log("client disconnected"); log("client disconnected");
} }
else if (messageType == WebSocketMessageType::Error) else if (msg->type == WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
testDone = true; testDone = true;
} }
else if (messageType == WebSocketMessageType::Pong) else if (msg->type == WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Ping) else if (msg->type == WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Message) else if (msg->type == WebSocketMessageType::Message)
{ {
REQUIRE(str.compare("Hey dude!") == 0); REQUIRE(msg->str.compare("Hey dude!") == 0);
++receivedCount; ++receivedCount;
ss << "Received message " << str; ss << "Received message " << msg->str;
log(ss.str()); log(ss.str());
sendNextMessage(); sendNextMessage();
} }
@ -189,5 +179,4 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
server.stop(); server.stop();
} }
} }

View File

@ -106,7 +106,7 @@ namespace
{ {
log("client disconnected"); log("client disconnected");
if (closeInfo.code == 1011) if (msg->closeInfo.code == 1011)
{ {
_closedDueToPingTimeout = true; _closedDueToPingTimeout = true;
} }

View File

@ -43,38 +43,33 @@ namespace ix
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, [webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType, &connectionId, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
connectionId = connectionState->getId(); connectionId = connectionState->getId();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -52,41 +52,36 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("TestConnectionDisconnection: connected !"); log("TestConnectionDisconnection: connected !");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("TestConnectionDisconnection: disconnected !"); log("TestConnectionDisconnection: disconnected !");
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "TestConnectionDisconnection: Error! "; ss << "TestConnectionDisconnection: Error! ";
ss << error.reason; ss << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
log("TestConnectionDisconnection: received message.!"); log("TestConnectionDisconnection: received message.!");
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("TestConnectionDisconnection: received ping message.!"); log("TestConnectionDisconnection: received ping message.!");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("TestConnectionDisconnection: received pong message.!"); log("TestConnectionDisconnection: received pong message.!");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("TestConnectionDisconnection: received fragment.!"); log("TestConnectionDisconnection: received fragment.!");
} }

View File

@ -114,31 +114,26 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " disconnected !"; << " disconnected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user // the messages we send, so we don't need to have a msg_user != user
@ -159,20 +154,20 @@ namespace
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "cmd_websocket_chat: Error ! " << error.reason; ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("cmd_websocket_chat: received ping message"); log("cmd_websocket_chat: received ping message");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("cmd_websocket_chat: received pong message"); log("cmd_websocket_chat: received pong message");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("cmd_websocket_chat: received message fragment"); log("cmd_websocket_chat: received message fragment");
} }
@ -221,35 +216,30 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("Closed connection"); log("Closed connection");
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -0,0 +1,30 @@
# Clients
## ws
```
$ ws connect ws://127.0.0.1:8765
Type Ctrl-D to exit prompt...
Connecting to url: ws://127.0.0.1:8765
> ws_connect: connected
Uri: /
Handshake Headers:
Connection: Upgrade
Date: Sat, 08 Jun 2019 16:43:29 GMT
Sec-WebSocket-Accept: kPCNwGa97y+7NWdAvHi/7/rA8AE=
Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
Server: Python/3.7 websockets/7.0
Upgrade: websocket
Received 13 bytes
ws_connect: received message: > Welcome !
ws_connect: connection closed: code 1006 reason Abnormal closure
```
## wscat
```
$ ./node_modules/.bin/wscat -c ws://127.0.0.1:8765
connected (press CTRL+C to quit)
< > Welcome !
disconnected (code: 1006)
```

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# WS server example
import asyncio
import websockets
async def hello(websocket, path):
await websocket.send(f"> Welcome !")
name = await websocket.recv()
print(f"< {name}")
greeting = f"Hello {name}!"
await websocket.send(greeting)
print(f"> {greeting}")
start_server = websockets.serve(hello, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@ -6,10 +6,10 @@
#pragma once #pragma once
#include <algorithm>
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <regex> #include <regex>
#include <algorithm>
namespace ix namespace ix
{ {

View File

@ -90,46 +90,41 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback() void CobraConnection::initWebSocketOnMessageCallback()
{ {
_webSocket->setOnMessageCallback( _webSocket->setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
CobraConnection::invokeTrafficTrackerCallback(wireSize, true); CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
invokeEventCallback(ix::CobraConnection_EventType_Open, invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(), std::string(),
openInfo.headers); msg->openInfo.headers);
sendHandshakeMessage(); sendHandshakeMessage();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
_authenticated = false; _authenticated = false;
std::stringstream ss; std::stringstream ss;
ss << "Close code " << closeInfo.code; ss << "Close code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason; ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraConnection_EventType_Closed, invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str()); ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Json::Value data; Json::Value data;
Json::Reader reader; Json::Reader reader;
if (!reader.parse(str, data)) if (!reader.parse(msg->str, data))
{ {
invokeErrorCallback("Invalid json", str); invokeErrorCallback("Invalid json", msg->str);
return; return;
} }
if (!data.isMember("action")) if (!data.isMember("action"))
{ {
invokeErrorCallback("Missing action", str); invokeErrorCallback("Missing action", msg->str);
return; return;
} }
@ -139,12 +134,12 @@ namespace ix
{ {
if (!handleHandshakeResponse(data)) if (!handleHandshakeResponse(data))
{ {
invokeErrorCallback("Error extracting nonce from handshake response", str); invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
} }
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
invokeErrorCallback("Handshake error", str); invokeErrorCallback("Handshake error", msg->str);
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -154,7 +149,7 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
invokeErrorCallback("Authentication error", str); invokeErrorCallback("Authentication error", msg->str);
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -164,36 +159,36 @@ namespace ix
{ {
if (!handleSubscriptionResponse(data)) if (!handleSubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/subscribe/error") else if (action == "rtm/subscribe/error")
{ {
invokeErrorCallback("Subscription error", str); invokeErrorCallback("Subscription error", msg->str);
} }
else if (action == "rtm/unsubscribe/ok") else if (action == "rtm/unsubscribe/ok")
{ {
if (!handleUnsubscriptionResponse(data)) if (!handleUnsubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/unsubscribe/error") else if (action == "rtm/unsubscribe/error")
{ {
invokeErrorCallback("Unsubscription error", str); invokeErrorCallback("Unsubscription error", msg->str);
} }
else else
{ {
invokeErrorCallback("Un-handled message type", str); invokeErrorCallback("Un-handled message type", msg->str);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string()); invokeErrorCallback(ss.str(), std::string());
} }
}); });

View File

@ -58,25 +58,20 @@ namespace snake
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState); auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType, [this, webSocket, state](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl; std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::string appkey = parseAppKey(openInfo.uri); std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey); state->setAppkey(appkey);
// Connect to redis first // Connect to redis first
@ -86,29 +81,29 @@ namespace snake
std::cerr << "Cannot connect to redis host" << std::endl; std::cerr << "Cannot connect to redis host" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str); processCobraMessage(state, webSocket, _appConfig, msg->str);
} }
} }
); );

View File

@ -80,6 +80,8 @@ int main(int argc, char** argv)
bool strict = false; bool strict = false;
bool stress = false; bool stress = false;
bool disableAutomaticReconnection = false; bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
bool greetings = false;
int port = 8008; int port = 8008;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -110,6 +112,7 @@ int main(int argc, char** argv)
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection"); connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
connectApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required(); chatApp->add_option("url", url, "Connection url")->required();
@ -118,6 +121,7 @@ int main(int argc, char** argv)
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server"); CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Port"); echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname"); echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Port"); broadcastServerApp->add_option("--port", port, "Port");
@ -241,7 +245,8 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("connect")) else if (app.got_subcommand("connect"))
{ {
ret = ix::ws_connect_main(url, disableAutomaticReconnection); ret = ix::ws_connect_main(url, disableAutomaticReconnection,
disablePerMessageDeflate);
} }
else if (app.got_subcommand("chat")) else if (app.got_subcommand("chat"))
{ {
@ -249,7 +254,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("echo_server")) else if (app.got_subcommand("echo_server"))
{ {
ret = ix::ws_echo_server_main(port, hostname); ret = ix::ws_echo_server_main(port, greetings, hostname);
} }
else if (app.got_subcommand("broadcast_server")) else if (app.got_subcommand("broadcast_server"))
{ {

View File

@ -24,13 +24,15 @@ namespace ix
int ws_ping_pong_main(const std::string& url); int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port, const std::string& hostname); int ws_echo_server_main(int port, bool greetings, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname); int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname); int ws_transfer_main(int port, const std::string& hostname);
int ws_chat_main(const std::string& url, const std::string& user); int ws_chat_main(const std::string& url, const std::string& user);
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection); int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs); int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs);

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
msg->binary,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "Step " << current std::cerr << "Step " << current

View File

@ -84,20 +84,15 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws chat: connected"); log("ws chat: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
@ -107,18 +102,18 @@ namespace ix
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws chat: user " ss << "ws chat: user "
<< _user << _user
<< " disconnected !" << " disconnected !"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason; << " reason " << msg->closeInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out. // the messages we send, so we don't have to filter it out.
@ -127,17 +122,17 @@ namespace ix
_receivedQueue.push(result.second); _receivedQueue.push(result.second);
ss << std::endl ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second << result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else
@ -172,7 +167,7 @@ namespace ix
void WebSocketChat::sendMessage(const std::string& text) void WebSocketChat::sendMessage(const std::string& text)
{ {
_webSocket.send(encodeMessage(text)); _webSocket.sendText(encodeMessage(text));
} }
int ws_chat_main(const std::string& url, int ws_chat_main(const std::string& url,

View File

@ -15,7 +15,8 @@ namespace ix
{ {
public: public:
WebSocketConnect(const std::string& _url, WebSocketConnect(const std::string& _url,
bool disableAutomaticReconnection); bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -26,13 +27,16 @@ namespace ix
private: private:
std::string _url; std::string _url;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
void log(const std::string& msg); void log(const std::string& msg);
}; };
WebSocketConnect::WebSocketConnect(const std::string& url, WebSocketConnect::WebSocketConnect(const std::string& url,
bool disableAutomaticReconnection) : bool disableAutomaticReconnection,
_url(url) bool disablePerMessageDeflate) :
_url(url),
_disablePerMessageDeflate(disablePerMessageDeflate)
{ {
if (disableAutomaticReconnection) if (disableAutomaticReconnection)
{ {
@ -54,64 +58,66 @@ namespace ix
{ {
_webSocket.setUrl(_url); _webSocket.setUrl(_url);
if (_disablePerMessageDeflate)
{
_webSocket.disablePerMessageDeflate();
}
else
{
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15); true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); _webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); log("ws_connect: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_connect: connection closed:"; ss << "ws_connect: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: " ss << "ws_connect: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
std::cerr << "Received ping" << std::endl; std::cerr << "Received ping" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cerr << "Received pong" << std::endl; std::cerr << "Received pong" << std::endl;
} }
@ -127,13 +133,17 @@ namespace ix
void WebSocketConnect::sendMessage(const std::string& text) void WebSocketConnect::sendMessage(const std::string& text)
{ {
_webSocket.send(text); _webSocket.sendText(text);
} }
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection) int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url, disableAutomaticReconnection); WebSocketConnect webSocketChat(url,
disableAutomaticReconnection,
disablePerMessageDeflate);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)

View File

@ -10,56 +10,56 @@
namespace ix namespace ix
{ {
int ws_echo_server_main(int port, const std::string& hostname) int ws_echo_server_main(int port, bool greetings, const std::string& hostname)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket, [greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState](ix::WebSocketMessageType messageType, [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
if (greetings)
{
webSocket->sendText("Welcome !");
} }
else if (messageType == ix::WebSocketMessageType::Close) }
else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " std::cerr << "Received "
<< wireSize << " bytes" << msg->wireSize << " bytes"
<< std::endl; << std::endl;
webSocket->send(str); webSocket->send(msg->str, msg->binary);
} }
} }
); );

View File

@ -54,59 +54,54 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ping_pong: disconnected:" ss << "ping_pong: disconnected:"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << " reason " << msg->closeInfo.reason
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ping_pong: received message: " ss << "ping_pong: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "ping_pong: received ping message: " ss << "ping_pong: received ping message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "ping_pong: received pong message: " ss << "ping_pong: received pong message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -183,41 +183,36 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_receive: connected"); log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_receive: connection closed:"; ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ws_receive: transfered " << wireSize << " bytes"; ss << "ws_receive: transfered " << msg->wireSize << " bytes";
log(ss.str()); log(ss.str());
handleMessage(str); handleMessage(msg->str);
_condition.notify_one(); _condition.notify_one();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
ss << "ws_receive: received fragment " << _receivedFragmentCounter++; ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str()); log(ss.str());
@ -229,13 +224,13 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_receive "; ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -112,42 +112,37 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_send: connected"); log("ws_send: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_send: connection closed:"; ss << "ws_send: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
_condition.notify_one(); _condition.notify_one();
ss << "ws_send: received message (" << wireSize << " bytes)"; ss << "ws_send: received message (" << msg->wireSize << " bytes)";
log(ss.str()); log(ss.str());
std::string errMsg; std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg); MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty()) if (!errMsg.empty())
{ {
std::cerr << "Invalid MsgPack response" << std::endl; std::cerr << "Invalid MsgPack response" << std::endl;
@ -160,13 +155,13 @@ namespace ix
std::cerr << "Invalid id" << std::endl; std::cerr << "Invalid id" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_send "; ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else
@ -244,7 +239,7 @@ namespace ix
MsgPack msg(pdu); MsgPack msg(pdu);
Bench bench("Sending file through websocket"); Bench bench("Sending file through websocket");
_webSocket.send(msg.dump(), _webSocket.sendBinary(msg.dump(),
[throttle](int current, int total) -> bool [throttle](int current, int total) -> bool
{ {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl; std::cout << "ws_send: Step " << current << " out of " << total << std::endl;

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment " std::cerr << "Received message fragment "
<< std::endl; << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
msg->binary,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "ws_transfer: Step " << current std::cerr << "ws_transfer: Step " << current