add a way to run in blocking more, which is useful for server mode to have N*thread instead of 2N*thread for N connections

This commit is contained in:
Benjamin Sergeant 2019-01-01 21:25:15 -08:00
parent 946d7015a2
commit 1bc5bc7f1c
5 changed files with 54 additions and 18 deletions

View File

@ -84,13 +84,16 @@ namespace ix
void WebSocket::stop() void WebSocket::stop()
{ {
bool automaticReconnection = _automaticReconnection;
// This value needs to be forced when shutting down, it is restored later
_automaticReconnection = false; _automaticReconnection = false;
close(); close();
if (!_thread.joinable()) if (!_thread.joinable())
{ {
_automaticReconnection = true; _automaticReconnection = automaticReconnection;
return; return;
} }
@ -98,7 +101,7 @@ namespace ix
_thread.join(); _thread.join();
_stop = false; _stop = false;
_automaticReconnection = true; _automaticReconnection = automaticReconnection;
} }
WebSocketInitResult WebSocket::connect() WebSocketInitResult WebSocket::connect()
@ -241,6 +244,11 @@ namespace ix
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
}); });
// 4. In blocking mode, getting out of this function is triggered by
// an explicit disconnection from the callback, or by the remote end
// closing the connection, ie isConnected() == false.
if (!_thread.joinable() && !isConnected() && !_automaticReconnection) return;
} }
} }
@ -332,4 +340,14 @@ namespace ix
case WebSocket_ReadyState_Closed: return "CLOSED"; case WebSocket_ReadyState_Closed: return "CLOSED";
} }
} }
void WebSocket::enableAutomaticReconnection()
{
_automaticReconnection = true;
}
void WebSocket::disableAutomaticReconnection()
{
_automaticReconnection = false;
}
} }

View File

@ -72,8 +72,14 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
// Run asynchronously, by calling start and stop.
void start(); void start();
void stop(); void stop();
// Run in blocking mode, by connecting first manually, and then calling run.
WebSocketInitResult connect();
void run();
WebSocketSendInfo send(const std::string& text); WebSocketSendInfo send(const std::string& text);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(); void close();
@ -86,12 +92,13 @@ namespace ix
const std::string& getUrl() const; const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
void enableAutomaticReconnection();
void disableAutomaticReconnection();
private: private:
void run();
WebSocketSendInfo sendMessage(const std::string& text, bool ping); WebSocketSendInfo sendMessage(const std::string& text, bool ping);
WebSocketInitResult connect();
bool isConnected() const; bool isConnected() const;
bool isClosing() const; bool isClosing() const;
void reconnectPerpetuallyIfDisconnected(); void reconnectPerpetuallyIfDisconnected();

View File

@ -130,11 +130,16 @@ namespace ix
_conditionVariable.wait(lock); _conditionVariable.wait(lock);
} }
// FIXME: we should cancel all the async per connections tasks
void WebSocketServer::stop() void WebSocketServer::stop()
{ {
if (!_thread.joinable()) return; // nothing to do if (!_thread.joinable()) return; // nothing to do
auto clients = getClients();
for (auto client : clients)
{
client->close();
}
_stop = true; _stop = true;
_thread.join(); _thread.join();
_stop = false; _stop = false;
@ -187,20 +192,19 @@ namespace ix
} }
} }
//
// FIXME: make sure we never run into reconnectPerpetuallyIfDisconnected
//
void WebSocketServer::handleConnection(int fd) void WebSocketServer::handleConnection(int fd)
{ {
std::shared_ptr<WebSocket> webSocket(new WebSocket); std::shared_ptr<WebSocket> webSocket(new WebSocket);
_onConnectionCallback(webSocket); _onConnectionCallback(webSocket);
webSocket->disableAutomaticReconnection();
// Add this client to our client set
{ {
std::lock_guard<std::mutex> lock(_clientsMutex); std::lock_guard<std::mutex> lock(_clientsMutex);
_clients.insert(webSocket); _clients.insert(webSocket);
} }
webSocket->start();
auto status = webSocket->connectToSocket(fd); auto status = webSocket->connectToSocket(fd);
if (!status.success) if (!status.success)
{ {
@ -211,16 +215,17 @@ namespace ix
return; return;
} }
// We can do better than this busy loop, with a condition variable. // Process incoming messages and execute callbacks
while (webSocket->isConnected()) // until the connection is closed
{ webSocket->run();
std::chrono::duration<double, std::milli> wait(10);
std::this_thread::sleep_for(wait);
}
// Remove this client from our client set
{ {
std::lock_guard<std::mutex> lock(_clientsMutex); std::lock_guard<std::mutex> lock(_clientsMutex);
_clients.erase(webSocket); if (_clients.erase(webSocket) != 1)
{
logError("Cannot delete client");
}
} }
logInfo("WebSocketServer::handleConnection() done"); logInfo("WebSocketServer::handleConnection() done");

View File

@ -250,6 +250,8 @@ TEST_CASE("Websocket chat", "[websocket_chat]")
ix::msleep(10); ix::msleep(10);
} }
REQUIRE(server.getClients().size() == 2);
// Add a bit of extra time, for the subscription to be active // Add a bit of extra time, for the subscription to be active
ix::msleep(200); ix::msleep(200);
@ -269,6 +271,10 @@ TEST_CASE("Websocket chat", "[websocket_chat]")
REQUIRE(chatA.getReceivedMessagesCount() == 2); REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatB.getReceivedMessagesCount() == 3); REQUIRE(chatB.getReceivedMessagesCount() == 3);
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic(); ix::reportWebSocketTraffic();
} }
} }

View File

@ -2,6 +2,6 @@
mkdir build mkdir build
cd build cd build
cmake .. cmake .. || exit 1
make make || exit 1
./ixwebsocket_unittest ./ixwebsocket_unittest