Compare commits

..

4 Commits

Author SHA1 Message Date
Benjamin Sergeant
7a4a84d6e0 project builds / gross hack to disable compression code path / ws connect -x works but test fails 2020-07-07 19:25:18 -07:00
Benjamin Sergeant
fbe7b0b020 WebSocketPerMessageDeflateCompressor can work with vector or std::string 2020-07-07 18:17:44 -07:00
Benjamin Sergeant
afd9ef7d6f more templates in WebSocketTransport 2020-07-07 11:07:01 -07:00
Benjamin Sergeant
f772e40ad8 WebSocketPerMessageDeflateCompressor 2020-07-07 10:59:59 -07:00
44 changed files with 175 additions and 327 deletions

View File

@@ -58,7 +58,6 @@ set( IXWEBSOCKET_SOURCES
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
ixwebsocket/IXBench.h ixwebsocket/IXBench.h
ixwebsocket/IXCancellationRequest.h ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionInfo.h
ixwebsocket/IXConnectionState.h ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h ixwebsocket/IXDNSLookup.h
ixwebsocket/IXExponentialBackoff.h ixwebsocket/IXExponentialBackoff.h

View File

@@ -1,10 +1,6 @@
# Changelog # Changelog
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [9.9.0] - 2020-07-08
(socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made
## [9.8.6] - 2020-07-06 ## [9.8.6] - 2020-07-06
(cmake) change the way zlib and openssl are searched (cmake) change the way zlib and openssl are searched

View File

@@ -257,31 +257,28 @@ ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<WebSocket> webSocket, [&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState)
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg) [webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
{ {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
std::cout << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id // A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory // You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom // to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...) // attributes per connection (authenticated bool flag, attributes, etc...)
std::cout << "id: " << connectionState->getId() << std::endl; std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to. // The uri the client did connect to.
std::cout << "Uri: " << msg->openInfo.uri << std::endl; std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)
{ {
std::cout << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (msg->type == ix::WebSocketMessageType::Message) else if (msg->type == ix::WebSocketMessageType::Message)
@@ -420,14 +417,11 @@ If you want to handle how requests are processed, implement the setOnConnectionC
```cpp ```cpp
setOnConnectionCallback( setOnConnectionCallback(
[this](HttpRequestPtr request, [this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/, std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
{ {
// Build a string for the response // Build a string for the response
std::stringstream ss; std::stringstream ss;
ss << connectionInfo->remoteIp ss << request->method
<< " "
<< request->method
<< " " << " "
<< request->uri; << request->uri;

View File

@@ -45,11 +45,8 @@ namespace ix
} }
void RedisServer::handleConnection(std::unique_ptr<Socket> socket, void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState)
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
_connectedClientsCount++; _connectedClientsCount++;
while (!_stopHandlingConnections) while (!_stopHandlingConnections)

View File

@@ -44,8 +44,7 @@ namespace ix
// Methods // Methods
virtual void handleConnection(std::unique_ptr<Socket>, virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) final;
std::unique_ptr<ConnectionInfo> connectionInfo) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
bool startsWith(const std::string& str, const std::string& start); bool startsWith(const std::string& str, const std::string& start);

View File

@@ -61,19 +61,16 @@ namespace snake
_server.setOnConnectionCallback( _server.setOnConnectionCallback(
[this](std::shared_ptr<ix::WebSocket> webSocket, [this](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ix::ConnectionState> connectionState, std::shared_ptr<ix::ConnectionState> connectionState) {
std::unique_ptr<ix::ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState); auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[this, webSocket, state, remoteIp](const ix::WebSocketMessagePtr& msg) { [this, webSocket, state](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss; std::stringstream ss;
ix::LogLevel logLevel = ix::LogLevel::Debug; ix::LogLevel logLevel = ix::LogLevel::Debug;
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
ss << "New connection" << std::endl; ss << "New connection" << std::endl;
ss << "remote ip: " << remoteIp << std::endl;
ss << "id: " << state->getId() << std::endl; ss << "id: " << state->getId() << std::endl;
ss << "Uri: " << msg->openInfo.uri << std::endl; ss << "Uri: " << msg->openInfo.uri << std::endl;
ss << "Headers:" << std::endl; ss << "Headers:" << std::endl;

View File

@@ -1,25 +0,0 @@
/*
* IXConnectionInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
namespace ix
{
struct ConnectionInfo
{
std::string remoteIp;
int remotePort;
ConnectionInfo(const std::string& r = std::string(), int p = 0)
: remoteIp(r)
, remotePort(p)
{
;
}
};
} // namespace ix

View File

@@ -103,9 +103,9 @@ namespace ix
std::thread _thread; std::thread _thread;
std::unique_ptr<Socket> _socket; std::unique_ptr<Socket> _socket;
std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per client)
// client) the mutex needs to be recursive as this function // the mutex needs to be recursive as this function might
// might be called recursively to follow HTTP redirections // be called recursively to follow HTTP redirections
SocketTLSOptions _tlsOptions; SocketTLSOptions _tlsOptions;

View File

@@ -9,11 +9,11 @@
#include "IXNetSystem.h" #include "IXNetSystem.h"
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXUserAgent.h" #include "IXUserAgent.h"
#include <cstring>
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
#include <zlib.h> #include <zlib.h>
#include <cstring>
namespace namespace
{ {
@@ -50,11 +50,8 @@ namespace
const int windowBits = 15; const int windowBits = 15;
const int GZIP_ENCODING = 16; const int GZIP_ENCODING = 16;
deflateInit2(&zs, deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
Z_DEFAULT_COMPRESSION, windowBits | GZIP_ENCODING, 8,
Z_DEFLATED,
windowBits | GZIP_ENCODING,
8,
Z_DEFAULT_STRATEGY); Z_DEFAULT_STRATEGY);
zs.next_in = (Bytef*) str.data(); zs.next_in = (Bytef*) str.data();
@@ -72,12 +69,13 @@ namespace
ret = deflate(&zs, Z_FINISH); ret = deflate(&zs, Z_FINISH);
if (outstring.size() < zs.total_out) if(outstring.size() < zs.total_out)
{ {
// append the block to the output string // append the block to the output string
outstring.append(outbuffer, zs.total_out - outstring.size()); outstring.append(outbuffer,
zs.total_out - outstring.size());
} }
} while (ret == Z_OK); } while(ret == Z_OK);
deflateEnd(&zs); deflateEnd(&zs);
@@ -115,8 +113,7 @@ namespace ix
} }
void HttpServer::handleConnection(std::unique_ptr<Socket> socket, void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState)
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
_connectedClientsCount++; _connectedClientsCount++;
@@ -125,9 +122,7 @@ namespace ix
if (std::get<0>(ret)) if (std::get<0>(ret))
{ {
auto response = _onConnectionCallback(std::get<2>(ret), auto response = _onConnectionCallback(std::get<2>(ret), connectionState);
connectionState,
std::move(connectionInfo));
if (!Http::sendResponse(response, socket)) if (!Http::sendResponse(response, socket))
{ {
logError("Cannot send response"); logError("Cannot send response");
@@ -147,8 +142,7 @@ namespace ix
{ {
setOnConnectionCallback( setOnConnectionCallback(
[this](HttpRequestPtr request, [this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/, std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
std::string uri(request->uri); std::string uri(request->uri);
if (uri.empty() || uri == "/") if (uri.empty() || uri == "/")
{ {
@@ -178,8 +172,7 @@ namespace ix
// Log request // Log request
std::stringstream ss; std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " " ss << request->method << " " << request->headers["User-Agent"] << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri << " " << content.size(); << request->uri << " " << content.size();
logInfo(ss.str()); logInfo(ss.str());
@@ -205,15 +198,13 @@ namespace ix
setOnConnectionCallback( setOnConnectionCallback(
[this, [this,
redirectUrl](HttpRequestPtr request, redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/, std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["Server"] = userAgent(); headers["Server"] = userAgent();
// Log request // Log request
std::stringstream ss; std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " " ss << request->method << " " << request->headers["User-Agent"] << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri; << request->uri;
logInfo(ss.str()); logInfo(ss.str());

View File

@@ -23,9 +23,7 @@ namespace ix
{ {
public: public:
using OnConnectionCallback = using OnConnectionCallback =
std::function<HttpResponsePtr(HttpRequestPtr, std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>;
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
HttpServer(int port = SocketServer::kDefaultPort, HttpServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
@@ -46,8 +44,7 @@ namespace ix
// Methods // Methods
virtual void handleConnection(std::unique_ptr<Socket>, virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) final;
std::unique_ptr<ConnectionInfo> connectionInfo) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
void setDefaultConnectionCallback(); void setDefaultConnectionCallback();

View File

@@ -276,7 +276,6 @@ namespace ix
} }
// Accept a connection. // Accept a connection.
// FIXME: Is this working for ipv6 ?
struct sockaddr_in client; // client address information struct sockaddr_in client; // client address information
int clientFd; // socket connected to client int clientFd; // socket connected to client
socklen_t addressLen = sizeof(client); socklen_t addressLen = sizeof(client);
@@ -308,45 +307,6 @@ namespace ix
continue; continue;
} }
std::unique_ptr<ConnectionInfo> connectionInfo;
if (_addressFamily == AF_INET)
{
char remoteIp[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error calling inet_ntop (ipv4): " << err << ", "
<< strerror(err);
logError(ss.str());
Socket::closeSocket(clientFd);
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
}
else // AF_INET6
{
char remoteIp[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error calling inet_ntop (ipv6): " << err << ", "
<< strerror(err);
logError(ss.str());
Socket::closeSocket(clientFd);
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
}
std::shared_ptr<ConnectionState> connectionState; std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory) if (_connectionStateFactory)
{ {
@@ -382,7 +342,7 @@ namespace ix
_connectionsThreads.push_back(std::make_pair( _connectionsThreads.push_back(std::make_pair(
connectionState, connectionState,
std::thread( std::thread(
&SocketServer::handleConnection, this, std::move(socket), connectionState, std::move(connectionInfo)))); &SocketServer::handleConnection, this, std::move(socket), connectionState)));
} }
} }

View File

@@ -6,7 +6,6 @@
#pragma once #pragma once
#include "IXConnectionInfo.h"
#include "IXConnectionState.h" #include "IXConnectionState.h"
#include "IXSocketTLSOptions.h" #include "IXSocketTLSOptions.h"
#include <atomic> #include <atomic>
@@ -103,8 +102,7 @@ namespace ix
ConnectionStateFactory _connectionStateFactory; ConnectionStateFactory _connectionStateFactory;
virtual void handleConnection(std::unique_ptr<Socket>, virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) = 0;
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
// Returns true if all connection threads are joined // Returns true if all connection threads are joined

View File

@@ -32,8 +32,8 @@
#include "IXUrlParser.h" #include "IXUrlParser.h"
#include <algorithm> #include <algorithm>
#include <cstdlib>
#include <cstring> #include <cstring>
#include <cstdlib>
namespace namespace
{ {

View File

@@ -429,6 +429,16 @@ namespace ix
return (binary) ? sendBinary(data, onProgressCallback) : sendText(data, onProgressCallback); return (binary) ? sendBinary(data, onProgressCallback) : sendText(data, onProgressCallback);
} }
WebSocketSendInfo WebSocket::sendBinary(const std::vector<uint8_t>& data,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
std::lock_guard<std::mutex> lock(_writeMutex);
auto webSocketSendInfo = _ws.sendBinary(data, onProgressCallback);
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
return webSocketSendInfo;
}
WebSocketSendInfo WebSocket::sendBinary(const std::string& text, WebSocketSendInfo WebSocket::sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {

View File

@@ -74,8 +74,11 @@ namespace ix
WebSocketSendInfo send(const std::string& data, WebSocketSendInfo send(const std::string& data,
bool binary = false, bool binary = false,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::string& text, WebSocketSendInfo sendBinary(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendBinary(const std::vector<uint8_t>& data,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text, WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);

View File

@@ -75,26 +75,22 @@ namespace ix
return compressData(in, out); return compressData(in, out);
} }
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::vector<uint8_t>& out)
std::vector<uint8_t>& out)
{ {
return compressData(in, out); return compressData(in, out);
} }
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, std::string& out)
std::string& out)
{ {
return compressData(in, out); return compressData(in, out);
} }
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out)
std::vector<uint8_t>& out)
{ {
return compressData(in, out); return compressData(in, out);
} }
template<typename T, typename S> template<typename T, typename S> bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
{ {
// //
// 7.2.1. Compression // 7.2.1. Compression

View File

@@ -26,10 +26,8 @@ namespace ix
bool compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out); bool compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out);
private: private:
template<typename T, typename S> template<typename T, typename S> bool compressData(const T& in, S& out);
bool compressData(const T& in, S& out); template<typename T> bool endsWithEmptyUnCompressedBlock(const T& value);
template<typename T>
bool endsWithEmptyUnCompressedBlock(const T& value);
int _flush; int _flush;
size_t _compressBufferSize; size_t _compressBufferSize;

View File

@@ -72,13 +72,12 @@ namespace ix
} }
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket, void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState)
std::unique_ptr<ConnectionInfo> connectionInfo)
{ {
setThreadName("WebSocketServer::" + connectionState->getId()); setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>(); auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo)); _onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection(); webSocket->disableAutomaticReconnection();

View File

@@ -23,8 +23,7 @@ namespace ix
{ {
public: public:
using OnConnectionCallback = using OnConnectionCallback =
std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>, std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>;
std::unique_ptr<ConnectionInfo> connectionInfo)>;
WebSocketServer(int port = SocketServer::kDefaultPort, WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
@@ -61,8 +60,7 @@ namespace ix
// Methods // Methods
virtual void handleConnection(std::unique_ptr<Socket> socket, virtual void handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) final;
std::unique_ptr<ConnectionInfo> connectionInfo);
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
}; };
} // namespace ix } // namespace ix

View File

@@ -751,11 +751,28 @@ namespace ix
return static_cast<unsigned>(seconds); return static_cast<unsigned>(seconds);
} }
template<class T>
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type, WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const T& message, const std::string& message,
bool compress, bool compress,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{
return sendRawData(type, message, compress, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::vector<uint8_t>& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
return sendRawData(type, message, compress, onProgressCallback);
}
template<class T>
WebSocketSendInfo WebSocketTransport::sendRawData(wsheader_type::opcode_type type,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{ {
if (_readyState != ReadyState::OPEN && _readyState != ReadyState::CLOSING) if (_readyState != ReadyState::OPEN && _readyState != ReadyState::CLOSING)
{ {
@@ -766,12 +783,14 @@ namespace ix
size_t wireSize = message.size(); size_t wireSize = message.size();
bool compressionError = false; bool compressionError = false;
auto message_begin = message.cbegin(); auto message_begin = message.begin();
auto message_end = message.cend(); auto message_end = message.end();
#if 0
if (compress) if (compress)
{ {
if (!_perMessageDeflate->compress(message, _compressedMessage)) T compressedMessage;
if (!_perMessageDeflate->compress(message, compressedMessage))
{ {
bool success = false; bool success = false;
compressionError = true; compressionError = true;
@@ -780,11 +799,12 @@ namespace ix
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize); return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
} }
compressionError = false; compressionError = false;
wireSize = _compressedMessage.size(); wireSize = compressedMessage.size();
message_begin = _compressedMessage.cbegin(); message_begin = compressedMessage.begin();
message_end = _compressedMessage.cend(); message_end = compressedMessage.end();
} }
#endif
{ {
std::lock_guard<std::mutex> lock(_txbufMutex); std::lock_guard<std::mutex> lock(_txbufMutex);
@@ -810,8 +830,8 @@ namespace ix
// //
auto steps = wireSize / kChunkSize; auto steps = wireSize / kChunkSize;
std::string::const_iterator begin = message_begin; auto begin = message_begin;
std::string::const_iterator end = message_end; auto end = message_end;
for (uint64_t i = 0; i < steps; ++i) for (uint64_t i = 0; i < steps; ++i)
{ {
@@ -964,6 +984,14 @@ namespace ix
return info; return info;
} }
WebSocketSendInfo WebSocketTransport::sendBinary(const std::vector<uint8_t>& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(
wsheader_type::BINARY_FRAME, message, _enablePerMessageDeflate, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message, WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)

View File

@@ -86,6 +86,9 @@ namespace ix
WebSocketInitResult connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs); WebSocketInitResult connectToSocket(std::unique_ptr<Socket> socket, int timeoutSecs);
PollResult poll(); PollResult poll();
WebSocketSendInfo sendBinary(const std::vector<uint8_t>& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendBinary(const std::string& message, WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback); const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message, WebSocketSendInfo sendText(const std::string& message,
@@ -190,7 +193,7 @@ namespace ix
std::atomic<bool> _enablePerMessageDeflate; std::atomic<bool> _enablePerMessageDeflate;
std::string _decompressedMessage; std::string _decompressedMessage;
std::string _compressedMessage; std::vector<uint8_t> _compressedMessage;
// Used to control TLS connection behavior // Used to control TLS connection behavior
SocketTLSOptions _socketTLSOptions; SocketTLSOptions _socketTLSOptions;
@@ -239,15 +242,28 @@ namespace ix
bool sendOnSocket(); bool sendOnSocket();
bool receiveFromSocket(); bool receiveFromSocket();
template<class T>
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const T& message, const std::string& message,
bool compress, bool compress,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::vector<uint8_t>& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
template<class T>
WebSocketSendInfo sendRawData(wsheader_type::opcode_type type,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
template<class Iterator> template<class Iterator>
bool sendFragment( bool sendFragment(wsheader_type::opcode_type type,
wsheader_type::opcode_type type, bool fin, Iterator begin, Iterator end, bool compress); bool fin,
Iterator begin,
Iterator end,
bool compress);
void emitMessage(MessageKind messageKind, void emitMessage(MessageKind messageKind,
const std::string& message, const std::string& message,

View File

@@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "9.9.0" #define IX_WEBSOCKET_VERSION "9.8.6"

View File

@@ -104,10 +104,8 @@ test_server:
# env TEST=Websocket_server make test # env TEST=Websocket_server make test
# env TEST=Websocket_chat make test # env TEST=Websocket_chat make test
# env TEST=heartbeat make test # env TEST=heartbeat make test
build_test: test:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. ; ninja install) mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; ninja install)
test: build_test
(cd test ; python2.7 run.py -r) (cd test ; python2.7 run.py -r)
test_make: test_make:

View File

@@ -55,7 +55,6 @@ set (SOURCES
IXSentryClientTest.cpp IXSentryClientTest.cpp
IXWebSocketChatTest.cpp IXWebSocketChatTest.cpp
IXWebSocketBroadcastTest.cpp IXWebSocketBroadcastTest.cpp
IXWebSocketPerMessageDeflateCompressorTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet

View File

@@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
#include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>
@@ -95,15 +95,13 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
sentryServer.setOnConnectionCallback( sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request, [](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/, std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["Server"] = userAgent(); headers["Server"] = userAgent();
// Log request // Log request
std::stringstream ss; std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " " ss << request->method << " " << request->headers["User-Agent"] << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri; << request->uri;
if (request->method == "POST") if (request->method == "POST")

View File

@@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
#include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>

View File

@@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h> #include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
#include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h> #include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h> #include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h> #include <ixwebsocket/IXUserAgent.h>

View File

@@ -67,8 +67,7 @@ TEST_CASE("http server", "[httpd]")
TEST_CASE("http server redirection", "[httpd_redirect]") TEST_CASE("http server redirection", "[httpd_redirect]")
{ {
SECTION( SECTION("Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
"Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
{ {
int port = getFreePort(); int port = getFreePort();
ix::HttpServer server(port, "127.0.0.1"); ix::HttpServer server(port, "127.0.0.1");

View File

@@ -85,15 +85,12 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server) bool startWebSocketEchoServer(ix::WebSocketServer& server)
{ {
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) { [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New connection"; TLogger() << "New connection";
TLogger() << "Remote ip: " << remoteIp;
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";
for (auto it : msg->openInfo.headers) for (auto it : msg->openInfo.headers)

View File

@@ -191,16 +191,13 @@ namespace
server.setOnConnectionCallback([&server, &connectionId]( server.setOnConnectionCallback([&server, &connectionId](
std::shared_ptr<ix::WebSocket> webSocket, std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) { webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
const ix::WebSocketMessagePtr& msg) { const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New connection"; TLogger() << "New connection";
connectionState->computeId(); connectionState->computeId();
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId(); TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";

View File

@@ -100,6 +100,7 @@ namespace
} }
_webSocket.setUrl(url); _webSocket.setUrl(url);
_webSocket.disablePerMessageDeflate();
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + url);
@@ -188,22 +189,20 @@ namespace
void WebSocketChat::sendMessage(const std::string& text) void WebSocketChat::sendMessage(const std::string& text)
{ {
_webSocket.sendBinary(encodeMessage(text)); auto msg = encodeMessage(text);
std::vector<uint8_t> data(text.begin(), text.end());
_webSocket.sendBinary(data);
} }
bool startServer(ix::WebSocketServer& server) bool startServer(ix::WebSocketServer& server)
{ {
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) { [webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New connection"; TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId(); TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";

View File

@@ -171,12 +171,9 @@ namespace
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite]( [&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
std::shared_ptr<ix::WebSocket> webSocket, std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, webSocket->setOnMessageCallback([webSocket,
connectionState, connectionState,
remoteIp,
&receivedCloseCode, &receivedCloseCode,
&receivedCloseReason, &receivedCloseReason,
&receivedCloseRemote, &receivedCloseRemote,
@@ -184,7 +181,6 @@ namespace
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New server connection"; TLogger() << "New server connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId(); TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";

View File

@@ -1,76 +0,0 @@
/*
* IXWebSocketPerMessageDeflateCodecTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*
* make build_test && build/test/ixwebsocket_unittest per-message-deflate-codec
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include <ixwebsocket/IXWebSocketPerMessageDeflateCodec.h>
#include <string.h>
using namespace ix;
namespace ix
{
std::string compressAndDecompress(const std::string& a)
{
std::string b, c;
WebSocketPerMessageDeflateCompressor compressor;
compressor.init(11, true);
compressor.compress(a, b);
WebSocketPerMessageDeflateDecompressor decompressor;
decompressor.init(11, true);
decompressor.decompress(b, c);
return c;
}
std::string compressAndDecompressVector(const std::string& a)
{
std::string b, c;
std::vector<uint8_t> vec(a.begin(), a.end());
WebSocketPerMessageDeflateCompressor compressor;
compressor.init(11, true);
compressor.compress(vec, b);
WebSocketPerMessageDeflateDecompressor decompressor;
decompressor.init(11, true);
decompressor.decompress(b, c);
return c;
}
TEST_CASE("per-message-deflate-codec", "[zlib]")
{
SECTION("string api")
{
REQUIRE(compressAndDecompress("") == "");
REQUIRE(compressAndDecompress("foo") == "foo");
REQUIRE(compressAndDecompress("bar") == "bar");
REQUIRE(compressAndDecompress("asdcaseqw`21897dehqwed") == "asdcaseqw`21897dehqwed");
REQUIRE(compressAndDecompress("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
}
SECTION("vector api")
{
REQUIRE(compressAndDecompressVector("") == "");
REQUIRE(compressAndDecompressVector("foo") == "foo");
REQUIRE(compressAndDecompressVector("bar") == "bar");
REQUIRE(compressAndDecompressVector("asdcaseqw`21897dehqwed") ==
"asdcaseqw`21897dehqwed");
REQUIRE(
compressAndDecompressVector("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
}
}
} // namespace ix

View File

@@ -35,16 +35,13 @@ namespace ix
server.setOnConnectionCallback([&server, &connectionId]( server.setOnConnectionCallback([&server, &connectionId](
std::shared_ptr<ix::WebSocket> webSocket, std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) { webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
const ix::WebSocketMessagePtr& msg) { const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New connection"; TLogger() << "New connection";
connectionState->computeId(); connectionState->computeId();
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId(); TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";

View File

@@ -18,15 +18,12 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
{ {
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket, [&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) { webSocket->setOnMessageCallback([webSocket, connectionState, &server, &subProtocols](
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server, &subProtocols](
const ix::WebSocketMessagePtr& msg) { const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
TLogger() << "New connection"; TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId(); TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri; TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:"; TLogger() << "Headers:";

View File

@@ -122,8 +122,6 @@
#pragma once #pragma once
#include <string>
namespace linenoise namespace linenoise
{ {
bool Readline(const char *prompt, std::string& line); bool Readline(const char *prompt, std::string& line);

View File

@@ -11,13 +11,13 @@
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <fstream> #include <fstream>
#include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixbots/IXCobraToPythonBot.h> #include <ixbots/IXCobraToPythonBot.h>
#include <ixbots/IXCobraToSentryBot.h> #include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h> #include <ixbots/IXCobraToStatsdBot.h>
#include <ixbots/IXCobraToStdoutBot.h> #include <ixbots/IXCobraToStdoutBot.h>
#include <ixcore/utils/IXCoreLogger.h> #include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixredis/IXRedisClient.h> #include <ixredis/IXRedisClient.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixsentry/IXSentryClient.h> #include <ixsentry/IXSentryClient.h>
#include <ixwebsocket/IXNetSystem.h> #include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
@@ -193,7 +193,8 @@ int main(int argc, char** argv)
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute"); "--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
app->add_option( app->add_option(
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute"); "--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
app->add_option("--batch_size", cobraBotConfig.batchSize, "Subscription batch size"); app->add_option(
"--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
}; };
app.add_flag("--version", version, "Print ws version"); app.add_flag("--version", version, "Print ws version");
@@ -357,8 +358,7 @@ int main(int argc, char** argv)
cobra2python->add_option("--host", hostname, "Statsd host"); cobra2python->add_option("--host", hostname, "Statsd host");
cobra2python->add_option("--port", statsdPort, "Statsd port"); cobra2python->add_option("--port", statsdPort, "Statsd port");
cobra2python->add_option("--prefix", prefix, "Statsd prefix"); cobra2python->add_option("--prefix", prefix, "Statsd prefix");
cobra2python->add_option("--script", scriptPath, "Python script path") cobra2python->add_option("--script", scriptPath, "Python script path")->check(CLI::ExistingPath);
->check(CLI::ExistingPath);
cobra2python->add_option("--pidfile", pidfile, "Pid file"); cobra2python->add_option("--pidfile", pidfile, "Pid file");
addTLSOptions(cobra2python); addTLSOptions(cobra2python);
addCobraBotConfig(cobra2python); addCobraBotConfig(cobra2python);
@@ -604,7 +604,8 @@ int main(int argc, char** argv)
} }
else else
{ {
ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, scriptPath); ret = (int) ix::cobra_to_python_bot(
cobraBotConfig, statsdClient, scriptPath);
} }
} }
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
@@ -619,12 +620,14 @@ int main(int argc, char** argv)
ix::RedisClient redisClient; ix::RedisClient redisClient;
if (!redisClient.connect(hostname, redisPort)) if (!redisClient.connect(hostname, redisPort))
{ {
spdlog::error("Cannot connect to redis host {}:{}", redisHosts, redisPort); spdlog::error("Cannot connect to redis host {}:{}",
redisHosts, redisPort);
return 1; return 1;
} }
else else
{ {
ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose); ret = (int) ix::cobra_metrics_to_redis_bot(
cobraBotConfig, redisClient, verbose);
} }
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))

View File

@@ -64,7 +64,9 @@ namespace ix
bool disablePerMessageDeflate, bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions); const ix::SocketTLSOptions& tlsOptions);
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password); int ws_redis_cli_main(const std::string& hostname,
int port,
const std::string& password);
int ws_redis_publish_main(const std::string& hostname, int ws_redis_publish_main(const std::string& hostname,
int port, int port,

View File

@@ -21,15 +21,12 @@ namespace ix
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket, server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) { webSocket->setOnMessageCallback([webSocket, connectionState, &server](
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("New connection"); spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId()); spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:"); spdlog::info("Headers:");

View File

@@ -6,12 +6,12 @@
#include "IXBench.h" #include "IXBench.h"
#include "linenoise.hpp" #include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
namespace ix namespace ix
@@ -216,7 +216,8 @@ namespace ix
{ {
if (_binaryMode) if (_binaryMode)
{ {
_webSocket.sendBinary(text); std::vector<uint8_t> data(text.begin(), text.end());
_webSocket.sendBinary(data);
} }
else else
{ {

View File

@@ -44,15 +44,12 @@ namespace ix
server.setOnConnectionCallback( server.setOnConnectionCallback(
[greetings](std::shared_ptr<ix::WebSocket> webSocket, [greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, remoteIp, greetings](const WebSocketMessagePtr& msg) { [webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("New connection"); spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId()); spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:"); spdlog::info("Headers:");

View File

@@ -56,18 +56,15 @@ namespace ix
server.setOnConnectionCallback([remoteUrl, server.setOnConnectionCallback([remoteUrl,
verbose](std::shared_ptr<ix::WebSocket> webSocket, verbose](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState); auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
// Server connection // Server connection
state->webSocket().setOnMessageCallback([webSocket, state, remoteIp, verbose]( state->webSocket().setOnMessageCallback([webSocket, state, verbose](
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("New connection to remote server"); spdlog::info("New connection to remote server");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", state->getId()); spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:"); spdlog::info("Headers:");

View File

@@ -4,15 +4,17 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/ */
#include "linenoise.hpp"
#include <iostream>
#include <ixredis/IXRedisClient.h> #include <ixredis/IXRedisClient.h>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>
#include <iostream>
#include "linenoise.hpp"
namespace ix namespace ix
{ {
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password) int ws_redis_cli_main(const std::string& hostname,
int port,
const std::string& password)
{ {
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
@@ -69,7 +71,9 @@ namespace ix
{ {
if (response.first != RespType::String) if (response.first != RespType::String)
{ {
std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")" std::cout << "("
<< redisClient.getRespTypeDescription(response.first)
<< ")"
<< " "; << " ";
} }

View File

@@ -20,15 +20,12 @@ namespace ix
server.setTLSOptions(tlsOptions); server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket, server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState, std::shared_ptr<ConnectionState> connectionState) {
std::unique_ptr<ConnectionInfo> connectionInfo) { webSocket->setOnMessageCallback([webSocket, connectionState, &server](
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
const WebSocketMessagePtr& msg) { const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
spdlog::info("ws_transfer: New connection"); spdlog::info("ws_transfer: New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId()); spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri); spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:"); spdlog::info("Headers:");