cleanup / remove printf, add mutex, remove hardcoded values, can pass in a binding host

This commit is contained in:
Benjamin Sergeant 2019-01-01 14:28:41 -08:00
parent 67de0fc8da
commit d279aecb87
3 changed files with 72 additions and 31 deletions

View File

@ -18,8 +18,11 @@
namespace ix namespace ix
{ {
WebSocketServer::WebSocketServer(int port, int backlog) : const std::string WebSocketServer::kDefaultHost("127.0.0.1");
WebSocketServer::WebSocketServer(int port, const std::string& host, int backlog) :
_port(port), _port(port),
_host(host),
_backlog(backlog) _backlog(backlog)
{ {
@ -35,13 +38,23 @@ namespace ix
_onConnectionCallback = callback; _onConnectionCallback = callback;
} }
void WebSocketServer::logError(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl;
}
void WebSocketServer::logInfo(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl;
}
std::pair<bool, std::string> WebSocketServer::listen() std::pair<bool, std::string> WebSocketServer::listen()
{ {
struct sockaddr_in server; /* server address information */ struct sockaddr_in server; // server address information
/* // Get a socket for accepting connections.
* Get a socket for accepting connections.
*/
if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0) if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{ {
std::stringstream ss; std::stringstream ss;
@ -51,6 +64,7 @@ namespace ix
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
// Make that socket reusable. (allow restarting this server at will)
int enable = 1; int enable = 1;
if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
{ {
@ -61,9 +75,7 @@ namespace ix
return std::make_pair(false, ss.str()); return std::make_pair(false, ss.str());
} }
/* // Bind the socket to the server address.
* Bind the socket to the server address.
*/
server.sin_family = AF_INET; server.sin_family = AF_INET;
server.sin_port = htons(_port); server.sin_port = htons(_port);
@ -72,8 +84,9 @@ namespace ix
// to allow that, but this is a bit of a pain. (this is what node or python would do). // to allow that, but this is a bit of a pain. (this is what node or python would do).
// //
// Using INADDR_LOOPBACK also does not work ... while it should. // Using INADDR_LOOPBACK also does not work ... while it should.
// We default to 127.0.0.1 (localhost)
// //
server.sin_addr.s_addr = inet_addr("127.0.0.1"); server.sin_addr.s_addr = inet_addr(_host.c_str());
if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0) if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0)
{ {
@ -113,9 +126,10 @@ namespace ix
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) == -1) if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) == -1)
{ {
// FIXME: that error should be propagated // FIXME: that error should be propagated
std::cerr << "WebSocketServer::run() error accepting connection: " std::stringstream ss;
<< strerror(errno) ss << "WebSocketServer::run() error accepting connection: "
<< std::endl; << strerror(errno);
logError(ss.str());
continue; continue;
} }
@ -123,7 +137,10 @@ namespace ix
// //
// the destructor of a future returned by std::async blocks, // the destructor of a future returned by std::async blocks,
// so we need to declare it outside of this loop // so we need to declare it outside of this loop
f = std::async(std::launch::async, &WebSocketServer::handleConnection, this, clientFd); f = std::async(std::launch::async,
&WebSocketServer::handleConnection,
this,
clientFd);
} }
} }
@ -135,27 +152,40 @@ namespace ix
std::shared_ptr<WebSocket> webSocket(new WebSocket); std::shared_ptr<WebSocket> webSocket(new WebSocket);
_onConnectionCallback(webSocket); _onConnectionCallback(webSocket);
{
std::lock_guard<std::mutex> lock(_clientsMutex);
_clients.insert(webSocket); _clients.insert(webSocket);
}
webSocket->start(); webSocket->start();
auto status = webSocket->connectToSocket(fd); auto status = webSocket->connectToSocket(fd);
if (!status.success) if (!status.success)
{ {
std::cerr << "WebSocketServer::handleConnection() error: " std::stringstream ss;
<< status.errorStr ss << "WebSocketServer::handleConnection() error: "
<< std::endl; << status.errorStr;
logError(ss.str());
return; return;
} }
// We can probably do better than this busy loop, with a condition variable. // We can do better than this busy loop, with a condition variable.
while (webSocket->isConnected()) while (webSocket->isConnected())
{ {
std::chrono::duration<double, std::milli> wait(10); std::chrono::duration<double, std::milli> wait(10);
std::this_thread::sleep_for(wait); std::this_thread::sleep_for(wait);
} }
{
std::lock_guard<std::mutex> lock(_clientsMutex);
_clients.erase(webSocket); _clients.erase(webSocket);
}
std::cerr << "WebSocketServer::handleConnection() done" << std::endl; logInfo("WebSocketServer::handleConnection() done");
}
std::set<std::shared_ptr<WebSocket>> WebSocketServer::getClients()
{
std::lock_guard<std::mutex> lock(_clientsMutex);
return _clients;
} }
} }

View File

@ -22,7 +22,9 @@ namespace ix
class WebSocketServer { class WebSocketServer {
public: public:
WebSocketServer(int port = 8080, int backlog = 5); WebSocketServer(int port = 8080,
const std::string& host = WebSocketServer::kDefaultHost,
int backlog = 5);
virtual ~WebSocketServer(); virtual ~WebSocketServer();
void setOnConnectionCallback(const OnConnectionCallback& callback); void setOnConnectionCallback(const OnConnectionCallback& callback);
@ -30,13 +32,13 @@ namespace ix
std::pair<bool, std::string> listen(); std::pair<bool, std::string> listen();
void run(); void run();
// FIXME: need mutex // Get all the connected clients
std::set<std::shared_ptr<WebSocket>> getClients() { return _clients; } std::set<std::shared_ptr<WebSocket>> getClients();
private: private:
void handleConnection(int fd); // Member variables
int _port; int _port;
std::string _host;
int _backlog; int _backlog;
OnConnectionCallback _onConnectionCallback; OnConnectionCallback _onConnectionCallback;
@ -44,6 +46,18 @@ namespace ix
// socket for accepting connections // socket for accepting connections
int _serverFd; int _serverFd;
std::mutex _clientsMutex;
std::set<std::shared_ptr<WebSocket>> _clients; std::set<std::shared_ptr<WebSocket>> _clients;
std::mutex _logMutex;
const static std::string kDefaultHost;
// Methods
void handleConnection(int fd);
// Logging
void logError(const std::string& str);
void logInfo(const std::string& str);
}; };
} }

View File

@ -409,8 +409,7 @@ namespace ix
return WebSocketInitResult(false, 0, std::string("Got bad status line connecting to ") + remote); return WebSocketInitResult(false, 0, std::string("Got bad status line connecting to ") + remote);
} }
std::cout << "initFromSocket::start" << std::endl; // FIXME: Validate line content (GET /)
std::cout << line << std::endl;
auto result = parseHttpHeaders(); auto result = parseHttpHeaders();
auto headersValid = result.first; auto headersValid = result.first;
@ -427,7 +426,8 @@ namespace ix
return WebSocketInitResult(false, 401, errorMsg); return WebSocketInitResult(false, 401, errorMsg);
} }
std::cout << "FIXME perMessageDeflateOptions" << std::endl; // FIXME perMessageDeflate support.
_enablePerMessageDeflate = false;
char output[29] = {}; char output[29] = {};
WebSocketHandshake::generate(headers["sec-websocket-key"].c_str(), output); WebSocketHandshake::generate(headers["sec-websocket-key"].c_str(), output);
@ -435,7 +435,6 @@ namespace ix
std::stringstream ss; std::stringstream ss;
ss << "HTTP/1.1 101\r\n"; ss << "HTTP/1.1 101\r\n";
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n"; ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "\r\n"; ss << "\r\n";
if (!writeBytes(ss.str())) if (!writeBytes(ss.str()))
@ -444,8 +443,6 @@ namespace ix
} }
setReadyState(OPEN); setReadyState(OPEN);
std::cout << "initFromSocket::end" << std::endl;
return WebSocketInitResult(true, 200, "", headers); return WebSocketInitResult(true, 200, "", headers);
} }