diff --git a/ixwebsocket/IXWebSocketServer.cpp b/ixwebsocket/IXWebSocketServer.cpp index 5c16c3b4..78aaf8f6 100644 --- a/ixwebsocket/IXWebSocketServer.cpp +++ b/ixwebsocket/IXWebSocketServer.cpp @@ -168,4 +168,45 @@ namespace ix std::lock_guard lock(_clientsMutex); return _clients.size(); } + + // + // Classic servers + // + void WebSocketServer::makeBroadcastServer() + { + setOnClientMessageCallback( + [this](std::shared_ptr connectionState, + WebSocket& webSocket, + const WebSocketMessagePtr& msg) { + auto remoteIp = connectionState->getRemoteIp(); + if (msg->type == ix::WebSocketMessageType::Message) + { + for (auto&& client : getClients()) + { + if (client.get() != &webSocket) + { + client->send(msg->str, msg->binary); + + do + { + size_t bufferedAmount = client->bufferedAmount(); + std::chrono::duration duration(500); + std::this_thread::sleep_for(duration); + } while (client->bufferedAmount() != 0); + } + } + } + }); + } + + int WebSocketServer::listenAndStart() + { + auto res = listen(); + if (!res.first) + { + return 1; + } + + start(); + } } // namespace ix diff --git a/ixwebsocket/IXWebSocketServer.h b/ixwebsocket/IXWebSocketServer.h index 86b173e4..c864f403 100644 --- a/ixwebsocket/IXWebSocketServer.h +++ b/ixwebsocket/IXWebSocketServer.h @@ -47,6 +47,9 @@ namespace ix // Get all the connected clients std::set> getClients(); + void makeBroadcastServer(); + int listenAndStart(); + const static int kDefaultHandShakeTimeoutSecs; private: diff --git a/ws/ws.cpp b/ws/ws.cpp index 8df93d6c..843bb904 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -439,93 +439,6 @@ namespace ix return generateReport(url) ? 0 : 1; } - // - // broadcast server - // - int ws_broadcast_server_main(int port, - const std::string& hostname, - const ix::SocketTLSOptions& tlsOptions) - { - spdlog::info("Listening on {}:{}", hostname, port); - - ix::WebSocketServer server(port, hostname); - server.setTLSOptions(tlsOptions); - - server.setOnClientMessageCallback( - [&server](std::shared_ptr connectionState, - WebSocket& webSocket, - const WebSocketMessagePtr& msg) { - auto remoteIp = connectionState->getRemoteIp(); - if (msg->type == ix::WebSocketMessageType::Open) - { - spdlog::info("New connection"); - spdlog::info("remote ip: {}", remoteIp); - spdlog::info("id: {}", connectionState->getId()); - spdlog::info("Uri: {}", msg->openInfo.uri); - spdlog::info("Headers:"); - for (auto it : msg->openInfo.headers) - { - spdlog::info("{}: {}", it.first, it.second); - } - } - else if (msg->type == ix::WebSocketMessageType::Close) - { - spdlog::info("Closed connection: code {} reason {}", - msg->closeInfo.code, - msg->closeInfo.reason); - } - else if (msg->type == ix::WebSocketMessageType::Error) - { - std::stringstream ss; - ss << "Connection error: " << msg->errorInfo.reason << std::endl; - ss << "#retries: " << msg->errorInfo.retries << std::endl; - ss << "Wait time(ms): " << msg->errorInfo.wait_time << std::endl; - ss << "HTTP Status: " << msg->errorInfo.http_status << std::endl; - spdlog::info(ss.str()); - } - else if (msg->type == ix::WebSocketMessageType::Fragment) - { - spdlog::info("Received message fragment"); - } - else if (msg->type == ix::WebSocketMessageType::Message) - { - spdlog::info("Received {} bytes", msg->wireSize); - - for (auto&& client : server.getClients()) - { - if (client.get() != &webSocket) - { - client->send(msg->str, msg->binary, [](int current, int total) -> bool { - spdlog::info("Step {} out of {}", current, total); - return true; - }); - - do - { - size_t bufferedAmount = client->bufferedAmount(); - spdlog::info("{} bytes left to be sent", bufferedAmount); - - std::chrono::duration duration(500); - std::this_thread::sleep_for(duration); - } while (client->bufferedAmount() != 0); - } - } - } - }); - - auto res = server.listen(); - if (!res.first) - { - spdlog::info(res.second); - return 1; - } - - server.start(); - server.wait(); - - return 0; - } - /* * ws_chat.cpp * Author: Benjamin Sergeant @@ -2853,9 +2766,13 @@ int main(int argc, char** argv) ret = ix::ws_push_server( port, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong, sendMsg); } - else if (app.got_subcommand("transfer")) + else if (app.got_subcommand("transfer") || app.got_subcommand("broadcast_server")) { - ret = ix::ws_transfer_main(port, hostname, tlsOptions); + ix::WebSocketServer server(port, hostname); + server.setTLSOptions(tlsOptions); + server.makeBroadcastServer(); + server.listenAndStart(); + server.wait(); } else if (app.got_subcommand("send")) { @@ -2870,10 +2787,6 @@ int main(int argc, char** argv) { ret = ix::ws_chat_main(url, user); } - else if (app.got_subcommand("broadcast_server")) - { - ret = ix::ws_broadcast_server_main(port, hostname, tlsOptions); - } else if (app.got_subcommand("ping")) { ret = ix::ws_ping_pong_main(url, tlsOptions);