Compare commits
13 Commits
v9.8.6
...
feature/st
Author | SHA1 | Date | |
---|---|---|---|
|
ca829a3a98 | ||
|
26a1e63626 | ||
|
c98959b895 | ||
|
baf18648e9 | ||
|
b21306376b | ||
|
fbd17685a1 | ||
|
3a673575dd | ||
|
d5e51840ab | ||
|
543c2086b2 | ||
|
95eab59c08 | ||
|
e9e768a288 | ||
|
e2180a1f31 | ||
|
7c1b57c8cd |
@@ -58,6 +58,7 @@ set( IXWEBSOCKET_SOURCES
|
|||||||
set( IXWEBSOCKET_HEADERS
|
set( IXWEBSOCKET_HEADERS
|
||||||
ixwebsocket/IXBench.h
|
ixwebsocket/IXBench.h
|
||||||
ixwebsocket/IXCancellationRequest.h
|
ixwebsocket/IXCancellationRequest.h
|
||||||
|
ixwebsocket/IXConnectionInfo.h
|
||||||
ixwebsocket/IXConnectionState.h
|
ixwebsocket/IXConnectionState.h
|
||||||
ixwebsocket/IXDNSLookup.h
|
ixwebsocket/IXDNSLookup.h
|
||||||
ixwebsocket/IXExponentialBackoff.h
|
ixwebsocket/IXExponentialBackoff.h
|
||||||
|
@@ -1,6 +1,14 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
All changes to this project will be documented in this file.
|
All changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [9.9.0] - 2020-07-08
|
||||||
|
|
||||||
|
(socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made
|
||||||
|
|
||||||
|
## [9.8.6] - 2020-07-06
|
||||||
|
|
||||||
|
(cmake) change the way zlib and openssl are searched
|
||||||
|
|
||||||
## [9.8.5] - 2020-07-06
|
## [9.8.5] - 2020-07-06
|
||||||
|
|
||||||
(cobra python bots) remove the test which stop the bot when events do not follow cobra metrics system schema with an id and a device entry
|
(cobra python bots) remove the test which stop the bot when events do not follow cobra metrics system schema with an id and a device entry
|
||||||
|
@@ -257,28 +257,31 @@ ix::WebSocketServer server(port);
|
|||||||
|
|
||||||
server.setOnConnectionCallback(
|
server.setOnConnectionCallback(
|
||||||
[&server](std::shared_ptr<WebSocket> webSocket,
|
[&server](std::shared_ptr<WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState)
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||||
{
|
{
|
||||||
|
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
|
||||||
|
|
||||||
webSocket->setOnMessageCallback(
|
webSocket->setOnMessageCallback(
|
||||||
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
|
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr msg)
|
||||||
{
|
{
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
std::cerr << "New connection" << std::endl;
|
std::cout << "New connection" << std::endl;
|
||||||
|
|
||||||
// A connection state object is available, and has a default id
|
// A connection state object is available, and has a default id
|
||||||
// You can subclass ConnectionState and pass an alternate factory
|
// You can subclass ConnectionState and pass an alternate factory
|
||||||
// to override it. It is useful if you want to store custom
|
// to override it. It is useful if you want to store custom
|
||||||
// attributes per connection (authenticated bool flag, attributes, etc...)
|
// attributes per connection (authenticated bool flag, attributes, etc...)
|
||||||
std::cerr << "id: " << connectionState->getId() << std::endl;
|
std::cout << "id: " << connectionState->getId() << std::endl;
|
||||||
|
|
||||||
// The uri the client did connect to.
|
// The uri the client did connect to.
|
||||||
std::cerr << "Uri: " << msg->openInfo.uri << std::endl;
|
std::cout << "Uri: " << msg->openInfo.uri << std::endl;
|
||||||
|
|
||||||
std::cerr << "Headers:" << std::endl;
|
std::cout << "Headers:" << std::endl;
|
||||||
for (auto it : msg->openInfo.headers)
|
for (auto it : msg->openInfo.headers)
|
||||||
{
|
{
|
||||||
std::cerr << it.first << ": " << it.second << std::endl;
|
std::cout << it.first << ": " << it.second << std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||||
@@ -417,11 +420,14 @@ If you want to handle how requests are processed, implement the setOnConnectionC
|
|||||||
```cpp
|
```cpp
|
||||||
setOnConnectionCallback(
|
setOnConnectionCallback(
|
||||||
[this](HttpRequestPtr request,
|
[this](HttpRequestPtr request,
|
||||||
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr
|
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
|
||||||
{
|
{
|
||||||
// Build a string for the response
|
// Build a string for the response
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << request->method
|
ss << connectionInfo->remoteIp
|
||||||
|
<< " "
|
||||||
|
<< request->method
|
||||||
<< " "
|
<< " "
|
||||||
<< request->uri;
|
<< request->uri;
|
||||||
|
|
||||||
|
@@ -45,8 +45,11 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
|
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||||
std::shared_ptr<ConnectionState> connectionState)
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||||
{
|
{
|
||||||
|
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
|
||||||
|
|
||||||
_connectedClientsCount++;
|
_connectedClientsCount++;
|
||||||
|
|
||||||
while (!_stopHandlingConnections)
|
while (!_stopHandlingConnections)
|
||||||
|
@@ -44,7 +44,8 @@ namespace ix
|
|||||||
|
|
||||||
// Methods
|
// Methods
|
||||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||||
std::shared_ptr<ConnectionState> connectionState) final;
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) final;
|
||||||
virtual size_t getConnectedClientsCount() final;
|
virtual size_t getConnectedClientsCount() final;
|
||||||
|
|
||||||
bool startsWith(const std::string& str, const std::string& start);
|
bool startsWith(const std::string& str, const std::string& start);
|
||||||
|
@@ -7,12 +7,14 @@ set (IXSNAKE_SOURCES
|
|||||||
ixsnake/IXSnakeServer.cpp
|
ixsnake/IXSnakeServer.cpp
|
||||||
ixsnake/IXSnakeProtocol.cpp
|
ixsnake/IXSnakeProtocol.cpp
|
||||||
ixsnake/IXAppConfig.cpp
|
ixsnake/IXAppConfig.cpp
|
||||||
|
ixsnake/IXStreamSql.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
set (IXSNAKE_HEADERS
|
set (IXSNAKE_HEADERS
|
||||||
ixsnake/IXSnakeServer.h
|
ixsnake/IXSnakeServer.h
|
||||||
ixsnake/IXSnakeProtocol.h
|
ixsnake/IXSnakeProtocol.h
|
||||||
ixsnake/IXAppConfig.h
|
ixsnake/IXAppConfig.h
|
||||||
|
ixsnake/IXStreamSql.h
|
||||||
)
|
)
|
||||||
|
|
||||||
add_library(ixsnake STATIC
|
add_library(ixsnake STATIC
|
||||||
|
@@ -33,6 +33,9 @@ namespace snake
|
|||||||
// Misc
|
// Misc
|
||||||
bool verbose;
|
bool verbose;
|
||||||
bool disablePong;
|
bool disablePong;
|
||||||
|
|
||||||
|
// If non empty, every published message gets republished to a given channel
|
||||||
|
std::string republishChannel;
|
||||||
};
|
};
|
||||||
|
|
||||||
bool isAppKeyValid(const AppConfig& appConfig, std::string appkey);
|
bool isAppKeyValid(const AppConfig& appConfig, std::string appkey);
|
||||||
|
@@ -30,6 +30,7 @@ namespace snake
|
|||||||
{
|
{
|
||||||
return _appkey;
|
return _appkey;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setAppkey(const std::string& appkey)
|
void setAppkey(const std::string& appkey)
|
||||||
{
|
{
|
||||||
_appkey = appkey;
|
_appkey = appkey;
|
||||||
@@ -39,6 +40,7 @@ namespace snake
|
|||||||
{
|
{
|
||||||
return _role;
|
return _role;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setRole(const std::string& role)
|
void setRole(const std::string& role)
|
||||||
{
|
{
|
||||||
_role = role;
|
_role = role;
|
||||||
|
@@ -8,6 +8,7 @@
|
|||||||
|
|
||||||
#include "IXAppConfig.h"
|
#include "IXAppConfig.h"
|
||||||
#include "IXSnakeConnectionState.h"
|
#include "IXSnakeConnectionState.h"
|
||||||
|
#include "IXStreamSql.h"
|
||||||
#include "nlohmann/json.hpp"
|
#include "nlohmann/json.hpp"
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <ixcore/utils/IXCoreLogger.h>
|
#include <ixcore/utils/IXCoreLogger.h>
|
||||||
@@ -91,6 +92,7 @@ namespace snake
|
|||||||
|
|
||||||
void handlePublish(std::shared_ptr<SnakeConnectionState> state,
|
void handlePublish(std::shared_ptr<SnakeConnectionState> state,
|
||||||
std::shared_ptr<ix::WebSocket> ws,
|
std::shared_ptr<ix::WebSocket> ws,
|
||||||
|
const AppConfig& appConfig,
|
||||||
const nlohmann::json& pdu)
|
const nlohmann::json& pdu)
|
||||||
{
|
{
|
||||||
std::vector<std::string> channels;
|
std::vector<std::string> channels;
|
||||||
@@ -115,6 +117,12 @@ namespace snake
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add an extra channel if the config has one specified
|
||||||
|
if (!appConfig.republishChannel.empty())
|
||||||
|
{
|
||||||
|
channels.push_back(appConfig.republishChannel);
|
||||||
|
}
|
||||||
|
|
||||||
for (auto&& channel : channels)
|
for (auto&& channel : channels)
|
||||||
{
|
{
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
@@ -180,12 +188,25 @@ namespace snake
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string filterStr;
|
||||||
|
if (pdu["body"].find("filter") != pdu["body"].end())
|
||||||
|
{
|
||||||
|
std::string filterStr = pdu["body"]["filter"];
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr);
|
||||||
|
|
||||||
int id = 0;
|
int id = 0;
|
||||||
auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) {
|
auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) {
|
||||||
auto msg = nlohmann::json::parse(messageStr);
|
auto msg = nlohmann::json::parse(messageStr);
|
||||||
|
|
||||||
msg = msg["body"]["message"];
|
msg = msg["body"]["message"];
|
||||||
|
|
||||||
|
if (streamSql->valid() && !streamSql->match(msg))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
nlohmann::json response = {
|
nlohmann::json response = {
|
||||||
{"action", "rtm/subscription/data"},
|
{"action", "rtm/subscription/data"},
|
||||||
{"id", id++},
|
{"id", id++},
|
||||||
@@ -279,7 +300,7 @@ namespace snake
|
|||||||
}
|
}
|
||||||
else if (action == "rtm/publish")
|
else if (action == "rtm/publish")
|
||||||
{
|
{
|
||||||
handlePublish(state, ws, pdu);
|
handlePublish(state, ws, appConfig, pdu);
|
||||||
}
|
}
|
||||||
else if (action == "rtm/subscribe")
|
else if (action == "rtm/subscribe")
|
||||||
{
|
{
|
||||||
|
@@ -61,16 +61,19 @@ namespace snake
|
|||||||
|
|
||||||
_server.setOnConnectionCallback(
|
_server.setOnConnectionCallback(
|
||||||
[this](std::shared_ptr<ix::WebSocket> webSocket,
|
[this](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ix::ConnectionState> connectionState) {
|
std::shared_ptr<ix::ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ix::ConnectionInfo> connectionInfo) {
|
||||||
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
|
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
|
|
||||||
webSocket->setOnMessageCallback(
|
webSocket->setOnMessageCallback(
|
||||||
[this, webSocket, state](const ix::WebSocketMessagePtr& msg) {
|
[this, webSocket, state, remoteIp](const ix::WebSocketMessagePtr& msg) {
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ix::LogLevel logLevel = ix::LogLevel::Debug;
|
ix::LogLevel logLevel = ix::LogLevel::Debug;
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
ss << "New connection" << std::endl;
|
ss << "New connection" << std::endl;
|
||||||
|
ss << "remote ip: " << remoteIp << std::endl;
|
||||||
ss << "id: " << state->getId() << std::endl;
|
ss << "id: " << state->getId() << std::endl;
|
||||||
ss << "Uri: " << msg->openInfo.uri << std::endl;
|
ss << "Uri: " << msg->openInfo.uri << std::endl;
|
||||||
ss << "Headers:" << std::endl;
|
ss << "Headers:" << std::endl;
|
||||||
|
63
ixsnake/ixsnake/IXStreamSql.cpp
Normal file
63
ixsnake/ixsnake/IXStreamSql.cpp
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
/*
|
||||||
|
* IXStreamSql.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
*
|
||||||
|
* Super simple hacked up version of a stream sql expression,
|
||||||
|
* that only supports non nested field evaluation
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXStreamSql.h"
|
||||||
|
#include <sstream>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
namespace snake
|
||||||
|
{
|
||||||
|
StreamSql::StreamSql(const std::string& sqlFilter)
|
||||||
|
: _valid(false)
|
||||||
|
{
|
||||||
|
std::string token;
|
||||||
|
std::stringstream tokenStream(sqlFilter);
|
||||||
|
std::vector<std::string> tokens;
|
||||||
|
|
||||||
|
// Split by ' '
|
||||||
|
while (std::getline(tokenStream, token, ' '))
|
||||||
|
{
|
||||||
|
tokens.push_back(token);
|
||||||
|
}
|
||||||
|
|
||||||
|
_valid = tokens.size() == 8;
|
||||||
|
if (!_valid) return;
|
||||||
|
|
||||||
|
_field = tokens[5];
|
||||||
|
_operator = tokens[6];
|
||||||
|
_value = tokens[7];
|
||||||
|
|
||||||
|
// remove single quotes
|
||||||
|
_value = _value.substr(1, _value.size() - 2);
|
||||||
|
|
||||||
|
if (_operator == "LIKE")
|
||||||
|
{
|
||||||
|
_value = _value.substr(1, _value.size() - 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool StreamSql::valid() const
|
||||||
|
{
|
||||||
|
return _valid;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool StreamSql::match(const nlohmann::json& msg)
|
||||||
|
{
|
||||||
|
if (!_valid) return false;
|
||||||
|
|
||||||
|
if (msg.find(_field) == msg.end())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string value = msg[_field];
|
||||||
|
return value == _value;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace snake
|
29
ixsnake/ixsnake/IXStreamSql.h
Normal file
29
ixsnake/ixsnake/IXStreamSql.h
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
/*
|
||||||
|
* IXStreamSql.h
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include "nlohmann/json.hpp"
|
||||||
|
|
||||||
|
namespace snake
|
||||||
|
{
|
||||||
|
class StreamSql
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
StreamSql(const std::string& sqlFilter = std::string());
|
||||||
|
~StreamSql() = default;
|
||||||
|
|
||||||
|
bool match(const nlohmann::json& msg);
|
||||||
|
bool valid() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::string _field;
|
||||||
|
std::string _operator;
|
||||||
|
std::string _value;
|
||||||
|
bool _valid;
|
||||||
|
};
|
||||||
|
}
|
25
ixwebsocket/IXConnectionInfo.h
Normal file
25
ixwebsocket/IXConnectionInfo.h
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
/*
|
||||||
|
* IXConnectionInfo.h
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
struct ConnectionInfo
|
||||||
|
{
|
||||||
|
std::string remoteIp;
|
||||||
|
int remotePort;
|
||||||
|
|
||||||
|
ConnectionInfo(const std::string& r = std::string(), int p = 0)
|
||||||
|
: remoteIp(r)
|
||||||
|
, remotePort(p)
|
||||||
|
{
|
||||||
|
;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} // namespace ix
|
@@ -103,9 +103,9 @@ namespace ix
|
|||||||
std::thread _thread;
|
std::thread _thread;
|
||||||
|
|
||||||
std::unique_ptr<Socket> _socket;
|
std::unique_ptr<Socket> _socket;
|
||||||
std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per client)
|
std::recursive_mutex _mutex; // to protect accessing the _socket (only one socket per
|
||||||
// the mutex needs to be recursive as this function might
|
// client) the mutex needs to be recursive as this function
|
||||||
// be called recursively to follow HTTP redirections
|
// might be called recursively to follow HTTP redirections
|
||||||
|
|
||||||
SocketTLSOptions _tlsOptions;
|
SocketTLSOptions _tlsOptions;
|
||||||
|
|
||||||
|
@@ -9,11 +9,11 @@
|
|||||||
#include "IXNetSystem.h"
|
#include "IXNetSystem.h"
|
||||||
#include "IXSocketConnect.h"
|
#include "IXSocketConnect.h"
|
||||||
#include "IXUserAgent.h"
|
#include "IXUserAgent.h"
|
||||||
|
#include <cstring>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <zlib.h>
|
#include <zlib.h>
|
||||||
#include <cstring>
|
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
@@ -50,8 +50,11 @@ namespace
|
|||||||
const int windowBits = 15;
|
const int windowBits = 15;
|
||||||
const int GZIP_ENCODING = 16;
|
const int GZIP_ENCODING = 16;
|
||||||
|
|
||||||
deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
|
deflateInit2(&zs,
|
||||||
windowBits | GZIP_ENCODING, 8,
|
Z_DEFAULT_COMPRESSION,
|
||||||
|
Z_DEFLATED,
|
||||||
|
windowBits | GZIP_ENCODING,
|
||||||
|
8,
|
||||||
Z_DEFAULT_STRATEGY);
|
Z_DEFAULT_STRATEGY);
|
||||||
|
|
||||||
zs.next_in = (Bytef*) str.data();
|
zs.next_in = (Bytef*) str.data();
|
||||||
@@ -69,13 +72,12 @@ namespace
|
|||||||
|
|
||||||
ret = deflate(&zs, Z_FINISH);
|
ret = deflate(&zs, Z_FINISH);
|
||||||
|
|
||||||
if(outstring.size() < zs.total_out)
|
if (outstring.size() < zs.total_out)
|
||||||
{
|
{
|
||||||
// append the block to the output string
|
// append the block to the output string
|
||||||
outstring.append(outbuffer,
|
outstring.append(outbuffer, zs.total_out - outstring.size());
|
||||||
zs.total_out - outstring.size());
|
|
||||||
}
|
}
|
||||||
} while(ret == Z_OK);
|
} while (ret == Z_OK);
|
||||||
|
|
||||||
deflateEnd(&zs);
|
deflateEnd(&zs);
|
||||||
|
|
||||||
@@ -113,7 +115,8 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
|
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||||
std::shared_ptr<ConnectionState> connectionState)
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||||
{
|
{
|
||||||
_connectedClientsCount++;
|
_connectedClientsCount++;
|
||||||
|
|
||||||
@@ -122,7 +125,9 @@ namespace ix
|
|||||||
|
|
||||||
if (std::get<0>(ret))
|
if (std::get<0>(ret))
|
||||||
{
|
{
|
||||||
auto response = _onConnectionCallback(std::get<2>(ret), connectionState);
|
auto response = _onConnectionCallback(std::get<2>(ret),
|
||||||
|
connectionState,
|
||||||
|
std::move(connectionInfo));
|
||||||
if (!Http::sendResponse(response, socket))
|
if (!Http::sendResponse(response, socket))
|
||||||
{
|
{
|
||||||
logError("Cannot send response");
|
logError("Cannot send response");
|
||||||
@@ -142,7 +147,8 @@ namespace ix
|
|||||||
{
|
{
|
||||||
setOnConnectionCallback(
|
setOnConnectionCallback(
|
||||||
[this](HttpRequestPtr request,
|
[this](HttpRequestPtr request,
|
||||||
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
|
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||||
std::string uri(request->uri);
|
std::string uri(request->uri);
|
||||||
if (uri.empty() || uri == "/")
|
if (uri.empty() || uri == "/")
|
||||||
{
|
{
|
||||||
@@ -172,7 +178,8 @@ namespace ix
|
|||||||
|
|
||||||
// Log request
|
// Log request
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << request->method << " " << request->headers["User-Agent"] << " "
|
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||||
|
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||||
<< request->uri << " " << content.size();
|
<< request->uri << " " << content.size();
|
||||||
logInfo(ss.str());
|
logInfo(ss.str());
|
||||||
|
|
||||||
@@ -198,13 +205,15 @@ namespace ix
|
|||||||
setOnConnectionCallback(
|
setOnConnectionCallback(
|
||||||
[this,
|
[this,
|
||||||
redirectUrl](HttpRequestPtr request,
|
redirectUrl](HttpRequestPtr request,
|
||||||
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
|
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||||
WebSocketHttpHeaders headers;
|
WebSocketHttpHeaders headers;
|
||||||
headers["Server"] = userAgent();
|
headers["Server"] = userAgent();
|
||||||
|
|
||||||
// Log request
|
// Log request
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << request->method << " " << request->headers["User-Agent"] << " "
|
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||||
|
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||||
<< request->uri;
|
<< request->uri;
|
||||||
logInfo(ss.str());
|
logInfo(ss.str());
|
||||||
|
|
||||||
|
@@ -23,7 +23,9 @@ namespace ix
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using OnConnectionCallback =
|
using OnConnectionCallback =
|
||||||
std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>;
|
std::function<HttpResponsePtr(HttpRequestPtr,
|
||||||
|
std::shared_ptr<ConnectionState>,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo)>;
|
||||||
|
|
||||||
HttpServer(int port = SocketServer::kDefaultPort,
|
HttpServer(int port = SocketServer::kDefaultPort,
|
||||||
const std::string& host = SocketServer::kDefaultHost,
|
const std::string& host = SocketServer::kDefaultHost,
|
||||||
@@ -44,7 +46,8 @@ namespace ix
|
|||||||
|
|
||||||
// Methods
|
// Methods
|
||||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||||
std::shared_ptr<ConnectionState> connectionState) final;
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) final;
|
||||||
virtual size_t getConnectedClientsCount() final;
|
virtual size_t getConnectedClientsCount() final;
|
||||||
|
|
||||||
void setDefaultConnectionCallback();
|
void setDefaultConnectionCallback();
|
||||||
|
@@ -276,6 +276,7 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Accept a connection.
|
// Accept a connection.
|
||||||
|
// FIXME: Is this working for ipv6 ?
|
||||||
struct sockaddr_in client; // client address information
|
struct sockaddr_in client; // client address information
|
||||||
int clientFd; // socket connected to client
|
int clientFd; // socket connected to client
|
||||||
socklen_t addressLen = sizeof(client);
|
socklen_t addressLen = sizeof(client);
|
||||||
@@ -307,6 +308,45 @@ namespace ix
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo;
|
||||||
|
|
||||||
|
if (_addressFamily == AF_INET)
|
||||||
|
{
|
||||||
|
char remoteIp[INET_ADDRSTRLEN];
|
||||||
|
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
|
||||||
|
{
|
||||||
|
int err = Socket::getErrno();
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "SocketServer::run() error calling inet_ntop (ipv4): " << err << ", "
|
||||||
|
<< strerror(err);
|
||||||
|
logError(ss.str());
|
||||||
|
|
||||||
|
Socket::closeSocket(clientFd);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
|
||||||
|
}
|
||||||
|
else // AF_INET6
|
||||||
|
{
|
||||||
|
char remoteIp[INET6_ADDRSTRLEN];
|
||||||
|
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
|
||||||
|
{
|
||||||
|
int err = Socket::getErrno();
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "SocketServer::run() error calling inet_ntop (ipv6): " << err << ", "
|
||||||
|
<< strerror(err);
|
||||||
|
logError(ss.str());
|
||||||
|
|
||||||
|
Socket::closeSocket(clientFd);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
|
||||||
|
}
|
||||||
|
|
||||||
std::shared_ptr<ConnectionState> connectionState;
|
std::shared_ptr<ConnectionState> connectionState;
|
||||||
if (_connectionStateFactory)
|
if (_connectionStateFactory)
|
||||||
{
|
{
|
||||||
@@ -342,7 +382,7 @@ namespace ix
|
|||||||
_connectionsThreads.push_back(std::make_pair(
|
_connectionsThreads.push_back(std::make_pair(
|
||||||
connectionState,
|
connectionState,
|
||||||
std::thread(
|
std::thread(
|
||||||
&SocketServer::handleConnection, this, std::move(socket), connectionState)));
|
&SocketServer::handleConnection, this, std::move(socket), connectionState, std::move(connectionInfo))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -6,6 +6,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "IXConnectionInfo.h"
|
||||||
#include "IXConnectionState.h"
|
#include "IXConnectionState.h"
|
||||||
#include "IXSocketTLSOptions.h"
|
#include "IXSocketTLSOptions.h"
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
@@ -102,7 +103,8 @@ namespace ix
|
|||||||
ConnectionStateFactory _connectionStateFactory;
|
ConnectionStateFactory _connectionStateFactory;
|
||||||
|
|
||||||
virtual void handleConnection(std::unique_ptr<Socket>,
|
virtual void handleConnection(std::unique_ptr<Socket>,
|
||||||
std::shared_ptr<ConnectionState> connectionState) = 0;
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
|
||||||
virtual size_t getConnectedClientsCount() = 0;
|
virtual size_t getConnectedClientsCount() = 0;
|
||||||
|
|
||||||
// Returns true if all connection threads are joined
|
// Returns true if all connection threads are joined
|
||||||
|
@@ -32,8 +32,8 @@
|
|||||||
#include "IXUrlParser.h"
|
#include "IXUrlParser.h"
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <cstring>
|
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
@@ -59,14 +59,42 @@ namespace ix
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value,
|
template<typename T>
|
||||||
const std::string& ending)
|
bool WebSocketPerMessageDeflateCompressor::endsWithEmptyUnCompressedBlock(const T& value)
|
||||||
{
|
{
|
||||||
if (ending.size() > value.size()) return false;
|
if (kEmptyUncompressedBlock.size() > value.size()) return false;
|
||||||
return std::equal(ending.rbegin(), ending.rend(), value.rbegin());
|
auto N = value.size();
|
||||||
|
return value[N - 1] == kEmptyUncompressedBlock[3] &&
|
||||||
|
value[N - 2] == kEmptyUncompressedBlock[2] &&
|
||||||
|
value[N - 3] == kEmptyUncompressedBlock[1] &&
|
||||||
|
value[N - 4] == kEmptyUncompressedBlock[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out)
|
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in, std::string& out)
|
||||||
|
{
|
||||||
|
return compressData(in, out);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in,
|
||||||
|
std::vector<uint8_t>& out)
|
||||||
|
{
|
||||||
|
return compressData(in, out);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in,
|
||||||
|
std::string& out)
|
||||||
|
{
|
||||||
|
return compressData(in, out);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool WebSocketPerMessageDeflateCompressor::compress(const std::vector<uint8_t>& in,
|
||||||
|
std::vector<uint8_t>& out)
|
||||||
|
{
|
||||||
|
return compressData(in, out);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T, typename S>
|
||||||
|
bool WebSocketPerMessageDeflateCompressor::compressData(const T& in, S& out)
|
||||||
{
|
{
|
||||||
//
|
//
|
||||||
// 7.2.1. Compression
|
// 7.2.1. Compression
|
||||||
@@ -96,7 +124,8 @@ namespace ix
|
|||||||
// The normal buffer size should be 6 but
|
// The normal buffer size should be 6 but
|
||||||
// we remove the 4 octets from the tail (#4)
|
// we remove the 4 octets from the tail (#4)
|
||||||
uint8_t buf[2] = {0x02, 0x00};
|
uint8_t buf[2] = {0x02, 0x00};
|
||||||
out.append((char*) (buf), 2);
|
out.push_back(buf[0]);
|
||||||
|
out.push_back(buf[1]);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -114,10 +143,10 @@ namespace ix
|
|||||||
|
|
||||||
output = _compressBufferSize - _deflateState.avail_out;
|
output = _compressBufferSize - _deflateState.avail_out;
|
||||||
|
|
||||||
out.append((char*) (_compressBuffer.get()), output);
|
out.insert(out.end(), _compressBuffer.get(), _compressBuffer.get() + output);
|
||||||
} while (_deflateState.avail_out == 0);
|
} while (_deflateState.avail_out == 0);
|
||||||
|
|
||||||
if (endsWith(out, kEmptyUncompressedBlock))
|
if (endsWithEmptyUnCompressedBlock(out))
|
||||||
{
|
{
|
||||||
out.resize(out.size() - 4);
|
out.resize(out.size() - 4);
|
||||||
}
|
}
|
||||||
|
@@ -9,6 +9,7 @@
|
|||||||
#include "zlib.h"
|
#include "zlib.h"
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
{
|
{
|
||||||
@@ -20,9 +21,15 @@ namespace ix
|
|||||||
|
|
||||||
bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
|
bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
|
||||||
bool compress(const std::string& in, std::string& out);
|
bool compress(const std::string& in, std::string& out);
|
||||||
|
bool compress(const std::string& in, std::vector<uint8_t>& out);
|
||||||
|
bool compress(const std::vector<uint8_t>& in, std::string& out);
|
||||||
|
bool compress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static bool endsWith(const std::string& value, const std::string& ending);
|
template<typename T, typename S>
|
||||||
|
bool compressData(const T& in, S& out);
|
||||||
|
template<typename T>
|
||||||
|
bool endsWithEmptyUnCompressedBlock(const T& value);
|
||||||
|
|
||||||
int _flush;
|
int _flush;
|
||||||
size_t _compressBufferSize;
|
size_t _compressBufferSize;
|
||||||
|
@@ -72,12 +72,13 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
|
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
|
||||||
std::shared_ptr<ConnectionState> connectionState)
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||||
{
|
{
|
||||||
setThreadName("WebSocketServer::" + connectionState->getId());
|
setThreadName("WebSocketServer::" + connectionState->getId());
|
||||||
|
|
||||||
auto webSocket = std::make_shared<WebSocket>();
|
auto webSocket = std::make_shared<WebSocket>();
|
||||||
_onConnectionCallback(webSocket, connectionState);
|
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
|
||||||
|
|
||||||
webSocket->disableAutomaticReconnection();
|
webSocket->disableAutomaticReconnection();
|
||||||
|
|
||||||
|
@@ -23,7 +23,8 @@ namespace ix
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using OnConnectionCallback =
|
using OnConnectionCallback =
|
||||||
std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>;
|
std::function<void(std::shared_ptr<WebSocket>, std::shared_ptr<ConnectionState>,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo)>;
|
||||||
|
|
||||||
WebSocketServer(int port = SocketServer::kDefaultPort,
|
WebSocketServer(int port = SocketServer::kDefaultPort,
|
||||||
const std::string& host = SocketServer::kDefaultHost,
|
const std::string& host = SocketServer::kDefaultHost,
|
||||||
@@ -60,7 +61,8 @@ namespace ix
|
|||||||
|
|
||||||
// Methods
|
// Methods
|
||||||
virtual void handleConnection(std::unique_ptr<Socket> socket,
|
virtual void handleConnection(std::unique_ptr<Socket> socket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) final;
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo);
|
||||||
virtual size_t getConnectedClientsCount() final;
|
virtual size_t getConnectedClientsCount() final;
|
||||||
};
|
};
|
||||||
} // namespace ix
|
} // namespace ix
|
||||||
|
@@ -326,9 +326,10 @@ namespace ix
|
|||||||
return _txbuf.empty();
|
return _txbuf.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<class Iterator>
|
||||||
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header,
|
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header,
|
||||||
std::string::const_iterator begin,
|
Iterator begin,
|
||||||
std::string::const_iterator end,
|
Iterator end,
|
||||||
uint64_t message_size,
|
uint64_t message_size,
|
||||||
uint8_t masking_key[4])
|
uint8_t masking_key[4])
|
||||||
{
|
{
|
||||||
@@ -750,8 +751,9 @@ namespace ix
|
|||||||
return static_cast<unsigned>(seconds);
|
return static_cast<unsigned>(seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<class T>
|
||||||
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
|
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
|
||||||
const std::string& message,
|
const T& message,
|
||||||
bool compress,
|
bool compress,
|
||||||
const OnProgressCallback& onProgressCallback)
|
const OnProgressCallback& onProgressCallback)
|
||||||
{
|
{
|
||||||
@@ -764,8 +766,8 @@ namespace ix
|
|||||||
size_t wireSize = message.size();
|
size_t wireSize = message.size();
|
||||||
bool compressionError = false;
|
bool compressionError = false;
|
||||||
|
|
||||||
std::string::const_iterator message_begin = message.begin();
|
auto message_begin = message.cbegin();
|
||||||
std::string::const_iterator message_end = message.end();
|
auto message_end = message.cend();
|
||||||
|
|
||||||
if (compress)
|
if (compress)
|
||||||
{
|
{
|
||||||
@@ -780,8 +782,8 @@ namespace ix
|
|||||||
compressionError = false;
|
compressionError = false;
|
||||||
wireSize = _compressedMessage.size();
|
wireSize = _compressedMessage.size();
|
||||||
|
|
||||||
message_begin = _compressedMessage.begin();
|
message_begin = _compressedMessage.cbegin();
|
||||||
message_end = _compressedMessage.end();
|
message_end = _compressedMessage.cend();
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@@ -859,10 +861,11 @@ namespace ix
|
|||||||
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
|
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<class Iterator>
|
||||||
bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
|
bool WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
|
||||||
bool fin,
|
bool fin,
|
||||||
std::string::const_iterator message_begin,
|
Iterator message_begin,
|
||||||
std::string::const_iterator message_end,
|
Iterator message_end,
|
||||||
bool compress)
|
bool compress)
|
||||||
{
|
{
|
||||||
uint64_t message_size = static_cast<uint64_t>(message_end - message_begin);
|
uint64_t message_size = static_cast<uint64_t>(message_end - message_begin);
|
||||||
@@ -1055,7 +1058,7 @@ namespace ix
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
// no close code/reason set
|
// no close code/reason set
|
||||||
sendData(wsheader_type::CLOSE, "", compress);
|
sendData(wsheader_type::CLOSE, std::string(""), compress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -239,16 +239,15 @@ namespace ix
|
|||||||
bool sendOnSocket();
|
bool sendOnSocket();
|
||||||
bool receiveFromSocket();
|
bool receiveFromSocket();
|
||||||
|
|
||||||
|
template<class T>
|
||||||
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
|
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
|
||||||
const std::string& message,
|
const T& message,
|
||||||
bool compress,
|
bool compress,
|
||||||
const OnProgressCallback& onProgressCallback = nullptr);
|
const OnProgressCallback& onProgressCallback = nullptr);
|
||||||
|
|
||||||
bool sendFragment(wsheader_type::opcode_type type,
|
template<class Iterator>
|
||||||
bool fin,
|
bool sendFragment(
|
||||||
std::string::const_iterator begin,
|
wsheader_type::opcode_type type, bool fin, Iterator begin, Iterator end, bool compress);
|
||||||
std::string::const_iterator end,
|
|
||||||
bool compress);
|
|
||||||
|
|
||||||
void emitMessage(MessageKind messageKind,
|
void emitMessage(MessageKind messageKind,
|
||||||
const std::string& message,
|
const std::string& message,
|
||||||
@@ -256,9 +255,11 @@ namespace ix
|
|||||||
const OnMessageCallback& onMessageCallback);
|
const OnMessageCallback& onMessageCallback);
|
||||||
|
|
||||||
bool isSendBufferEmpty() const;
|
bool isSendBufferEmpty() const;
|
||||||
|
|
||||||
|
template<class Iterator>
|
||||||
void appendToSendBuffer(const std::vector<uint8_t>& header,
|
void appendToSendBuffer(const std::vector<uint8_t>& header,
|
||||||
std::string::const_iterator begin,
|
Iterator begin,
|
||||||
std::string::const_iterator end,
|
Iterator end,
|
||||||
uint64_t message_size,
|
uint64_t message_size,
|
||||||
uint8_t masking_key[4]);
|
uint8_t masking_key[4]);
|
||||||
|
|
||||||
|
@@ -6,4 +6,4 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#define IX_WEBSOCKET_VERSION "9.8.5"
|
#define IX_WEBSOCKET_VERSION "9.9.0"
|
||||||
|
6
makefile
6
makefile
@@ -104,8 +104,10 @@ test_server:
|
|||||||
# env TEST=Websocket_server make test
|
# env TEST=Websocket_server make test
|
||||||
# env TEST=Websocket_chat make test
|
# env TEST=Websocket_chat make test
|
||||||
# env TEST=heartbeat make test
|
# env TEST=heartbeat make test
|
||||||
test:
|
build_test:
|
||||||
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_TEST=1 .. ; ninja install)
|
mkdir -p build && (cd build ; cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_TEST=1 .. ; ninja install)
|
||||||
|
|
||||||
|
test: build_test
|
||||||
(cd test ; python2.7 run.py -r)
|
(cd test ; python2.7 run.py -r)
|
||||||
|
|
||||||
test_make:
|
test_make:
|
||||||
|
@@ -42,6 +42,7 @@ set (SOURCES
|
|||||||
|
|
||||||
IXSocketTest.cpp
|
IXSocketTest.cpp
|
||||||
IXSocketConnectTest.cpp
|
IXSocketConnectTest.cpp
|
||||||
|
# IXWebSocketLeakTest.cpp # commented until we have a fix for #224
|
||||||
IXWebSocketServerTest.cpp
|
IXWebSocketServerTest.cpp
|
||||||
IXWebSocketTestConnectionDisconnection.cpp
|
IXWebSocketTestConnectionDisconnection.cpp
|
||||||
IXUrlParserTest.cpp
|
IXUrlParserTest.cpp
|
||||||
@@ -55,6 +56,8 @@ set (SOURCES
|
|||||||
IXSentryClientTest.cpp
|
IXSentryClientTest.cpp
|
||||||
IXWebSocketChatTest.cpp
|
IXWebSocketChatTest.cpp
|
||||||
IXWebSocketBroadcastTest.cpp
|
IXWebSocketBroadcastTest.cpp
|
||||||
|
IXWebSocketPerMessageDeflateCompressorTest.cpp
|
||||||
|
IXStreamSqlTest.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
# Some unittest don't work on windows yet
|
# Some unittest don't work on windows yet
|
||||||
|
@@ -12,8 +12,8 @@
|
|||||||
#include <ixcobra/IXCobraConnection.h>
|
#include <ixcobra/IXCobraConnection.h>
|
||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
#include <ixwebsocket/IXHttpServer.h>
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
#include <ixwebsocket/IXUserAgent.h>
|
||||||
@@ -95,13 +95,15 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
|
|||||||
|
|
||||||
sentryServer.setOnConnectionCallback(
|
sentryServer.setOnConnectionCallback(
|
||||||
[](HttpRequestPtr request,
|
[](HttpRequestPtr request,
|
||||||
std::shared_ptr<ConnectionState> /*connectionState*/) -> HttpResponsePtr {
|
std::shared_ptr<ConnectionState> /*connectionState*/,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
|
||||||
WebSocketHttpHeaders headers;
|
WebSocketHttpHeaders headers;
|
||||||
headers["Server"] = userAgent();
|
headers["Server"] = userAgent();
|
||||||
|
|
||||||
// Log request
|
// Log request
|
||||||
std::stringstream ss;
|
std::stringstream ss;
|
||||||
ss << request->method << " " << request->headers["User-Agent"] << " "
|
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
|
||||||
|
<< request->method << " " << request->headers["User-Agent"] << " "
|
||||||
<< request->uri;
|
<< request->uri;
|
||||||
|
|
||||||
if (request->method == "POST")
|
if (request->method == "POST")
|
||||||
|
@@ -12,8 +12,8 @@
|
|||||||
#include <ixcobra/IXCobraConnection.h>
|
#include <ixcobra/IXCobraConnection.h>
|
||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
#include <ixwebsocket/IXHttpServer.h>
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
#include <ixwebsocket/IXUserAgent.h>
|
||||||
|
@@ -12,8 +12,8 @@
|
|||||||
#include <ixcobra/IXCobraConnection.h>
|
#include <ixcobra/IXCobraConnection.h>
|
||||||
#include <ixcobra/IXCobraMetricsPublisher.h>
|
#include <ixcobra/IXCobraMetricsPublisher.h>
|
||||||
#include <ixcrypto/IXUuid.h>
|
#include <ixcrypto/IXUuid.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
|
||||||
#include <ixredis/IXRedisServer.h>
|
#include <ixredis/IXRedisServer.h>
|
||||||
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixsnake/IXSnakeServer.h>
|
#include <ixsnake/IXSnakeServer.h>
|
||||||
#include <ixwebsocket/IXHttpServer.h>
|
#include <ixwebsocket/IXHttpServer.h>
|
||||||
#include <ixwebsocket/IXUserAgent.h>
|
#include <ixwebsocket/IXUserAgent.h>
|
||||||
@@ -92,6 +92,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
|
|||||||
cobraBotConfig.enableHeartbeat = false;
|
cobraBotConfig.enableHeartbeat = false;
|
||||||
bool quiet = false;
|
bool quiet = false;
|
||||||
|
|
||||||
|
cobraBotConfig.filter =
|
||||||
|
std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'";
|
||||||
|
|
||||||
// We could try to capture the output ... not sure how.
|
// We could try to capture the output ... not sure how.
|
||||||
bool fluentd = true;
|
bool fluentd = true;
|
||||||
|
|
||||||
|
@@ -67,7 +67,8 @@ TEST_CASE("http server", "[httpd]")
|
|||||||
|
|
||||||
TEST_CASE("http server redirection", "[httpd_redirect]")
|
TEST_CASE("http server redirection", "[httpd_redirect]")
|
||||||
{
|
{
|
||||||
SECTION("Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
|
SECTION(
|
||||||
|
"Connect to a local HTTP server, with redirection enabled, but we do not follow redirects")
|
||||||
{
|
{
|
||||||
int port = getFreePort();
|
int port = getFreePort();
|
||||||
ix::HttpServer server(port, "127.0.0.1");
|
ix::HttpServer server(port, "127.0.0.1");
|
||||||
|
42
test/IXStreamSqlTest.cpp
Normal file
42
test/IXStreamSqlTest.cpp
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* IXStreamSqlTest.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXTest.h"
|
||||||
|
#include "catch.hpp"
|
||||||
|
#include <iostream>
|
||||||
|
#include <ixsnake/IXStreamSql.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
using namespace ix;
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
TEST_CASE("stream_sql", "[streamsql]")
|
||||||
|
{
|
||||||
|
SECTION("expression A")
|
||||||
|
{
|
||||||
|
snake::StreamSql streamSql(
|
||||||
|
"select * from subscriber_republished_v1_neo where session LIKE '%123456%'");
|
||||||
|
|
||||||
|
nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}};
|
||||||
|
|
||||||
|
CHECK(streamSql.match(msg));
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("expression A")
|
||||||
|
{
|
||||||
|
snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where "
|
||||||
|
"session = '30091320ed8d4e50b758f8409b83bed7'");
|
||||||
|
|
||||||
|
nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"},
|
||||||
|
{"id", "foo_id"},
|
||||||
|
{"timestamp", 12}};
|
||||||
|
|
||||||
|
CHECK(streamSql.match(msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ix
|
@@ -85,12 +85,15 @@ namespace ix
|
|||||||
bool startWebSocketEchoServer(ix::WebSocketServer& server)
|
bool startWebSocketEchoServer(ix::WebSocketServer& server)
|
||||||
{
|
{
|
||||||
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
webSocket->setOnMessageCallback(
|
webSocket->setOnMessageCallback(
|
||||||
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
|
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
TLogger() << "New connection";
|
TLogger() << "New connection";
|
||||||
|
TLogger() << "Remote ip: " << remoteIp;
|
||||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||||
TLogger() << "Headers:";
|
TLogger() << "Headers:";
|
||||||
for (auto it : msg->openInfo.headers)
|
for (auto it : msg->openInfo.headers)
|
||||||
|
@@ -191,13 +191,16 @@ namespace
|
|||||||
|
|
||||||
server.setOnConnectionCallback([&server, &connectionId](
|
server.setOnConnectionCallback([&server, &connectionId](
|
||||||
std::shared_ptr<ix::WebSocket> webSocket,
|
std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
|
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
|
||||||
const ix::WebSocketMessagePtr& msg) {
|
const ix::WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
TLogger() << "New connection";
|
TLogger() << "New connection";
|
||||||
connectionState->computeId();
|
connectionState->computeId();
|
||||||
|
TLogger() << "remote ip: " << remoteIp;
|
||||||
TLogger() << "id: " << connectionState->getId();
|
TLogger() << "id: " << connectionState->getId();
|
||||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||||
TLogger() << "Headers:";
|
TLogger() << "Headers:";
|
||||||
|
@@ -194,12 +194,16 @@ namespace
|
|||||||
bool startServer(ix::WebSocketServer& server)
|
bool startServer(ix::WebSocketServer& server)
|
||||||
{
|
{
|
||||||
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
webSocket->setOnMessageCallback(
|
webSocket->setOnMessageCallback(
|
||||||
[webSocket, connectionState, &server](const ix::WebSocketMessagePtr& msg) {
|
[webSocket, connectionState, remoteIp, &server](const ix::WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
TLogger() << "New connection";
|
TLogger() << "New connection";
|
||||||
|
TLogger() << "remote ip: " << remoteIp;
|
||||||
TLogger() << "id: " << connectionState->getId();
|
TLogger() << "id: " << connectionState->getId();
|
||||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||||
TLogger() << "Headers:";
|
TLogger() << "Headers:";
|
||||||
|
@@ -171,9 +171,12 @@ namespace
|
|||||||
server.setOnConnectionCallback(
|
server.setOnConnectionCallback(
|
||||||
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
|
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
|
||||||
std::shared_ptr<ix::WebSocket> webSocket,
|
std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
webSocket->setOnMessageCallback([webSocket,
|
webSocket->setOnMessageCallback([webSocket,
|
||||||
connectionState,
|
connectionState,
|
||||||
|
remoteIp,
|
||||||
&receivedCloseCode,
|
&receivedCloseCode,
|
||||||
&receivedCloseReason,
|
&receivedCloseReason,
|
||||||
&receivedCloseRemote,
|
&receivedCloseRemote,
|
||||||
@@ -181,6 +184,7 @@ namespace
|
|||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
TLogger() << "New server connection";
|
TLogger() << "New server connection";
|
||||||
|
TLogger() << "remote ip: " << remoteIp;
|
||||||
TLogger() << "id: " << connectionState->getId();
|
TLogger() << "id: " << connectionState->getId();
|
||||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||||
TLogger() << "Headers:";
|
TLogger() << "Headers:";
|
||||||
|
182
test/IXWebSocketLeakTest.cpp
Normal file
182
test/IXWebSocketLeakTest.cpp
Normal file
@@ -0,0 +1,182 @@
|
|||||||
|
/*
|
||||||
|
* IXWebSocketServer.cpp
|
||||||
|
* Author: Benjamin Sergeant, @marcelkauf
|
||||||
|
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXTest.h"
|
||||||
|
|
||||||
|
#include "catch.hpp"
|
||||||
|
#include <memory>
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
|
#include <ixwebsocket/IXWebSocket.h>
|
||||||
|
#include <ixwebsocket/IXWebSocketServer.h>
|
||||||
|
|
||||||
|
using namespace ix;
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
class WebSocketClient
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
WebSocketClient(int port);
|
||||||
|
void start();
|
||||||
|
void stop();
|
||||||
|
bool isReady() const;
|
||||||
|
bool hasConnectionError() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
ix::WebSocket _webSocket;
|
||||||
|
int _port;
|
||||||
|
std::atomic<bool> _connectionError;
|
||||||
|
};
|
||||||
|
|
||||||
|
WebSocketClient::WebSocketClient(int port)
|
||||||
|
: _port(port)
|
||||||
|
, _connectionError(false)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool WebSocketClient::hasConnectionError() const
|
||||||
|
{
|
||||||
|
return _connectionError;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool WebSocketClient::isReady() const
|
||||||
|
{
|
||||||
|
return _webSocket.getReadyState() == ix::ReadyState::Open;
|
||||||
|
}
|
||||||
|
|
||||||
|
void WebSocketClient::stop()
|
||||||
|
{
|
||||||
|
_webSocket.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void WebSocketClient::start()
|
||||||
|
{
|
||||||
|
std::string url;
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "ws://localhost:" << _port << "/";
|
||||||
|
|
||||||
|
url = ss.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
_webSocket.setUrl(url);
|
||||||
|
_webSocket.disableAutomaticReconnection();
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
log(std::string("Connecting to url: ") + url);
|
||||||
|
|
||||||
|
_webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg)
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
|
{
|
||||||
|
log("client connected");
|
||||||
|
}
|
||||||
|
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||||
|
{
|
||||||
|
log("client disconnected");
|
||||||
|
}
|
||||||
|
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||||
|
{
|
||||||
|
_connectionError = true;
|
||||||
|
log("error");
|
||||||
|
}
|
||||||
|
else if (msg->type == ix::WebSocketMessageType::Pong)
|
||||||
|
{
|
||||||
|
log("pong");
|
||||||
|
}
|
||||||
|
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||||
|
{
|
||||||
|
log("ping");
|
||||||
|
}
|
||||||
|
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||||
|
{
|
||||||
|
log("message");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
log("invalid type");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
_webSocket.start();
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
TEST_CASE("Websocket leak test")
|
||||||
|
{
|
||||||
|
SECTION("Websocket destructor is called when closing the connection.")
|
||||||
|
{
|
||||||
|
// stores the server websocket in order to check the use_count
|
||||||
|
std::shared_ptr<WebSocket> webSocketPtr;
|
||||||
|
|
||||||
|
{
|
||||||
|
int port = getFreePort();
|
||||||
|
WebSocketServer server(port);
|
||||||
|
|
||||||
|
server.setOnConnectionCallback([&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo)
|
||||||
|
{
|
||||||
|
// original ptr in WebSocketServer::handleConnection and the callback argument
|
||||||
|
REQUIRE(webSocket.use_count() == 2);
|
||||||
|
webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](const ix::WebSocketMessagePtr& msg)
|
||||||
|
{
|
||||||
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
|
{
|
||||||
|
log(std::string("New connection id: ") + connectionState->getId());
|
||||||
|
// original ptr in WebSocketServer::handleConnection, captured ptr of this callback, and ptr in WebSocketServer::_clients
|
||||||
|
REQUIRE(webSocket.use_count() == 3);
|
||||||
|
webSocketPtr = std::shared_ptr<WebSocket>(webSocket);
|
||||||
|
REQUIRE(webSocket.use_count() == 4);
|
||||||
|
}
|
||||||
|
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||||
|
{
|
||||||
|
log(std::string("Client closed connection id: ") + connectionState->getId());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
log(std::string(msg->str));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// original ptr in WebSocketServer::handleConnection, argument of this callback, and captured ptr in websocket callback
|
||||||
|
REQUIRE(webSocket.use_count() == 3);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
WebSocketClient webSocketClient(port);
|
||||||
|
webSocketClient.start();
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
REQUIRE(!webSocketClient.hasConnectionError());
|
||||||
|
if (webSocketClient.isReady()) break;
|
||||||
|
ix::msleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
REQUIRE(server.getClients().size() == 1);
|
||||||
|
// same value as in Open-handler above
|
||||||
|
REQUIRE(webSocketPtr.use_count() == 4);
|
||||||
|
|
||||||
|
ix::msleep(500);
|
||||||
|
webSocketClient.stop();
|
||||||
|
ix::msleep(500);
|
||||||
|
REQUIRE(server.getClients().size() == 0);
|
||||||
|
|
||||||
|
// websocket should only be referenced by webSocketPtr but is still used by the websocket callback
|
||||||
|
REQUIRE(webSocketPtr.use_count() == 1);
|
||||||
|
webSocketPtr->setOnMessageCallback(nullptr);
|
||||||
|
// websocket should only be referenced by webSocketPtr
|
||||||
|
REQUIRE(webSocketPtr.use_count() == 1);
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
// websocket should only be referenced by webSocketPtr
|
||||||
|
REQUIRE(webSocketPtr.use_count() == 1);
|
||||||
|
}
|
||||||
|
}
|
76
test/IXWebSocketPerMessageDeflateCompressorTest.cpp
Normal file
76
test/IXWebSocketPerMessageDeflateCompressorTest.cpp
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
/*
|
||||||
|
* IXWebSocketPerMessageDeflateCodecTest.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2020 Machine Zone. All rights reserved.
|
||||||
|
*
|
||||||
|
* make build_test && build/test/ixwebsocket_unittest per-message-deflate-codec
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXTest.h"
|
||||||
|
#include "catch.hpp"
|
||||||
|
#include <iostream>
|
||||||
|
#include <ixwebsocket/IXWebSocketPerMessageDeflateCodec.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
using namespace ix;
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
std::string compressAndDecompress(const std::string& a)
|
||||||
|
{
|
||||||
|
std::string b, c;
|
||||||
|
|
||||||
|
WebSocketPerMessageDeflateCompressor compressor;
|
||||||
|
compressor.init(11, true);
|
||||||
|
compressor.compress(a, b);
|
||||||
|
|
||||||
|
WebSocketPerMessageDeflateDecompressor decompressor;
|
||||||
|
decompressor.init(11, true);
|
||||||
|
decompressor.decompress(b, c);
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string compressAndDecompressVector(const std::string& a)
|
||||||
|
{
|
||||||
|
std::string b, c;
|
||||||
|
|
||||||
|
std::vector<uint8_t> vec(a.begin(), a.end());
|
||||||
|
|
||||||
|
WebSocketPerMessageDeflateCompressor compressor;
|
||||||
|
compressor.init(11, true);
|
||||||
|
compressor.compress(vec, b);
|
||||||
|
|
||||||
|
WebSocketPerMessageDeflateDecompressor decompressor;
|
||||||
|
decompressor.init(11, true);
|
||||||
|
decompressor.decompress(b, c);
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("per-message-deflate-codec", "[zlib]")
|
||||||
|
{
|
||||||
|
SECTION("string api")
|
||||||
|
{
|
||||||
|
REQUIRE(compressAndDecompress("") == "");
|
||||||
|
REQUIRE(compressAndDecompress("foo") == "foo");
|
||||||
|
REQUIRE(compressAndDecompress("bar") == "bar");
|
||||||
|
REQUIRE(compressAndDecompress("asdcaseqw`21897dehqwed") == "asdcaseqw`21897dehqwed");
|
||||||
|
REQUIRE(compressAndDecompress("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
|
||||||
|
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("vector api")
|
||||||
|
{
|
||||||
|
REQUIRE(compressAndDecompressVector("") == "");
|
||||||
|
REQUIRE(compressAndDecompressVector("foo") == "foo");
|
||||||
|
REQUIRE(compressAndDecompressVector("bar") == "bar");
|
||||||
|
REQUIRE(compressAndDecompressVector("asdcaseqw`21897dehqwed") ==
|
||||||
|
"asdcaseqw`21897dehqwed");
|
||||||
|
REQUIRE(
|
||||||
|
compressAndDecompressVector("/usr/local/include/ixwebsocket/IXSocketAppleSSL.h") ==
|
||||||
|
"/usr/local/include/ixwebsocket/IXSocketAppleSSL.h");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ix
|
@@ -35,13 +35,16 @@ namespace ix
|
|||||||
|
|
||||||
server.setOnConnectionCallback([&server, &connectionId](
|
server.setOnConnectionCallback([&server, &connectionId](
|
||||||
std::shared_ptr<ix::WebSocket> webSocket,
|
std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
webSocket->setOnMessageCallback([webSocket, connectionState, &connectionId, &server](
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
|
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &connectionId, &server](
|
||||||
const ix::WebSocketMessagePtr& msg) {
|
const ix::WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
TLogger() << "New connection";
|
TLogger() << "New connection";
|
||||||
connectionState->computeId();
|
connectionState->computeId();
|
||||||
|
TLogger() << "remote ip: " << remoteIp;
|
||||||
TLogger() << "id: " << connectionState->getId();
|
TLogger() << "id: " << connectionState->getId();
|
||||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||||
TLogger() << "Headers:";
|
TLogger() << "Headers:";
|
||||||
|
@@ -18,12 +18,15 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
|
|||||||
{
|
{
|
||||||
server.setOnConnectionCallback(
|
server.setOnConnectionCallback(
|
||||||
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket,
|
[&server, &subProtocols](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
webSocket->setOnMessageCallback([webSocket, connectionState, &server, &subProtocols](
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
|
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server, &subProtocols](
|
||||||
const ix::WebSocketMessagePtr& msg) {
|
const ix::WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
TLogger() << "New connection";
|
TLogger() << "New connection";
|
||||||
|
TLogger() << "remote ip: " << remoteIp;
|
||||||
TLogger() << "id: " << connectionState->getId();
|
TLogger() << "id: " << connectionState->getId();
|
||||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||||
TLogger() << "Headers:";
|
TLogger() << "Headers:";
|
||||||
|
2
third_party/cpp-linenoise/linenoise.hpp
vendored
2
third_party/cpp-linenoise/linenoise.hpp
vendored
@@ -122,6 +122,8 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
namespace linenoise
|
namespace linenoise
|
||||||
{
|
{
|
||||||
bool Readline(const char *prompt, std::string& line);
|
bool Readline(const char *prompt, std::string& line);
|
||||||
|
24
ws/ws.cpp
24
ws/ws.cpp
@@ -11,13 +11,13 @@
|
|||||||
|
|
||||||
#include <cli11/CLI11.hpp>
|
#include <cli11/CLI11.hpp>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
#include <ixbots/IXCobraMetricsToRedisBot.h>
|
||||||
#include <ixbots/IXCobraToPythonBot.h>
|
#include <ixbots/IXCobraToPythonBot.h>
|
||||||
#include <ixbots/IXCobraToSentryBot.h>
|
#include <ixbots/IXCobraToSentryBot.h>
|
||||||
#include <ixbots/IXCobraToStatsdBot.h>
|
#include <ixbots/IXCobraToStatsdBot.h>
|
||||||
#include <ixbots/IXCobraToStdoutBot.h>
|
#include <ixbots/IXCobraToStdoutBot.h>
|
||||||
#include <ixbots/IXCobraMetricsToRedisBot.h>
|
|
||||||
#include <ixredis/IXRedisClient.h>
|
|
||||||
#include <ixcore/utils/IXCoreLogger.h>
|
#include <ixcore/utils/IXCoreLogger.h>
|
||||||
|
#include <ixredis/IXRedisClient.h>
|
||||||
#include <ixsentry/IXSentryClient.h>
|
#include <ixsentry/IXSentryClient.h>
|
||||||
#include <ixwebsocket/IXNetSystem.h>
|
#include <ixwebsocket/IXNetSystem.h>
|
||||||
#include <ixwebsocket/IXSocket.h>
|
#include <ixwebsocket/IXSocket.h>
|
||||||
@@ -122,6 +122,7 @@ int main(int argc, char** argv)
|
|||||||
std::string key;
|
std::string key;
|
||||||
std::string logfile;
|
std::string logfile;
|
||||||
std::string scriptPath;
|
std::string scriptPath;
|
||||||
|
std::string republishChannel;
|
||||||
ix::SocketTLSOptions tlsOptions;
|
ix::SocketTLSOptions tlsOptions;
|
||||||
ix::CobraConfig cobraConfig;
|
ix::CobraConfig cobraConfig;
|
||||||
ix::CobraBotConfig cobraBotConfig;
|
ix::CobraBotConfig cobraBotConfig;
|
||||||
@@ -193,8 +194,7 @@ int main(int argc, char** argv)
|
|||||||
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
|
"--limit_received_events", cobraBotConfig.limitReceivedEvents, "Max events per minute");
|
||||||
app->add_option(
|
app->add_option(
|
||||||
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
|
"--max_events_per_minute", cobraBotConfig.maxEventsPerMinute, "Max events per minute");
|
||||||
app->add_option(
|
app->add_option("--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
|
||||||
"--batch_size", cobraBotConfig.batchSize, "Subscription batch size");
|
|
||||||
};
|
};
|
||||||
|
|
||||||
app.add_flag("--version", version, "Print ws version");
|
app.add_flag("--version", version, "Print ws version");
|
||||||
@@ -358,7 +358,8 @@ int main(int argc, char** argv)
|
|||||||
cobra2python->add_option("--host", hostname, "Statsd host");
|
cobra2python->add_option("--host", hostname, "Statsd host");
|
||||||
cobra2python->add_option("--port", statsdPort, "Statsd port");
|
cobra2python->add_option("--port", statsdPort, "Statsd port");
|
||||||
cobra2python->add_option("--prefix", prefix, "Statsd prefix");
|
cobra2python->add_option("--prefix", prefix, "Statsd prefix");
|
||||||
cobra2python->add_option("--script", scriptPath, "Python script path")->check(CLI::ExistingPath);
|
cobra2python->add_option("--script", scriptPath, "Python script path")
|
||||||
|
->check(CLI::ExistingPath);
|
||||||
cobra2python->add_option("--pidfile", pidfile, "Pid file");
|
cobra2python->add_option("--pidfile", pidfile, "Pid file");
|
||||||
addTLSOptions(cobra2python);
|
addTLSOptions(cobra2python);
|
||||||
addCobraBotConfig(cobra2python);
|
addCobraBotConfig(cobra2python);
|
||||||
@@ -391,6 +392,7 @@ int main(int argc, char** argv)
|
|||||||
snakeApp->add_option("--redis_password", redisPassword, "Redis password");
|
snakeApp->add_option("--redis_password", redisPassword, "Redis password");
|
||||||
snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
|
snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
|
||||||
->check(CLI::ExistingPath);
|
->check(CLI::ExistingPath);
|
||||||
|
snakeApp->add_option("--republish_channel", republishChannel, "Republish channel");
|
||||||
snakeApp->add_flag("-v", verbose, "Verbose");
|
snakeApp->add_flag("-v", verbose, "Verbose");
|
||||||
snakeApp->add_flag("-d", disablePong, "Disable Pongs");
|
snakeApp->add_flag("-d", disablePong, "Disable Pongs");
|
||||||
addTLSOptions(snakeApp);
|
addTLSOptions(snakeApp);
|
||||||
@@ -604,8 +606,7 @@ int main(int argc, char** argv)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ret = (int) ix::cobra_to_python_bot(
|
ret = (int) ix::cobra_to_python_bot(cobraBotConfig, statsdClient, scriptPath);
|
||||||
cobraBotConfig, statsdClient, scriptPath);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (app.got_subcommand("cobra_to_sentry"))
|
else if (app.got_subcommand("cobra_to_sentry"))
|
||||||
@@ -620,14 +621,12 @@ int main(int argc, char** argv)
|
|||||||
ix::RedisClient redisClient;
|
ix::RedisClient redisClient;
|
||||||
if (!redisClient.connect(hostname, redisPort))
|
if (!redisClient.connect(hostname, redisPort))
|
||||||
{
|
{
|
||||||
spdlog::error("Cannot connect to redis host {}:{}",
|
spdlog::error("Cannot connect to redis host {}:{}", redisHosts, redisPort);
|
||||||
redisHosts, redisPort);
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ret = (int) ix::cobra_metrics_to_redis_bot(
|
ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose);
|
||||||
cobraBotConfig, redisClient, verbose);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (app.got_subcommand("snake"))
|
else if (app.got_subcommand("snake"))
|
||||||
@@ -640,7 +639,8 @@ int main(int argc, char** argv)
|
|||||||
verbose,
|
verbose,
|
||||||
appsConfigPath,
|
appsConfigPath,
|
||||||
tlsOptions,
|
tlsOptions,
|
||||||
disablePong);
|
disablePong,
|
||||||
|
republishChannel);
|
||||||
}
|
}
|
||||||
else if (app.got_subcommand("httpd"))
|
else if (app.got_subcommand("httpd"))
|
||||||
{
|
{
|
||||||
|
7
ws/ws.h
7
ws/ws.h
@@ -64,9 +64,7 @@ namespace ix
|
|||||||
bool disablePerMessageDeflate,
|
bool disablePerMessageDeflate,
|
||||||
const ix::SocketTLSOptions& tlsOptions);
|
const ix::SocketTLSOptions& tlsOptions);
|
||||||
|
|
||||||
int ws_redis_cli_main(const std::string& hostname,
|
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password);
|
||||||
int port,
|
|
||||||
const std::string& password);
|
|
||||||
|
|
||||||
int ws_redis_publish_main(const std::string& hostname,
|
int ws_redis_publish_main(const std::string& hostname,
|
||||||
int port,
|
int port,
|
||||||
@@ -105,7 +103,8 @@ namespace ix
|
|||||||
bool verbose,
|
bool verbose,
|
||||||
const std::string& appsConfigPath,
|
const std::string& appsConfigPath,
|
||||||
const ix::SocketTLSOptions& tlsOptions,
|
const ix::SocketTLSOptions& tlsOptions,
|
||||||
bool disablePong);
|
bool disablePong,
|
||||||
|
const std::string& republishChannel);
|
||||||
|
|
||||||
int ws_httpd_main(int port,
|
int ws_httpd_main(int port,
|
||||||
const std::string& hostname,
|
const std::string& hostname,
|
||||||
|
@@ -21,12 +21,15 @@ namespace ix
|
|||||||
server.setTLSOptions(tlsOptions);
|
server.setTLSOptions(tlsOptions);
|
||||||
|
|
||||||
server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket,
|
server.setOnConnectionCallback([&server](std::shared_ptr<WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
webSocket->setOnMessageCallback([webSocket, connectionState, &server](
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
|
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
|
||||||
const WebSocketMessagePtr& msg) {
|
const WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
spdlog::info("New connection");
|
spdlog::info("New connection");
|
||||||
|
spdlog::info("remote ip: {}", remoteIp);
|
||||||
spdlog::info("id: {}", connectionState->getId());
|
spdlog::info("id: {}", connectionState->getId());
|
||||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||||
spdlog::info("Headers:");
|
spdlog::info("Headers:");
|
||||||
|
@@ -6,12 +6,12 @@
|
|||||||
|
|
||||||
#include "IXBench.h"
|
#include "IXBench.h"
|
||||||
#include "linenoise.hpp"
|
#include "linenoise.hpp"
|
||||||
|
#include <iostream>
|
||||||
#include <ixwebsocket/IXSocket.h>
|
#include <ixwebsocket/IXSocket.h>
|
||||||
#include <ixwebsocket/IXSocketTLSOptions.h>
|
#include <ixwebsocket/IXSocketTLSOptions.h>
|
||||||
#include <ixwebsocket/IXWebSocket.h>
|
#include <ixwebsocket/IXWebSocket.h>
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <iostream>
|
|
||||||
|
|
||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
|
@@ -44,12 +44,15 @@ namespace ix
|
|||||||
|
|
||||||
server.setOnConnectionCallback(
|
server.setOnConnectionCallback(
|
||||||
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
|
[greetings](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
webSocket->setOnMessageCallback(
|
webSocket->setOnMessageCallback(
|
||||||
[webSocket, connectionState, greetings](const WebSocketMessagePtr& msg) {
|
[webSocket, connectionState, remoteIp, greetings](const WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
spdlog::info("New connection");
|
spdlog::info("New connection");
|
||||||
|
spdlog::info("remote ip: {}", remoteIp);
|
||||||
spdlog::info("id: {}", connectionState->getId());
|
spdlog::info("id: {}", connectionState->getId());
|
||||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||||
spdlog::info("Headers:");
|
spdlog::info("Headers:");
|
||||||
|
@@ -56,15 +56,18 @@ namespace ix
|
|||||||
|
|
||||||
server.setOnConnectionCallback([remoteUrl,
|
server.setOnConnectionCallback([remoteUrl,
|
||||||
verbose](std::shared_ptr<ix::WebSocket> webSocket,
|
verbose](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
|
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
|
|
||||||
// Server connection
|
// Server connection
|
||||||
state->webSocket().setOnMessageCallback([webSocket, state, verbose](
|
state->webSocket().setOnMessageCallback([webSocket, state, remoteIp, verbose](
|
||||||
const WebSocketMessagePtr& msg) {
|
const WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
spdlog::info("New connection to remote server");
|
spdlog::info("New connection to remote server");
|
||||||
|
spdlog::info("remote ip: {}", remoteIp);
|
||||||
spdlog::info("id: {}", state->getId());
|
spdlog::info("id: {}", state->getId());
|
||||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||||
spdlog::info("Headers:");
|
spdlog::info("Headers:");
|
||||||
|
@@ -4,17 +4,15 @@
|
|||||||
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "linenoise.hpp"
|
||||||
|
#include <iostream>
|
||||||
#include <ixredis/IXRedisClient.h>
|
#include <ixredis/IXRedisClient.h>
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <iostream>
|
|
||||||
#include "linenoise.hpp"
|
|
||||||
|
|
||||||
namespace ix
|
namespace ix
|
||||||
{
|
{
|
||||||
int ws_redis_cli_main(const std::string& hostname,
|
int ws_redis_cli_main(const std::string& hostname, int port, const std::string& password)
|
||||||
int port,
|
|
||||||
const std::string& password)
|
|
||||||
{
|
{
|
||||||
RedisClient redisClient;
|
RedisClient redisClient;
|
||||||
if (!redisClient.connect(hostname, port))
|
if (!redisClient.connect(hostname, port))
|
||||||
@@ -71,9 +69,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
if (response.first != RespType::String)
|
if (response.first != RespType::String)
|
||||||
{
|
{
|
||||||
std::cout << "("
|
std::cout << "(" << redisClient.getRespTypeDescription(response.first) << ")"
|
||||||
<< redisClient.getRespTypeDescription(response.first)
|
|
||||||
<< ")"
|
|
||||||
<< " ";
|
<< " ";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -45,7 +45,8 @@ namespace ix
|
|||||||
bool verbose,
|
bool verbose,
|
||||||
const std::string& appsConfigPath,
|
const std::string& appsConfigPath,
|
||||||
const SocketTLSOptions& socketTLSOptions,
|
const SocketTLSOptions& socketTLSOptions,
|
||||||
bool disablePong)
|
bool disablePong,
|
||||||
|
const std::string& republishChannel)
|
||||||
{
|
{
|
||||||
snake::AppConfig appConfig;
|
snake::AppConfig appConfig;
|
||||||
appConfig.port = port;
|
appConfig.port = port;
|
||||||
@@ -55,6 +56,7 @@ namespace ix
|
|||||||
appConfig.redisPassword = redisPassword;
|
appConfig.redisPassword = redisPassword;
|
||||||
appConfig.socketTLSOptions = socketTLSOptions;
|
appConfig.socketTLSOptions = socketTLSOptions;
|
||||||
appConfig.disablePong = disablePong;
|
appConfig.disablePong = disablePong;
|
||||||
|
appConfig.republishChannel = republishChannel;
|
||||||
|
|
||||||
// Parse config file
|
// Parse config file
|
||||||
auto str = readAsString(appsConfigPath);
|
auto str = readAsString(appsConfigPath);
|
||||||
|
@@ -20,12 +20,15 @@ namespace ix
|
|||||||
server.setTLSOptions(tlsOptions);
|
server.setTLSOptions(tlsOptions);
|
||||||
|
|
||||||
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
||||||
std::shared_ptr<ConnectionState> connectionState) {
|
std::shared_ptr<ConnectionState> connectionState,
|
||||||
webSocket->setOnMessageCallback([webSocket, connectionState, &server](
|
std::unique_ptr<ConnectionInfo> connectionInfo) {
|
||||||
|
auto remoteIp = connectionInfo->remoteIp;
|
||||||
|
webSocket->setOnMessageCallback([webSocket, connectionState, remoteIp, &server](
|
||||||
const WebSocketMessagePtr& msg) {
|
const WebSocketMessagePtr& msg) {
|
||||||
if (msg->type == ix::WebSocketMessageType::Open)
|
if (msg->type == ix::WebSocketMessageType::Open)
|
||||||
{
|
{
|
||||||
spdlog::info("ws_transfer: New connection");
|
spdlog::info("ws_transfer: New connection");
|
||||||
|
spdlog::info("remote ip: {}", remoteIp);
|
||||||
spdlog::info("id: {}", connectionState->getId());
|
spdlog::info("id: {}", connectionState->getId());
|
||||||
spdlog::info("Uri: {}", msg->openInfo.uri);
|
spdlog::info("Uri: {}", msg->openInfo.uri);
|
||||||
spdlog::info("Headers:");
|
spdlog::info("Headers:");
|
||||||
|
Reference in New Issue
Block a user