Compare commits

..

7 Commits

Author SHA1 Message Date
Benjamin Sergeant
a788b31080 ws_cobra_to_sentry improvements 2019-06-05 18:45:31 -07:00
Benjamin Sergeant
03a2f1443b HttpClient class is not thread safe, we should protect it as we only have one socket 2019-06-05 18:43:35 -07:00
Benjamin Sergeant
6e0463c981 cobra_to_sentry / add tags 2019-06-05 17:34:33 -07:00
Benjamin Sergeant
e9399a0734 add more logging 2019-06-05 16:38:51 -07:00
Benjamin Sergeant
7b2ddb5e7c fix ws + add doc 2019-06-05 16:04:51 -07:00
Benjamin Sergeant
c7cb743a69 fix command line tools 2019-06-05 15:52:31 -07:00
Benjamin Sergeant
3257ad1363 unittest working / uses shared_ptr for a bunch of things 🗿 2019-06-05 15:26:31 -07:00
47 changed files with 667 additions and 779 deletions

View File

@@ -12,30 +12,30 @@ matrix:
- python test/run.py
- make ws
# Linux
- os: linux
dist: xenial
script:
- python test/run.py
- make ws
env:
- CC=gcc
- CXX=g++
# Clang + Linux disabled for now
# # Linux
# - os: linux
# dist: xenial
# script: python test/run.py
# script:
# - python test/run.py
# - make ws
# env:
# - CC=clang
# - CXX=clang++
# - CC=gcc
# - CXX=g++
# Clang + Linux disabled for now
- os: linux
dist: xenial
script: python test/run.py
env:
- CC=clang
- CXX=clang++
# Windows
# - os: windows
# env:
# - CMAKE_PATH="/c/Program Files/CMake/bin"
# script:
# - export PATH=$CMAKE_PATH:$PATH
# # - cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .
# # - cmake --build --parallel .
# - python test/run.py
- os: windows
env:
- CMAKE_PATH="/c/Program Files/CMake/bin"
script:
- export PATH=$CMAKE_PATH:$PATH
- cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .
- cmake --build --parallel .
- python test/run.py

View File

@@ -1,32 +0,0 @@
# Changelog
All notable changes to this project will be documented in this file.
## [unreleased] - 2019-06-09
### Changed
- cobra_to_sentry / backtraces are reversed and line number is not extracted correctly
- mbedtls and zlib are searched with find_package, and we use the vendored version if nothing is found
- travis CI uses g++ on Linux
## [4.0.0] - 2019-06-09
### Changed
- WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
- WebSocket::send takes a third arg, binary which default to true (can be text too)
- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg
- Add explicit WebSocket::sendBinary method
- New headers + WebSocketMessage class to hold message data, still not used across the board
- Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility.
- ws echo_server has a -g option to print a greeting message on connect
- IXSocketMbedTLS: better error handling in close and connect
## [3.1.2] - 2019-06-06
### Added
- ws connect has a -x option to disable per message deflate
- Add WebSocket::disablePerMessageDeflate() option.
## [3.0.0] - 2019-06-xx
### Changed
- TLS, aka SSL works on Windows (websocket and http clients)
- ws command line tool build on Windows
- Async API for HttpClient
- HttpClient API changed to use shared_ptr for response and request

View File

@@ -1,13 +0,0 @@
find_path(MBEDTLS_INCLUDE_DIRS mbedtls/ssl.h)
find_library(MBEDTLS_LIBRARY mbedtls)
find_library(MBEDX509_LIBRARY mbedx509)
find_library(MBEDCRYPTO_LIBRARY mbedcrypto)
set(MBEDTLS_LIBRARIES "${MBEDTLS_LIBRARY}" "${MBEDX509_LIBRARY}" "${MBEDCRYPTO_LIBRARY}")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(MBEDTLS DEFAULT_MSG
MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)
mark_as_advanced(MBEDTLS_INCLUDE_DIRS MBEDTLS_LIBRARY MBEDX509_LIBRARY MBEDCRYPTO_LIBRARY)

View File

@@ -4,8 +4,6 @@
#
cmake_minimum_required(VERSION 3.4.1)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake;${CMAKE_MODULE_PATH}")
project(ixwebsocket C CXX)
set (CMAKE_CXX_STANDARD 14)
@@ -22,64 +20,60 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketHandshake.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/LUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
ixwebsocket/IXWebSocketCloseConstants.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
)
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXHttpClient.h
ixwebsocket/IXNetSystem.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXNetSystem.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketCloseInfo.h
ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/IXWebSocketMessage.h
ixwebsocket/IXWebSocketMessageQueue.h
ixwebsocket/IXWebSocketMessageType.h
ixwebsocket/IXWebSocketOpenInfo.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/LUrlParser.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/libwshandshake.hpp
ixwebsocket/IXHttpClient.h
ixwebsocket/IXUrlParser.h
ixwebsocket/LUrlParser.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXWebSocketCloseConstants.h
ixwebsocket/IXWebSocketMessageQueue.h
)
if (UNIX)
@@ -134,11 +128,6 @@ if (APPLE AND USE_TLS AND NOT USE_MBED_TLS)
target_link_libraries(ixwebsocket "-framework foundation" "-framework security")
endif()
if (UNIX)
find_package(Threads)
target_link_libraries(ixwebsocket ${CMAKE_THREAD_LIBS_INIT})
endif()
if (USE_OPEN_SSL)
find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS})
@@ -148,28 +137,25 @@ if (USE_OPEN_SSL)
endif()
if (USE_MBED_TLS)
if (USE_VENDORED_THIRD_PARTY)
set (ENABLE_PROGRAMS OFF)
add_subdirectory(third_party/mbedtls)
include_directories(third_party/mbedtls/include)
set (ENABLE_PROGRAMS OFF)
add_subdirectory(third_party/mbedtls)
include_directories(third_party/mbedtls/include)
target_link_libraries(ixwebsocket mbedtls)
else()
find_package(MbedTLS REQUIRED)
include_directories(${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${MBEDTLS_LIBRARIES})
endif()
target_link_libraries(ixwebsocket mbedtls)
endif()
find_package(ZLIB REQUIRED)
if (ZLIB_FOUND)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(ixwebsocket ${ZLIB_LIBRARIES})
else()
if (WIN32)
add_subdirectory(third_party/zlib)
include_directories(third_party/zlib ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib)
target_link_libraries(ixwebsocket zlibstatic wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
else()
# gcc/Linux needs -pthread
find_package(Threads)
target_link_libraries(ixwebsocket
z ${CMAKE_THREAD_LIBS_INIT})
endif()
set( IXWEBSOCKET_INCLUDE_DIRS
@@ -181,7 +167,7 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
target_compile_options(ixwebsocket PRIVATE /MP)
endif()
target_include_directories(ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS})
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
set_target_properties(ixwebsocket PROPERTIES PUBLIC_HEADER "${IXWEBSOCKET_HEADERS}")

View File

@@ -1 +1 @@
4.0.4
2.2.1

View File

@@ -1 +1 @@
docker/Dockerfile.alpine
docker/Dockerfile.ubuntu_artful

View File

@@ -28,27 +28,29 @@ webSocket.setUrl(url);
// to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45);
// Per message deflate connection is enabled by default. You can tweak its parameters or disable it
webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Message)
if (messageType == ix::WebSocketMessageType::Message)
{
std::cout << msg->str << std::endl;
std::cout << str << std::endl;
}
});
// Now that our callback is setup, we can start our background thread and receive messages
webSocket.start();
// Send a message to the server (default to TEXT mode)
// Send a message to the server (default to BINARY mode)
webSocket.send("hello world");
// The message can be sent in BINARY mode (useful if you send MsgPack data for example)
webSocket.sendBinary("some serialized binary data");
// The message can be sent in TEXT mode
webSocket.sendText("hello again");
// ... finally ...
@@ -68,9 +70,14 @@ server.setOnConnectionCallback(
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
@@ -81,21 +88,19 @@ server.setOnConnectionCallback(
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
// Second parameter tells whether we are sending the message in binary or text mode.
// Here we send it in the same mode as it was received.
webSocket->send(msg->str, msg->binary);
webSocket->send(str);
}
}
);
@@ -250,7 +255,7 @@ No manual polling to fetch data is required. Data is sent and received instantly
### Automatic reconnection
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. This behavior can be disabled.
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
### Large messages
@@ -326,27 +331,32 @@ The onMessage event will be fired when the connection is opened or closed. This
```
webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
std::cout << "send greetings" << std::endl;
// Headers can be inspected (pairs of string/string)
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->headers)
for (auto it : headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
std::cout << "disconnected" << std::endl;
// The server can send an explicit code and reason for closing.
// This data can be accessed through the closeInfo object.
std::cout << msg->closeInfo.code << std::endl;
std::cout << msg->closeInfo.reason << std::endl;
std::cout << closeInfo.code << std::endl;
std::cout << closeInfo.reason << std::endl;
}
}
);
@@ -358,15 +368,20 @@ A message will be fired when there is an error with the connection. The message
```
webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Error)
if (messageType == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->eventInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->eventInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->eventInfo.http_status << std::endl;
ss << "Error: " << error.reason << std::endl;
ss << "#retries: " << event.retries << std::endl;
ss << "Wait time(ms): " << event.wait_time << std::endl;
ss << "HTTP Status: " << event.http_status << std::endl;
std::cout << ss.str() << std::endl;
}
}
@@ -393,12 +408,17 @@ Ping/pong messages are used to implement keep-alive. 2 message types exists to i
```
webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Ping ||
msg->type == ix::WebSocketMessageType::Pong)
if (messageType == ix::WebSocketMessageType::Ping ||
messageType == ix::WebSocketMessageType::Pong)
{
std::cout << "pong data: " << msg->str << std::endl;
std::cout << "pong data: " << str << std::endl;
}
}
);

View File

@@ -109,7 +109,7 @@ namespace ix
HttpRequestArgsPtr args,
int redirects)
{
// We only have one socket connection, so we cannot
// We only have one socket connection, so we cannot
// make multiple requests concurrently.
std::lock_guard<std::mutex> lock(_mutex);

View File

@@ -24,8 +24,6 @@ namespace ix
bool SocketMbedTLS::init(const std::string& host, std::string& errMsg)
{
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_init(&_ssl);
mbedtls_ssl_config_init(&_conf);
mbedtls_ctr_drbg_init(&_ctr_drbg);
@@ -77,24 +75,15 @@ namespace ix
std::string& errMsg,
const CancellationRequest& isCancellationRequested)
{
{
std::lock_guard<std::mutex> lock(_mutex);
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
}
if (!init(host, errMsg))
{
close();
return false;
}
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
if (!init(host, errMsg)) return false;
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res;
do
{
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl);
}
while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
@@ -106,8 +95,6 @@ namespace ix
errMsg = "error in handshake : ";
errMsg += buf;
close();
return false;
}
@@ -116,14 +103,10 @@ namespace ix
void SocketMbedTLS::close()
{
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_free(&_ssl);
mbedtls_ssl_config_free(&_conf);
mbedtls_ctr_drbg_free(&_ctr_drbg);
mbedtls_entropy_free(&_entropy);
Socket::close();
}
ssize_t SocketMbedTLS::send(char* buf, size_t nbyte)

View File

@@ -51,11 +51,9 @@ namespace ix
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote)
{
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
_onMessageCallback(WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote));
}
);
}
@@ -137,13 +135,6 @@ namespace ix
_enablePong = false;
}
void WebSocket::disablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(false);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::start()
{
if (_thread.joinable()) return; // we've already been started
@@ -182,12 +173,10 @@ namespace ix
return status;
}
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
_onMessageCallback(WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
return status;
}
@@ -207,12 +196,10 @@ namespace ix
return status;
}
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
_onMessageCallback(WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
return status;
}
@@ -280,11 +267,9 @@ namespace ix
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()));
_onMessageCallback(WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
}
}
}
@@ -325,8 +310,8 @@ namespace ix
WebSocketMessageType webSocketMessageType;
switch (messageKind)
{
case WebSocketTransport::MessageKind::MSG_TEXT:
case WebSocketTransport::MessageKind::MSG_BINARY:
default:
case WebSocketTransport::MessageKind::MSG:
{
webSocketMessageType = WebSocketMessageType::Message;
} break;
@@ -350,13 +335,9 @@ namespace ix
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
_onMessageCallback(
std::make_shared<WebSocketMessage>(
webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo(), binary));
_onMessageCallback(webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo());
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});
@@ -387,18 +368,9 @@ namespace ix
}
WebSocketSendInfo WebSocket::send(const std::string& data,
bool binary,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(data,
(binary) ? SendMessageKind::Binary: SendMessageKind::Text,
onProgressCallback);
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
return sendMessage(data, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,

View File

@@ -13,7 +13,6 @@
#include "IXWebSocketCloseConstants.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketMessage.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h"
@@ -33,7 +32,52 @@ namespace ix
Closed = 3
};
using OnMessageCallback = std::function<void(const WebSocketMessagePtr&)>;
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
size_t wireSize,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
@@ -51,7 +95,6 @@ namespace ix
void setPingTimeout(int pingTimeoutSecs);
void enablePong();
void disablePong();
void disablePerMessageDeflate();
// Run asynchronously, by calling start and stop.
void start();
@@ -64,18 +107,14 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs);
void run();
// send is in binary mode by default
// send binary data
WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void close(uint16_t code = 1000, const std::string& reason = "Normal closure");
void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@@ -129,7 +168,7 @@ namespace ix
bool _enablePong;
static const bool kDefaultEnablePong;
// Optional ping and pong timeout
// Optional ping and ping timeout
int _pingIntervalSecs;
int _pingTimeoutSecs;
static const int kDefaultPingIntervalSecs;

View File

@@ -1,25 +0,0 @@
/*
* IXWebSocketCloseInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
} // namespace ix

View File

@@ -1,49 +0,0 @@
/*
* IXWebSocketMessage.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocketCloseInfo.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketMessageType.h"
#include "IXWebSocketOpenInfo.h"
#include <memory>
#include <string>
#include <thread>
namespace ix
{
struct WebSocketMessage
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
bool binary;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c,
bool b = false)
: type(t)
, str(std::move(s))
, wireSize(w)
, errorInfo(e)
, openInfo(o)
, closeInfo(c)
, binary(b)
{
;
}
};
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;
} // namespace ix

View File

@@ -32,7 +32,14 @@ namespace ix
if (_websocket)
{
// set dummy callback just to avoid crash
_websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
_websocket->setOnMessageCallback([](
WebSocketMessageType,
const std::string&,
size_t,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
{});
}
_websocket = websocket;
@@ -40,10 +47,27 @@ namespace ix
// bind new
if (_websocket)
{
_websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg)
_websocket->setOnMessageCallback([this](
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(msg));
MessagePtr message(new Message());
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
});
}
}
@@ -58,9 +82,9 @@ namespace ix
_onMessageUserCallback = std::move(callback);
}
WebSocketMessagePtr WebSocketMessageQueue::popMessage()
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
{
WebSocketMessagePtr message;
MessagePtr message;
std::lock_guard<std::mutex> lock(_messagesMutex);
if (!_messages.empty())
@@ -77,11 +101,19 @@ namespace ix
if (!_onMessageUserCallback)
return;
WebSocketMessagePtr message;
MessagePtr message;
while (count > 0 && (message = popMessage()))
{
_onMessageUserCallback(message);
_onMessageUserCallback(
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count;
}
}

View File

@@ -30,12 +30,24 @@ namespace ix
void poll(int count = 512);
protected:
WebSocketMessagePtr popMessage();
struct Message
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessagePtr = std::shared_ptr<Message>;
MessagePtr popMessage();
private:
WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex;
std::list<WebSocketMessagePtr> _messages;
std::list<MessagePtr> _messages;
};
} // namespace ix

View File

@@ -1,21 +0,0 @@
/*
* IXWebSocketMessageType.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
}

View File

@@ -1,24 +0,0 @@
/*
* IXWebSocketOpenInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
} // namespace ix

View File

@@ -542,17 +542,12 @@ namespace ix
) {
unmaskReceiveBuffer(ws);
MessageKind messageKind =
(ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
//
// Usual case. Small unfragmented messages
//
if (ws.fin && _chunks.empty())
{
emitMessage(messageKind,
emitMessage(MessageKind::MSG,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
@@ -572,7 +567,7 @@ namespace ix
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin)
{
emitMessage(messageKind, getMergedChunks(), ws, onMessageCallback);
emitMessage(MessageKind::MSG, getMergedChunks(), ws, onMessageCallback);
_chunks.clear();
}
else

View File

@@ -50,8 +50,7 @@ namespace ix
enum class MessageKind
{
MSG_TEXT,
MSG_BINARY,
MSG,
PING,
PONG,
FRAGMENT

View File

@@ -9,10 +9,10 @@ install: brew
# on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local
brew:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j install)
ws:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .. ; make -j)
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j)
uninstall:
xargs rm -fv < build/install_manifest.txt
@@ -44,7 +44,7 @@ trail:
sh third_party/remote_trailing_whitespaces.sh
format:
find test ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
find ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
# That target is used to start a node server, but isn't required as we have
# a builtin C++ server started in the unittest now

View File

@@ -178,29 +178,34 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str, msg->binary);
client->send(str);
}
}
}

View File

@@ -6,13 +6,13 @@
#pragma once
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex>
#include <spdlog/spdlog.h>
#include <sstream>
#include <string>
#include <vector>
#include <sstream>
#include <iostream>
#include <mutex>
#include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketServer.h>
namespace ix
{
@@ -28,20 +28,20 @@ namespace ix
struct Logger
{
public:
template<typename T>
Logger& operator<<(T const& obj)
{
std::lock_guard<std::mutex> lock(_mutex);
public:
template <typename T>
Logger& operator<<(T const& obj)
{
std::lock_guard<std::mutex> lock(_mutex);
std::stringstream ss;
ss << obj;
spdlog::info(ss.str());
return *this;
}
std::stringstream ss;
ss << obj;
spdlog::info(ss.str());
return *this;
}
private:
static std::mutex _mutex;
private:
static std::mutex _mutex;
};
void log(const std::string& msg);
@@ -49,4 +49,4 @@ namespace ix
int getFreePort();
bool startWebSocketEchoServer(ix::WebSocketServer& server);
} // namespace ix
}

View File

@@ -108,47 +108,52 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
log("client connected");
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "client disconnected("
<< msg->closeInfo.code
<< closeInfo.code
<< ","
<< msg->closeInfo.reason
<< closeInfo.reason
<< ")";
log(ss.str());
std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = msg->closeInfo.code;
_closeReason = std::string(msg->closeInfo.reason);
_closeRemote = msg->closeInfo.remote;
_closeCode = closeInfo.code;
_closeReason = std::string(closeInfo.reason);
_closeRemote = closeInfo.remote;
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "Error ! " << msg->errorInfo.reason;
ss << "Error ! " << error.reason;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Pong)
else if (messageType == ix::WebSocketMessageType::Pong)
{
ss << "Received pong message " << msg->str;
ss << "Received pong message " << str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Ping)
else if (messageType == ix::WebSocketMessageType::Ping)
{
ss << "Received ping message " << msg->str;
ss << "Received ping message " << str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
ss << "Received message " << msg->str;
ss << "Received message " << str;
log(ss.str());
}
else
@@ -178,34 +183,39 @@ namespace
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg)
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "Server closed connection("
<< msg->closeInfo.code
<< closeInfo.code
<< ","
<< msg->closeInfo.reason
<< closeInfo.reason
<< ")";
log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = msg->closeInfo.remote;
receivedCloseCode = closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason);
receivedCloseRemote = closeInfo.remote;
}
}
);

View File

@@ -20,34 +20,39 @@ namespace
{
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[connectionState, &server](const WebSocketMessagePtr& msg)
[connectionState, &server](ix::WebSocketMessageType messageType,
const std::string & str,
size_t wireSize,
const ix::WebSocketErrorInfo & error,
const ix::WebSocketOpenInfo & openInfo,
const ix::WebSocketCloseInfo & closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto&& it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
Logger() << "Message received: " << msg->str;
Logger() << "Message received: " << str;
for (auto&& client : server.getClients())
{
client->send(msg->str);
client->send(str);
}
}
}
@@ -73,41 +78,46 @@ namespace
{
msgQ.bindWebsocket(&ws);
msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg)
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType,
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & error,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
{
REQUIRE(mainThreadId == std::this_thread::get_id());
std::stringstream ss;
if (msg->type == WebSocketMessageType::Open)
if (messageType == WebSocketMessageType::Open)
{
log("client connected");
sendNextMessage();
}
else if (msg->type == WebSocketMessageType::Close)
else if (messageType == WebSocketMessageType::Close)
{
log("client disconnected");
}
else if (msg->type == WebSocketMessageType::Error)
else if (messageType == WebSocketMessageType::Error)
{
ss << "Error ! " << msg->errorInfo.reason;
ss << "Error ! " << error.reason;
log(ss.str());
testDone = true;
}
else if (msg->type == WebSocketMessageType::Pong)
else if (messageType == WebSocketMessageType::Pong)
{
ss << "Received pong message " << msg->str;
ss << "Received pong message " << str;
log(ss.str());
}
else if (msg->type == WebSocketMessageType::Ping)
else if (messageType == WebSocketMessageType::Ping)
{
ss << "Received ping message " << msg->str;
ss << "Received ping message " << str;
log(ss.str());
}
else if (msg->type == WebSocketMessageType::Message)
else if (messageType == WebSocketMessageType::Message)
{
REQUIRE(msg->str.compare("Hey dude!") == 0);
REQUIRE(str.compare("Hey dude!") == 0);
++receivedCount;
ss << "Received message " << msg->str;
ss << "Received message " << str;
log(ss.str());
sendNextMessage();
}
@@ -179,4 +189,5 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
server.stop();
}
}

View File

@@ -106,7 +106,7 @@ namespace
{
log("client disconnected");
if (msg->closeInfo.code == 1011)
if (closeInfo.code == 1011)
{
_closedDueToPingTimeout = true;
}

View File

@@ -39,37 +39,42 @@ namespace ix
server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState,
&connectionId, &server](const ix::WebSocketMessagePtr& msg)
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str);
client->send(str);
}
}
}

View File

@@ -52,36 +52,41 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
log("TestConnectionDisconnection: connected !");
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
log("TestConnectionDisconnection: disconnected !");
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "TestConnectionDisconnection: Error! ";
ss << msg->errorInfo.reason;
ss << error.reason;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
log("TestConnectionDisconnection: received message.!");
}
else if (msg->type == ix::WebSocketMessageType::Ping)
else if (messageType == ix::WebSocketMessageType::Ping)
{
log("TestConnectionDisconnection: received ping message.!");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
else if (messageType == ix::WebSocketMessageType::Pong)
{
log("TestConnectionDisconnection: received pong message.!");
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
else if (messageType == ix::WebSocketMessageType::Fragment)
{
log("TestConnectionDisconnection: received fragment.!");
}

View File

@@ -114,26 +114,31 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " Connected !";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " disconnected !";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(msg->str);
auto result = decodeMessage(str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user
@@ -154,20 +159,20 @@ namespace
<< _user << " > ";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason;
ss << "cmd_websocket_chat: Error ! " << error.reason;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Ping)
else if (messageType == ix::WebSocketMessageType::Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
else if (messageType == ix::WebSocketMessageType::Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
else if (messageType == ix::WebSocketMessageType::Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
@@ -216,30 +221,35 @@ namespace
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str);
client->send(str);
}
}
}

View File

@@ -1,30 +0,0 @@
# Clients
## ws
```
$ ws connect ws://127.0.0.1:8765
Type Ctrl-D to exit prompt...
Connecting to url: ws://127.0.0.1:8765
> ws_connect: connected
Uri: /
Handshake Headers:
Connection: Upgrade
Date: Sat, 08 Jun 2019 16:43:29 GMT
Sec-WebSocket-Accept: kPCNwGa97y+7NWdAvHi/7/rA8AE=
Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
Server: Python/3.7 websockets/7.0
Upgrade: websocket
Received 13 bytes
ws_connect: received message: > Welcome !
ws_connect: connection closed: code 1006 reason Abnormal closure
```
## wscat
```
$ ./node_modules/.bin/wscat -c ws://127.0.0.1:8765
connected (press CTRL+C to quit)
< > Welcome !
disconnected (code: 1006)
```

View File

@@ -1,22 +0,0 @@
#!/usr/bin/env python
# WS server example
import asyncio
import websockets
async def hello(websocket, path):
await websocket.send(f"> Welcome !")
name = await websocket.recv()
print(f"< {name}")
greeting = f"Hello {name}!"
await websocket.send(greeting)
print(f"> {greeting}")
start_server = websockets.serve(hello, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@@ -99,10 +99,8 @@ def runCMake(sanitizer, buildDir):
#generator = '"NMake Makefiles"'
#generator = '"Visual Studio 16 2019"'
generator = '"Visual Studio 15 2017"'
USE_VENDORED_THIRD_PARTY = 'ON'
else:
generator = '"Unix Makefiles"'
USE_VENDORED_THIRD_PARTY = 'OFF'
CMAKE_BUILD_TYPE = BUILD_TYPE
@@ -112,7 +110,6 @@ def runCMake(sanitizer, buildDir):
-DCMAKE_BUILD_TYPE={CMAKE_BUILD_TYPE} \
-DUSE_TLS=1 \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DUSE_VENDORED_THIRD_PARTY={USE_VENDORED_THIRD_PARTY} \
-G{generator}'
cmakeCmd = fmt.format(**locals())

View File

@@ -72,6 +72,7 @@ namespace ix
std::string line;
std::stringstream tokenStream(stack);
std::stringstream ss;
std::smatch group;
while (std::getline(tokenStream, line))
@@ -83,7 +84,6 @@ namespace ix
const auto linenoStr = group.str(2);
const auto function = group.str(3);
std::stringstream ss;
ss << linenoStr;
uint64_t lineno;
ss >> lineno;
@@ -97,8 +97,6 @@ namespace ix
}
}
std::reverse(frames.begin(), frames.end());
return frames;
}
@@ -145,7 +143,7 @@ namespace ix
// "b"
// ],
// ]
//
//
Json::Value tags;
Json::Value gameTag;
@@ -171,15 +169,18 @@ namespace ix
std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg,
bool verbose)
{
std::string log;
auto args = _httpClient.createRequest();
args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args->connectTimeout = 60;
args->transferTimeout = 5 * 60;
args->followRedirects = true;
args->verbose = verbose;
args->logger = [](const std::string& msg)
args->logger = [&log](const std::string& msg)
{
spdlog::info("request logger: {}", msg);
log += msg;
std::cout << msg;
};
std::string body = computePayload(msg);
@@ -195,7 +196,7 @@ namespace ix
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
spdlog::info("Status: {}", response->statusCode);
std::cerr << "Status: " << response->statusCode << std::endl;
if (response->errorCode != HttpErrorCode::Ok)
{
spdlog::info("error message: {}", response->errorMsg);
@@ -207,6 +208,6 @@ namespace ix
}
}
return std::make_pair(response, body);
return std::make_pair(response, log);
}
} // namespace ix

View File

@@ -6,10 +6,10 @@
#pragma once
#include <algorithm>
#include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h>
#include <regex>
#include <algorithm>
namespace ix
{

View File

@@ -90,41 +90,46 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback()
{
_webSocket->setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
CobraConnection::invokeTrafficTrackerCallback(wireSize, true);
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(),
msg->openInfo.headers);
openInfo.headers);
sendHandshakeMessage();
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
_authenticated = false;
std::stringstream ss;
ss << "Close code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason;
ss << "Close code " << closeInfo.code;
ss << " reason " << closeInfo.reason;
invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
Json::Value data;
Json::Reader reader;
if (!reader.parse(msg->str, data))
if (!reader.parse(str, data))
{
invokeErrorCallback("Invalid json", msg->str);
invokeErrorCallback("Invalid json", str);
return;
}
if (!data.isMember("action"))
{
invokeErrorCallback("Missing action", msg->str);
invokeErrorCallback("Missing action", str);
return;
}
@@ -134,12 +139,12 @@ namespace ix
{
if (!handleHandshakeResponse(data))
{
invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
invokeErrorCallback("Error extracting nonce from handshake response", str);
}
}
else if (action == "auth/handshake/error")
{
invokeErrorCallback("Handshake error", msg->str);
invokeErrorCallback("Handshake error", str);
}
else if (action == "auth/authenticate/ok")
{
@@ -149,7 +154,7 @@ namespace ix
}
else if (action == "auth/authenticate/error")
{
invokeErrorCallback("Authentication error", msg->str);
invokeErrorCallback("Authentication error", str);
}
else if (action == "rtm/subscription/data")
{
@@ -159,36 +164,36 @@ namespace ix
{
if (!handleSubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", msg->str);
invokeErrorCallback("Error processing subscribe response", str);
}
}
else if (action == "rtm/subscribe/error")
{
invokeErrorCallback("Subscription error", msg->str);
invokeErrorCallback("Subscription error", str);
}
else if (action == "rtm/unsubscribe/ok")
{
if (!handleUnsubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", msg->str);
invokeErrorCallback("Error processing subscribe response", str);
}
}
else if (action == "rtm/unsubscribe/error")
{
invokeErrorCallback("Unsubscription error", msg->str);
invokeErrorCallback("Unsubscription error", str);
}
else
{
invokeErrorCallback("Un-handled message type", msg->str);
invokeErrorCallback("Un-handled message type", str);
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string());
}
});

View File

@@ -10,6 +10,7 @@
#if defined(IXWEBSOCKET_USE_MBED_TLS)
# include <mbedtls/md.h>
#elif defined(__APPLE__)
# include <ixwebsocket/IXSocketMbedTLS.h>
# include <CommonCrypto/CommonHMAC.h>
#else
# include <openssl/hmac.h>

View File

@@ -58,20 +58,25 @@ namespace snake
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback(
[this, webSocket, state](const ix::WebSocketMessagePtr& msg)
[this, webSocket, state](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::string appkey = parseAppKey(msg->openInfo.uri);
std::string appkey = parseAppKey(openInfo.uri);
state->setAppkey(appkey);
// Connect to redis first
@@ -81,29 +86,29 @@ namespace snake
std::cerr << "Cannot connect to redis host" << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
else if (messageType == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, msg->str);
std::cerr << "Received " << wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str);
}
}
);

View File

@@ -80,8 +80,6 @@ int main(int argc, char** argv)
bool strict = false;
bool stress = false;
bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
bool greetings = false;
int port = 8008;
int redisPort = 6379;
int statsdPort = 8125;
@@ -112,7 +110,6 @@ int main(int argc, char** argv)
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
connectApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required();
@@ -121,7 +118,6 @@ int main(int argc, char** argv)
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Port");
@@ -245,8 +241,7 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("connect"))
{
ret = ix::ws_connect_main(url, disableAutomaticReconnection,
disablePerMessageDeflate);
ret = ix::ws_connect_main(url, disableAutomaticReconnection);
}
else if (app.got_subcommand("chat"))
{
@@ -254,7 +249,7 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(port, greetings, hostname);
ret = ix::ws_echo_server_main(port, hostname);
}
else if (app.got_subcommand("broadcast_server"))
{

View File

@@ -24,15 +24,13 @@ namespace ix
int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port, bool greetings, const std::string& hostname);
int ws_echo_server_main(int port, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname);
int ws_chat_main(const std::string& url, const std::string& user);
int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection);
int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs);

View File

@@ -21,48 +21,52 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
else if (messageType == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str,
msg->binary,
client->send(str,
[](int current, int total) -> bool
{
std::cerr << "Step " << current

View File

@@ -84,15 +84,20 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](const WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
log("ws chat: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
@@ -102,18 +107,18 @@ namespace ix
<< " Connected !";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
ss << "ws chat: user "
<< _user
<< " disconnected !"
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(msg->str);
auto result = decodeMessage(str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out.
@@ -122,17 +127,17 @@ namespace ix
_receivedQueue.push(result.second);
ss << std::endl
<< result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second
<< std::endl
<< _user << " > ";
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else
@@ -167,7 +172,7 @@ namespace ix
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.sendText(encodeMessage(text));
_webSocket.send(encodeMessage(text));
}
int ws_chat_main(const std::string& url,

View File

@@ -47,12 +47,12 @@ namespace ix
std::condition_variable progressCondition;
std::queue<Json::Value> queue;
SentryClient sentryClient(dsn);
auto sentrySender = [&condition, &progressCondition, &conditionVariableMutex,
&queue, verbose, &errorSending, &sentCount,
&stop, &dsn]
&stop, &sentryClient]
{
SentryClient sentryClient(dsn);
while (true)
{
Json::Value msg;
@@ -70,8 +70,8 @@ namespace ix
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
spdlog::error("Log: {}", ret.second);
errorSending = true;
}
else
@@ -192,6 +192,6 @@ namespace ix
pool[i].join();
}
return (strict && errorSending) ? 1 : 0;
return 0;
}
}

View File

@@ -15,8 +15,7 @@ namespace ix
{
public:
WebSocketConnect(const std::string& _url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
bool disableAutomaticReconnection);
void subscribe(const std::string& channel);
void start();
@@ -27,16 +26,13 @@ namespace ix
private:
std::string _url;
ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
void log(const std::string& msg);
};
WebSocketConnect::WebSocketConnect(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate) :
_url(url),
_disablePerMessageDeflate(disablePerMessageDeflate)
bool disableAutomaticReconnection) :
_url(url)
{
if (disableAutomaticReconnection)
{
@@ -58,66 +54,64 @@ namespace ix
{
_webSocket.setUrl(_url);
if (_disablePerMessageDeflate)
{
_webSocket.disablePerMessageDeflate();
}
else
{
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
log("ws_connect: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
ss << "ws_connect: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::cerr << "Received " << wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: "
<< msg->str;
<< str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
else if (messageType == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Ping)
else if (messageType == ix::WebSocketMessageType::Ping)
{
std::cerr << "Received ping" << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Pong)
else if (messageType == ix::WebSocketMessageType::Pong)
{
std::cerr << "Received pong" << std::endl;
}
@@ -133,17 +127,13 @@ namespace ix
void WebSocketConnect::sendMessage(const std::string& text)
{
_webSocket.sendText(text);
_webSocket.send(text);
}
int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate)
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url,
disableAutomaticReconnection,
disablePerMessageDeflate);
WebSocketConnect webSocketChat(url, disableAutomaticReconnection);
webSocketChat.start();
while (true)

View File

@@ -10,56 +10,56 @@
namespace ix
{
int ws_echo_server_main(int port, bool greetings, const std::string& hostname)
int ws_echo_server_main(int port, const std::string& hostname)
{
std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
[](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg)
[webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
if (greetings)
{
webSocket->sendText("Welcome !");
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
std::cerr << "Received "
<< msg->wireSize << " bytes"
<< wireSize << " bytes"
<< std::endl;
webSocket->send(msg->str, msg->binary);
webSocket->send(str);
}
}
);

View File

@@ -54,54 +54,59 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
log("ping_pong: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
ss << "ping_pong: disconnected:"
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason
<< msg->str;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason
<< str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
ss << "ping_pong: received message: "
<< msg->str;
<< str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Ping)
else if (messageType == ix::WebSocketMessageType::Ping)
{
ss << "ping_pong: received ping message: "
<< msg->str;
<< str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Pong)
else if (messageType == ix::WebSocketMessageType::Pong)
{
ss << "ping_pong: received pong message: "
<< msg->str;
<< str;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else

View File

@@ -183,36 +183,41 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](const ix::WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
_condition.notify_one();
log("ws_receive: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
ss << "ws_receive: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
ss << "ws_receive: transfered " << msg->wireSize << " bytes";
ss << "ws_receive: transfered " << wireSize << " bytes";
log(ss.str());
handleMessage(msg->str);
handleMessage(str);
_condition.notify_one();
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
else if (messageType == ix::WebSocketMessageType::Fragment)
{
ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str());
@@ -224,13 +229,13 @@ namespace ix
std::this_thread::sleep_for(duration);
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "ws_receive ";
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else

View File

@@ -112,37 +112,42 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](const WebSocketMessagePtr& msg)
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
_condition.notify_one();
log("ws_send: connected");
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
ss << "ws_send: connection closed:";
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
log(ss.str());
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
_condition.notify_one();
ss << "ws_send: received message (" << msg->wireSize << " bytes)";
ss << "ws_send: received message (" << wireSize << " bytes)";
log(ss.str());
std::string errMsg;
MsgPack data = MsgPack::parse(msg->str, errMsg);
MsgPack data = MsgPack::parse(str, errMsg);
if (!errMsg.empty())
{
std::cerr << "Invalid MsgPack response" << std::endl;
@@ -155,13 +160,13 @@ namespace ix
std::cerr << "Invalid id" << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
ss << "ws_send ";
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else
@@ -239,8 +244,8 @@ namespace ix
MsgPack msg(pdu);
Bench bench("Sending file through websocket");
_webSocket.sendBinary(msg.dump(),
[throttle](int current, int total) -> bool
_webSocket.send(msg.dump(),
[throttle](int current, int total) -> bool
{
std::cout << "ws_send: Step " << current << " out of " << total << std::endl;

View File

@@ -21,48 +21,52 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (msg->type == ix::WebSocketMessageType::Open)
if (messageType == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Close)
else if (messageType == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Error)
else if (messageType == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (msg->type == ix::WebSocketMessageType::Fragment)
else if (messageType == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment "
<< std::endl;
}
else if (msg->type == ix::WebSocketMessageType::Message)
else if (messageType == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(msg->str,
msg->binary,
client->send(str,
[](int current, int total) -> bool
{
std::cerr << "ws_transfer: Step " << current