Compare commits

...

17 Commits

Author SHA1 Message Date
1af96ed4e4 try to build vcpkg on travis 2019-06-10 13:35:47 -07:00
7cb5cc05e4 Add -DUSE_VENDORED_THIRD_PARTY=1 to build ws 2019-06-10 13:26:41 -07:00
750a752ac0 - mbedtls and zlib are searched with find_package, and we use the vendored version if nothing is found 2019-06-10 11:18:27 -07:00
61e5f52286 - travis CI uses g++ on Linux 2019-06-09 14:27:45 -07:00
ce0b716f54 compile error in IXWebSocketMessageQTest 2019-06-09 12:25:36 -07:00
aae8e5ec65 fix IXWebSocketMessageQTest.cpp 2019-06-09 12:08:00 -07:00
2723e8466e fix changelog 2019-06-09 12:02:38 -07:00
f13c610352 update README to reflect the new API 2019-06-09 12:02:02 -07:00
55c65b08bf - WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
2019-06-09 11:56:47 -07:00
a11aa3e0dd WebSocket::send takes a third arg, binary which default to true (can be text too) 2019-06-09 11:35:31 -07:00
de0bf5ebcd WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg 2019-06-09 11:33:17 -07:00
15369e1ae9 ... 2019-06-09 10:22:27 -07:00
d4115880b9 Add explicite WebSocket::sendBinary
New headers + WebSocketMessage class to hold message data, still not used across the board
2019-06-09 10:10:33 -07:00
3c80c75e4a Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility. 2019-06-08 09:46:26 -07:00
5cb72dce4c ws echo_server has a -g option to print a greeting message on connect 2019-06-08 09:16:33 -07:00
d2747487e3 IXSocketMbedTLS: better error handling in close and connect 2019-06-06 14:59:22 -07:00
12e664fc61 add a changelog 2019-06-06 13:59:12 -07:00
39 changed files with 715 additions and 611 deletions

View File

@ -12,23 +12,23 @@ matrix:
- python test/run.py - python test/run.py
- make ws - make ws
# # Linux # Linux
# - os: linux
# dist: xenial
# script:
# - python test/run.py
# - make ws
# env:
# - CC=gcc
# - CXX=g++
# Clang + Linux disabled for now
- os: linux - os: linux
dist: xenial dist: xenial
script: python test/run.py script:
- python test/run.py
- make ws
env: env:
- CC=clang - CC=gcc
- CXX=clang++ - CXX=g++
# Clang + Linux disabled for now
# - os: linux
# dist: xenial
# script: python test/run.py
# env:
# - CC=clang
# - CXX=clang++
# Windows # Windows
- os: windows - os: windows
@ -36,6 +36,36 @@ matrix:
- CMAKE_PATH="/c/Program Files/CMake/bin" - CMAKE_PATH="/c/Program Files/CMake/bin"
script: script:
- export PATH=$CMAKE_PATH:$PATH - export PATH=$CMAKE_PATH:$PATH
- cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 . - cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .
- cmake --build --parallel . - cmake --build --parallel .
- python test/run.py - python test/run.py
install:
# HACK: gcc 8.0.1 is missing movdirintrin.h so just download it. We need this for GLM and Vectrexy to build.
- sudo wget https://raw.githubusercontent.com/gcc-mirror/gcc/gcc-8-branch/gcc/config/i386/movdirintrin.h -P /usr/lib/gcc/x86_64-linux-gnu/8/include/
# Create deps dir
- mkdir -p ${DEPS_DIR}
# Set compiler vars
- export CC=${CC_COMPILER}
- export CXX=${CXX_COMPILER}
# Install vcpkg and dependencies
- |
set -e
mkdir -p ${DEPS_DIR}/vcpkg
pushd ${DEPS_DIR}/vcpkg
git init
git remote add origin https://github.com/Microsoft/vcpkg.git
git fetch origin master
git checkout -b master origin/master
./bootstrap-vcpkg.sh
# Only build release libs to save time. We inject a new line first since some cmake files don't end with one.
echo -e '\nset(VCPKG_BUILD_TYPE release)' >> ./triplets/${VCPKG_TRIPLET}.cmake
./vcpkg install sdl2 sdl2-net glew glm stb imgui
popd
cache:
directories:
- ${DEPS_DIR}/vcpkg/installed

31
CHANGELOG.md Normal file
View File

@ -0,0 +1,31 @@
# Changelog
All notable changes to this project will be documented in this file.
## [unreleased] - 2019-06-09
### Changed
- mbedtls and zlib are searched with find_package, and we use the vendored version if nothing is found
- travis CI uses g++ on Linux
## [4.0.0] - 2019-06-09
### Changed
- WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
- WebSocket::send takes a third arg, binary which default to true (can be text too)
- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg
- Add explicit WebSocket::sendBinary method
- New headers + WebSocketMessage class to hold message data, still not used across the board
- Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility.
- ws echo_server has a -g option to print a greeting message on connect
- IXSocketMbedTLS: better error handling in close and connect
## [3.1.2] - 2019-06-06
### Added
- ws connect has a -x option to disable per message deflate
- Add WebSocket::disablePerMessageDeflate() option.
## [3.0.0] - 2019-06-xx
### Changed
- TLS, aka SSL works on Windows (websocket and http clients)
- ws command line tool build on Windows
- Async API for HttpClient
- HttpClient API changed to use shared_ptr for response and request

13
CMake/FindMbedTLS.cmake Normal file
View File

@ -0,0 +1,13 @@
find_path(MBEDTLS_INCLUDE_DIRS mbedtls/ssl.h)
find_library(MBEDTLS_LIBRARY mbedtls)
find_library(MBEDX509_LIBRARY mbedx509)
find_library(MBEDCRYPTO_LIBRARY mbedcrypto)
set(MBEDTLS_LIBRARIES "${MBEDTLS_LIBRARY}" "${MBEDX509_LIBRARY}" "${MBEDCRYPTO_LIBRARY}")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(MBEDTLS DEFAULT_MSG
MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)
mark_as_advanced(MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)

View File

@ -4,6 +4,8 @@
# #
cmake_minimum_required(VERSION 3.4.1) cmake_minimum_required(VERSION 3.4.1)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake;${CMAKE_MODULE_PATH}")
project(ixwebsocket C CXX) project(ixwebsocket C CXX)
set (CMAKE_CXX_STANDARD 14) set (CMAKE_CXX_STANDARD 14)
@ -20,60 +22,64 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
endif() endif()
set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_SOURCES
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXWebSocket.cpp ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketServer.cpp ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketHandshake.cpp ixwebsocket/IXWebSocketHandshake.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
ixwebsocket/IXWebSocketPerMessageDeflate.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXHttpClient.cpp ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/LUrlParser.cpp ixwebsocket/LUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
) )
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXHttpClient.h
ixwebsocket/IXNetSystem.h ixwebsocket/IXNetSystem.h
ixwebsocket/IXProgressCallback.h ixwebsocket/IXProgressCallback.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXWebSocket.h ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketCloseInfo.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketErrorInfo.h ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/IXWebSocketMessage.h
ixwebsocket/IXWebSocketMessageQueue.h
ixwebsocket/IXWebSocketMessageType.h
ixwebsocket/IXWebSocketOpenInfo.h
ixwebsocket/IXWebSocketPerMessageDeflate.h ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/libwshandshake.hpp ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXHttpClient.h ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXUrlParser.h
ixwebsocket/LUrlParser.h ixwebsocket/LUrlParser.h
ixwebsocket/IXSelectInterrupt.h ixwebsocket/libwshandshake.hpp
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketMessageQueue.h
) )
if (UNIX) if (UNIX)
@ -137,25 +143,28 @@ if (USE_OPEN_SSL)
endif() endif()
if (USE_MBED_TLS) if (USE_MBED_TLS)
set (ENABLE_PROGRAMS OFF) if (USE_VENDORED_THIRD_PARTY)
add_subdirectory(third_party/mbedtls) set (ENABLE_PROGRAMS OFF)
include_directories(third_party/mbedtls/include) add_subdirectory(third_party/mbedtls)
include_directories(third_party/mbedtls/include)
target_link_libraries(ixwebsocket mbedtls) target_link_libraries(ixwebsocket mbedtls)
else()
find_package(MbedTLS REQUIRED)
include_directories(${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES})
endif()
endif() endif()
if (WIN32) find_package(ZLIB REQUIRED)
if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
else()
add_subdirectory(third_party/zlib) add_subdirectory(third_party/zlib)
include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib) include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib)
target_link_libraries(ixwebsocket zlibstatic wsock32 ws2_32) target_link_libraries(ixwebsocket zlibstatic wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS) add_definitions(-D_CRT_SECURE_NO_WARNINGS)
else()
# gcc/Linux needs -pthread
find_package(Threads)
target_link_libraries(ixwebsocket
z ${CMAKE_THREAD_LIBS_INIT})
endif() endif()
set( IXWEBSOCKET_INCLUDE_DIRS set( IXWEBSOCKET_INCLUDE_DIRS
@ -167,7 +176,7 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
target_compile_options(ixwebsocket PRIVATE /MP) target_compile_options(ixwebsocket PRIVATE /MP)
endif() endif()
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} ) target_include_directories(ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS})
set_target_properties(ixwebsocket PROPERTIES PUBLIC_HEADER "${IXWEBSOCKET_HEADERS}") set_target_properties(ixwebsocket PROPERTIES PUBLIC_HEADER "${IXWEBSOCKET_HEADERS}")

View File

@ -1 +1 @@
3.1.1 4.0.0

View File

@ -33,27 +33,22 @@ webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received // Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Message) if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cout << str << std::endl; std::cout << msg->str << std::endl;
} }
}); });
// Now that our callback is setup, we can start our background thread and receive messages // Now that our callback is setup, we can start our background thread and receive messages
webSocket.start(); webSocket.start();
// Send a message to the server (default to BINARY mode) // Send a message to the server (default to TEXT mode)
webSocket.send("hello world"); webSocket.send("hello world");
// The message can be sent in TEXT mode // The message can be sent in BINARY mode (useful if you send MsgPack data for example)
webSocket.sendText("hello again"); webSocket.sendBinary("some serialized binary data");
// ... finally ... // ... finally ...
@ -73,14 +68,9 @@ server.setOnConnectionCallback(
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
@ -91,19 +81,21 @@ server.setOnConnectionCallback(
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to. // The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
// For an echo server, we just send back to the client whatever was received by the server // For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example. // All connected clients are available in an std::set. See the broadcast cpp example.
webSocket->send(str); // Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket->send(msg->str, msg->binary);
} }
} }
); );
@ -334,32 +326,27 @@ The onMessage event will be fired when the connection is opened or closed. This
``` ```
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cout << "send greetings" << std::endl; std::cout << "send greetings" << std::endl;
// Headers can be inspected (pairs of string/string) // Headers can be inspected (pairs of string/string)
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : headers) for (auto it : msg->headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cout << "disconnected" << std::endl; std::cout << "disconnected" << std::endl;
// The server can send an explicit code and reason for closing. // The server can send an explicit code and reason for closing.
// This data can be accessed through the closeInfo object. // This data can be accessed through the closeInfo object.
std::cout << closeInfo.code << std::endl; std::cout << msg->closeInfo.code << std::endl;
std::cout << closeInfo.reason << std::endl; std::cout << msg->closeInfo.reason << std::endl;
} }
} }
); );
@ -371,20 +358,15 @@ A message will be fired when there is an error with the connection. The message
``` ```
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Error) if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Error: " << error.reason << std::endl; ss << "Error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << event.retries << std::endl; ss << "#retries: " << msg->eventInfo.retries << std::endl;
ss << "Wait time(ms): " << event.wait_time << std::endl; ss << "Wait time(ms): " << msg->eventInfo.wait_time << std::endl;
ss << "HTTP Status: " << event.http_status << std::endl; ss << "HTTP Status: " << msg->eventInfo.http_status << std::endl;
std::cout << ss.str() << std::endl; std::cout << ss.str() << std::endl;
} }
} }
@ -411,17 +393,12 @@ Ping/pong messages are used to implement keep-alive. 2 message types exists to i
``` ```
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Ping || if (msg->type == ix::WebSocketMessageType::Ping ||
messageType == ix::WebSocketMessageType::Pong) msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cout << "pong data: " << str << std::endl; std::cout << "pong data: " << msg->str << std::endl;
} }
} }
); );

View File

@ -24,6 +24,8 @@ namespace ix
bool SocketMbedTLS::init(const std::string& host, std::string& errMsg) bool SocketMbedTLS::init(const std::string& host, std::string& errMsg)
{ {
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_init(&_ssl); mbedtls_ssl_init(&_ssl);
mbedtls_ssl_config_init(&_conf); mbedtls_ssl_config_init(&_conf);
mbedtls_ctr_drbg_init(&_ctr_drbg); mbedtls_ctr_drbg_init(&_ctr_drbg);
@ -75,15 +77,24 @@ namespace ix
std::string& errMsg, std::string& errMsg,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); {
if (_sockfd == -1) return false; std::lock_guard<std::mutex> lock(_mutex);
if (!init(host, errMsg)) return false; _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
}
if (!init(host, errMsg))
{
close();
return false;
}
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL); mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res; int res;
do do
{ {
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl); res = mbedtls_ssl_handshake(&_ssl);
} }
while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE); while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
@ -95,6 +106,8 @@ namespace ix
errMsg = "error in handshake : "; errMsg = "error in handshake : ";
errMsg += buf; errMsg += buf;
close();
return false; return false;
} }
@ -103,10 +116,14 @@ namespace ix
void SocketMbedTLS::close() void SocketMbedTLS::close()
{ {
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_free(&_ssl); mbedtls_ssl_free(&_ssl);
mbedtls_ssl_config_free(&_conf); mbedtls_ssl_config_free(&_conf);
mbedtls_ctr_drbg_free(&_ctr_drbg); mbedtls_ctr_drbg_free(&_ctr_drbg);
mbedtls_entropy_free(&_entropy); mbedtls_entropy_free(&_entropy);
Socket::close();
} }
ssize_t SocketMbedTLS::send(char* buf, size_t nbyte) ssize_t SocketMbedTLS::send(char* buf, size_t nbyte)

View File

@ -51,9 +51,11 @@ namespace ix
_ws.setOnCloseCallback( _ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) [this](uint16_t code, const std::string& reason, size_t wireSize, bool remote)
{ {
_onMessageCallback(WebSocketMessageType::Close, "", wireSize, _onMessageCallback(
WebSocketErrorInfo(), WebSocketOpenInfo(), std::make_shared<WebSocketMessage>(
WebSocketCloseInfo(code, reason, remote)); WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
} }
); );
} }
@ -180,10 +182,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
WebSocketErrorInfo(), std::make_shared<WebSocketMessage>(
WebSocketOpenInfo(status.uri, status.headers), WebSocketMessageType::Open, "", 0,
WebSocketCloseInfo()); WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status; return status;
} }
@ -203,10 +207,12 @@ namespace ix
return status; return status;
} }
_onMessageCallback(WebSocketMessageType::Open, "", 0, _onMessageCallback(
WebSocketErrorInfo(), std::make_shared<WebSocketMessage>(
WebSocketOpenInfo(status.uri, status.headers), WebSocketMessageType::Open, "", 0,
WebSocketCloseInfo()); WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status; return status;
} }
@ -274,9 +280,11 @@ namespace ix
connectErr.reason = status.errorStr; connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status; connectErr.http_status = status.http_status;
_onMessageCallback(WebSocketMessageType::Error, "", 0, _onMessageCallback(
connectErr, WebSocketOpenInfo(), std::make_shared<WebSocketMessage>(
WebSocketCloseInfo()); WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()));
} }
} }
} }
@ -317,8 +325,8 @@ namespace ix
WebSocketMessageType webSocketMessageType; WebSocketMessageType webSocketMessageType;
switch (messageKind) switch (messageKind)
{ {
default: case WebSocketTransport::MessageKind::MSG_TEXT:
case WebSocketTransport::MessageKind::MSG: case WebSocketTransport::MessageKind::MSG_BINARY:
{ {
webSocketMessageType = WebSocketMessageType::Message; webSocketMessageType = WebSocketMessageType::Message;
} break; } break;
@ -342,9 +350,13 @@ namespace ix
WebSocketErrorInfo webSocketErrorInfo; WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError; webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize, bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo()); _onMessageCallback(
std::make_shared<WebSocketMessage>(
webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo(), binary));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });
@ -375,9 +387,18 @@ namespace ix
} }
WebSocketSendInfo WebSocket::send(const std::string& data, WebSocketSendInfo WebSocket::send(const std::string& data,
bool binary,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {
return sendMessage(data, SendMessageKind::Binary, onProgressCallback); return sendMessage(data,
(binary) ? SendMessageKind::Binary: SendMessageKind::Text,
onProgressCallback);
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
} }
WebSocketSendInfo WebSocket::sendText(const std::string& text, WebSocketSendInfo WebSocket::sendText(const std::string& text,

View File

@ -13,6 +13,7 @@
#include "IXWebSocketCloseConstants.h" #include "IXWebSocketCloseConstants.h"
#include "IXWebSocketErrorInfo.h" #include "IXWebSocketErrorInfo.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketMessage.h"
#include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketSendInfo.h" #include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
@ -32,52 +33,7 @@ namespace ix
Closed = 3 Closed = 3
}; };
enum class WebSocketMessageType using OnMessageCallback = std::function<void(const WebSocketMessagePtr&)>;
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
size_t wireSize,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>; using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
@ -108,14 +64,18 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs); WebSocketInitResult connect(int timeoutSecs);
void run(); void run();
// send binary data // send is in binary mode by default
WebSocketSendInfo send(const std::string& data, WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text, WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(uint16_t code = 1000, const std::string& reason = "Normal closure"); void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@ -169,7 +129,7 @@ namespace ix
bool _enablePong; bool _enablePong;
static const bool kDefaultEnablePong; static const bool kDefaultEnablePong;
// Optional ping and ping timeout // Optional ping and pong timeout
int _pingIntervalSecs; int _pingIntervalSecs;
int _pingTimeoutSecs; int _pingTimeoutSecs;
static const int kDefaultPingIntervalSecs; static const int kDefaultPingIntervalSecs;

View File

@ -0,0 +1,25 @@
/*
* IXWebSocketCloseInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
} // namespace ix

View File

@ -0,0 +1,49 @@
/*
* IXWebSocketMessage.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocketCloseInfo.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketMessageType.h"
#include "IXWebSocketOpenInfo.h"
#include <memory>
#include <string>
#include <thread>
namespace ix
{
struct WebSocketMessage
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
bool binary;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c,
bool b = false)
: type(t)
, str(std::move(s))
, wireSize(w)
, errorInfo(e)
, openInfo(o)
, closeInfo(c)
, binary(b)
{
;
}
};
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;
} // namespace ix

View File

@ -32,14 +32,7 @@ namespace ix
if (_websocket) if (_websocket)
{ {
// set dummy callback just to avoid crash // set dummy callback just to avoid crash
_websocket->setOnMessageCallback([]( _websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
WebSocketMessageType,
const std::string&,
size_t,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
{});
} }
_websocket = websocket; _websocket = websocket;
@ -47,27 +40,10 @@ namespace ix
// bind new // bind new
if (_websocket) if (_websocket)
{ {
_websocket->setOnMessageCallback([this]( _websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg)
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
{ {
MessagePtr message(new Message()); std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(msg));
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
}); });
} }
} }
@ -82,9 +58,9 @@ namespace ix
_onMessageUserCallback = std::move(callback); _onMessageUserCallback = std::move(callback);
} }
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage() WebSocketMessagePtr WebSocketMessageQueue::popMessage()
{ {
MessagePtr message; WebSocketMessagePtr message;
std::lock_guard<std::mutex> lock(_messagesMutex); std::lock_guard<std::mutex> lock(_messagesMutex);
if (!_messages.empty()) if (!_messages.empty())
@ -101,19 +77,11 @@ namespace ix
if (!_onMessageUserCallback) if (!_onMessageUserCallback)
return; return;
MessagePtr message; WebSocketMessagePtr message;
while (count > 0 && (message = popMessage())) while (count > 0 && (message = popMessage()))
{ {
_onMessageUserCallback( _onMessageUserCallback(message);
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count; --count;
} }
} }

View File

@ -30,24 +30,12 @@ namespace ix
void poll(int count = 512); void poll(int count = 512);
protected: protected:
struct Message WebSocketMessagePtr popMessage();
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessagePtr = std::shared_ptr<Message>;
MessagePtr popMessage();
private: private:
WebSocket* _websocket = nullptr; WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback; OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex; std::mutex _messagesMutex;
std::list<MessagePtr> _messages; std::list<WebSocketMessagePtr> _messages;
}; };
} // namespace ix } // namespace ix

View File

@ -0,0 +1,21 @@
/*
* IXWebSocketMessageType.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
}

View File

@ -0,0 +1,24 @@
/*
* IXWebSocketOpenInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
} // namespace ix

View File

@ -542,12 +542,17 @@ namespace ix
) { ) {
unmaskReceiveBuffer(ws); unmaskReceiveBuffer(ws);
MessageKind messageKind =
(ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
// //
// Usual case. Small unfragmented messages // Usual case. Small unfragmented messages
// //
if (ws.fin && _chunks.empty()) if (ws.fin && _chunks.empty())
{ {
emitMessage(MessageKind::MSG, emitMessage(messageKind,
std::string(_rxbuf.begin()+ws.header_size, std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N), _rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws, ws,
@ -567,7 +572,7 @@ namespace ix
_rxbuf.begin()+ws.header_size+(size_t)ws.N)); _rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin) if (ws.fin)
{ {
emitMessage(MessageKind::MSG, getMergedChunks(), ws, onMessageCallback); emitMessage(messageKind, getMergedChunks(), ws, onMessageCallback);
_chunks.clear(); _chunks.clear();
} }
else else

View File

@ -50,7 +50,8 @@ namespace ix
enum class MessageKind enum class MessageKind
{ {
MSG, MSG_TEXT,
MSG_BINARY,
PING, PING,
PONG, PONG,
FRAGMENT FRAGMENT

View File

@ -12,7 +12,7 @@ brew:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
ws: ws:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .. ; make -j)
uninstall: uninstall:
xargs rm -fv < build/install_manifest.txt xargs rm -fv < build/install_manifest.txt

View File

@ -178,34 +178,29 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str, msg->binary);
} }
} }
} }

View File

@ -108,52 +108,47 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "client disconnected(" ss << "client disconnected("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(_mutexCloseData); std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = closeInfo.code; _closeCode = msg->closeInfo.code;
_closeReason = std::string(closeInfo.reason); _closeReason = std::string(msg->closeInfo.reason);
_closeRemote = closeInfo.remote; _closeRemote = msg->closeInfo.remote;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "Received message " << str; ss << "Received message " << msg->str;
log(ss.str()); log(ss.str());
} }
else else
@ -183,39 +178,34 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New server connection"; Logger() << "New server connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Server closed connection(" ss << "Server closed connection("
<< closeInfo.code << msg->closeInfo.code
<< "," << ","
<< closeInfo.reason << msg->closeInfo.reason
<< ")"; << ")";
log(ss.str()); log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite); std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code; receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason); receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = closeInfo.remote; receivedCloseRemote = msg->closeInfo.remote;
} }
} }
); );

View File

@ -20,39 +20,34 @@ namespace
{ {
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[connectionState, &server](ix::WebSocketMessageType messageType, [connectionState, &server](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const ix::WebSocketErrorInfo & error,
const ix::WebSocketOpenInfo & openInfo,
const ix::WebSocketCloseInfo & closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto&& it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Logger() << "Message received: " << str; Logger() << "Message received: " << msg->str;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
client->send(str); client->send(msg->str);
} }
} }
} }
@ -78,46 +73,41 @@ namespace
{ {
msgQ.bindWebsocket(&ws); msgQ.bindWebsocket(&ws);
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType, msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg)
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & error,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
{ {
REQUIRE(mainThreadId == std::this_thread::get_id()); REQUIRE(mainThreadId == std::this_thread::get_id());
std::stringstream ss; std::stringstream ss;
if (messageType == WebSocketMessageType::Open) if (msg->type == WebSocketMessageType::Open)
{ {
log("client connected"); log("client connected");
sendNextMessage(); sendNextMessage();
} }
else if (messageType == WebSocketMessageType::Close) else if (msg->type == WebSocketMessageType::Close)
{ {
log("client disconnected"); log("client disconnected");
} }
else if (messageType == WebSocketMessageType::Error) else if (msg->type == WebSocketMessageType::Error)
{ {
ss << "Error ! " << error.reason; ss << "Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
testDone = true; testDone = true;
} }
else if (messageType == WebSocketMessageType::Pong) else if (msg->type == WebSocketMessageType::Pong)
{ {
ss << "Received pong message " << str; ss << "Received pong message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Ping) else if (msg->type == WebSocketMessageType::Ping)
{ {
ss << "Received ping message " << str; ss << "Received ping message " << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == WebSocketMessageType::Message) else if (msg->type == WebSocketMessageType::Message)
{ {
REQUIRE(str.compare("Hey dude!") == 0); REQUIRE(msg->str.compare("Hey dude!") == 0);
++receivedCount; ++receivedCount;
ss << "Received message " << str; ss << "Received message " << msg->str;
log(ss.str()); log(ss.str());
sendNextMessage(); sendNextMessage();
} }
@ -189,5 +179,4 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
server.stop(); server.stop();
} }
} }

View File

@ -106,7 +106,7 @@ namespace
{ {
log("client disconnected"); log("client disconnected");
if (closeInfo.code == 1011) if (msg->closeInfo.code == 1011)
{ {
_closedDueToPingTimeout = true; _closedDueToPingTimeout = true;
} }

View File

@ -39,42 +39,37 @@ namespace ix
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket, [&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, [webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType, &connectionId, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
connectionId = connectionState->getId(); connectionId = connectionState->getId();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
Logger() << "Closed connection"; Logger() << "Closed connection";
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -52,41 +52,36 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("TestConnectionDisconnection: connected !"); log("TestConnectionDisconnection: connected !");
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("TestConnectionDisconnection: disconnected !"); log("TestConnectionDisconnection: disconnected !");
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "TestConnectionDisconnection: Error! "; ss << "TestConnectionDisconnection: Error! ";
ss << error.reason; ss << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
log("TestConnectionDisconnection: received message.!"); log("TestConnectionDisconnection: received message.!");
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("TestConnectionDisconnection: received ping message.!"); log("TestConnectionDisconnection: received ping message.!");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("TestConnectionDisconnection: received pong message.!"); log("TestConnectionDisconnection: received pong message.!");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("TestConnectionDisconnection: received fragment.!"); log("TestConnectionDisconnection: received fragment.!");
} }

View File

@ -114,31 +114,26 @@ namespace
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "cmd_websocket_chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " disconnected !"; << " disconnected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user // the messages we send, so we don't need to have a msg_user != user
@ -159,20 +154,20 @@ namespace
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "cmd_websocket_chat: Error ! " << error.reason; ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
log("cmd_websocket_chat: received ping message"); log("cmd_websocket_chat: received ping message");
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
log("cmd_websocket_chat: received pong message"); log("cmd_websocket_chat: received pong message");
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
log("cmd_websocket_chat: received message fragment"); log("cmd_websocket_chat: received message fragment");
} }
@ -221,35 +216,30 @@ namespace
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
log("Closed connection"); log("Closed connection");
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(msg->str);
} }
} }
} }

View File

@ -0,0 +1,30 @@
# Clients
## ws
```
$ ws connect ws://127.0.0.1:8765
Type Ctrl-D to exit prompt...
Connecting to url: ws://127.0.0.1:8765
> ws_connect: connected
Uri: /
Handshake Headers:
Connection: Upgrade
Date: Sat, 08 Jun 2019 16:43:29 GMT
Sec-WebSocket-Accept: kPCNwGa97y+7NWdAvHi/7/rA8AE=
Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
Server: Python/3.7 websockets/7.0
Upgrade: websocket
Received 13 bytes
ws_connect: received message: > Welcome !
ws_connect: connection closed: code 1006 reason Abnormal closure
```
## wscat
```
$ ./node_modules/.bin/wscat -c ws://127.0.0.1:8765
connected (press CTRL+C to quit)
< > Welcome !
disconnected (code: 1006)
```

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# WS server example
import asyncio
import websockets
async def hello(websocket, path):
await websocket.send(f"> Welcome !")
name = await websocket.recv()
print(f"< {name}")
greeting = f"Hello {name}!"
await websocket.send(greeting)
print(f"> {greeting}")
start_server = websockets.serve(hello, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@ -90,46 +90,41 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback() void CobraConnection::initWebSocketOnMessageCallback()
{ {
_webSocket->setOnMessageCallback( _webSocket->setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
CobraConnection::invokeTrafficTrackerCallback(wireSize, true); CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
invokeEventCallback(ix::CobraConnection_EventType_Open, invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(), std::string(),
openInfo.headers); msg->openInfo.headers);
sendHandshakeMessage(); sendHandshakeMessage();
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
_authenticated = false; _authenticated = false;
std::stringstream ss; std::stringstream ss;
ss << "Close code " << closeInfo.code; ss << "Close code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason; ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraConnection_EventType_Closed, invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str()); ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
Json::Value data; Json::Value data;
Json::Reader reader; Json::Reader reader;
if (!reader.parse(str, data)) if (!reader.parse(msg->str, data))
{ {
invokeErrorCallback("Invalid json", str); invokeErrorCallback("Invalid json", msg->str);
return; return;
} }
if (!data.isMember("action")) if (!data.isMember("action"))
{ {
invokeErrorCallback("Missing action", str); invokeErrorCallback("Missing action", msg->str);
return; return;
} }
@ -139,12 +134,12 @@ namespace ix
{ {
if (!handleHandshakeResponse(data)) if (!handleHandshakeResponse(data))
{ {
invokeErrorCallback("Error extracting nonce from handshake response", str); invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
} }
} }
else if (action == "auth/handshake/error") else if (action == "auth/handshake/error")
{ {
invokeErrorCallback("Handshake error", str); invokeErrorCallback("Handshake error", msg->str);
} }
else if (action == "auth/authenticate/ok") else if (action == "auth/authenticate/ok")
{ {
@ -154,7 +149,7 @@ namespace ix
} }
else if (action == "auth/authenticate/error") else if (action == "auth/authenticate/error")
{ {
invokeErrorCallback("Authentication error", str); invokeErrorCallback("Authentication error", msg->str);
} }
else if (action == "rtm/subscription/data") else if (action == "rtm/subscription/data")
{ {
@ -164,36 +159,36 @@ namespace ix
{ {
if (!handleSubscriptionResponse(data)) if (!handleSubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/subscribe/error") else if (action == "rtm/subscribe/error")
{ {
invokeErrorCallback("Subscription error", str); invokeErrorCallback("Subscription error", msg->str);
} }
else if (action == "rtm/unsubscribe/ok") else if (action == "rtm/unsubscribe/ok")
{ {
if (!handleUnsubscriptionResponse(data)) if (!handleUnsubscriptionResponse(data))
{ {
invokeErrorCallback("Error processing subscribe response", str); invokeErrorCallback("Error processing subscribe response", msg->str);
} }
} }
else if (action == "rtm/unsubscribe/error") else if (action == "rtm/unsubscribe/error")
{ {
invokeErrorCallback("Unsubscription error", str); invokeErrorCallback("Unsubscription error", msg->str);
} }
else else
{ {
invokeErrorCallback("Un-handled message type", str); invokeErrorCallback("Un-handled message type", msg->str);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string()); invokeErrorCallback(ss.str(), std::string());
} }
}); });

View File

@ -58,25 +58,20 @@ namespace snake
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState); auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType, [this, webSocket, state](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl; std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::string appkey = parseAppKey(openInfo.uri); std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey); state->setAppkey(appkey);
// Connect to redis first // Connect to redis first
@ -86,29 +81,29 @@ namespace snake
std::cerr << "Cannot connect to redis host" << std::endl; std::cerr << "Cannot connect to redis host" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str); processCobraMessage(state, webSocket, _appConfig, msg->str);
} }
} }
); );

View File

@ -81,6 +81,7 @@ int main(int argc, char** argv)
bool stress = false; bool stress = false;
bool disableAutomaticReconnection = false; bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false; bool disablePerMessageDeflate = false;
bool greetings = false;
int port = 8008; int port = 8008;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -120,6 +121,7 @@ int main(int argc, char** argv)
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server"); CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Port"); echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname"); echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Port"); broadcastServerApp->add_option("--port", port, "Port");
@ -252,7 +254,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("echo_server")) else if (app.got_subcommand("echo_server"))
{ {
ret = ix::ws_echo_server_main(port, hostname); ret = ix::ws_echo_server_main(port, greetings, hostname);
} }
else if (app.got_subcommand("broadcast_server")) else if (app.got_subcommand("broadcast_server"))
{ {

View File

@ -24,7 +24,7 @@ namespace ix
int ws_ping_pong_main(const std::string& url); int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port, const std::string& hostname); int ws_echo_server_main(int port, bool greetings, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname); int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname); int ws_transfer_main(int port, const std::string& hostname);

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
msg->binary,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "Step " << current std::cerr << "Step " << current

View File

@ -84,20 +84,15 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws chat: connected"); log("ws chat: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
@ -107,18 +102,18 @@ namespace ix
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws chat: user " ss << "ws chat: user "
<< _user << _user
<< " disconnected !" << " disconnected !"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason; << " reason " << msg->closeInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
auto result = decodeMessage(str); auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us // Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out. // the messages we send, so we don't have to filter it out.
@ -127,17 +122,17 @@ namespace ix
_receivedQueue.push(result.second); _receivedQueue.push(result.second);
ss << std::endl ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second << result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else
@ -172,7 +167,7 @@ namespace ix
void WebSocketChat::sendMessage(const std::string& text) void WebSocketChat::sendMessage(const std::string& text)
{ {
_webSocket.send(encodeMessage(text)); _webSocket.sendText(encodeMessage(text));
} }
int ws_chat_main(const std::string& url, int ws_chat_main(const std::string& url,

View File

@ -73,56 +73,51 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ws_connect: connected"); log("ws_connect: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_connect: connection closed:"; ss << "ws_connect: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: " ss << "ws_connect: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment" << std::endl; std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
std::cerr << "Received ping" << std::endl; std::cerr << "Received ping" << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
std::cerr << "Received pong" << std::endl; std::cerr << "Received pong" << std::endl;
} }
@ -138,7 +133,7 @@ namespace ix
void WebSocketConnect::sendMessage(const std::string& text) void WebSocketConnect::sendMessage(const std::string& text)
{ {
_webSocket.send(text); _webSocket.sendText(text);
} }
int ws_connect_main(const std::string& url, int ws_connect_main(const std::string& url,

View File

@ -10,56 +10,56 @@
namespace ix namespace ix
{ {
int ws_echo_server_main(int port, const std::string& hostname) int ws_echo_server_main(int port, bool greetings, const std::string& hostname)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket, [greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState](ix::WebSocketMessageType messageType, [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
if (greetings)
{
webSocket->sendText("Welcome !");
}
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " std::cerr << "Received "
<< wireSize << " bytes" << msg->wireSize << " bytes"
<< std::endl; << std::endl;
webSocket->send(str); webSocket->send(msg->str, msg->binary);
} }
} }
); );

View File

@ -54,59 +54,54 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ping_pong: disconnected:" ss << "ping_pong: disconnected:"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << " reason " << msg->closeInfo.reason
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ping_pong: received message: " ss << "ping_pong: received message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Ping) else if (msg->type == ix::WebSocketMessageType::Ping)
{ {
ss << "ping_pong: received ping message: " ss << "ping_pong: received ping message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Pong) else if (msg->type == ix::WebSocketMessageType::Pong)
{ {
ss << "ping_pong: received pong message: " ss << "ping_pong: received pong message: "
<< str; << msg->str;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -183,41 +183,36 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const ix::WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_receive: connected"); log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_receive: connection closed:"; ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
ss << "ws_receive: transfered " << wireSize << " bytes"; ss << "ws_receive: transfered " << msg->wireSize << " bytes";
log(ss.str()); log(ss.str());
handleMessage(str); handleMessage(msg->str);
_condition.notify_one(); _condition.notify_one();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
ss << "ws_receive: received fragment " << _receivedFragmentCounter++; ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str()); log(ss.str());
@ -229,13 +224,13 @@ namespace ix
std::this_thread::sleep_for(duration); std::this_thread::sleep_for(duration);
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_receive "; ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else

View File

@ -112,42 +112,37 @@ namespace ix
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
_condition.notify_one(); _condition.notify_one();
log("ws_send: connected"); log("ws_send: connected");
std::cout << "Uri: " << openInfo.uri << std::endl; std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl; std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cout << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
ss << "ws_send: connection closed:"; ss << "ws_send: connection closed:";
ss << " code " << closeInfo.code; ss << " code " << msg->closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl; ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
_condition.notify_one(); _condition.notify_one();
ss << "ws_send: received message (" << wireSize << " bytes)"; ss << "ws_send: received message (" << msg->wireSize << " bytes)";
log(ss.str()); log(ss.str());
std::string errMsg; std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg); MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty()) if (!errMsg.empty())
{ {
std::cerr << "Invalid MsgPack response" << std::endl; std::cerr << "Invalid MsgPack response" << std::endl;
@ -160,13 +155,13 @@ namespace ix
std::cerr << "Invalid id" << std::endl; std::cerr << "Invalid id" << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
ss << "ws_send "; ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str()); log(ss.str());
} }
else else
@ -244,8 +239,8 @@ namespace ix
MsgPack msg(pdu); MsgPack msg(pdu);
Bench bench("Sending file through websocket"); Bench bench("Sending file through websocket");
_webSocket.send(msg.dump(), _webSocket.sendBinary(msg.dump(),
[throttle](int current, int total) -> bool [throttle](int current, int total) -> bool
{ {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl; std::cout << "ws_send: Step " << current << " out of " << total << std::endl;

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState) std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{ {
if (messageType == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (messageType == ix::WebSocketMessageType::Close) else if (msg->type == ix::WebSocketMessageType::Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection"
<< " code " << closeInfo.code << " code " << msg->closeInfo.code
<< " reason " << closeInfo.reason << std::endl; << " reason " << msg->closeInfo.reason << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl; ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str(); std::cerr << ss.str();
} }
else if (messageType == ix::WebSocketMessageType::Fragment) else if (msg->type == ix::WebSocketMessageType::Fragment)
{ {
std::cerr << "Received message fragment " std::cerr << "Received message fragment "
<< std::endl; << std::endl;
} }
else if (messageType == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(msg->str,
msg->binary,
[](int current, int total) -> bool [](int current, int total) -> bool
{ {
std::cerr << "ws_transfer: Step " << current std::cerr << "ws_transfer: Step " << current