Compare commits

...

16 Commits

Author SHA1 Message Date
f7031d0d3e set thread name in only one file 2020-07-17 11:43:50 -07:00
595e6c57df IXSelectInterruptPipe.h included in cmake on windows but compiled out 2020-07-17 11:33:02 -07:00
87709c201e (socket server) bump default max connection count from 32 to 128 2020-07-10 17:11:11 -07:00
e70d83ace1 (snake) implement super simple stream sql expression support in snake server 2020-07-10 16:10:59 -07:00
ca829a3a98 implement very very simple stream sql support 2020-07-10 16:07:51 -07:00
26a1e63626 snake: stream sql mock + add republished channel option 2020-07-10 15:06:55 -07:00
c98959b895 comment out unittest which cannot be activated yet 2020-07-09 10:34:52 -07:00
baf18648e9 Added test for websocket leak (#225)
* Added test for websocket leak

* Fixed test
2020-07-09 10:19:44 -07:00
b21306376b uwp build fix + more ivp6 support 2020-07-08 12:38:55 -07:00
fbd17685a1 (socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made (see #222) / only ipv4 is handled 2020-07-08 12:10:35 -07:00
3a673575dd clang format 2020-07-08 10:39:46 -07:00
d5e51840ab use const iterators 2020-07-08 10:34:14 -07:00
543c2086b2 more templates in WebSocketTransport 2020-07-07 21:26:42 -07:00
95eab59c08 WebSocketPerMessageDeflateCompressor can work with vector or std::string 2020-07-07 21:26:04 -07:00
e9e768a288 better unittest for IXWebSocketPerMessageDeflateCompressor 2020-07-07 21:15:34 -07:00
e2180a1f31 add unittest for IXWebSocketPerMessageDeflateCompressor 2020-07-07 20:56:38 -07:00
57 changed files with 820 additions and 241 deletions

View File

@ -36,6 +36,8 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSetThreadName.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
@ -58,6 +60,7 @@ set( IXWEBSOCKET_SOURCES
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXBench.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionInfo.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXExponentialBackoff.h
@ -68,6 +71,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h
@ -98,23 +102,6 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketVersion.h
)
if (UNIX)
# Linux, Mac, iOS, Android
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.cpp )
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.h )
endif()
# Platform specific code
if (APPLE)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/apple/IXSetThreadName_apple.cpp)
elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
elseif (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/freebsd/IXSetThreadName_freebsd.cpp)
else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
endif()
option(USE_TLS "Enable TLS support" FALSE)
if (USE_TLS)

View File

@ -1,6 +1,18 @@
# Changelog
All changes to this project will be documented in this file.
## [9.9.2] - 2020-07-10
(socket server) bump default max connection count from 32 to 128
## [9.9.1] - 2020-07-10
(snake) implement super simple stream sql expression support in snake server
## [9.9.0] - 2020-07-08
(socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made
## [9.8.6] - 2020-07-06
(cmake) change the way zlib and openssl are searched

View File

@ -257,28 +257,31 @@ ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
{
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "New connection" << std::endl;
std::cout << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cout << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
std::cout << "Headers:" << std::endl;
for (auto it : msg->openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (msg->type == ix::WebSocketMessageType::Message)
@ -417,11 +420,14 @@ If you want to handle how requests are processed, implement the setOnConnectionC
```cpp
setOnConnectionCallback(
[this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
{
// Build a string for the response
std::stringstream ss;
ss << request->method
ss << connectionInfo->remoteIp
<< " "
<< request->method
<< " "
<< request->uri;

View File

@ -45,8 +45,11 @@ namespace ix
}
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
_connectedClientsCount++;
while (!_stopHandlingConnections)

View File

@ -44,7 +44,8 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) final;
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
virtual size_t getConnectedClientsCount() final;
bool startsWith(const std::string& str, const std::string& start);

View File

@ -7,12 +7,14 @@ set (IXSNAKE_SOURCES
ixsnake/IXSnakeServer.cpp
ixsnake/IXSnakeProtocol.cpp
ixsnake/IXAppConfig.cpp
ixsnake/IXStreamSql.cpp
)
set (IXSNAKE_HEADERS
ixsnake/IXSnakeServer.h
ixsnake/IXSnakeProtocol.h
ixsnake/IXAppConfig.h
ixsnake/IXStreamSql.h
)
add_library(ixsnake STATIC

View File

@ -33,6 +33,9 @@ namespace snake
// Misc
bool verbose;
bool disablePong;
// If non empty, every published message gets republished to a given channel
std::string republishChannel;
};
bool isAppKeyValid(const AppConfig& appConfig, std::string appkey);

View File

@ -30,6 +30,7 @@ namespace snake
{
return _appkey;
}
void setAppkey(const std::string& appkey)
{
_appkey = appkey;
@ -39,6 +40,7 @@ namespace snake
{
return _role;
}
void setRole(const std::string& role)
{
_role = role;

View File

@ -8,6 +8,7 @@
#include "IXAppConfig.h"
#include "IXSnakeConnectionState.h"
#include "IXStreamSql.h"
#include "nlohmann/json.hpp"
#include <iostream>
#include <ixcore/utils/IXCoreLogger.h>
@ -91,6 +92,7 @@ namespace snake
void handlePublish(std::shared_ptr<SnakeConnectionState> state,
std::shared_ptr<ix::WebSocket> ws,
const AppConfig& appConfig,
const nlohmann::json& pdu)
{
std::vector<std::string> channels;
@ -115,6 +117,12 @@ namespace snake
return;
}
// add an extra channel if the config has one specified
if (!appConfig.republishChannel.empty())
{
channels.push_back(appConfig.republishChannel);
}
for (auto&& channel : channels)
{
std::stringstream ss;
@ -180,12 +188,25 @@ namespace snake
}
}
std::string filterStr;
if (pdu["body"].find("filter") != pdu["body"].end())
{
std::string filterStr = pdu["body"]["filter"];
}
std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr);
int id = 0;
auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) {
auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) {
auto msg = nlohmann::json::parse(messageStr);
msg = msg["body"]["message"];
if (streamSql->valid() && !streamSql->match(msg))
{
return;
}
nlohmann::json response = {
{"action", "rtm/subscription/data"},
{"id", id++},
@ -279,7 +300,7 @@ namespace snake
}
else if (action == "rtm/publish")
{
handlePublish(state, ws, pdu);
handlePublish(state, ws, appConfig, pdu);
}
else if (action == "rtm/subscribe")
{

View File

@ -61,16 +61,19 @@ namespace snake
_server.setOnConnectionCallback(
[this](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ix::ConnectionState> connectionState) {
std::shared_ptr<ix::ConnectionState> connectionState,
std::unique_ptr<ix::ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback(
[this, webSocket, state](const ix::WebSocketMessagePtr& msg) {
[this, webSocket, state, remoteIp](const ix::WebSocketMessagePtr& msg) {
std::stringstream ss;
ix::LogLevel logLevel = ix::LogLevel::Debug;
if (msg->type == ix::WebSocketMessageType::Open)
{
ss << "New connection" << std::endl;
ss << "remote ip: " << remoteIp << std::endl;
ss << "id: " << state->getId() << std::endl;
ss << "Uri: " << msg->openInfo.uri << std::endl;
ss << "Headers:" << std::endl;

View File

@ -0,0 +1,63 @@
/*
* IXStreamSql.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*
* Super simple hacked up version of a stream sql expression,
* that only supports non nested field evaluation
*/
#include "IXStreamSql.h"
#include <sstream>
#include <iostream>
namespace snake
{
StreamSql::StreamSql(const std::string& sqlFilter)
: _valid(false)
{
std::string token;
std::stringstream tokenStream(sqlFilter);
std::vector<std::string> tokens;
// Split by ' '
while (std::getline(tokenStream, token, ' '))
{
tokens.push_back(token);
}
_valid = tokens.size() == 8;
if (!_valid) return;
_field = tokens[5];
_operator = tokens[6];
_value = tokens[7];
// remove single quotes
_value = _value.substr(1, _value.size() - 2);
if (_operator == "LIKE")
{
_value = _value.substr(1, _value.size() - 2);
}
}
bool StreamSql::valid() const
{
return _valid;
}
bool StreamSql::match(const nlohmann::json& msg)
{
if (!_valid) return false;
if (msg.find(_field) == msg.end())
{
return false;
}
std::string value = msg[_field];
return value == _value;
}
} // namespace snake

View File

@ -0,0 +1,29 @@
/*
* IXStreamSql.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include "nlohmann/json.hpp"
namespace snake
{
class StreamSql
{
public:
StreamSql(const std::string& sqlFilter = std::string());
~StreamSql() = default;
bool match(const nlohmann::json& msg);
bool valid() const;
private:
std::string _field;
std::string _operator;
std::string _value;
bool _valid;
};
}

View File

@ -0,0 +1,25 @@
/*
* IXConnectionInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
namespace ix
{
struct ConnectionInfo
{
std::string remoteIp;
int remotePort;
ConnectionInfo(const std::string& r = std::string(), int p = 0)
: remoteIp(r)
, remotePort(p)
{
;
}
};
} // namespace ix

View File

@ -103,9 +103,9 @@ namespace ix
std::thread _thread;
std::unique_ptr<Socket> _socket;
std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per client)
// the mutex needs to be recursive as this function might
// be called recursively to follow HTTP redirections
std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per
// client) the mutex needs to be recursive as this function
// might be called recursively to follow HTTP redirections
SocketTLSOptions _tlsOptions;

View File

@ -9,11 +9,11 @@
#include "IXNetSystem.h"
#include "IXSocketConnect.h"
#include "IXUserAgent.h"
#include <cstring>
#include <fstream>
#include <sstream>
#include <vector>
#include <zlib.h>
#include <cstring>
namespace
{
@ -50,8 +50,11 @@ namespace
const int windowBits = 15;
const int GZIP_ENCODING = 16;
deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
windowBits | GZIP_ENCODING, 8,
deflateInit2(&zs,
Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
windowBits | GZIP_ENCODING,
8,
Z_DEFAULT_STRATEGY);
zs.next_in = (Bytef*) str.data();
@ -69,13 +72,12 @@ namespace
ret = deflate(&zs, Z_FINISH);
if(outstring.size() < zs.total_out)
if (outstring.size() < zs.total_out)
{
// append the block to the output string
outstring.append(outbuffer,
zs.total_out - outstring.size());
outstring.append(outbuffer, zs.total_out - outstring.size());
}
} while(ret == Z_OK);
} while (ret == Z_OK);
deflateEnd(&zs);
@ -113,7 +115,8 @@ namespace ix
}
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{
_connectedClientsCount++;
@ -122,7 +125,9 @@ namespace ix
if (std::get<0>(ret))
{
auto response = _onConnectionCallback(std::get<2>(ret), connectionState);
auto response = _onConnectionCallback(std::get<2>(ret),
connectionState,
std::move(connectionInfo));
if (!Http::sendResponse(response, socket))
{
logError("Cannot send response");
@ -142,7 +147,8 @@ namespace ix
{
setOnConnectionCallback(
[this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
std::string uri(request->uri);
if (uri.empty() || uri == "/")
{
@ -172,7 +178,8 @@ namespace ix
// Log request
std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " "
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri << " " << content.size();
logInfo(ss.str());
@ -198,13 +205,15 @@ namespace ix
setOnConnectionCallback(
[this,
redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " "
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
logInfo(ss.str());

View File

@ -23,7 +23,9 @@ namespace ix
{
public:
using OnConnectionCallback =
std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>;
std::function<HttpResponsePtr(HttpRequestPtr,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
HttpServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
@ -44,7 +46,8 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) final;
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
virtual size_t getConnectedClientsCount() final;
void setDefaultConnectionCallback();

View File

@ -5,8 +5,10 @@
*/
//
// On macOS we use UNIX pipes to wake up select.
// On UNIX we use pipes to wake up select. There is no way to do that
// on Windows so this file is compiled out on Windows.
//
#ifndef _WIN32
#include "IXSelectInterruptPipe.h"
@ -144,3 +146,5 @@ namespace ix
return _fildes[kPipeReadIndex];
}
} // namespace ix
#endif // !_WIN32

View File

@ -0,0 +1,81 @@
/*
* IXSetThreadName.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXSetThreadName.h"
// unix systems
#if defined(__APPLE__) || defined(__linux__) || defined(BSD)
# include <pthread.h>
#endif
// freebsd needs this header as well
#if defined(BSD)
# include <pthread_np.h>
#endif
// Windows
#ifdef _WIN32
# include <Windows.h>
#endif
namespace ix
{
#ifdef _WIN32
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void SetThreadName(DWORD dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(
MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
#endif
void setThreadName(const std::string& name)
{
#if defined(__APPLE__)
//
// Apple reserves 16 bytes for its thread names
// Notice that the Apple version of pthread_setname_np
// does not take a pthread_t argument
//
pthread_setname_np(name.substr(0, 63).c_str());
#elif defined(__linux__)
//
// Linux only reserves 16 bytes for its thread names
// See prctl and PR_SET_NAME property in
// http://man7.org/linux/man-pages/man2/prctl.2.html
//
pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
#elif defined(_WIN32)
SetThreadName(-1, name.c_str());
#elif defined(BSD)
pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str());
#else
// ... assert here ?
#endif
}
} // namespace ix

View File

@ -22,7 +22,7 @@ namespace ix
const int SocketServer::kDefaultPort(8080);
const std::string SocketServer::kDefaultHost("127.0.0.1");
const int SocketServer::kDefaultTcpBacklog(5);
const size_t SocketServer::kDefaultMaxConnections(32);
const size_t SocketServer::kDefaultMaxConnections(128);
const int SocketServer::kDefaultAddressFamily(AF_INET);
SocketServer::SocketServer(
@ -276,6 +276,7 @@ namespace ix
}
// Accept a connection.
// FIXME: Is this working for ipv6 ?
struct sockaddr_in client; // client address information
int clientFd; // socket connected to client
socklen_t addressLen = sizeof(client);
@ -307,6 +308,45 @@ namespace ix
continue;
}
std::unique_ptr<ConnectionInfo> connectionInfo;
if (_addressFamily == AF_INET)
{
char remoteIp[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error calling inet_ntop (ipv4): " << err << ", "
<< strerror(err);
logError(ss.str());
Socket::closeSocket(clientFd);
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
}
else // AF_INET6
{
char remoteIp[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
ss << "SocketServer::run() error calling inet_ntop (ipv6): " << err << ", "
<< strerror(err);
logError(ss.str());
Socket::closeSocket(clientFd);
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
}
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
@ -342,7 +382,7 @@ namespace ix
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::thread(
&SocketServer::handleConnection, this, std::move(socket), connectionState)));
&SocketServer::handleConnection, this, std::move(socket), connectionState, std::move(connectionInfo))));
}
}

View File

@ -6,6 +6,7 @@
#pragma once
#include "IXConnectionInfo.h"
#include "IXConnectionState.h"
#include "IXSocketTLSOptions.h"
#include <atomic>
@ -102,7 +103,8 @@ namespace ix
ConnectionStateFactory _connectionStateFactory;
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState) = 0;
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
virtual size_t getConnectedClientsCount() = 0;
// Returns true if all connection threads are joined

View File

@ -32,8 +32,8 @@
#include "IXUrlParser.h"
#include <algorithm>
#include <cstring>
#include <cstdlib>
#include <cstring>
namespace
{

View File

@ -59,14 +59,42 @@ namespace ix
return true;
}
bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value,
const std::string& ending)
template<typename T>
bool WebSocketPerMessageDeflateCompressor::endsWithEmptyUnCompressedBlock(const T& value)
{
if (ending.size() > value.size()) return false;
return std::equal(ending.rbegin(), ending.rend(), value.rbegin());
if (kEmptyUncompressedBlock.size() > value.size()) return false;
auto N = value.size();
return value[N - 1] == kEmptyUncompressedBlock[3] &&
value[N - 2] == kEmptyUncompressedBlock[2] &&
value[N - 3] == kEmptyUncompressedBlock[1] &&
value[N - 4] == kEmptyUncompressedBlock[0];
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in,
std::vector<uint8_t>& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in,
std::string& out)
{
return compressData(in, out);
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in,
std::vector<uint8_t>& out)
{
return compressData(in, out);
}
template<typename T, typename S>
bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
{
//
// 7.2.1. Compression
@ -96,7 +124,8 @@ namespace ix
// The normal buffer size should be 6 but
// we remove the 4 octets from the tail (#4)
uint8_t buf[2] = {0x02, 0x00};
out.append((char*) (buf), 2);
out.push_back(buf[0]);
out.push_back(buf[1]);
return true;
}
@ -114,10 +143,10 @@ namespace ix
output = _compressBufferSize - _deflateState.avail_out;
out.append((char*) (_compressBuffer.get()), output);
out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
} while (_deflateState.avail_out == 0);
if (endsWith(out, kEmptyUncompressedBlock))
if (endsWithEmptyUnCompressedBlock(out))
{
out.resize(out.size() - 4);
}

View File

@ -9,6 +9,7 @@
#include "zlib.h"
#include <memory>
#include <string>
#include <vector>
namespace ix
{
@ -20,9 +21,15 @@ namespace ix
bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
bool compress(const std::string& in, std::string& out);
bool compress(const std::string& in, std::vector<uint8_t>& out);
bool compress(const std::vector<uint8_t>& in, std::string& out);
bool compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out);
private:
static bool endsWith(const std::string& value, const std::string& ending);
template<typename T, typename S>
bool compressData(const T& in, S& out);
template<typename T>
bool endsWithEmptyUnCompressedBlock(const T& value);
int _flush;
size_t _compressBufferSize;

View File

@ -72,12 +72,13 @@ namespace ix
}
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{
setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState);
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
webSocket->disableAutomaticReconnection();

View File

@ -23,7 +23,8 @@ namespace ix
{
public:
using OnConnectionCallback =
std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>;
std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
@ -60,7 +61,8 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState) final;
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo);
virtual size_t getConnectedClientsCount() final;
};
} // namespace ix

View File

@ -326,9 +326,10 @@ namespace ix
return _txbuf.empty();
}
template<class Iterator>
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin,
std::string::const_iterator end,
Iterator begin,
Iterator end,
uint64_t message_size,
uint8_t masking_key[4])
{
@ -750,8 +751,9 @@ namespace ix
return static_cast<unsigned>(seconds);
}
template<class T>
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::string& message,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
@ -764,8 +766,8 @@ namespace ix
size_t wireSize = message.size();
bool compressionError = false;
std::string::const_iterator message_begin = message.begin();
std::string::const_iterator message_end = message.end();
auto message_begin = message.cbegin();
auto message_end = message.cend();
if (compress)
{
@ -780,8 +782,8 @@ namespace ix
compressionError = false;
wireSize = _compressedMessage.size();
message_begin = _compressedMessage.begin();
message_end = _compressedMessage.end();
message_begin = _compressedMessage.cbegin();
message_end = _compressedMessage.cend();
}
{
@ -859,10 +861,11 @@ namespace ix
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
}
template<class Iterator>
bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator message_begin,
std::string::const_iterator message_end,
Iterator message_begin,
Iterator message_end,
bool compress)
{
uint64_t message_size = static_cast<uint64_t>(message_end - message_begin);
@ -1055,7 +1058,7 @@ namespace ix
else
{
// no close code/reason set
sendData(wsheader_type::CLOSE, "", compress);
sendData(wsheader_type::CLOSE, std::string(""), compress);
}
}

View File

@ -239,16 +239,15 @@ namespace ix
bool sendOnSocket();
bool receiveFromSocket();
template<class T>
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message,
const T& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
bool sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator begin,
std::string::const_iterator end,
bool compress);
template<class Iterator>
bool sendFragment(
wsheader_type::opcode_type type, bool fin, Iterator begin, Iterator end, bool compress);
void emitMessage(MessageKind messageKind,
const std::string& message,
@ -256,9 +255,11 @@ namespace ix
const OnMessageCallback& onMessageCallback);
bool isSendBufferEmpty() const;
template<class Iterator>
void appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin,
std::string::const_iterator end,
Iterator begin,
Iterator end,
uint64_t message_size,
uint8_t masking_key[4]);

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "9.8.6"
#define IX_WEBSOCKET_VERSION "9.9.2"

View File

@ -1,20 +0,0 @@
/*
* IXSetThreadName_apple.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
namespace ix
{
void setThreadName(const std::string& name)
{
//
// Apple reserves 16 bytes for its thread names
// Notice that the Apple version of pthread_setname_np
// does not take a pthread_t argument
//
pthread_setname_np(name.substr(0, 63).c_str());
}
} // namespace ix

View File

@ -1,16 +0,0 @@
/*
* IXSetThreadName_freebsd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
#include <pthread_np.h>
namespace ix
{
void setThreadName(const std::string& name)
{
pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str());
}
} // namespace ix

View File

@ -1,20 +0,0 @@
/*
* IXSetThreadName_linux.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <pthread.h>
namespace ix
{
void setThreadName(const std::string& name)
{
//
// Linux only reserves 16 bytes for its thread names
// See prctl and PR_SET_NAME property in
// http://man7.org/linux/man-pages/man2/prctl.2.html
//
pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
}
} // namespace ix

View File

@ -1,46 +0,0 @@
/*
* IXSetThreadName_windows.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <Windows.h>
namespace ix
{
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void SetThreadName(DWORD dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(
MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
void setThreadName(const std::string& name)
{
SetThreadName(-1, name.c_str());
}
} // namespace ix

View File

@ -104,8 +104,10 @@ test_server:
# env TEST=Websocket_server make test
# env TEST=Websocket_chat make test
# env TEST=heartbeat make test
test:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; ninja install)
build_test:
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. ; ninja install)
test: build_test
(cd test ; python2.7 run.py -r)
test_make:

View File

@ -42,6 +42,7 @@ set (SOURCES
IXSocketTest.cpp
IXSocketConnectTest.cpp
# IXWebSocketLeakTest.cpp # commented until we have a fix for #224
IXWebSocketServerTest.cpp
IXWebSocketTestConnectionDisconnection.cpp
IXUrlParserTest.cpp
@ -55,6 +56,8 @@ set (SOURCES
IXSentryClientTest.cpp
IXWebSocketChatTest.cpp
IXWebSocketBroadcastTest.cpp
IXWebSocketPerMessageDeflateCompressorTest.cpp
IXStreamSqlTest.cpp
)
# Some unittest don't work on windows yet

View File

@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
@ -95,13 +95,15 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << request->method << " " << request->headers["User-Agent"] << " "
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
if (request->method == "POST")

View File

@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>

View File

@ -12,8 +12,8 @@
#include <ixcobra/IXCobraConnection.h>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsentry/IXSentryClient.h>
#include <ixredis/IXRedisServer.h>
#include <ixsentry/IXSentryClient.h>
#include <ixsnake/IXSnakeServer.h>
#include <ixwebsocket/IXHttpServer.h>
#include <ixwebsocket/IXUserAgent.h>
@ -92,6 +92,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
cobraBotConfig.enableHeartbeat = false;
bool quiet = false;
cobraBotConfig.filter =
std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'";
// We could try to capture the output ... not sure how.
bool fluentd = true;

View File

@ -67,7 +67,8 @@ TEST_CASE("http server", "[httpd]")
TEST_CASE("http server redirection", "[httpd_redirect]")
{
SECTION("Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
SECTION(
"Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
{
int port = getFreePort();
ix::HttpServer server(port, "127.0.0.1");

42
test/IXStreamSqlTest.cpp Normal file
View File

@ -0,0 +1,42 @@
/*
* IXStreamSqlTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include <ixsnake/IXStreamSql.h>
#include <string.h>
using namespace ix;
namespace ix
{
TEST_CASE("stream_sql", "[streamsql]")
{
SECTION("expression A")
{
snake::StreamSql streamSql(
"select * from subscriber_republished_v1_neo where session LIKE '%123456%'");
nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}};
CHECK(streamSql.match(msg));
}
SECTION("expression A")
{
snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where "
"session = '30091320ed8d4e50b758f8409b83bed7'");
nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"},
{"id", "foo_id"},
{"timestamp", 12}};
CHECK(streamSql.match(msg));
}
}
} // namespace ix

View File

@ -85,12 +85,15 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "Remote ip: " << remoteIp;
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";
for (auto it : msg->openInfo.headers)

View File

@ -191,13 +191,16 @@ namespace
server.setOnConnectionCallback([&server, &connectionId](
std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
connectionState->computeId();
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";

View File

@ -194,12 +194,16 @@ namespace
bool startServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";

View File

@ -171,9 +171,12 @@ namespace
server.setOnConnectionCallback(
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket,
connectionState,
remoteIp,
&receivedCloseCode,
&receivedCloseReason,
&receivedCloseRemote,
@ -181,6 +184,7 @@ namespace
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New server connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";

View File

@ -0,0 +1,182 @@
/*
* IXWebSocketServer.cpp
* Author: Benjamin Sergeant, @marcelkauf
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#include "IXTest.h"
#include "catch.hpp"
#include <memory>
#include <sstream>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
using namespace ix;
namespace
{
class WebSocketClient
{
public:
WebSocketClient(int port);
void start();
void stop();
bool isReady() const;
bool hasConnectionError() const;
private:
ix::WebSocket _webSocket;
int _port;
std::atomic<bool> _connectionError;
};
WebSocketClient::WebSocketClient(int port)
: _port(port)
, _connectionError(false)
{
}
bool WebSocketClient::hasConnectionError() const
{
return _connectionError;
}
bool WebSocketClient::isReady() const
{
return _webSocket.getReadyState() == ix::ReadyState::Open;
}
void WebSocketClient::stop()
{
_webSocket.stop();
}
void WebSocketClient::start()
{
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:" << _port << "/";
url = ss.str();
}
_webSocket.setUrl(url);
_webSocket.disableAutomaticReconnection();
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg)
{
std::stringstream ss;
if (msg->type == ix::WebSocketMessageType::Open)
{
log("client connected");
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log("client disconnected");
}
else if (msg->type == ix::WebSocketMessageType::Error)
{
_connectionError = true;
log("error");
}
else if (msg->type == ix::WebSocketMessageType::Pong)
{
log("pong");
}
else if (msg->type == ix::WebSocketMessageType::Ping)
{
log("ping");
}
else if (msg->type == ix::WebSocketMessageType::Message)
{
log("message");
}
else
{
log("invalid type");
}
});
_webSocket.start();
}
} // namespace
TEST_CASE("Websocket leak test")
{
SECTION("Websocket destructor is called when closing the connection.")
{
// stores the server websocket in order to check the use_count
std::shared_ptr<WebSocket> webSocketPtr;
{
int port = getFreePort();
WebSocketServer server(port);
server.setOnConnectionCallback([&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
{
// original ptr in WebSocketServer::handleConnection and the callback argument
REQUIRE(webSocket.use_count() == 2);
webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](const ix::WebSocketMessagePtr& msg)
{
if (msg->type == ix::WebSocketMessageType::Open)
{
log(std::string("New connection id: ") + connectionState->getId());
// original ptr in WebSocketServer::handleConnection, captured ptr of this callback, and ptr in WebSocketServer::_clients
REQUIRE(webSocket.use_count() == 3);
webSocketPtr = std::shared_ptr<WebSocket>(webSocket);
REQUIRE(webSocket.use_count() == 4);
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
log(std::string("Client closed connection id: ") + connectionState->getId());
}
else
{
log(std::string(msg->str));
}
});
// original ptr in WebSocketServer::handleConnection, argument of this callback, and captured ptr in websocket callback
REQUIRE(webSocket.use_count() == 3);
});
server.listen();
server.start();
WebSocketClient webSocketClient(port);
webSocketClient.start();
while (true)
{
REQUIRE(!webSocketClient.hasConnectionError());
if (webSocketClient.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 1);
// same value as in Open-handler above
REQUIRE(webSocketPtr.use_count() == 4);
ix::msleep(500);
webSocketClient.stop();
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
// websocket should only be referenced by webSocketPtr but is still used by the websocket callback
REQUIRE(webSocketPtr.use_count() == 1);
webSocketPtr->setOnMessageCallback(nullptr);
// websocket should only be referenced by webSocketPtr
REQUIRE(webSocketPtr.use_count() == 1);
server.stop();
}
// websocket should only be referenced by webSocketPtr
REQUIRE(webSocketPtr.use_count() == 1);
}
}

View File

@ -0,0 +1,76 @@
/*
* IXWebSocketPerMessageDeflateCodecTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone. All rights reserved.
*
* make build_test && build/test/ixwebsocket_unittest per-message-deflate-codec
*/
#include "IXTest.h"
#include "catch.hpp"
#include <iostream>
#include <ixwebsocket/IXWebSocketPerMessageDeflateCodec.h>
#include <string.h>
using namespace ix;
namespace ix
{
std::string compressAndDecompress(const std::string& a)
{
std::string b, c;
WebSocketPerMessageDeflateCompressor compressor;
compressor.init(11, true);
compressor.compress(a, b);
WebSocketPerMessageDeflateDecompressor decompressor;
decompressor.init(11, true);
decompressor.decompress(b, c);
return c;
}
std::string compressAndDecompressVector(const std::string& a)
{
std::string b, c;
std::vector<uint8_t> vec(a.begin(), a.end());
WebSocketPerMessageDeflateCompressor compressor;
compressor.init(11, true);
compressor.compress(vec, b);
WebSocketPerMessageDeflateDecompressor decompressor;
decompressor.init(11, true);
decompressor.decompress(b, c);
return c;
}
TEST_CASE("per-message-deflate-codec", "[zlib]")
{
SECTION("string api")
{
REQUIRE(compressAndDecompress("") == "");
REQUIRE(compressAndDecompress("foo") == "foo");
REQUIRE(compressAndDecompress("bar") == "bar");
REQUIRE(compressAndDecompress("asdcaseqw`21897dehqwed") == "asdcaseqw`21897dehqwed");
REQUIRE(compressAndDecompress("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
}
SECTION("vector api")
{
REQUIRE(compressAndDecompressVector("") == "");
REQUIRE(compressAndDecompressVector("foo") == "foo");
REQUIRE(compressAndDecompressVector("bar") == "bar");
REQUIRE(compressAndDecompressVector("asdcaseqw`21897dehqwed") ==
"asdcaseqw`21897dehqwed");
REQUIRE(
compressAndDecompressVector("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
}
}
} // namespace ix

View File

@ -35,13 +35,16 @@ namespace ix
server.setOnConnectionCallback([&server, &connectionId](
std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
connectionState->computeId();
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";

View File

@ -18,12 +18,15 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
{
server.setOnConnectionCallback(
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
webSocket->setOnMessageCallback([webSocket, connectionState, &server, &subProtocols](
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server, &subProtocols](
const ix::WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";
TLogger() << "remote ip: " << remoteIp;
TLogger() << "id: " << connectionState->getId();
TLogger() << "Uri: " << msg->openInfo.uri;
TLogger() << "Headers:";

View File

@ -122,6 +122,8 @@
#pragma once
#include <string>
namespace linenoise
{
bool Readline(const char *prompt, std::string& line);

View File

@ -11,13 +11,13 @@
#include <cli11/CLI11.hpp>
#include <fstream>
#include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixbots/IXCobraToPythonBot.h>
#include <ixbots/IXCobraToSentryBot.h>
#include <ixbots/IXCobraToStatsdBot.h>
#include <ixbots/IXCobraToStdoutBot.h>
#include <ixbots/IXCobraMetricsToRedisBot.h>
#include <ixredis/IXRedisClient.h>
#include <ixcore/utils/IXCoreLogger.h>
#include <ixredis/IXRedisClient.h>
#include <ixsentry/IXSentryClient.h>
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocket.h>
@ -122,6 +122,7 @@ int main(int argc, char** argv)
std::string key;
std::string logfile;
std::string scriptPath;
std::string republishChannel;
ix::SocketTLSOptions tlsOptions;
ix::CobraConfig cobraConfig;
ix::CobraBotConfig cobraBotConfig;
@ -193,8 +194,7 @@ int main(int argc, char** argv)
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
app->add_option(
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
app->add_option(
"--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
app->add_option("--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
};
app.add_flag("--version", version, "Print ws version");
@ -358,7 +358,8 @@ int main(int argc, char** argv)
cobra2python->add_option("--host", hostname, "Statsd host");
cobra2python->add_option("--port", statsdPort, "Statsd port");
cobra2python->add_option("--prefix", prefix, "Statsd prefix");
cobra2python->add_option("--script", scriptPath, "Python script path")->check(CLI::ExistingPath);
cobra2python->add_option("--script", scriptPath, "Python script path")
->check(CLI::ExistingPath);
cobra2python->add_option("--pidfile", pidfile, "Pid file");
addTLSOptions(cobra2python);
addCobraBotConfig(cobra2python);
@ -391,6 +392,7 @@ int main(int argc, char** argv)
snakeApp->add_option("--redis_password", redisPassword, "Redis password");
snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
->check(CLI::ExistingPath);
snakeApp->add_option("--republish_channel", republishChannel, "Republish channel");
snakeApp->add_flag("-v", verbose, "Verbose");
snakeApp->add_flag("-d", disablePong, "Disable Pongs");
addTLSOptions(snakeApp);
@ -604,8 +606,7 @@ int main(int argc, char** argv)
}
else
{
ret = (int) ix::cobra_to_python_bot(
cobraBotConfig, statsdClient, scriptPath);
ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, scriptPath);
}
}
else if (app.got_subcommand("cobra_to_sentry"))
@ -620,14 +621,12 @@ int main(int argc, char** argv)
ix::RedisClient redisClient;
if (!redisClient.connect(hostname, redisPort))
{
spdlog::error("Cannot connect to redis host {}:{}",
redisHosts, redisPort);
spdlog::error("Cannot connect to redis host {}:{}", redisHosts, redisPort);
return 1;
}
else
{
ret = (int) ix::cobra_metrics_to_redis_bot(
cobraBotConfig, redisClient, verbose);
ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose);
}
}
else if (app.got_subcommand("snake"))
@ -640,7 +639,8 @@ int main(int argc, char** argv)
verbose,
appsConfigPath,
tlsOptions,
disablePong);
disablePong,
republishChannel);
}
else if (app.got_subcommand("httpd"))
{

View File

@ -64,9 +64,7 @@ namespace ix
bool disablePerMessageDeflate,
const ix::SocketTLSOptions& tlsOptions);
int ws_redis_cli_main(const std::string& hostname,
int port,
const std::string& password);
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password);
int ws_redis_publish_main(const std::string& hostname,
int port,
@ -105,7 +103,8 @@ namespace ix
bool verbose,
const std::string& appsConfigPath,
const ix::SocketTLSOptions& tlsOptions,
bool disablePong);
bool disablePong,
const std::string& republishChannel);
int ws_httpd_main(int port,
const std::string& hostname,

View File

@ -21,12 +21,15 @@ namespace ix
server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
webSocket->setOnMessageCallback([webSocket, connectionState, &server](
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");

View File

@ -6,12 +6,12 @@
#include "IXBench.h"
#include "linenoise.hpp"
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocket.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <iostream>
namespace ix

View File

@ -44,12 +44,15 @@ namespace ix
server.setOnConnectionCallback(
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback(
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) {
[webSocket, connectionState, remoteIp, greetings](const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");

View File

@ -56,15 +56,18 @@ namespace ix
server.setOnConnectionCallback([remoteUrl,
verbose](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
// Server connection
state->webSocket().setOnMessageCallback([webSocket, state, verbose](
state->webSocket().setOnMessageCallback([webSocket, state, remoteIp, verbose](
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection to remote server");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", state->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");

View File

@ -4,17 +4,15 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "linenoise.hpp"
#include <iostream>
#include <ixredis/IXRedisClient.h>
#include <spdlog/spdlog.h>
#include <sstream>
#include <iostream>
#include "linenoise.hpp"
namespace ix
{
int ws_redis_cli_main(const std::string& hostname,
int port,
const std::string& password)
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
@ -71,9 +69,7 @@ namespace ix
{
if (response.first != RespType::String)
{
std::cout << "("
<< redisClient.getRespTypeDescription(response.first)
<< ")"
std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")"
<< " ";
}

View File

@ -45,7 +45,8 @@ namespace ix
bool verbose,
const std::string& appsConfigPath,
const SocketTLSOptions& socketTLSOptions,
bool disablePong)
bool disablePong,
const std::string& republishChannel)
{
snake::AppConfig appConfig;
appConfig.port = port;
@ -55,6 +56,7 @@ namespace ix
appConfig.redisPassword = redisPassword;
appConfig.socketTLSOptions = socketTLSOptions;
appConfig.disablePong = disablePong;
appConfig.republishChannel = republishChannel;
// Parse config file
auto str = readAsString(appsConfigPath);

View File

@ -20,12 +20,15 @@ namespace ix
server.setTLSOptions(tlsOptions);
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState) {
webSocket->setOnMessageCallback([webSocket, connectionState, &server](
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
auto remoteIp = connectionInfo->remoteIp;
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
const WebSocketMessagePtr& msg) {
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_transfer: New connection");
spdlog::info("remote ip: {}", remoteIp);
spdlog::info("id: {}", connectionState->getId());
spdlog::info("Uri: {}", msg->openInfo.uri);
spdlog::info("Headers:");