(socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis

This commit is contained in:
Benjamin Sergeant 2020-08-28 14:55:40 -07:00
parent 629c155044
commit 73b9c0b89b
25 changed files with 97 additions and 123 deletions

View File

@ -62,7 +62,6 @@ set( IXWEBSOCKET_SOURCES
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXBench.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXConnectionInfo.h
ixwebsocket/IXConnectionState.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXExponentialBackoff.h

View File

@ -2,6 +2,10 @@
All changes to this project will be documented in this file.
## [10.3.1] - 2020-08-28
(socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis
## [10.3.0] - 2020-08-26
(ws) set the main thread name, to help with debugging in XCode, gdb, lldb etc...

View File

@ -280,10 +280,9 @@ ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::weak_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
std::cout << "Remote ip: " << connectionInfo->remoteIp << std::endl;
std::cout << "Remote ip: " << connectionState->remoteIp << std::endl;
auto ws = webSocket.lock();
if (ws)
@ -359,13 +358,12 @@ The webSocket reference is guaranteed to be always valid ; by design the callbac
ix::WebSocketServer server(port);
server.setOnClientMessageCallback(std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg)
{
// The ConnectionInfo object contains information about the connection,
// The ConnectionState object contains information about the connection,
// at this point only the client ip address and the port.
std::cout << "Remote ip: " << connectionInfo.remoteIp << std::endl;
std::cout << "Remote ip: " << connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
@ -519,12 +517,11 @@ If you want to handle how requests are processed, implement the setOnConnectionC
```cpp
setOnConnectionCallback(
[this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr
{
// Build a string for the response
std::stringstream ss;
ss << connectionInfo->remoteIp
ss << connectionState->getRemoteIp();
<< " "
<< request->method
<< " "

View File

@ -45,10 +45,9 @@ namespace ix
}
void RedisServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
logInfo("New connection from remote ip " + connectionInfo->remoteIp);
logInfo("New connection from remote ip " + connectionState->getRemoteIp());
_connectedClientsCount++;

View File

@ -44,8 +44,7 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
bool startsWith(const std::string& str, const std::string& start);

View File

@ -61,11 +61,10 @@ namespace snake
_server.setOnClientMessageCallback(
[this](std::shared_ptr<ix::ConnectionState> connectionState,
ix::ConnectionInfo& connectionInfo,
ix::WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto state = std::dynamic_pointer_cast<SnakeConnectionState>(connectionState);
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
std::stringstream ss;
ss << "[" << state->getId() << "] ";

View File

@ -1,25 +0,0 @@
/*
* IXConnectionInfo.h
* Author: Benjamin Sergeant
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
namespace ix
{
struct ConnectionInfo
{
std::string remoteIp;
int remotePort;
ConnectionInfo(const std::string& r = std::string(), int p = 0)
: remoteIp(r)
, remotePort(p)
{
;
}
};
} // namespace ix

View File

@ -50,4 +50,24 @@ namespace ix
_onSetTerminatedCallback();
}
}
const std::string& ConnectionState::getRemoteIp()
{
return _remoteIp;
}
int ConnectionState::getRemotePort()
{
return _remotePort;
}
void ConnectionState::setRemoteIp(const std::string& remoteIp)
{
_remoteIp = remoteIp;
}
void ConnectionState::setRemotePort(int remotePort)
{
_remotePort = remotePort;
}
} // namespace ix

View File

@ -28,11 +28,17 @@ namespace ix
void setTerminated();
bool isTerminated() const;
const std::string& getRemoteIp();
int getRemotePort();
static std::shared_ptr<ConnectionState> createConnectionState();
private:
void setOnSetTerminatedCallback(const OnSetTerminatedCallback& callback);
void setRemoteIp(const std::string& remoteIp);
void setRemotePort(int remotePort);
protected:
std::atomic<bool> _terminated;
std::string _id;
@ -40,6 +46,9 @@ namespace ix
static std::atomic<uint64_t> _globalId;
std::string _remoteIp;
int _remotePort;
friend class SocketServer;
};
} // namespace ix

View File

@ -120,8 +120,7 @@ namespace ix
}
void HttpServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
_connectedClientsCount++;
@ -130,8 +129,7 @@ namespace ix
if (std::get<0>(ret))
{
auto response =
_onConnectionCallback(std::get<2>(ret), connectionState, std::move(connectionInfo));
auto response = _onConnectionCallback(std::get<2>(ret), connectionState);
if (!Http::sendResponse(response, socket))
{
logError("Cannot send response");
@ -151,8 +149,7 @@ namespace ix
{
setOnConnectionCallback(
[this](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
std::string uri(request->uri);
if (uri.empty() || uri == "/")
{
@ -184,8 +181,8 @@ namespace ix
// Log request
std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
<< " " << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri << " " << content.size();
logInfo(ss.str());
@ -209,16 +206,16 @@ namespace ix
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections
//
setOnConnectionCallback(
[this, redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
[this,
redirectUrl](HttpRequestPtr request,
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
<< " " << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
logInfo(ss.str());

View File

@ -23,9 +23,7 @@ namespace ix
{
public:
using OnConnectionCallback =
std::function<HttpResponsePtr(HttpRequestPtr,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
std::function<HttpResponsePtr(HttpRequestPtr, std::shared_ptr<ConnectionState>)>;
HttpServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
@ -46,8 +44,7 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) final;
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
void setDefaultConnectionCallback();

View File

@ -332,12 +332,13 @@ namespace ix
}
// Retrieve connection info, the ip address of the remote peer/client)
std::unique_ptr<ConnectionInfo> connectionInfo;
std::string remoteIp;
int remotePort;
if (_addressFamily == AF_INET)
{
char remoteIp[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp, INET_ADDRSTRLEN) == nullptr)
char remoteIp4[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &client.sin_addr, remoteIp4, INET_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
@ -350,12 +351,13 @@ namespace ix
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
remotePort = client.sin_port;
remoteIp = remoteIp4;
}
else // AF_INET6
{
char remoteIp[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp, INET6_ADDRSTRLEN) == nullptr)
char remoteIp6[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, &client.sin_addr, remoteIp6, INET6_ADDRSTRLEN) == nullptr)
{
int err = Socket::getErrno();
std::stringstream ss;
@ -368,7 +370,8 @@ namespace ix
continue;
}
connectionInfo = std::make_unique<ConnectionInfo>(remoteIp, client.sin_port);
remotePort = client.sin_port;
remoteIp = remoteIp6;
}
std::shared_ptr<ConnectionState> connectionState;
@ -377,6 +380,8 @@ namespace ix
connectionState = _connectionStateFactory();
}
connectionState->setOnSetTerminatedCallback([this] { onSetTerminatedCallback(); });
connectionState->setRemoteIp(remoteIp);
connectionState->setRemotePort(remotePort);
if (_stop) return;
@ -404,13 +409,10 @@ namespace ix
// Launch the handleConnection work asynchronously in its own thread.
std::lock_guard<std::mutex> lock(_connectionsThreadsMutex);
_connectionsThreads.push_back(
std::make_pair(connectionState,
std::thread(&SocketServer::handleConnection,
this,
std::move(socket),
_connectionsThreads.push_back(std::make_pair(
connectionState,
std::move(connectionInfo))));
std::thread(
&SocketServer::handleConnection, this, std::move(socket), connectionState)));
}
}

View File

@ -6,7 +6,6 @@
#pragma once
#include "IXConnectionInfo.h"
#include "IXConnectionState.h"
#include "IXSelectInterrupt.h"
#include "IXSocketTLSOptions.h"
@ -105,8 +104,7 @@ namespace ix
ConnectionStateFactory _connectionStateFactory;
virtual void handleConnection(std::unique_ptr<Socket>,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) = 0;
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;
// Returns true if all connection threads are joined

View File

@ -56,10 +56,9 @@ namespace ix
server.setOnConnectionCallback(
[remoteUrl, remoteUrlsMapping](std::weak_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo) {
std::shared_ptr<ConnectionState> connectionState) {
auto state = std::dynamic_pointer_cast<ProxyConnectionState>(connectionState);
auto remoteIp = connectionInfo->remoteIp;
auto remoteIp = connectionState->getRemoteIp();
// Server connection
state->webSocket().setOnMessageCallback(

View File

@ -77,15 +77,14 @@ namespace ix
}
void WebSocketServer::handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo)
std::shared_ptr<ConnectionState> connectionState)
{
setThreadName("WebSocketServer::" + connectionState->getId());
auto webSocket = std::make_shared<WebSocket>();
if (_onConnectionCallback)
{
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
_onConnectionCallback(webSocket, connectionState);
if (!webSocket->isOnMessageCallbackRegistered())
{
@ -99,9 +98,8 @@ namespace ix
else if (_onClientMessageCallback)
{
webSocket->setOnMessageCallback(
[this, &ws = *webSocket.get(), connectionState, &ci = *connectionInfo.get()](
const WebSocketMessagePtr& msg) {
_onClientMessageCallback(connectionState, ci, ws, msg);
[this, &ws = *webSocket.get(), connectionState](const WebSocketMessagePtr& msg) {
_onClientMessageCallback(connectionState, ws, msg);
});
}
else

View File

@ -23,14 +23,10 @@ namespace ix
{
public:
using OnConnectionCallback =
std::function<void(std::weak_ptr<WebSocket>,
std::shared_ptr<ConnectionState>,
std::unique_ptr<ConnectionInfo> connectionInfo)>;
std::function<void(std::weak_ptr<WebSocket>, std::shared_ptr<ConnectionState>)>;
using OnClientMessageCallback = std::function<void(std::shared_ptr<ConnectionState>,
ConnectionInfo&,
WebSocket&,
const WebSocketMessagePtr&)>;
using OnClientMessageCallback = std::function<void(
std::shared_ptr<ConnectionState>, WebSocket&, const WebSocketMessagePtr&)>;
WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
@ -69,8 +65,7 @@ namespace ix
// Methods
virtual void handleConnection(std::unique_ptr<Socket> socket,
std::shared_ptr<ConnectionState> connectionState,
std::unique_ptr<ConnectionInfo> connectionInfo);
std::shared_ptr<ConnectionState> connectionState);
virtual size_t getConnectedClientsCount() final;
};
} // namespace ix

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "10.3.0"
#define IX_WEBSOCKET_VERSION "10.3.1"

View File

@ -95,15 +95,14 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
sentryServer.setOnConnectionCallback(
[](HttpRequestPtr request,
std::shared_ptr<ConnectionState> /*connectionState*/,
std::unique_ptr<ConnectionInfo> connectionInfo) -> HttpResponsePtr {
std::shared_ptr<ConnectionState> connectionState) -> HttpResponsePtr {
WebSocketHttpHeaders headers;
headers["Server"] = userAgent();
// Log request
std::stringstream ss;
ss << connectionInfo->remoteIp << ":" << connectionInfo->remotePort << " "
<< request->method << " " << request->headers["User-Agent"] << " "
ss << connectionState->getRemoteIp() << ":" << connectionState->getRemotePort()
<< " " << request->method << " " << request->headers["User-Agent"] << " "
<< request->uri;
if (request->method == "POST")

View File

@ -85,11 +85,10 @@ namespace ix
bool startWebSocketEchoServer(ix::WebSocketServer& server)
{
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> /*connectionState*/,
ConnectionInfo& connectionInfo,
[&server](std::shared_ptr<ConnectionState> connectionState,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";

View File

@ -191,11 +191,9 @@ namespace
server.setOnClientMessageCallback(
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{

View File

@ -195,10 +195,9 @@ namespace
{
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";

View File

@ -171,10 +171,9 @@ namespace
server.setOnClientMessageCallback(
[&receivedCloseCode, &receivedCloseReason, &receivedCloseRemote, &mutexWrite](
std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& /*webSocket*/,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New server connection";

View File

@ -35,11 +35,9 @@ namespace ix
server.setOnClientMessageCallback(
[&server, &connectionId](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{

View File

@ -18,10 +18,9 @@ bool startServer(ix::WebSocketServer& server, std::string& subProtocols)
{
server.setOnClientMessageCallback(
[&server, &subProtocols](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const ix::WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
TLogger() << "New connection";

View File

@ -412,10 +412,9 @@ namespace ix
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
@ -1240,10 +1239,9 @@ namespace ix
server.setOnClientMessageCallback(
[greetings](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
@ -1674,10 +1672,9 @@ namespace ix
server.setOnClientMessageCallback(
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("New connection");
@ -2613,10 +2610,9 @@ namespace ix
server.setOnClientMessageCallback(
[&server](std::shared_ptr<ConnectionState> connectionState,
ConnectionInfo& connectionInfo,
WebSocket& webSocket,
const WebSocketMessagePtr& msg) {
auto remoteIp = connectionInfo.remoteIp;
auto remoteIp = connectionState->getRemoteIp();
if (msg->type == ix::WebSocketMessageType::Open)
{
spdlog::info("ws_transfer: New connection");