cleanup / use a websocket instead of raw websockettransport

This commit is contained in:
Benjamin Sergeant 2018-12-30 22:00:49 -08:00
parent 266cf93584
commit 379a845166
6 changed files with 116 additions and 97 deletions

View File

@ -19,11 +19,14 @@ int main(int argc, char** argv)
} }
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
auto res = server.run(); auto res = server.listen();
if (!res.first) if (!res.first)
{ {
std::cerr << res.second << std::endl; std::cerr << res.second << std::endl;
return 1;
} }
server.run();
return 0; return 0;
} }

View File

@ -35,6 +35,15 @@ namespace ix
_stop(false), _stop(false),
_automaticReconnection(true) _automaticReconnection(true)
{ {
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize)
{
_onMessageCallback(WebSocket_MessageType_Close, "", wireSize,
WebSocketErrorInfo(),
WebSocketCloseInfo(code, reason),
WebSocketHttpHeaders());
}
);
} }
WebSocket::~WebSocket() WebSocket::~WebSocket()
@ -99,16 +108,6 @@ namespace ix
_ws.configure(_url, _perMessageDeflateOptions); _ws.configure(_url, _perMessageDeflateOptions);
} }
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize)
{
_onMessageCallback(WebSocket_MessageType_Close, "", wireSize,
WebSocketErrorInfo(),
WebSocketCloseInfo(code, reason),
WebSocketHttpHeaders());
}
);
WebSocketInitResult status = _ws.init(); WebSocketInitResult status = _ws.init();
if (!status.success) if (!status.success)
{ {
@ -121,6 +120,11 @@ namespace ix
return status; return status;
} }
void WebSocket::setSocketFileDescriptor(int fd)
{
_ws.initFromSocket(fd);
}
bool WebSocket::isConnected() const bool WebSocket::isConnected() const
{ {
return getReadyState() == WebSocket_ReadyState_Open; return getReadyState() == WebSocket_ReadyState_Open;

View File

@ -98,6 +98,9 @@ namespace ix
std::string readyStateToString(ReadyState readyState); std::string readyStateToString(ReadyState readyState);
static void invokeTrafficTrackerCallback(size_t size, bool incoming); static void invokeTrafficTrackerCallback(size_t size, bool incoming);
// Server
void setSocketFileDescriptor(int fd);
WebSocketTransport _ws; WebSocketTransport _ws;
std::string _url; std::string _url;
@ -111,5 +114,7 @@ namespace ix
std::atomic<bool> _automaticReconnection; std::atomic<bool> _automaticReconnection;
std::thread _thread; std::thread _thread;
std::mutex _writeMutex; std::mutex _writeMutex;
friend class WebSocketServer;
}; };
} }

View File

@ -7,7 +7,9 @@
#include "IXWebSocketServer.h" #include "IXWebSocketServer.h"
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
#include "IXWebSocket.h" #include "IXWebSocket.h"
#include "IXSocketConnect.h" // for configure, cleanup, move it back to Socket
#include <sstream>
#include <iostream>
#include <netdb.h> #include <netdb.h>
#include <stdio.h> #include <stdio.h>
@ -16,8 +18,9 @@
namespace ix namespace ix
{ {
WebSocketServer::WebSocketServer(int port) : WebSocketServer::WebSocketServer(int port, int backlog) :
_port(port) _port(port),
_backlog(backlog)
{ {
} }
@ -27,19 +30,30 @@ namespace ix
} }
std::pair<bool, std::string> WebSocketServer::run() std::pair<bool, std::string> WebSocketServer::listen()
{ {
// https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.3.0/com.ibm.zos.v2r3.hala001/server.htm
struct sockaddr_in server; /* server address information */ struct sockaddr_in server; /* server address information */
int s; /* socket for accepting connections */
/* /*
* Get a socket for accepting connections. * Get a socket for accepting connections.
*/ */
if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{ {
std::string errMsg = "Socket()"; std::stringstream ss;
return std::make_pair(false, errMsg); ss << "WebSocketServer::listen() error creating socket): "
<< strerror(errno);
return std::make_pair(false, ss.str());
}
int enable = 1;
if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error calling setsockopt(SO_REUSEADDR): "
<< strerror(errno);
return std::make_pair(false, ss.str());
} }
/* /*
@ -50,94 +64,78 @@ namespace ix
server.sin_addr.s_addr = INADDR_ANY; server.sin_addr.s_addr = INADDR_ANY;
// server.sin_addr.s_addr = INADDR_LOOPBACK; // server.sin_addr.s_addr = INADDR_LOOPBACK;
if (bind(s, (struct sockaddr *)&server, sizeof(server)) < 0) if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0)
{ {
std::string errMsg = "Bind()"; std::stringstream ss;
return std::make_pair(false, errMsg); ss << "WebSocketServer::listen() error calling bind: "
<< strerror(errno);
return std::make_pair(false, ss.str());
} }
/* /*
* Listen for connections. Specify the backlog as 1. * Listen for connections. Specify the tcp backlog.
*/ */
if (listen(s, 1) != 0) if (::listen(_serverFd, _backlog) != 0)
{ {
std::string errMsg = "Listen()"; std::stringstream ss;
return std::make_pair(false, errMsg); ss << "WebSocketServer::listen() error calling listen: "
} << strerror(errno);
for (;;) return std::make_pair(false, ss.str());
{
/*
* Accept a connection.
*/
struct sockaddr_in client; /* client address information */
int clientFd; /* socket connected to client */
socklen_t address_len = sizeof(socklen_t);
if ((clientFd = accept(s, (struct sockaddr *)&client, &address_len)) == -1)
{
std::string errMsg = "Accept()";
return std::make_pair(false, errMsg);
}
_workers.push_back(std::thread(&WebSocketServer::handleConnection, this, clientFd));
// handleConnection(clientFd);
} }
return std::make_pair(true, ""); return std::make_pair(true, "");
} }
void WebSocketServer::run()
{
for (;;)
{
/*
* Accept a connection.
*/
struct sockaddr_in client; /* client address information */
int clientFd; /* socket connected to client */
socklen_t addressLen = sizeof(socklen_t);
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) == -1)
{
std::cerr << "WebSocketServer::run() error accepting connection: "
<< strerror(errno)
<< std::endl;
continue;
}
_workers.push_back(std::thread(&WebSocketServer::handleConnection, this, clientFd));
}
}
void WebSocketServer::handleConnection(int fd) void WebSocketServer::handleConnection(int fd)
{ {
// We only handle one connection so far, and we just 'print received message from it' ix::WebSocket webSocket;
ix::WebSocketTransport webSocketTransport; webSocket.setSocketFileDescriptor(fd);
SocketConnect::configure(fd); // We could/should do this inside initFromSocket
webSocketTransport.initFromSocket(fd); webSocket.setOnMessageCallback(
[&webSocket](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketCloseInfo& closeInfo,
const ix::WebSocketHttpHeaders& headers)
{
if (messageType == ix::WebSocket_MessageType_Message)
{
std::cout << str << std::endl;
webSocket.send(str);
}
}
);
webSocket.start();
for (;;) for (;;)
{ {
webSocketTransport.poll();
// 1. Dispatch the incoming messages
webSocketTransport.dispatch(
[&webSocketTransport](const std::string& msg,
size_t wireSize,
bool decompressionError,
WebSocketTransport::MessageKind messageKind)
{
WebSocketMessageType webSocketMessageType;
switch (messageKind)
{
case WebSocketTransport::MSG:
{
webSocketMessageType = WebSocket_MessageType_Message;
} break;
case WebSocketTransport::PING:
{
webSocketMessageType = WebSocket_MessageType_Ping;
} break;
case WebSocketTransport::PONG:
{
webSocketMessageType = WebSocket_MessageType_Pong;
} break;
}
WebSocketErrorInfo webSocketErrorInfo;
webSocketErrorInfo.decompressionError = decompressionError;
// _onMessageCallback(webSocketMessageType, msg, wireSize,
// webSocketErrorInfo, WebSocketCloseInfo(),
// WebSocketHttpHeaders());
// WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
std::cout << "received: " << msg << std::endl;
webSocketTransport.sendBinary(msg);
});
std::chrono::duration<double, std::milli> wait(10); std::chrono::duration<double, std::milli> wait(10);
std::this_thread::sleep_for(wait); std::this_thread::sleep_for(wait);
} }

View File

@ -15,16 +15,22 @@ namespace ix
{ {
class WebSocketServer { class WebSocketServer {
public: public:
WebSocketServer(int port = 8080); WebSocketServer(int port = 8080, int backlog = 5);
virtual ~WebSocketServer(); virtual ~WebSocketServer();
std::pair<bool, std::string> run(); std::pair<bool, std::string> listen();
void run();
void handleConnection(int fd);
private: private:
int _port; void handleConnection(int fd);
int _port;
int _backlog;
// socket for accepting connections
int _serverFd;
// FIXME: we never reclaim space in this array ...
std::vector<std::thread> _workers; std::vector<std::thread> _workers;
}; };
} }

View File

@ -10,7 +10,7 @@
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXSocketConnect.h" // for configure, cleanup, move it back to Socket #include "IXSocketConnect.h"
#include "IXSocket.h" #include "IXSocket.h"
#ifdef IXWEBSOCKET_USE_TLS #ifdef IXWEBSOCKET_USE_TLS
@ -387,6 +387,9 @@ namespace ix
{ {
_requestInitCancellation = false; _requestInitCancellation = false;
// Set the socket to non blocking mode + other tweaks
SocketConnect::configure(fd);
_socket.reset(); _socket.reset();
_socket = std::make_shared<Socket>(fd); _socket = std::make_shared<Socket>(fd);