Compare commits

...

17 Commits

Author SHA1 Message Date
1af96ed4e4 try to build vcpkg on travis 2019-06-10 13:35:47 -07:00
7cb5cc05e4 Add -DUSE_VENDORED_THIRD_PARTY=1 to build ws 2019-06-10 13:26:41 -07:00
750a752ac0 - mbedtls and zlib are searched with find_package, and we use the vendored version if nothing is found 2019-06-10 11:18:27 -07:00
61e5f52286 - travis CI uses g++ on Linux 2019-06-09 14:27:45 -07:00
ce0b716f54 compile error in IXWebSocketMessageQTest 2019-06-09 12:25:36 -07:00
aae8e5ec65 fix IXWebSocketMessageQTest.cpp 2019-06-09 12:08:00 -07:00
2723e8466e fix changelog 2019-06-09 12:02:38 -07:00
f13c610352 update README to reflect the new API 2019-06-09 12:02:02 -07:00
55c65b08bf - WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
2019-06-09 11:56:47 -07:00
a11aa3e0dd WebSocket::send takes a third arg, binary which default to true (can be text too) 2019-06-09 11:35:31 -07:00
de0bf5ebcd WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg 2019-06-09 11:33:17 -07:00
15369e1ae9 ... 2019-06-09 10:22:27 -07:00
d4115880b9 Add explicite WebSocket::sendBinary
New headers + WebSocketMessage class to hold message data, still not used across the board
2019-06-09 10:10:33 -07:00
3c80c75e4a Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility. 2019-06-08 09:46:26 -07:00
5cb72dce4c ws echo_server has a -g option to print a greeting message on connect 2019-06-08 09:16:33 -07:00
d2747487e3 IXSocketMbedTLS: better error handling in close and connect 2019-06-06 14:59:22 -07:00
12e664fc61 add a changelog 2019-06-06 13:59:12 -07:00
39 changed files with 715 additions and 611 deletions

View File

@ -12,23 +12,23 @@ matrix:
- python test/run.py
- make ws
# # Linux
# - os: linux
# dist: xenial
# script:
# - python test/run.py
# - make ws
# env:
# - CC=gcc
# - CXX=g++
# Clang + Linux disabled for now
# Linux
- os: linux
dist: xenial
script: python test/run.py
script:
- python test/run.py
- make ws
env:
- CC=clang
- CXX=clang++
- CC=gcc
- CXX=g++
# Clang + Linux disabled for now
# - os: linux
# dist: xenial
# script: python test/run.py
# env:
# - CC=clang
# - CXX=clang++
# Windows
- os: windows
@ -36,6 +36,36 @@ matrix:
- CMAKE_PATH="/c/Program Files/CMake/bin"
script:
- export PATH=$CMAKE_PATH:$PATH
- cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .
- cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .
- cmake --build --parallel .
- python test/run.py
install:
# HACK: gcc 8.0.1 is missing movdirintrin.h so just download it. We need this for GLM and Vectrexy to build.
- sudo wget https://raw.githubusercontent.com/gcc-mirror/gcc/gcc-8-branch/gcc/config/i386/movdirintrin.h -P /usr/lib/gcc/x86_64-linux-gnu/8/include/
# Create deps dir
- mkdir -p ${DEPS_DIR}
# Set compiler vars
- export CC=${CC_COMPILER}
- export CXX=${CXX_COMPILER}
# Install vcpkg and dependencies
- |
set -e
mkdir -p ${DEPS_DIR}/vcpkg
pushd ${DEPS_DIR}/vcpkg
git init
git remote add origin https://github.com/Microsoft/vcpkg.git
git fetch origin master
git checkout -b master origin/master
./bootstrap-vcpkg.sh
# Only build release libs to save time. We inject a new line first since some cmake files don't end with one.
echo -e '\nset(VCPKG_BUILD_TYPE release)' >> ./triplets/${VCPKG_TRIPLET}.cmake
./vcpkg install sdl2 sdl2-net glew glm stb imgui
popd
cache:
directories:
- ${DEPS_DIR}/vcpkg/installed

31
CHANGELOG.md Normal file
View File

@ -0,0 +1,31 @@
# Changelog
All notable changes to this project will be documented in this file.
## [unreleased] - 2019-06-09
### Changed
- mbedtls and zlib are searched with find_package, and we use the vendored version if nothing is found
- travis CI uses g++ on Linux
## [4.0.0] - 2019-06-09
### Changed
- WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
- WebSocket::send takes a third arg, binary which default to true (can be text too)
- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg
- Add explicit WebSocket::sendBinary method
- New headers + WebSocketMessage class to hold message data, still not used across the board
- Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility.
- ws echo_server has a -g option to print a greeting message on connect
- IXSocketMbedTLS: better error handling in close and connect
## [3.1.2] - 2019-06-06
### Added
- ws connect has a -x option to disable per message deflate
- Add WebSocket::disablePerMessageDeflate() option.
## [3.0.0] - 2019-06-xx
### Changed
- TLS, aka SSL works on Windows (websocket and http clients)
- ws command line tool build on Windows
- Async API for HttpClient
- HttpClient API changed to use shared_ptr for response and request

13
CMake/FindMbedTLS.cmake Normal file
View File

@ -0,0 +1,13 @@
find_path(MBEDTLS_INCLUDE_DIRS mbedtls/ssl.h)
find_library(MBEDTLS_LIBRARY mbedtls)
find_library(MBEDX509_LIBRARY mbedx509)
find_library(MBEDCRYPTO_LIBRARY mbedcrypto)
set(MBEDTLS_LIBRARIES "${MBEDTLS_LIBRARY}" "${MBEDX509_LIBRARY}" "${MBEDCRYPTO_LIBRARY}")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(MBEDTLS DEFAULT_MSG
MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)
mark_as_advanced(MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)

View File

@ -4,6 +4,8 @@
#
cmake_minimum_required(VERSION 3.4.1)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake;${CMAKE_MODULE_PATH}")
project(ixwebsocket C CXX)
set (CMAKE_CXX_STANDARD 14)
@ -20,60 +22,64 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketHandshake.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/LUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
)
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXHttpClient.h
ixwebsocket/IXNetSystem.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketCloseInfo.h
ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/IXWebSocketMessage.h
ixwebsocket/IXWebSocketMessageQueue.h
ixwebsocket/IXWebSocketMessageType.h
ixwebsocket/IXWebSocketOpenInfo.h
ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/libwshandshake.hpp
ixwebsocket/IXHttpClient.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/LUrlParser.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketMessageQueue.h
ixwebsocket/libwshandshake.hpp
)
if (UNIX)
@ -137,25 +143,28 @@ if (USE_OPEN_SSL)
endif()
if (USE_MBED_TLS)
set (ENABLE_PROGRAMS OFF)
add_subdirectory(third_party/mbedtls)
include_directories(third_party/mbedtls/include)
if (USE_VENDORED_THIRD_PARTY)
set (ENABLE_PROGRAMS OFF)
add_subdirectory(third_party/mbedtls)
include_directories(third_party/mbedtls/include)
target_link_libraries(ixwebsocket mbedtls)
target_link_libraries(ixwebsocket mbedtls)
else()
find_package(MbedTLS REQUIRED)
include_directories(${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES})
endif()
endif()
if (WIN32)
find_package(ZLIB REQUIRED)
if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
else()
add_subdirectory(third_party/zlib)
include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib)
target_link_libraries(ixwebsocket zlibstatic wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
else()
# gcc/Linux needs -pthread
find_package(Threads)
target_link_libraries(ixwebsocket
z ${CMAKE_THREAD_LIBS_INIT})
endif()
set( IXWEBSOCKET_INCLUDE_DIRS
@ -167,7 +176,7 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
target_compile_options(ixwebsocket PRIVATE /MP)
endif()
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
target_include_directories(ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS})
set_target_properties(ixwebsocket PROPERTIES PUBLIC_HEADER "${IXWEBSOCKET_HEADERS}")

View File

@ -1 +1 @@
3.1.1
4.0.0

View File

@ -33,27 +33,22 @@ webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Message)
if (msg->type == ix::WebSocketMessageType::Message)
{
std::cout << str << std::endl;
std::cout << msg->str << std::endl;
}
});
// Now that our callback is setup, we can start our background thread and receive messages
webSocket.start();
// Send a message to the server (default to BINARY mode)
// Send a message to the server (default to TEXT mode)
webSocket.send("hello world");
// The message can be sent in TEXT mode
webSocket.sendText("hello again");
// The message can be sent in BINARY mode (useful if you send MsgPack data for example)
webSocket.sendBinary("some serialized binary data");
// ... finally ...
@ -73,14 +68,9 @@ server.setOnConnectionCallback(
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
@ -91,19 +81,21 @@ server.setOnConnectionCallback(
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
webSocket->send(str);
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket->send(msg->str, msg->binary);
}
}
);
@ -334,32 +326,27 @@ The onMessage event will be fired when the connection is opened or closed. This
```
webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cout << "send greetings" << std::endl;
// Headers can be inspected (pairs of string/string)
std::cout << "Handshake Headers:" << std::endl;
for (auto it : headers)
for (auto it : msg->headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cout << "disconnected" << std::endl;
// The server can send an explicit code and reason for closing.
// This data can be accessed through the closeInfo object.
std::cout << closeInfo.code << std::endl;
std::cout << closeInfo.reason << std::endl;
std::cout << msg->closeInfo.code << std::endl;
std::cout << msg->closeInfo.reason << std::endl;
}
}
);
@ -371,20 +358,15 @@ A message will be fired when there is an error with the connection. The message
```
webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Error)
if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Error: " << error.reason << std::endl;
ss << "#retries: " << event.retries << std::endl;
ss << "Wait time(ms): " << event.wait_time << std::endl;
ss << "HTTP Status: " << event.http_status << std::endl;
ss << "Error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->eventInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->eventInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->eventInfo.http_status << std::endl;
std::cout << ss.str() << std::endl;
}
}
@ -411,17 +393,12 @@ Ping/pong messages are used to implement keep-alive. 2 message types exists to i
```
webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Ping ||
messageType == ix::WebSocketMessageType::Pong)
if (msg->type == ix::WebSocketMessageType::Ping ||
msg->type == ix::WebSocketMessageType::Pong)
{
std::cout << "pong data: " << str << std::endl;
std::cout << "pong data: " << msg->str << std::endl;
}
}
);

View File

@ -24,6 +24,8 @@ namespace ix
bool SocketMbedTLS::init(const std::string& host, std::string& errMsg)
{
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_init(&_ssl);
mbedtls_ssl_config_init(&_conf);
mbedtls_ctr_drbg_init(&_ctr_drbg);
@ -75,15 +77,24 @@ namespace ix
std::string& errMsg,
const CancellationRequest& isCancellationRequested)
{
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
if (!init(host, errMsg)) return false;
{
std::lock_guard<std::mutex> lock(_mutex);
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
}
if (!init(host, errMsg))
{
close();
return false;
}
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res;
do
{
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl);
}
while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
@ -95,6 +106,8 @@ namespace ix
errMsg = "error in handshake : ";
errMsg += buf;
close();
return false;
}
@ -103,10 +116,14 @@ namespace ix
void SocketMbedTLS::close()
{
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_free(&_ssl);
mbedtls_ssl_config_free(&_conf);
mbedtls_ctr_drbg_free(&_ctr_drbg);
mbedtls_entropy_free(&_entropy);
Socket::close();
}
ssize_t SocketMbedTLS::send(char* buf, size_t nbyte)

View File

@ -51,9 +51,11 @@ namespace ix
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote)
{
_onMessageCallback(WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote));
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
}
);
}
@ -180,10 +182,12 @@ namespace ix
return status;
}
_onMessageCallback(WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status;
}
@ -203,10 +207,12 @@ namespace ix
return status;
}
_onMessageCallback(WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status;
}
@ -274,9 +280,11 @@ namespace ix
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()));
}
}
}
@ -317,8 +325,8 @@ namespace ix
WebSocketMessageType webSocketMessageType;
switch (messageKind)
{
default:
case WebSocketTransport::MessageKind::MSG:
case WebSocketTransport::MessageKind::MSG_TEXT:
case WebSocketTransport::MessageKind::MSG_BINARY:
{
webSocketMessageType = WebSocketMessageType::Message;
} break;
@ -342,9 +350,13 @@ namespace ix
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo());
bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
_onMessageCallback(
std::make_shared<WebSocketMessage>(
webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo(), binary));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});
@ -375,9 +387,18 @@ namespace ix
}
WebSocketSendInfo WebSocket::send(const std::string& data,
bool binary,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(data, SendMessageKind::Binary, onProgressCallback);
return sendMessage(data,
(binary) ? SendMessageKind::Binary: SendMessageKind::Text,
onProgressCallback);
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,

View File

@ -13,6 +13,7 @@
#include "IXWebSocketCloseConstants.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketMessage.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h"
@ -32,52 +33,7 @@ namespace ix
Closed = 3
};
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
size_t wireSize,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)>;
using OnMessageCallback = std::function<void(const WebSocketMessagePtr&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
@ -108,14 +64,18 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs);
void run();
// send binary data
// send is in binary mode by default
WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close(uint16_t code = 1000, const std::string& reason = "Normal closure");
void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@ -169,7 +129,7 @@ namespace ix
bool _enablePong;
static const bool kDefaultEnablePong;
// Optional ping and ping timeout
// Optional ping and pong timeout
int _pingIntervalSecs;
int _pingTimeoutSecs;
static const int kDefaultPingIntervalSecs;

View File

@ -0,0 +1,25 @@
/*
* IXWebSocketCloseInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
} // namespace ix

View File

@ -0,0 +1,49 @@
/*
* IXWebSocketMessage.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocketCloseInfo.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketMessageType.h"
#include "IXWebSocketOpenInfo.h"
#include <memory>
#include <string>
#include <thread>
namespace ix
{
struct WebSocketMessage
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
bool binary;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c,
bool b = false)
: type(t)
, str(std::move(s))
, wireSize(w)
, errorInfo(e)
, openInfo(o)
, closeInfo(c)
, binary(b)
{
;
}
};
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;
} // namespace ix

View File

@ -32,14 +32,7 @@ namespace ix
if (_websocket)
{
// set dummy callback just to avoid crash
_websocket->setOnMessageCallback([](
WebSocketMessageType,
const std::string&,
size_t,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
{});
_websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
}
_websocket = websocket;
@ -47,27 +40,10 @@ namespace ix
// bind new
if (_websocket)
{
_websocket->setOnMessageCallback([this](
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
_websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg)
{
MessagePtr message(new Message());
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(msg));
});
}
}
@ -82,9 +58,9 @@ namespace ix
_onMessageUserCallback = std::move(callback);
}
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
WebSocketMessagePtr WebSocketMessageQueue::popMessage()
{
MessagePtr message;
WebSocketMessagePtr message;
std::lock_guard<std::mutex> lock(_messagesMutex);
if (!_messages.empty())
@ -101,19 +77,11 @@ namespace ix
if (!_onMessageUserCallback)
return;
MessagePtr message;
WebSocketMessagePtr message;
while (count > 0 && (message = popMessage()))
{
_onMessageUserCallback(
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
_onMessageUserCallback(message);
--count;
}
}

View File

@ -30,24 +30,12 @@ namespace ix
void poll(int count = 512);
protected:
struct Message
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessagePtr = std::shared_ptr<Message>;
MessagePtr popMessage();
WebSocketMessagePtr popMessage();
private:
WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex;
std::list<MessagePtr> _messages;
std::list<WebSocketMessagePtr> _messages;
};
} // namespace ix

View File

@ -0,0 +1,21 @@
/*
* IXWebSocketMessageType.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
}

View File

@ -0,0 +1,24 @@
/*
* IXWebSocketOpenInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
} // namespace ix

View File

@ -542,12 +542,17 @@ namespace ix
) {
unmaskReceiveBuffer(ws);
MessageKind messageKind =
(ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
//
// Usual case. Small unfragmented messages
//
if (ws.fin && _chunks.empty())
{
emitMessage(MessageKind::MSG,
emitMessage(messageKind,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
@ -567,7 +572,7 @@ namespace ix
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin)
{
emitMessage(MessageKind::MSG, getMergedChunks(), ws, onMessageCallback);
emitMessage(messageKind, getMergedChunks(), ws, onMessageCallback);
_chunks.clear();
}
else

View File

@ -50,7 +50,8 @@ namespace ix
enum class MessageKind
{
MSG,
MSG_TEXT,
MSG_BINARY,
PING,
PONG,
FRAGMENT

View File

@ -12,7 +12,7 @@ brew:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
ws:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j)
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .. ; make -j)
uninstall:
xargs rm -fv < build/install_manifest.txt

View File

@ -178,34 +178,29 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
client->send(msg->str, msg->binary);
}
}
}

View File

@ -108,52 +108,47 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("client connected");
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "client disconnected("
<< closeInfo.code
<< msg->closeInfo.code
<< ","
<< closeInfo.reason
<< msg->closeInfo.reason
<< ")";
log(ss.str());
std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = closeInfo.code;
_closeReason = std::string(closeInfo.reason);
_closeRemote = closeInfo.remote;
_closeCode = msg->closeInfo.code;
_closeReason = std::string(msg->closeInfo.reason);
_closeRemote = msg->closeInfo.remote;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Error ! " << error.reason;
ss << "Error ! " << msg->errorInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
ss << "Received pong message " << str;
ss << "Received pong message " << msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
ss << "Received ping message " << str;
ss << "Received ping message " << msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "Received message " << str;
ss << "Received message " << msg->str;
log(ss.str());
}
else
@ -183,39 +178,34 @@ namespace
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "Server closed connection("
<< closeInfo.code
<< msg->closeInfo.code
<< ","
<< closeInfo.reason
<< msg->closeInfo.reason
<< ")";
log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason);
receivedCloseRemote = closeInfo.remote;
receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = msg->closeInfo.remote;
}
}
);

View File

@ -20,39 +20,34 @@ namespace
{
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[connectionState, &server](ix::WebSocketMessageType messageType,
const std::string & str,
size_t wireSize,
const ix::WebSocketErrorInfo & error,
const ix::WebSocketOpenInfo & openInfo,
const ix::WebSocketCloseInfo & closeInfo)
[connectionState, &server](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto&& it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
Logger() << "Message received: " << str;
Logger() << "Message received: " << msg->str;
for (auto&& client : server.getClients())
{
client->send(str);
client->send(msg->str);
}
}
}
@ -78,46 +73,41 @@ namespace
{
msgQ.bindWebsocket(&ws);
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType,
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & error,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg)
{
REQUIRE(mainThreadId == std::this_thread::get_id());
std::stringstream ss;
if (messageType == WebSocketMessageType::Open)
if (msg->type == WebSocketMessageType::Open)
{
log("client connected");
sendNextMessage();
}
else if (messageType == WebSocketMessageType::Close)
else if (msg->type == WebSocketMessageType::Close)
{
log("client disconnected");
}
else if (messageType == WebSocketMessageType::Error)
else if (msg->type == WebSocketMessageType::Error)
{
ss << "Error ! " << error.reason;
ss << "Error ! " << msg->errorInfo.reason;
log(ss.str());
testDone = true;
}
else if (messageType == WebSocketMessageType::Pong)
else if (msg->type == WebSocketMessageType::Pong)
{
ss << "Received pong message " << str;
ss << "Received pong message " << msg->str;
log(ss.str());
}
else if (messageType == WebSocketMessageType::Ping)
else if (msg->type == WebSocketMessageType::Ping)
{
ss << "Received ping message " << str;
ss << "Received ping message " << msg->str;
log(ss.str());
}
else if (messageType == WebSocketMessageType::Message)
else if (msg->type == WebSocketMessageType::Message)
{
REQUIRE(str.compare("Hey dude!") == 0);
REQUIRE(msg->str.compare("Hey dude!") == 0);
++receivedCount;
ss << "Received message " << str;
ss << "Received message " << msg->str;
log(ss.str());
sendNextMessage();
}
@ -189,5 +179,4 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
server.stop();
}
}

View File

@ -106,7 +106,7 @@ namespace
{
log("client disconnected");
if (closeInfo.code == 1011)
if (msg->closeInfo.code == 1011)
{
_closedDueToPingTimeout = true;
}

View File

@ -39,42 +39,37 @@ namespace ix
server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
&connectionId, &server](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
client->send(msg->str);
}
}
}

View File

@ -52,41 +52,36 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("TestConnectionDisconnection: connected !");
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("TestConnectionDisconnection: disconnected !");
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "TestConnectionDisconnection: Error! ";
ss << error.reason;
ss << msg->errorInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
log("TestConnectionDisconnection: received message.!");
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("TestConnectionDisconnection: received ping message.!");
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("TestConnectionDisconnection: received pong message.!");
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
log("TestConnectionDisconnection: received fragment.!");
}

View File

@ -114,31 +114,26 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " Connected !";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " disconnected !";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(str);
auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user
@ -159,20 +154,20 @@ namespace
<< _user << " > ";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "cmd_websocket_chat: Error ! " << error.reason;
ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
@ -221,35 +216,30 @@ namespace
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
client->send(msg->str);
}
}
}

View File

@ -0,0 +1,30 @@
# Clients
## ws
```
$ ws connect ws://127.0.0.1:8765
Type Ctrl-D to exit prompt...
Connecting to url: ws://127.0.0.1:8765
> ws_connect: connected
Uri: /
Handshake Headers:
Connection: Upgrade
Date: Sat, 08 Jun 2019 16:43:29 GMT
Sec-WebSocket-Accept: kPCNwGa97y+7NWdAvHi/7/rA8AE=
Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
Server: Python/3.7 websockets/7.0
Upgrade: websocket
Received 13 bytes
ws_connect: received message: > Welcome !
ws_connect: connection closed: code 1006 reason Abnormal closure
```
## wscat
```
$ ./node_modules/.bin/wscat -c ws://127.0.0.1:8765
connected (press CTRL+C to quit)
< > Welcome !
disconnected (code: 1006)
```

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# WS server example
import asyncio
import websockets
async def hello(websocket, path):
await websocket.send(f"> Welcome !")
name = await websocket.recv()
print(f"< {name}")
greeting = f"Hello {name}!"
await websocket.send(greeting)
print(f"> {greeting}")
start_server = websockets.serve(hello, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@ -90,46 +90,41 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback()
{
_webSocket->setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
CobraConnection::invokeTrafficTrackerCallback(wireSize, true);
CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(),
openInfo.headers);
msg->openInfo.headers);
sendHandshakeMessage();
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
_authenticated = false;
std::stringstream ss;
ss << "Close code " << closeInfo.code;
ss << " reason " << closeInfo.reason;
ss << "Close code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data))
if (!reader.parse(msg->str, data))
{
invokeErrorCallback("Invalid json", str);
invokeErrorCallback("Invalid json", msg->str);
return;
}
if (!data.isMember("action"))
{
invokeErrorCallback("Missing action", str);
invokeErrorCallback("Missing action", msg->str);
return;
}
@ -139,12 +134,12 @@ namespace ix
{
if (!handleHandshakeResponse(data))
{
invokeErrorCallback("Error extracting nonce from handshake response", str);
invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
}
}
else if (action == "auth/handshake/error")
{
invokeErrorCallback("Handshake error", str);
invokeErrorCallback("Handshake error", msg->str);
}
else if (action == "auth/authenticate/ok")
{
@ -154,7 +149,7 @@ namespace ix
}
else if (action == "auth/authenticate/error")
{
invokeErrorCallback("Authentication error", str);
invokeErrorCallback("Authentication error", msg->str);
}
else if (action == "rtm/subscription/data")
{
@ -164,36 +159,36 @@ namespace ix
{
if (!handleSubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", str);
invokeErrorCallback("Error processing subscribe response", msg->str);
}
}
else if (action == "rtm/subscribe/error")
{
invokeErrorCallback("Subscription error", str);
invokeErrorCallback("Subscription error", msg->str);
}
else if (action == "rtm/unsubscribe/ok")
{
if (!handleUnsubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", str);
invokeErrorCallback("Error processing subscribe response", msg->str);
}
}
else if (action == "rtm/unsubscribe/error")
{
invokeErrorCallback("Unsubscription error", str);
invokeErrorCallback("Unsubscription error", msg->str);
}
else
{
invokeErrorCallback("Un-handled message type", str);
invokeErrorCallback("Un-handled message type", msg->str);
}
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string());
}
});

View File

@ -58,25 +58,20 @@ namespace snake
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this, webSocket, state](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::string appkey = parseAppKey(openInfo.uri);
std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey);
// Connect to redis first
@ -86,29 +81,29 @@ namespace snake
std::cerr << "Cannot connect to redis host" << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str);
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, msg->str);
}
}
);

View File

@ -81,6 +81,7 @@ int main(int argc, char** argv)
bool stress = false;
bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
bool greetings = false;
int port = 8008;
int redisPort = 6379;
int statsdPort = 8125;
@ -120,6 +121,7 @@ int main(int argc, char** argv)
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Port");
@ -252,7 +254,7 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(port, hostname);
ret = ix::ws_echo_server_main(port, greetings, hostname);
}
else if (app.got_subcommand("broadcast_server"))
{

View File

@ -24,7 +24,7 @@ namespace ix
int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port, const std::string& hostname);
int ws_echo_server_main(int port, bool greetings, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname);

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str,
client->send(msg->str,
msg->binary,
[](int current, int total) -> bool
{
std::cerr << "Step " << current

View File

@ -84,20 +84,15 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws chat: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
@ -107,18 +102,18 @@ namespace ix
<< " Connected !";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws chat: user "
<< _user
<< " disconnected !"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(str);
auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out.
@ -127,17 +122,17 @@ namespace ix
_receivedQueue.push(result.second);
ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second
<< result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second
<< std::endl
<< _user << " > ";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else
@ -172,7 +167,7 @@ namespace ix
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.send(encodeMessage(text));
_webSocket.sendText(encodeMessage(text));
}
int ws_chat_main(const std::string& url,

View File

@ -73,56 +73,51 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws_connect: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_connect: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
std::cerr << "Received ping" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
std::cerr << "Received pong" << std::endl;
}
@ -138,7 +133,7 @@ namespace ix
void WebSocketConnect::sendMessage(const std::string& text)
{
_webSocket.send(text);
_webSocket.sendText(text);
}
int ws_connect_main(const std::string& url,

View File

@ -10,56 +10,56 @@
namespace ix
{
int ws_echo_server_main(int port, const std::string& hostname)
int ws_echo_server_main(int port, bool greetings, const std::string& hostname)
{
std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
if (greetings)
{
webSocket->sendText("Welcome !");
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received "
<< wireSize << " bytes"
<< msg->wireSize << " bytes"
<< std::endl;
webSocket->send(str);
webSocket->send(msg->str, msg->binary);
}
}
);

View File

@ -54,59 +54,54 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ping_pong: disconnected:"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason
<< str;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "ping_pong: received message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
ss << "ping_pong: received ping message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
ss << "ping_pong: received pong message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else

View File

@ -183,41 +183,36 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
_condition.notify_one();
log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "ws_receive: transfered " << wireSize << " bytes";
ss << "ws_receive: transfered " << msg->wireSize << " bytes";
log(ss.str());
handleMessage(str);
handleMessage(msg->str);
_condition.notify_one();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str());
@ -229,13 +224,13 @@ namespace ix
std::this_thread::sleep_for(duration);
}
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else

View File

@ -112,42 +112,37 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
_condition.notify_one();
log("ws_send: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_send: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
_condition.notify_one();
ss << "ws_send: received message (" << wireSize << " bytes)";
ss << "ws_send: received message (" << msg->wireSize << " bytes)";
log(ss.str());
std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg);
MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty())
{
std::cerr << "Invalid MsgPack response" << std::endl;
@ -160,13 +155,13 @@ namespace ix
std::cerr << "Invalid id" << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else
@ -244,8 +239,8 @@ namespace ix
MsgPack msg(pdu);
Bench bench("Sending file through websocket");
_webSocket.send(msg.dump(),
[throttle](int current, int total) -> bool
_webSocket.sendBinary(msg.dump(),
[throttle](int current, int total) -> bool
{
std::cout << "ws_send: Step " << current << " out of " << total << std::endl;

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment "
<< std::endl;
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str,
client->send(msg->str,
msg->binary,
[](int current, int total) -> bool
{
std::cerr << "ws_transfer: Step " << current