diff --git a/CMakeLists.txt b/CMakeLists.txt index e43f1987..81419813 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ endif() set( IXWEBSOCKET_SOURCES ixwebsocket/IXEventFd.cpp ixwebsocket/IXSocket.cpp + ixwebsocket/IXSocketServer.cpp ixwebsocket/IXSocketConnect.cpp ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXCancellationRequest.cpp @@ -32,6 +33,7 @@ set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_HEADERS ixwebsocket/IXEventFd.h ixwebsocket/IXSocket.h + ixwebsocket/IXSocketServer.h ixwebsocket/IXSocketConnect.h ixwebsocket/IXSetThreadName.h ixwebsocket/IXDNSLookup.h diff --git a/ixwebsocket/IXSocketAppleSSL.cpp b/ixwebsocket/IXSocketAppleSSL.cpp index 01c4ce2c..7cd2faff 100644 --- a/ixwebsocket/IXSocketAppleSSL.cpp +++ b/ixwebsocket/IXSocketAppleSSL.cpp @@ -237,7 +237,7 @@ namespace ix status = SSLRead(_sslContext, buf, nbyte, &processed); if (processed > 0) - return (int) processed; + return (ssize_t) processed; // The connection was reset, inform the caller that this // Socket should close diff --git a/ixwebsocket/IXSocketServer.cpp b/ixwebsocket/IXSocketServer.cpp new file mode 100644 index 00000000..317b3d8a --- /dev/null +++ b/ixwebsocket/IXSocketServer.cpp @@ -0,0 +1,212 @@ +/* + * IXSocketServer.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#include "IXSocketServer.h" +#include "IXSocket.h" +#include "IXSocketConnect.h" +#include "IXNetSystem.h" + +#include +#include +#include +#include + +namespace ix +{ + const int SocketServer::kDefaultPort(8080); + const std::string SocketServer::kDefaultHost("127.0.0.1"); + const int SocketServer::kDefaultTcpBacklog(5); + const size_t SocketServer::kDefaultMaxConnections(32); + + SocketServer::SocketServer(int port, + const std::string& host, + int backlog, + size_t maxConnections) : + _port(port), + _host(host), + _backlog(backlog), + _maxConnections(maxConnections), + _stop(false) + { + + } + + SocketServer::~SocketServer() + { + stop(); + } + + void SocketServer::logError(const std::string& str) + { + std::lock_guard lock(_logMutex); + std::cerr << str << std::endl; + } + + void SocketServer::logInfo(const std::string& str) + { + std::lock_guard lock(_logMutex); + std::cout << str << std::endl; + } + + std::pair SocketServer::listen() + { + struct sockaddr_in server; // server address information + + // Get a socket for accepting connections. + if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + std::stringstream ss; + ss << "SocketServer::listen() error creating socket): " + << strerror(Socket::getErrno()); + + return std::make_pair(false, ss.str()); + } + + // Make that socket reusable. (allow restarting this server at will) + int enable = 1; + if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR, + (char*) &enable, sizeof(enable)) < 0) + { + std::stringstream ss; + ss << "SocketServer::listen() error calling setsockopt(SO_REUSEADDR): " + << strerror(errno); + + return std::make_pair(false, ss.str()); + } + + // Bind the socket to the server address. + server.sin_family = AF_INET; + server.sin_port = htons(_port); + + // Using INADDR_ANY trigger a pop-up box as binding to any address is detected + // by the osx firewall. We need to codesign the binary with a self-signed cert + // to allow that, but this is a bit of a pain. (this is what node or python would do). + // + // Using INADDR_LOOPBACK also does not work ... while it should. + // We default to 127.0.0.1 (localhost) + // + server.sin_addr.s_addr = inet_addr(_host.c_str()); + + if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0) + { + std::stringstream ss; + ss << "SocketServer::listen() error calling bind: " + << strerror(Socket::getErrno()); + + return std::make_pair(false, ss.str()); + } + + /* + * Listen for connections. Specify the tcp backlog. + */ + if (::listen(_serverFd, _backlog) != 0) + { + std::stringstream ss; + ss << "SocketServer::listen() error calling listen: " + << strerror(Socket::getErrno()); + + return std::make_pair(false, ss.str()); + } + + return std::make_pair(true, ""); + } + + void SocketServer::start() + { + if (_thread.joinable()) return; // we've already been started + + _thread = std::thread(&SocketServer::run, this); + } + + void SocketServer::wait() + { + std::unique_lock lock(_conditionVariableMutex); + _conditionVariable.wait(lock); + } + + void SocketServer::stop() + { + if (!_thread.joinable()) return; // nothing to do + + _stop = true; + _thread.join(); + _stop = false; + + _conditionVariable.notify_one(); + } + + void SocketServer::run() + { + // Set the socket to non blocking mode, so that accept calls are not blocking + SocketConnect::configure(_serverFd); + + // Return value of std::async, ignored + std::future f; + + // Select arguments + fd_set rfds; + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 10 * 1000; // 10ms + + for (;;) + { + if (_stop) return; + + FD_ZERO(&rfds); + FD_SET(_serverFd, &rfds); + select(_serverFd + 1, &rfds, nullptr, nullptr, &timeout); + + if (!FD_ISSET(_serverFd, &rfds)) + { + // We reached the select timeout, and no new connections are pending + continue; + } + + // Accept a connection. + struct sockaddr_in client; // client address information + int clientFd; // socket connected to client + socklen_t addressLen = sizeof(socklen_t); + memset(&client, 0, sizeof(client)); + + if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0) + { + if (Socket::getErrno() != EWOULDBLOCK) + { + // FIXME: that error should be propagated + std::stringstream ss; + ss << "SocketServer::run() error accepting connection: " + << strerror(Socket::getErrno()); + logError(ss.str()); + } + continue; + } + + if (getConnectedClientsCount() >= _maxConnections) + { + std::stringstream ss; + ss << "SocketServer::run() reached max connections = " + << _maxConnections << ". " + << "Not accepting connection"; + logError(ss.str()); + + ::close(clientFd); + + continue; + } + + // Launch the handleConnection work asynchronously in its own thread. + // + // the destructor of a future returned by std::async blocks, + // so we need to declare it outside of this loop + f = std::async(std::launch::async, + &SocketServer::handleConnection, + this, + clientFd); + } + } +} + diff --git a/ixwebsocket/IXSocketServer.h b/ixwebsocket/IXSocketServer.h new file mode 100644 index 00000000..c8e2ba01 --- /dev/null +++ b/ixwebsocket/IXSocketServer.h @@ -0,0 +1,67 @@ +/* + * IXSocketServer.h + * Author: Benjamin Sergeant + * Copyright (c) 2018 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include // pair +#include +#include +#include +#include +#include +#include +#include + +namespace ix +{ + class SocketServer { + public: + SocketServer(int port = SocketServer::kDefaultPort, + const std::string& host = SocketServer::kDefaultHost, + int backlog = SocketServer::kDefaultTcpBacklog, + size_t maxConnections = SocketServer::kDefaultMaxConnections); + virtual ~SocketServer(); + virtual void stop(); + + const static int kDefaultPort; + const static std::string kDefaultHost; + const static int kDefaultTcpBacklog; + const static size_t kDefaultMaxConnections; + + void start(); + std::pair listen(); + void wait(); + + protected: + + // Logging + void logError(const std::string& str); + void logInfo(const std::string& str); + + private: + // Member variables + int _port; + std::string _host; + int _backlog; + size_t _maxConnections; + + // socket for accepting connections + int _serverFd; + + std::mutex _logMutex; + + std::atomic _stop; + std::thread _thread; + + std::condition_variable _conditionVariable; + std::mutex _conditionVariableMutex; + + // Methods + void run(); + virtual void handleConnection(int fd) = 0; + virtual size_t getConnectedClientsCount() = 0; + }; +} diff --git a/ixwebsocket/IXWebSocketServer.cpp b/ixwebsocket/IXWebSocketServer.cpp index f6da46e8..a6c07490 100644 --- a/ixwebsocket/IXWebSocketServer.cpp +++ b/ixwebsocket/IXWebSocketServer.cpp @@ -16,23 +16,14 @@ namespace ix { - const int WebSocketServer::kDefaultPort(8080); - const std::string WebSocketServer::kDefaultHost("127.0.0.1"); - const int WebSocketServer::kDefaultTcpBacklog(5); - const size_t WebSocketServer::kDefaultMaxConnections(32); const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds WebSocketServer::WebSocketServer(int port, const std::string& host, int backlog, size_t maxConnections, - int handshakeTimeoutSecs) : - _port(port), - _host(host), - _backlog(backlog), - _maxConnections(maxConnections), - _handshakeTimeoutSecs(handshakeTimeoutSecs), - _stop(false) + int handshakeTimeoutSecs) : SocketServer(port, host, backlog, maxConnections), + _handshakeTimeoutSecs(handshakeTimeoutSecs) { } @@ -42,185 +33,20 @@ namespace ix stop(); } - void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback) - { - _onConnectionCallback = callback; - } - - void WebSocketServer::logError(const std::string& str) - { - std::lock_guard lock(_logMutex); - std::cerr << str << std::endl; - } - - void WebSocketServer::logInfo(const std::string& str) - { - std::lock_guard lock(_logMutex); - std::cout << str << std::endl; - } - - std::pair WebSocketServer::listen() - { - struct sockaddr_in server; // server address information - - // Get a socket for accepting connections. - if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0) - { - std::stringstream ss; - ss << "WebSocketServer::listen() error creating socket): " - << strerror(Socket::getErrno()); - - return std::make_pair(false, ss.str()); - } - - // Make that socket reusable. (allow restarting this server at will) - int enable = 1; - if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR, - (char*) &enable, sizeof(enable)) < 0) - { - std::stringstream ss; - ss << "WebSocketServer::listen() error calling setsockopt(SO_REUSEADDR): " - << strerror(errno); - - return std::make_pair(false, ss.str()); - } - - // Bind the socket to the server address. - server.sin_family = AF_INET; - server.sin_port = htons(_port); - - // Using INADDR_ANY trigger a pop-up box as binding to any address is detected - // by the osx firewall. We need to codesign the binary with a self-signed cert - // to allow that, but this is a bit of a pain. (this is what node or python would do). - // - // Using INADDR_LOOPBACK also does not work ... while it should. - // We default to 127.0.0.1 (localhost) - // - server.sin_addr.s_addr = inet_addr(_host.c_str()); - - if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0) - { - std::stringstream ss; - ss << "WebSocketServer::listen() error calling bind: " - << strerror(Socket::getErrno()); - - return std::make_pair(false, ss.str()); - } - - /* - * Listen for connections. Specify the tcp backlog. - */ - if (::listen(_serverFd, _backlog) != 0) - { - std::stringstream ss; - ss << "WebSocketServer::listen() error calling listen: " - << strerror(Socket::getErrno()); - - return std::make_pair(false, ss.str()); - } - - return std::make_pair(true, ""); - } - - void WebSocketServer::start() - { - if (_thread.joinable()) return; // we've already been started - - _thread = std::thread(&WebSocketServer::run, this); - } - - void WebSocketServer::wait() - { - std::unique_lock lock(_conditionVariableMutex); - _conditionVariable.wait(lock); - } - void WebSocketServer::stop() { - if (!_thread.joinable()) return; // nothing to do - auto clients = getClients(); for (auto client : clients) { client->close(); } - _stop = true; - _thread.join(); - _stop = false; - - _conditionVariable.notify_one(); + SocketServer::stop(); } - void WebSocketServer::run() + void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback) { - // Set the socket to non blocking mode, so that accept calls are not blocking - SocketConnect::configure(_serverFd); - - // Return value of std::async, ignored - std::future f; - - // Select arguments - fd_set rfds; - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 10 * 1000; // 10ms - - for (;;) - { - if (_stop) return; - - FD_ZERO(&rfds); - FD_SET(_serverFd, &rfds); - select(_serverFd + 1, &rfds, nullptr, nullptr, &timeout); - - if (!FD_ISSET(_serverFd, &rfds)) - { - // We reached the select timeout, and no new connections are pending - continue; - } - - // Accept a connection. - struct sockaddr_in client; // client address information - int clientFd; // socket connected to client - socklen_t addressLen = sizeof(socklen_t); - memset(&client, 0, sizeof(client)); - - if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0) - { - if (Socket::getErrno() != EWOULDBLOCK) - { - // FIXME: that error should be propagated - std::stringstream ss; - ss << "WebSocketServer::run() error accepting connection: " - << strerror(Socket::getErrno()); - logError(ss.str()); - } - continue; - } - - if (getConnectedClientsCount() >= _maxConnections) - { - std::stringstream ss; - ss << "WebSocketServer::run() reached max connections = " - << _maxConnections << ". " - << "Not accepting connection"; - logError(ss.str()); - - ::close(clientFd); - - continue; - } - - // Launch the handleConnection work asynchronously in its own thread. - // - // the destructor of a future returned by std::async blocks, - // so we need to declare it outside of this loop - f = std::async(std::launch::async, - &WebSocketServer::handleConnection, - this, - clientFd); - } + _onConnectionCallback = callback; } void WebSocketServer::handleConnection(int fd) diff --git a/ixwebsocket/IXWebSocketServer.h b/ixwebsocket/IXWebSocketServer.h index b15e34ca..f4b161cb 100644 --- a/ixwebsocket/IXWebSocketServer.h +++ b/ixwebsocket/IXWebSocketServer.h @@ -16,67 +16,40 @@ #include #include "IXWebSocket.h" +#include "IXSocketServer.h" namespace ix { using OnConnectionCallback = std::function)>; - class WebSocketServer { + class WebSocketServer : public SocketServer { public: - WebSocketServer(int port = WebSocketServer::kDefaultPort, - const std::string& host = WebSocketServer::kDefaultHost, - int backlog = WebSocketServer::kDefaultTcpBacklog, - size_t maxConnections = WebSocketServer::kDefaultMaxConnections, + WebSocketServer(int port = SocketServer::kDefaultPort, + const std::string& host = SocketServer::kDefaultHost, + int backlog = SocketServer::kDefaultTcpBacklog, + size_t maxConnections = SocketServer::kDefaultMaxConnections, int handshakeTimeoutSecs = WebSocketServer::kDefaultHandShakeTimeoutSecs); virtual ~WebSocketServer(); + virtual void stop() final; void setOnConnectionCallback(const OnConnectionCallback& callback); - void start(); - void wait(); - void stop(); - - std::pair listen(); // Get all the connected clients std::set> getClients(); private: // Member variables - int _port; - std::string _host; - int _backlog; - size_t _maxConnections; int _handshakeTimeoutSecs; OnConnectionCallback _onConnectionCallback; - // socket for accepting connections - int _serverFd; - std::mutex _clientsMutex; std::set> _clients; - std::mutex _logMutex; - - std::atomic _stop; - std::thread _thread; - - std::condition_variable _conditionVariable; - std::mutex _conditionVariableMutex; - - const static int kDefaultPort; - const static std::string kDefaultHost; - const static int kDefaultTcpBacklog; - const static size_t kDefaultMaxConnections; const static int kDefaultHandShakeTimeoutSecs; // Methods - void run(); - void handleConnection(int fd); - size_t getConnectedClientsCount(); - - // Logging - void logError(const std::string& str); - void logInfo(const std::string& str); + virtual void handleConnection(int fd) final; + virtual size_t getConnectedClientsCount() final; }; }