diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 864438ee..6eb14c76 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [7.5.3] - 2019-12-12 + +(server) attempt at fixing #131 by using blocking writes in server mode + ## [7.5.2] - 2019-12-11 (ws) cobra to sentry - created events with sentry tags based on tags present in the cobra messages diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 48bcfc6b..50c280df 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -77,6 +77,7 @@ namespace ix WebSocketTransport::WebSocketTransport() : _useMask(true) + , _blockingSend(false) , _compressedMessage(false) , _readyState(ReadyState::CLOSED) , _closeCode(WebSocketCloseConstants::kInternalErrorCode) @@ -178,6 +179,7 @@ namespace ix // Server should not mask the data it sends to the client _useMask = false; + _blockingSend = true; _socket = socket; @@ -339,22 +341,9 @@ namespace ix // there can be a lot of it for large messages. if (pollResult == PollResultType::SendRequest) { - while (!isSendBufferEmpty() && !_requestInitCancellation) + if (!flushSendBuffer()) { - // Wait with a 10ms timeout until the socket is ready to write. - // This way we are not busy looping - PollResultType result = _socket->isReadyToWrite(10); - - if (result == PollResultType::Error) - { - closeSocket(); - setReadyState(ReadyState::CLOSED); - break; - } - else if (result == PollResultType::ReadyForWrite) - { - sendOnSocket(); - } + return PollResult::CannotFlushSendBuffer; } } else if (pollResult == PollResultType::ReadyForRead) @@ -924,13 +913,21 @@ namespace ix } } + bool success = true; + // Request to flush the send buffer on the background thread if it isn't empty if (!isSendBufferEmpty()) { _socket->wakeUpFromPoll(Socket::kSendRequest); + + // FIXME: we should have a timeout when sending large messages: see #131 + if (_blockingSend && !flushSendBuffer()) + { + success = false; + } } - return WebSocketSendInfo(true, compressionError, payloadSize, wireSize); + return WebSocketSendInfo(success, compressionError, payloadSize, wireSize); } void WebSocketTransport::sendFragment(wsheader_type::opcode_type type, @@ -1168,4 +1165,27 @@ namespace ix return _txbuf.size(); } + bool WebSocketTransport::flushSendBuffer() + { + while (!isSendBufferEmpty() && !_requestInitCancellation) + { + // Wait with a 10ms timeout until the socket is ready to write. + // This way we are not busy looping + PollResultType result = _socket->isReadyToWrite(10); + + if (result == PollResultType::Error) + { + closeSocket(); + setReadyState(ReadyState::CLOSED); + return false; + } + else if (result == PollResultType::ReadyForWrite) + { + sendOnSocket(); + } + } + + return true; + } + } // namespace ix diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index 189fc3f8..934a376d 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -61,7 +61,8 @@ namespace ix enum class PollResult { Succeeded, - AbnormalClose + AbnormalClose, + CannotFlushSendBuffer }; using OnMessageCallback = @@ -135,6 +136,10 @@ namespace ix // client should mask but server should not std::atomic _useMask; + // Tells whether we should flush the send buffer before + // saying that a send is complete. This is the mode for server code. + std::atomic _blockingSend; + // Buffer for reading from our socket. That buffer is never resized. std::vector _readbuf; @@ -238,6 +243,7 @@ namespace ix size_t closeWireSize, bool remote); + bool flushSendBuffer(); void sendOnSocket(); WebSocketSendInfo sendData(wsheader_type::opcode_type type, const std::string& message, diff --git a/ixwebsocket/IXWebSocketVersion.h b/ixwebsocket/IXWebSocketVersion.h index 5b485625..c3990864 100644 --- a/ixwebsocket/IXWebSocketVersion.h +++ b/ixwebsocket/IXWebSocketVersion.h @@ -6,4 +6,4 @@ #pragma once -#define IX_WEBSOCKET_VERSION "7.5.2" +#define IX_WEBSOCKET_VERSION "7.5.3"