Compare commits

...

9 Commits

35 changed files with 562 additions and 488 deletions

26
CHANGELOG.md Normal file
View File

@ -0,0 +1,26 @@
# Changelog
All notable changes to this project will be documented in this file.
## [4.0.0] - 2019-06-09
### Changed
- WebSocket::send() sends message in TEXT mode by default
- WebSocketMessage sets a new binary field, which tells whether the received incoming message is binary or text
- WebSocket::send takes a third arg, binary which default to true (can be text too)
- WebSocket callback only take one object, a const ix::WebSocketMessagePtr& msg
- Add explicite WebSocket::sendBinary
- New headers + WebSocketMessage class to hold message data, still not used across the board
- Add test/compatibility folder with small servers and clients written in different languages and different libraries to test compatibility.
- ws echo_server has a -g option to print a greeting message on connect
- IXSocketMbedTLS: better error handling in close and connect
## [3.1.2] - 2019-06-06
### Added
- ws connect has a -x option to disable per message deflate
- Add WebSocket::disablePerMessageDeflate() option.
## [3.0.0] - 2019-06-xx
### Changed
- TLS, aka SSL works on Windows (websocket and http clients)
- ws command line tool build on Windows
- Async API for HttpClient
- HttpClient API changed to use shared_ptr for response and request

View File

@ -61,6 +61,10 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketHandshake.h
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketCloseInfo.h
ixwebsocket/IXWebSocketOpenInfo.h
ixwebsocket/IXWebSocketMessageType.h
ixwebsocket/IXWebSocketMessage.h
ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h

View File

@ -1 +1 @@
3.1.1
4.0.0

View File

@ -24,6 +24,8 @@ namespace ix
bool SocketMbedTLS::init(const std::string& host, std::string& errMsg)
{
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_init(&_ssl);
mbedtls_ssl_config_init(&_conf);
mbedtls_ctr_drbg_init(&_ctr_drbg);
@ -75,15 +77,24 @@ namespace ix
std::string& errMsg,
const CancellationRequest& isCancellationRequested)
{
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
if (!init(host, errMsg)) return false;
{
std::lock_guard<std::mutex> lock(_mutex);
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
if (_sockfd == -1) return false;
}
if (!init(host, errMsg))
{
close();
return false;
}
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res;
do
{
std::lock_guard<std::mutex> lock(_mutex);
res = mbedtls_ssl_handshake(&_ssl);
}
while (res == MBEDTLS_ERR_SSL_WANT_READ || res == MBEDTLS_ERR_SSL_WANT_WRITE);
@ -95,6 +106,8 @@ namespace ix
errMsg = "error in handshake : ";
errMsg += buf;
close();
return false;
}
@ -103,10 +116,14 @@ namespace ix
void SocketMbedTLS::close()
{
std::lock_guard<std::mutex> lock(_mutex);
mbedtls_ssl_free(&_ssl);
mbedtls_ssl_config_free(&_conf);
mbedtls_ctr_drbg_free(&_ctr_drbg);
mbedtls_entropy_free(&_entropy);
Socket::close();
}
ssize_t SocketMbedTLS::send(char* buf, size_t nbyte)

View File

@ -51,9 +51,11 @@ namespace ix
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote)
{
_onMessageCallback(WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote));
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Close, "", wireSize,
WebSocketErrorInfo(), WebSocketOpenInfo(),
WebSocketCloseInfo(code, reason, remote)));
}
);
}
@ -180,10 +182,12 @@ namespace ix
return status;
}
_onMessageCallback(WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status;
}
@ -203,10 +207,12 @@ namespace ix
return status;
}
_onMessageCallback(WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Open, "", 0,
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo()));
return status;
}
@ -274,9 +280,11 @@ namespace ix
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo());
_onMessageCallback(
std::make_shared<WebSocketMessage>(
WebSocketMessageType::Error, "", 0,
connectErr, WebSocketOpenInfo(),
WebSocketCloseInfo()));
}
}
}
@ -317,8 +325,8 @@ namespace ix
WebSocketMessageType webSocketMessageType;
switch (messageKind)
{
default:
case WebSocketTransport::MessageKind::MSG:
case WebSocketTransport::MessageKind::MSG_TEXT:
case WebSocketTransport::MessageKind::MSG_BINARY:
{
webSocketMessageType = WebSocketMessageType::Message;
} break;
@ -342,9 +350,13 @@ namespace ix
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
_onMessageCallback(webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo());
bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
_onMessageCallback(
std::make_shared<WebSocketMessage>(
webSocketMessageType, msg, wireSize,
webSocketErrorInfo, WebSocketOpenInfo(),
WebSocketCloseInfo(), binary));
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});
@ -375,9 +387,18 @@ namespace ix
}
WebSocketSendInfo WebSocket::send(const std::string& data,
bool binary,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(data, SendMessageKind::Binary, onProgressCallback);
return sendMessage(data,
(binary) ? SendMessageKind::Binary: SendMessageKind::Text,
onProgressCallback);
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,

View File

@ -13,6 +13,7 @@
#include "IXWebSocketCloseConstants.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXWebSocketMessage.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketTransport.h"
@ -32,52 +33,7 @@ namespace ix
Closed = 3
};
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
using OnMessageCallback = std::function<void(WebSocketMessageType,
const std::string&,
size_t wireSize,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)>;
using OnMessageCallback = std::function<void(const WebSocketMessagePtr&)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
@ -108,14 +64,18 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs);
void run();
// send binary data
// send is in binary mode by default
WebSocketSendInfo send(const std::string& data,
bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close(uint16_t code = 1000, const std::string& reason = "Normal closure");
void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
@ -169,7 +129,7 @@ namespace ix
bool _enablePong;
static const bool kDefaultEnablePong;
// Optional ping and ping timeout
// Optional ping and pong timeout
int _pingIntervalSecs;
int _pingTimeoutSecs;
static const int kDefaultPingIntervalSecs;

View File

@ -0,0 +1,25 @@
/*
* IXWebSocketCloseInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketCloseInfo
{
uint16_t code;
std::string reason;
bool remote;
WebSocketCloseInfo(uint16_t c = 0, const std::string& r = std::string(), bool rem = false)
: code(c)
, reason(r)
, remote(rem)
{
;
}
};
} // namespace ix

View File

@ -0,0 +1,49 @@
/*
* IXWebSocketMessage.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocketCloseInfo.h"
#include "IXWebSocketErrorInfo.h"
#include "IXWebSocketMessageType.h"
#include "IXWebSocketOpenInfo.h"
#include <memory>
#include <string>
#include <thread>
namespace ix
{
struct WebSocketMessage
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
bool binary;
WebSocketMessage(WebSocketMessageType t,
const std::string& s,
size_t w,
WebSocketErrorInfo e,
WebSocketOpenInfo o,
WebSocketCloseInfo c,
bool b = false)
: type(t)
, str(std::move(s))
, wireSize(w)
, errorInfo(e)
, openInfo(o)
, closeInfo(c)
, binary(b)
{
;
}
};
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;
} // namespace ix

View File

@ -32,14 +32,7 @@ namespace ix
if (_websocket)
{
// set dummy callback just to avoid crash
_websocket->setOnMessageCallback([](
WebSocketMessageType,
const std::string&,
size_t,
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
{});
_websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
}
_websocket = websocket;
@ -47,27 +40,10 @@ namespace ix
// bind new
if (_websocket)
{
_websocket->setOnMessageCallback([this](
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
_websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg)
{
MessagePtr message(new Message());
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(msg));
});
}
}
@ -82,9 +58,9 @@ namespace ix
_onMessageUserCallback = std::move(callback);
}
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
WebSocketMessagePtr WebSocketMessageQueue::popMessage()
{
MessagePtr message;
WebSocketMessagePtr message;
std::lock_guard<std::mutex> lock(_messagesMutex);
if (!_messages.empty())
@ -101,19 +77,11 @@ namespace ix
if (!_onMessageUserCallback)
return;
MessagePtr message;
WebSocketMessagePtr message;
while (count > 0 && (message = popMessage()))
{
_onMessageUserCallback(
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
_onMessageUserCallback(message);
--count;
}
}

View File

@ -30,24 +30,12 @@ namespace ix
void poll(int count = 512);
protected:
struct Message
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessagePtr = std::shared_ptr<Message>;
MessagePtr popMessage();
WebSocketMessagePtr popMessage();
private:
WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex;
std::list<MessagePtr> _messages;
std::list<WebSocketMessagePtr> _messages;
};
} // namespace ix

View File

@ -0,0 +1,21 @@
/*
* IXWebSocketMessageType.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
enum class WebSocketMessageType
{
Message = 0,
Open = 1,
Close = 2,
Error = 3,
Ping = 4,
Pong = 5,
Fragment = 6
};
}

View File

@ -0,0 +1,24 @@
/*
* IXWebSocketOpenInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
struct WebSocketOpenInfo
{
std::string uri;
WebSocketHttpHeaders headers;
WebSocketOpenInfo(const std::string& u = std::string(),
const WebSocketHttpHeaders& h = WebSocketHttpHeaders())
: uri(u)
, headers(h)
{
;
}
};
} // namespace ix

View File

@ -542,12 +542,17 @@ namespace ix
) {
unmaskReceiveBuffer(ws);
MessageKind messageKind =
(ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY;
//
// Usual case. Small unfragmented messages
//
if (ws.fin && _chunks.empty())
{
emitMessage(MessageKind::MSG,
emitMessage(messageKind,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
@ -567,7 +572,7 @@ namespace ix
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin)
{
emitMessage(MessageKind::MSG, getMergedChunks(), ws, onMessageCallback);
emitMessage(messageKind, getMergedChunks(), ws, onMessageCallback);
_chunks.clear();
}
else

View File

@ -50,7 +50,8 @@ namespace ix
enum class MessageKind
{
MSG,
MSG_TEXT,
MSG_BINARY,
PING,
PONG,
FRAGMENT

View File

@ -178,34 +178,29 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
client->send(msg->str);
}
}
}

View File

@ -108,52 +108,47 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("client connected");
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "client disconnected("
<< closeInfo.code
<< msg->closeInfo.code
<< ","
<< closeInfo.reason
<< msg->closeInfo.reason
<< ")";
log(ss.str());
std::lock_guard<std::mutex> lck(_mutexCloseData);
_closeCode = closeInfo.code;
_closeReason = std::string(closeInfo.reason);
_closeRemote = closeInfo.remote;
_closeCode = msg->closeInfo.code;
_closeReason = std::string(msg->closeInfo.reason);
_closeRemote = msg->closeInfo.remote;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Error ! " << error.reason;
ss << "Error ! " << msg->errorInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
ss << "Received pong message " << str;
ss << "Received pong message " << msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
ss << "Received ping message " << str;
ss << "Received ping message " << msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "Received message " << str;
ss << "Received message " << msg->str;
log(ss.str());
}
else
@ -183,39 +178,34 @@ namespace
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server, &receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::stringstream ss;
ss << "Server closed connection("
<< closeInfo.code
<< msg->closeInfo.code
<< ","
<< closeInfo.reason
<< msg->closeInfo.reason
<< ")";
log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code;
receivedCloseReason = std::string(closeInfo.reason);
receivedCloseRemote = closeInfo.remote;
receivedCloseCode = msg->closeInfo.code;
receivedCloseReason = std::string(msg->closeInfo.reason);
receivedCloseRemote = msg->closeInfo.remote;
}
}
);

View File

@ -20,33 +20,28 @@ namespace
{
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[connectionState, &server](ix::WebSocketMessageType messageType,
const std::string & str,
size_t wireSize,
const ix::WebSocketErrorInfo & error,
const ix::WebSocketOpenInfo & openInfo,
const ix::WebSocketCloseInfo & closeInfo)
[connectionState, &server](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto&& it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
Logger() << "Message received: " << str;
@ -78,42 +73,37 @@ namespace
{
msgQ.bindWebsocket(&ws);
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType,
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & error,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg)
{
REQUIRE(mainThreadId == std::this_thread::get_id());
std::stringstream ss;
if (messageType == WebSocketMessageType::Open)
if (msg->type == WebSocketMessageType::Open)
{
log("client connected");
sendNextMessage();
}
else if (messageType == WebSocketMessageType::Close)
else if (msg->type == WebSocketMessageType::Close)
{
log("client disconnected");
}
else if (messageType == WebSocketMessageType::Error)
else if (msg->type == WebSocketMessageType::Error)
{
ss << "Error ! " << error.reason;
log(ss.str());
testDone = true;
}
else if (messageType == WebSocketMessageType::Pong)
else if (msg->type == WebSocketMessageType::Pong)
{
ss << "Received pong message " << str;
log(ss.str());
}
else if (messageType == WebSocketMessageType::Ping)
else if (msg->type == WebSocketMessageType::Ping)
{
ss << "Received ping message " << str;
log(ss.str());
}
else if (messageType == WebSocketMessageType::Message)
else if (msg->type == WebSocketMessageType::Message)
{
REQUIRE(str.compare("Hey dude!") == 0);
++receivedCount;
@ -189,5 +179,4 @@ TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
server.stop();
}
}

View File

@ -106,7 +106,7 @@ namespace
{
log("client disconnected");
if (closeInfo.code == 1011)
if (msg->closeInfo.code == 1011)
{
_closedDueToPingTimeout = true;
}

View File

@ -39,42 +39,37 @@ namespace ix
server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
&connectionId, &server](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
Logger() << "Closed connection";
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
client->send(msg->str);
}
}
}

View File

@ -52,41 +52,36 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("TestConnectionDisconnection: connected !");
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("TestConnectionDisconnection: disconnected !");
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "TestConnectionDisconnection: Error! ";
ss << error.reason;
ss << msg->errorInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
log("TestConnectionDisconnection: received message.!");
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("TestConnectionDisconnection: received ping message.!");
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("TestConnectionDisconnection: received pong message.!");
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
log("TestConnectionDisconnection: received fragment.!");
}

View File

@ -114,31 +114,26 @@ namespace
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " Connected !";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " disconnected !";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(str);
auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't need to have a msg_user != user
@ -159,20 +154,20 @@ namespace
<< _user << " > ";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "cmd_websocket_chat: Error ! " << error.reason;
ss << "cmd_websocket_chat: Error ! " << msg->errorInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
@ -221,35 +216,30 @@ namespace
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Uri: " << msg->openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("Closed connection");
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
client->send(msg->str);
}
}
}

View File

@ -0,0 +1,30 @@
# Clients
## ws
```
$ ws connect ws://127.0.0.1:8765
Type Ctrl-D to exit prompt...
Connecting to url: ws://127.0.0.1:8765
> ws_connect: connected
Uri: /
Handshake Headers:
Connection: Upgrade
Date: Sat, 08 Jun 2019 16:43:29 GMT
Sec-WebSocket-Accept: kPCNwGa97y+7NWdAvHi/7/rA8AE=
Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=15; client_max_window_bits=15
Server: Python/3.7 websockets/7.0
Upgrade: websocket
Received 13 bytes
ws_connect: received message: > Welcome !
ws_connect: connection closed: code 1006 reason Abnormal closure
```
## wscat
```
$ ./node_modules/.bin/wscat -c ws://127.0.0.1:8765
connected (press CTRL+C to quit)
< > Welcome !
disconnected (code: 1006)
```

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# WS server example
import asyncio
import websockets
async def hello(websocket, path):
await websocket.send(f"> Welcome !")
name = await websocket.recv()
print(f"< {name}")
greeting = f"Hello {name}!"
await websocket.send(greeting)
print(f"> {greeting}")
start_server = websockets.serve(hello, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@ -90,46 +90,41 @@ namespace ix
void CobraConnection::initWebSocketOnMessageCallback()
{
_webSocket->setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
CobraConnection::invokeTrafficTrackerCallback(wireSize, true);
CobraConnection::invokeTrafficTrackerCallback(msg->wireSize, true);
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(),
openInfo.headers);
msg->openInfo.headers);
sendHandshakeMessage();
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
_authenticated = false;
std::stringstream ss;
ss << "Close code " << closeInfo.code;
ss << " reason " << closeInfo.reason;
ss << "Close code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason;
invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data))
if (!reader.parse(msg->str, data))
{
invokeErrorCallback("Invalid json", str);
invokeErrorCallback("Invalid json", msg->str);
return;
}
if (!data.isMember("action"))
{
invokeErrorCallback("Missing action", str);
invokeErrorCallback("Missing action", msg->str);
return;
}
@ -139,12 +134,12 @@ namespace ix
{
if (!handleHandshakeResponse(data))
{
invokeErrorCallback("Error extracting nonce from handshake response", str);
invokeErrorCallback("Error extracting nonce from handshake response", msg->str);
}
}
else if (action == "auth/handshake/error")
{
invokeErrorCallback("Handshake error", str);
invokeErrorCallback("Handshake error", msg->str);
}
else if (action == "auth/authenticate/ok")
{
@ -154,7 +149,7 @@ namespace ix
}
else if (action == "auth/authenticate/error")
{
invokeErrorCallback("Authentication error", str);
invokeErrorCallback("Authentication error", msg->str);
}
else if (action == "rtm/subscription/data")
{
@ -164,36 +159,36 @@ namespace ix
{
if (!handleSubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", str);
invokeErrorCallback("Error processing subscribe response", msg->str);
}
}
else if (action == "rtm/subscribe/error")
{
invokeErrorCallback("Subscription error", str);
invokeErrorCallback("Subscription error", msg->str);
}
else if (action == "rtm/unsubscribe/ok")
{
if (!handleUnsubscriptionResponse(data))
{
invokeErrorCallback("Error processing subscribe response", str);
invokeErrorCallback("Error processing subscribe response", msg->str);
}
}
else if (action == "rtm/unsubscribe/error")
{
invokeErrorCallback("Unsubscription error", str);
invokeErrorCallback("Unsubscription error", msg->str);
}
else
{
invokeErrorCallback("Un-handled message type", str);
invokeErrorCallback("Un-handled message type", msg->str);
}
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
invokeErrorCallback(ss.str(), std::string());
}
});

View File

@ -58,25 +58,20 @@ namespace snake
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
webSocket->setOnMessageCallback(
[this, webSocket, state](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this, webSocket, state](const ix::WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << state->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::string appkey = parseAppKey(openInfo.uri);
std::string appkey = parseAppKey(msg->openInfo.uri);
state->setAppkey(appkey);
// Connect to redis first
@ -86,29 +81,29 @@ namespace snake
std::cerr << "Cannot connect to redis host" << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, str);
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
processCobraMessage(state, webSocket, _appConfig, msg->str);
}
}
);

View File

@ -81,6 +81,7 @@ int main(int argc, char** argv)
bool stress = false;
bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
bool greetings = false;
int port = 8008;
int redisPort = 6379;
int statsdPort = 8125;
@ -120,6 +121,7 @@ int main(int argc, char** argv)
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname");
echoServerApp->add_flag("-g", greetings, "Verbose");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Port");
@ -252,7 +254,7 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("echo_server"))
{
ret = ix::ws_echo_server_main(port, hostname);
ret = ix::ws_echo_server_main(port, greetings, hostname);
}
else if (app.got_subcommand("broadcast_server"))
{

View File

@ -24,7 +24,7 @@ namespace ix
int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port, const std::string& hostname);
int ws_echo_server_main(int port, bool greetings, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname);

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str,
client->send(msg->str,
msg->binary,
[](int current, int total) -> bool
{
std::cerr << "Step " << current

View File

@ -84,20 +84,15 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws chat: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
@ -107,18 +102,18 @@ namespace ix
<< " Connected !";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws chat: user "
<< _user
<< " disconnected !"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
auto result = decodeMessage(str);
auto result = decodeMessage(msg->str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out.
@ -127,17 +122,17 @@ namespace ix
_receivedQueue.push(result.second);
ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second
<< result.first << "(" << msg->wireSize << " bytes)" << " > " << result.second
<< std::endl
<< _user << " > ";
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else
@ -172,7 +167,7 @@ namespace ix
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.send(encodeMessage(text));
_webSocket.sendText(encodeMessage(text));
}
int ws_chat_main(const std::string& url,

View File

@ -73,56 +73,51 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ws_connect: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_connect: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
std::cerr << "Received ping" << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
std::cerr << "Received pong" << std::endl;
}
@ -138,7 +133,7 @@ namespace ix
void WebSocketConnect::sendMessage(const std::string& text)
{
_webSocket.send(text);
_webSocket.sendText(text);
}
int ws_connect_main(const std::string& url,

View File

@ -10,56 +10,56 @@
namespace ix
{
int ws_echo_server_main(int port, const std::string& hostname)
int ws_echo_server_main(int port, bool greetings, const std::string& hostname)
{
std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
if (greetings)
{
webSocket->sendText("Welcome !");
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received "
<< wireSize << " bytes"
<< msg->wireSize << " bytes"
<< std::endl;
webSocket->send(str);
webSocket->send(msg->str, msg->binary);
}
}
);

View File

@ -54,59 +54,54 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ping_pong: disconnected:"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason
<< str;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "ping_pong: received message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Ping)
else if (msg->type == ix::WebSocketMessageType::Ping)
{
ss << "ping_pong: received ping message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Pong)
else if (msg->type == ix::WebSocketMessageType::Pong)
{
ss << "ping_pong: received pong message: "
<< str;
<< msg->str;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else

View File

@ -183,41 +183,36 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
_condition.notify_one();
log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
ss << "ws_receive: transfered " << wireSize << " bytes";
ss << "ws_receive: transfered " << msg->wireSize << " bytes";
log(ss.str());
handleMessage(str);
handleMessage(msg->str);
_condition.notify_one();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str());
@ -229,13 +224,13 @@ namespace ix
std::this_thread::sleep_for(duration);
}
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else

View File

@ -112,42 +112,37 @@ namespace ix
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[this](const WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
_condition.notify_one();
log("ws_send: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
ss << "ws_send: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
ss << " code " << msg->closeInfo.code;
ss << " reason " << msg->closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
_condition.notify_one();
ss << "ws_send: received message (" << wireSize << " bytes)";
ss << "ws_send: received message (" << msg->wireSize << " bytes)";
log(ss.str());
std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg);
MsgPack data = MsgPack::parse(msg->str, errMsg);
if (!errMsg.empty())
{
std::cerr << "Invalid MsgPack response" << std::endl;
@ -160,13 +155,13 @@ namespace ix
std::cerr << "Invalid id" << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
log(ss.str());
}
else
@ -244,8 +239,8 @@ namespace ix
MsgPack msg(pdu);
Bench bench("Sending file through websocket");
_webSocket.send(msg.dump(),
[throttle](int current, int total) -> bool
_webSocket.sendBinary(msg.dump(),
[throttle](int current, int total) -> bool
{
std::cout << "ws_send: Step " << current << " out of " << total << std::endl;

View File

@ -21,52 +21,48 @@ namespace ix
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
[webSocket, connectionState, &server](const WebSocketMessagePtr& msg)
{
if (messageType == ix::WebSocketMessageType::Open)
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocketMessageType::Close)
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
<< " code " << msg->closeInfo.code
<< " reason " << msg->closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocketMessageType::Error)
else if (msg->type == ix::WebSocketMessageType::Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
ss << "Connection error: " << msg->errorInfo.reason << std::endl;
ss << "#retries: " << msg->errorInfo.retries << std::endl;
ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl;
ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocketMessageType::Fragment)
else if (msg->type == ix::WebSocketMessageType::Fragment)
{
std::cerr << "Received message fragment "
<< std::endl;
}
else if (messageType == ix::WebSocketMessageType::Message)
else if (msg->type == ix::WebSocketMessageType::Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::cerr << "Received " << msg->wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str,
client->send(msg->str,
msg->binary,
[](int current, int total) -> bool
{
std::cerr << "ws_transfer: Step " << current