make a class hierarchy for server code (IXWebSocketServer <- IXSocketServer)

This commit is contained in:
Benjamin Sergeant 2019-01-06 12:01:33 -08:00
parent a5026849a3
commit 57c22cddb8
6 changed files with 296 additions and 216 deletions

View File

@ -17,6 +17,7 @@ endif()
set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXCancellationRequest.cpp
@ -32,6 +33,7 @@ set( IXWEBSOCKET_SOURCES
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSetThreadName.h ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h ixwebsocket/IXDNSLookup.h

View File

@ -237,7 +237,7 @@ namespace ix
status = SSLRead(_sslContext, buf, nbyte, &processed); status = SSLRead(_sslContext, buf, nbyte, &processed);
if (processed > 0) if (processed > 0)
return (int) processed; return (ssize_t) processed;
// The connection was reset, inform the caller that this // The connection was reset, inform the caller that this
// Socket should close // Socket should close

View File

@ -0,0 +1,212 @@
/*
* IXSocketServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocketServer.h"
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXNetSystem.h"
#include <iostream>
#include <sstream>
#include <future>
#include <string.h>
namespace ix
{
const int SocketServer::kDefaultPort(8080);
const std::string SocketServer::kDefaultHost("127.0.0.1");
const int SocketServer::kDefaultTcpBacklog(5);
const size_t SocketServer::kDefaultMaxConnections(32);
SocketServer::SocketServer(int port,
const std::string& host,
int backlog,
size_t maxConnections) :
_port(port),
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_stop(false)
{
}
SocketServer::~SocketServer()
{
stop();
}
void SocketServer::logError(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl;
}
void SocketServer::logInfo(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl;
}
std::pair<bool, std::string> SocketServer::listen()
{
struct sockaddr_in server; // server address information
// Get a socket for accepting connections.
if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error creating socket): "
<< strerror(Socket::getErrno());
return std::make_pair(false, ss.str());
}
// Make that socket reusable. (allow restarting this server at will)
int enable = 1;
if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR,
(char*) &enable, sizeof(enable)) < 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error calling setsockopt(SO_REUSEADDR): "
<< strerror(errno);
return std::make_pair(false, ss.str());
}
// Bind the socket to the server address.
server.sin_family = AF_INET;
server.sin_port = htons(_port);
// Using INADDR_ANY trigger a pop-up box as binding to any address is detected
// by the osx firewall. We need to codesign the binary with a self-signed cert
// to allow that, but this is a bit of a pain. (this is what node or python would do).
//
// Using INADDR_LOOPBACK also does not work ... while it should.
// We default to 127.0.0.1 (localhost)
//
server.sin_addr.s_addr = inet_addr(_host.c_str());
if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error calling bind: "
<< strerror(Socket::getErrno());
return std::make_pair(false, ss.str());
}
/*
* Listen for connections. Specify the tcp backlog.
*/
if (::listen(_serverFd, _backlog) != 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error calling listen: "
<< strerror(Socket::getErrno());
return std::make_pair(false, ss.str());
}
return std::make_pair(true, "");
}
void SocketServer::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&SocketServer::run, this);
}
void SocketServer::wait()
{
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_conditionVariable.wait(lock);
}
void SocketServer::stop()
{
if (!_thread.joinable()) return; // nothing to do
_stop = true;
_thread.join();
_stop = false;
_conditionVariable.notify_one();
}
void SocketServer::run()
{
// Set the socket to non blocking mode, so that accept calls are not blocking
SocketConnect::configure(_serverFd);
// Return value of std::async, ignored
std::future<void> f;
// Select arguments
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms
for (;;)
{
if (_stop) return;
FD_ZERO(&rfds);
FD_SET(_serverFd, &rfds);
select(_serverFd + 1, &rfds, nullptr, nullptr, &timeout);
if (!FD_ISSET(_serverFd, &rfds))
{
// We reached the select timeout, and no new connections are pending
continue;
}
// Accept a connection.
struct sockaddr_in client; // client address information
int clientFd; // socket connected to client
socklen_t addressLen = sizeof(socklen_t);
memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{
if (Socket::getErrno() != EWOULDBLOCK)
{
// FIXME: that error should be propagated
std::stringstream ss;
ss << "SocketServer::run() error accepting connection: "
<< strerror(Socket::getErrno());
logError(ss.str());
}
continue;
}
if (getConnectedClientsCount() >= _maxConnections)
{
std::stringstream ss;
ss << "SocketServer::run() reached max connections = "
<< _maxConnections << ". "
<< "Not accepting connection";
logError(ss.str());
::close(clientFd);
continue;
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
// so we need to declare it outside of this loop
f = std::async(std::launch::async,
&SocketServer::handleConnection,
this,
clientFd);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* IXSocketServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <utility> // pair
#include <string>
#include <set>
#include <thread>
#include <mutex>
#include <functional>
#include <memory>
#include <condition_variable>
namespace ix
{
class SocketServer {
public:
SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = SocketServer::kDefaultMaxConnections);
virtual ~SocketServer();
virtual void stop();
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
const static size_t kDefaultMaxConnections;
void start();
std::pair<bool, std::string> listen();
void wait();
protected:
// Logging
void logError(const std::string& str);
void logInfo(const std::string& str);
private:
// Member variables
int _port;
std::string _host;
int _backlog;
size_t _maxConnections;
// socket for accepting connections
int _serverFd;
std::mutex _logMutex;
std::atomic<bool> _stop;
std::thread _thread;
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
// Methods
void run();
virtual void handleConnection(int fd) = 0;
virtual size_t getConnectedClientsCount() = 0;
};
}

View File

@ -16,23 +16,14 @@
namespace ix namespace ix
{ {
const int WebSocketServer::kDefaultPort(8080);
const std::string WebSocketServer::kDefaultHost("127.0.0.1");
const int WebSocketServer::kDefaultTcpBacklog(5);
const size_t WebSocketServer::kDefaultMaxConnections(32);
const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds
WebSocketServer::WebSocketServer(int port, WebSocketServer::WebSocketServer(int port,
const std::string& host, const std::string& host,
int backlog, int backlog,
size_t maxConnections, size_t maxConnections,
int handshakeTimeoutSecs) : int handshakeTimeoutSecs) : SocketServer(port, host, backlog, maxConnections),
_port(port), _handshakeTimeoutSecs(handshakeTimeoutSecs)
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_handshakeTimeoutSecs(handshakeTimeoutSecs),
_stop(false)
{ {
} }
@ -42,185 +33,20 @@ namespace ix
stop(); stop();
} }
void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback)
{
_onConnectionCallback = callback;
}
void WebSocketServer::logError(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl;
}
void WebSocketServer::logInfo(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl;
}
std::pair<bool, std::string> WebSocketServer::listen()
{
struct sockaddr_in server; // server address information
// Get a socket for accepting connections.
if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error creating socket): "
<< strerror(Socket::getErrno());
return std::make_pair(false, ss.str());
}
// Make that socket reusable. (allow restarting this server at will)
int enable = 1;
if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR,
(char*) &enable, sizeof(enable)) < 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error calling setsockopt(SO_REUSEADDR): "
<< strerror(errno);
return std::make_pair(false, ss.str());
}
// Bind the socket to the server address.
server.sin_family = AF_INET;
server.sin_port = htons(_port);
// Using INADDR_ANY trigger a pop-up box as binding to any address is detected
// by the osx firewall. We need to codesign the binary with a self-signed cert
// to allow that, but this is a bit of a pain. (this is what node or python would do).
//
// Using INADDR_LOOPBACK also does not work ... while it should.
// We default to 127.0.0.1 (localhost)
//
server.sin_addr.s_addr = inet_addr(_host.c_str());
if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error calling bind: "
<< strerror(Socket::getErrno());
return std::make_pair(false, ss.str());
}
/*
* Listen for connections. Specify the tcp backlog.
*/
if (::listen(_serverFd, _backlog) != 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error calling listen: "
<< strerror(Socket::getErrno());
return std::make_pair(false, ss.str());
}
return std::make_pair(true, "");
}
void WebSocketServer::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&WebSocketServer::run, this);
}
void WebSocketServer::wait()
{
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_conditionVariable.wait(lock);
}
void WebSocketServer::stop() void WebSocketServer::stop()
{ {
if (!_thread.joinable()) return; // nothing to do
auto clients = getClients(); auto clients = getClients();
for (auto client : clients) for (auto client : clients)
{ {
client->close(); client->close();
} }
_stop = true; SocketServer::stop();
_thread.join();
_stop = false;
_conditionVariable.notify_one();
} }
void WebSocketServer::run() void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback)
{ {
// Set the socket to non blocking mode, so that accept calls are not blocking _onConnectionCallback = callback;
SocketConnect::configure(_serverFd);
// Return value of std::async, ignored
std::future<void> f;
// Select arguments
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms
for (;;)
{
if (_stop) return;
FD_ZERO(&rfds);
FD_SET(_serverFd, &rfds);
select(_serverFd + 1, &rfds, nullptr, nullptr, &timeout);
if (!FD_ISSET(_serverFd, &rfds))
{
// We reached the select timeout, and no new connections are pending
continue;
}
// Accept a connection.
struct sockaddr_in client; // client address information
int clientFd; // socket connected to client
socklen_t addressLen = sizeof(socklen_t);
memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{
if (Socket::getErrno() != EWOULDBLOCK)
{
// FIXME: that error should be propagated
std::stringstream ss;
ss << "WebSocketServer::run() error accepting connection: "
<< strerror(Socket::getErrno());
logError(ss.str());
}
continue;
}
if (getConnectedClientsCount() >= _maxConnections)
{
std::stringstream ss;
ss << "WebSocketServer::run() reached max connections = "
<< _maxConnections << ". "
<< "Not accepting connection";
logError(ss.str());
::close(clientFd);
continue;
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
// so we need to declare it outside of this loop
f = std::async(std::launch::async,
&WebSocketServer::handleConnection,
this,
clientFd);
}
} }
void WebSocketServer::handleConnection(int fd) void WebSocketServer::handleConnection(int fd)

View File

@ -16,67 +16,40 @@
#include <condition_variable> #include <condition_variable>
#include "IXWebSocket.h" #include "IXWebSocket.h"
#include "IXSocketServer.h"
namespace ix namespace ix
{ {
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>; using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
class WebSocketServer { class WebSocketServer : public SocketServer {
public: public:
WebSocketServer(int port = WebSocketServer::kDefaultPort, WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = WebSocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
int backlog = WebSocketServer::kDefaultTcpBacklog, int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = WebSocketServer::kDefaultMaxConnections, size_t maxConnections = SocketServer::kDefaultMaxConnections,
int handshakeTimeoutSecs = WebSocketServer::kDefaultHandShakeTimeoutSecs); int handshakeTimeoutSecs = WebSocketServer::kDefaultHandShakeTimeoutSecs);
virtual ~WebSocketServer(); virtual ~WebSocketServer();
virtual void stop() final;
void setOnConnectionCallback(const OnConnectionCallback& callback); void setOnConnectionCallback(const OnConnectionCallback& callback);
void start();
void wait();
void stop();
std::pair<bool, std::string> listen();
// Get all the connected clients // Get all the connected clients
std::set<std::shared_ptr<WebSocket>> getClients(); std::set<std::shared_ptr<WebSocket>> getClients();
private: private:
// Member variables // Member variables
int _port;
std::string _host;
int _backlog;
size_t _maxConnections;
int _handshakeTimeoutSecs; int _handshakeTimeoutSecs;
OnConnectionCallback _onConnectionCallback; OnConnectionCallback _onConnectionCallback;
// socket for accepting connections
int _serverFd;
std::mutex _clientsMutex; std::mutex _clientsMutex;
std::set<std::shared_ptr<WebSocket>> _clients; std::set<std::shared_ptr<WebSocket>> _clients;
std::mutex _logMutex;
std::atomic<bool> _stop;
std::thread _thread;
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
const static size_t kDefaultMaxConnections;
const static int kDefaultHandShakeTimeoutSecs; const static int kDefaultHandShakeTimeoutSecs;
// Methods // Methods
void run(); virtual void handleConnection(int fd) final;
void handleConnection(int fd); virtual size_t getConnectedClientsCount() final;
size_t getConnectedClientsCount();
// Logging
void logError(const std::string& str);
void logInfo(const std::string& str);
}; };
} }