From 379a84516646f1c8d2d0571e3d5c2bbf3af7e9b8 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Sun, 30 Dec 2018 22:00:49 -0800 Subject: [PATCH] cleanup / use a websocket instead of raw websockettransport --- examples/echo_server/echo_server.cpp | 5 +- ixwebsocket/IXWebSocket.cpp | 24 ++-- ixwebsocket/IXWebSocket.h | 5 + ixwebsocket/IXWebSocketServer.cpp | 158 +++++++++++++-------------- ixwebsocket/IXWebSocketServer.h | 16 ++- ixwebsocket/IXWebSocketTransport.cpp | 5 +- 6 files changed, 116 insertions(+), 97 deletions(-) diff --git a/examples/echo_server/echo_server.cpp b/examples/echo_server/echo_server.cpp index c0801c69..ccee5327 100644 --- a/examples/echo_server/echo_server.cpp +++ b/examples/echo_server/echo_server.cpp @@ -19,11 +19,14 @@ int main(int argc, char** argv) } ix::WebSocketServer server(port); - auto res = server.run(); + auto res = server.listen(); if (!res.first) { std::cerr << res.second << std::endl; + return 1; } + server.run(); + return 0; } diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index a892f068..19eea273 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -35,6 +35,15 @@ namespace ix _stop(false), _automaticReconnection(true) { + _ws.setOnCloseCallback( + [this](uint16_t code, const std::string& reason, size_t wireSize) + { + _onMessageCallback(WebSocket_MessageType_Close, "", wireSize, + WebSocketErrorInfo(), + WebSocketCloseInfo(code, reason), + WebSocketHttpHeaders()); + } + ); } WebSocket::~WebSocket() @@ -99,16 +108,6 @@ namespace ix _ws.configure(_url, _perMessageDeflateOptions); } - _ws.setOnCloseCallback( - [this](uint16_t code, const std::string& reason, size_t wireSize) - { - _onMessageCallback(WebSocket_MessageType_Close, "", wireSize, - WebSocketErrorInfo(), - WebSocketCloseInfo(code, reason), - WebSocketHttpHeaders()); - } - ); - WebSocketInitResult status = _ws.init(); if (!status.success) { @@ -121,6 +120,11 @@ namespace ix return status; } + void WebSocket::setSocketFileDescriptor(int fd) + { + _ws.initFromSocket(fd); + } + bool WebSocket::isConnected() const { return getReadyState() == WebSocket_ReadyState_Open; diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index 164cf992..ed398bdf 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -98,6 +98,9 @@ namespace ix std::string readyStateToString(ReadyState readyState); static void invokeTrafficTrackerCallback(size_t size, bool incoming); + // Server + void setSocketFileDescriptor(int fd); + WebSocketTransport _ws; std::string _url; @@ -111,5 +114,7 @@ namespace ix std::atomic _automaticReconnection; std::thread _thread; std::mutex _writeMutex; + + friend class WebSocketServer; }; } diff --git a/ixwebsocket/IXWebSocketServer.cpp b/ixwebsocket/IXWebSocketServer.cpp index 3f6da4e7..0016902b 100644 --- a/ixwebsocket/IXWebSocketServer.cpp +++ b/ixwebsocket/IXWebSocketServer.cpp @@ -7,7 +7,9 @@ #include "IXWebSocketServer.h" #include "IXWebSocketTransport.h" #include "IXWebSocket.h" -#include "IXSocketConnect.h" // for configure, cleanup, move it back to Socket + +#include +#include #include #include @@ -16,8 +18,9 @@ namespace ix { - WebSocketServer::WebSocketServer(int port) : - _port(port) + WebSocketServer::WebSocketServer(int port, int backlog) : + _port(port), + _backlog(backlog) { } @@ -27,19 +30,30 @@ namespace ix } - std::pair WebSocketServer::run() + std::pair WebSocketServer::listen() { - // https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.hala001/server.htm struct sockaddr_in server; /* server address information */ - int s; /* socket for accepting connections */ /* * Get a socket for accepting connections. */ - if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) + if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - std::string errMsg = "Socket()"; - return std::make_pair(false, errMsg); + std::stringstream ss; + ss << "WebSocketServer::listen() error creating socket): " + << strerror(errno); + + return std::make_pair(false, ss.str()); + } + + int enable = 1; + if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) + { + std::stringstream ss; + ss << "WebSocketServer::listen() error calling setsockopt(SO_REUSEADDR): " + << strerror(errno); + + return std::make_pair(false, ss.str()); } /* @@ -50,94 +64,78 @@ namespace ix server.sin_addr.s_addr = INADDR_ANY; // server.sin_addr.s_addr = INADDR_LOOPBACK; - if (bind(s, (struct sockaddr *)&server, sizeof(server)) < 0) + if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0) { - std::string errMsg = "Bind()"; - return std::make_pair(false, errMsg); + std::stringstream ss; + ss << "WebSocketServer::listen() error calling bind: " + << strerror(errno); + + return std::make_pair(false, ss.str()); } /* - * Listen for connections. Specify the backlog as 1. + * Listen for connections. Specify the tcp backlog. */ - if (listen(s, 1) != 0) + if (::listen(_serverFd, _backlog) != 0) { - std::string errMsg = "Listen()"; - return std::make_pair(false, errMsg); - } + std::stringstream ss; + ss << "WebSocketServer::listen() error calling listen: " + << strerror(errno); - for (;;) - { - /* - * Accept a connection. - */ - struct sockaddr_in client; /* client address information */ - int clientFd; /* socket connected to client */ - - socklen_t address_len = sizeof(socklen_t); - if ((clientFd = accept(s, (struct sockaddr *)&client, &address_len)) == -1) - { - std::string errMsg = "Accept()"; - return std::make_pair(false, errMsg); - } - - _workers.push_back(std::thread(&WebSocketServer::handleConnection, this, clientFd)); - - // handleConnection(clientFd); + return std::make_pair(false, ss.str()); } return std::make_pair(true, ""); } + void WebSocketServer::run() + { + for (;;) + { + /* + * Accept a connection. + */ + struct sockaddr_in client; /* client address information */ + int clientFd; /* socket connected to client */ + socklen_t addressLen = sizeof(socklen_t); + + if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) == -1) + { + std::cerr << "WebSocketServer::run() error accepting connection: " + << strerror(errno) + << std::endl; + continue; + } + + _workers.push_back(std::thread(&WebSocketServer::handleConnection, this, clientFd)); + } + } + void WebSocketServer::handleConnection(int fd) { - // We only handle one connection so far, and we just 'print received message from it' - ix::WebSocketTransport webSocketTransport; - SocketConnect::configure(fd); // We could/should do this inside initFromSocket - webSocketTransport.initFromSocket(fd); + ix::WebSocket webSocket; + webSocket.setSocketFileDescriptor(fd); + + webSocket.setOnMessageCallback( + [&webSocket](ix::WebSocketMessageType messageType, + const std::string& str, + size_t wireSize, + const ix::WebSocketErrorInfo& error, + const ix::WebSocketCloseInfo& closeInfo, + const ix::WebSocketHttpHeaders& headers) + { + if (messageType == ix::WebSocket_MessageType_Message) + { + std::cout << str << std::endl; + webSocket.send(str); + } + } + ); + + webSocket.start(); for (;;) { - webSocketTransport.poll(); - - // 1. Dispatch the incoming messages - webSocketTransport.dispatch( - [&webSocketTransport](const std::string& msg, - size_t wireSize, - bool decompressionError, - WebSocketTransport::MessageKind messageKind) - { - WebSocketMessageType webSocketMessageType; - switch (messageKind) - { - case WebSocketTransport::MSG: - { - webSocketMessageType = WebSocket_MessageType_Message; - } break; - - case WebSocketTransport::PING: - { - webSocketMessageType = WebSocket_MessageType_Ping; - } break; - - case WebSocketTransport::PONG: - { - webSocketMessageType = WebSocket_MessageType_Pong; - } break; - } - - WebSocketErrorInfo webSocketErrorInfo; - webSocketErrorInfo.decompressionError = decompressionError; - - // _onMessageCallback(webSocketMessageType, msg, wireSize, - // webSocketErrorInfo, WebSocketCloseInfo(), - // WebSocketHttpHeaders()); - - // WebSocket::invokeTrafficTrackerCallback(msg.size(), true); - - std::cout << "received: " << msg << std::endl; - webSocketTransport.sendBinary(msg); - }); - std::chrono::duration wait(10); std::this_thread::sleep_for(wait); } diff --git a/ixwebsocket/IXWebSocketServer.h b/ixwebsocket/IXWebSocketServer.h index f90d87fd..64943795 100644 --- a/ixwebsocket/IXWebSocketServer.h +++ b/ixwebsocket/IXWebSocketServer.h @@ -15,16 +15,22 @@ namespace ix { class WebSocketServer { public: - WebSocketServer(int port = 8080); + WebSocketServer(int port = 8080, int backlog = 5); virtual ~WebSocketServer(); - std::pair run(); - - void handleConnection(int fd); + std::pair listen(); + void run(); private: - int _port; + void handleConnection(int fd); + int _port; + int _backlog; + + // socket for accepting connections + int _serverFd; + + // FIXME: we never reclaim space in this array ... std::vector _workers; }; } diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 837fe7e5..d534dc79 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -10,7 +10,7 @@ #include "IXWebSocketTransport.h" #include "IXWebSocketHttpHeaders.h" -#include "IXSocketConnect.h" // for configure, cleanup, move it back to Socket +#include "IXSocketConnect.h" #include "IXSocket.h" #ifdef IXWEBSOCKET_USE_TLS @@ -387,6 +387,9 @@ namespace ix { _requestInitCancellation = false; + // Set the socket to non blocking mode + other tweaks + SocketConnect::configure(fd); + _socket.reset(); _socket = std::make_shared(fd);