(server) attempt at fixing #131 by using blocking writes in server mode

This commit is contained in:
Benjamin Sergeant 2019-12-12 12:17:29 -08:00
parent e223f8fac2
commit cae016564e
4 changed files with 48 additions and 18 deletions

View File

@ -1,6 +1,10 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [7.5.3] - 2019-12-12
(server) attempt at fixing #131 by using blocking writes in server mode
## [7.5.2] - 2019-12-11 ## [7.5.2] - 2019-12-11
(ws) cobra to sentry - created events with sentry tags based on tags present in the cobra messages (ws) cobra to sentry - created events with sentry tags based on tags present in the cobra messages

View File

@ -77,6 +77,7 @@ namespace ix
WebSocketTransport::WebSocketTransport() WebSocketTransport::WebSocketTransport()
: _useMask(true) : _useMask(true)
, _blockingSend(false)
, _compressedMessage(false) , _compressedMessage(false)
, _readyState(ReadyState::CLOSED) , _readyState(ReadyState::CLOSED)
, _closeCode(WebSocketCloseConstants::kInternalErrorCode) , _closeCode(WebSocketCloseConstants::kInternalErrorCode)
@ -178,6 +179,7 @@ namespace ix
// Server should not mask the data it sends to the client // Server should not mask the data it sends to the client
_useMask = false; _useMask = false;
_blockingSend = true;
_socket = socket; _socket = socket;
@ -339,22 +341,9 @@ namespace ix
// there can be a lot of it for large messages. // there can be a lot of it for large messages.
if (pollResult == PollResultType::SendRequest) if (pollResult == PollResultType::SendRequest)
{ {
while (!isSendBufferEmpty() && !_requestInitCancellation) if (!flushSendBuffer())
{ {
// Wait with a 10ms timeout until the socket is ready to write. return PollResult::CannotFlushSendBuffer;
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType::Error)
{
closeSocket();
setReadyState(ReadyState::CLOSED);
break;
}
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
} }
} }
else if (pollResult == PollResultType::ReadyForRead) else if (pollResult == PollResultType::ReadyForRead)
@ -924,13 +913,21 @@ namespace ix
} }
} }
bool success = true;
// Request to flush the send buffer on the background thread if it isn't empty // Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty()) if (!isSendBufferEmpty())
{ {
_socket->wakeUpFromPoll(Socket::kSendRequest); _socket->wakeUpFromPoll(Socket::kSendRequest);
// FIXME: we should have a timeout when sending large messages: see #131
if (_blockingSend && !flushSendBuffer())
{
success = false;
}
} }
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize); return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
} }
void WebSocketTransport::sendFragment(wsheader_type::opcode_type type, void WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
@ -1168,4 +1165,27 @@ namespace ix
return _txbuf.size(); return _txbuf.size();
} }
bool WebSocketTransport::flushSendBuffer()
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType::Error)
{
closeSocket();
setReadyState(ReadyState::CLOSED);
return false;
}
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
}
return true;
}
} // namespace ix } // namespace ix

View File

@ -61,7 +61,8 @@ namespace ix
enum class PollResult enum class PollResult
{ {
Succeeded, Succeeded,
AbnormalClose AbnormalClose,
CannotFlushSendBuffer
}; };
using OnMessageCallback = using OnMessageCallback =
@ -135,6 +136,10 @@ namespace ix
// client should mask but server should not // client should mask but server should not
std::atomic<bool> _useMask; std::atomic<bool> _useMask;
// Tells whether we should flush the send buffer before
// saying that a send is complete. This is the mode for server code.
std::atomic<bool> _blockingSend;
// Buffer for reading from our socket. That buffer is never resized. // Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf; std::vector<uint8_t> _readbuf;
@ -238,6 +243,7 @@ namespace ix
size_t closeWireSize, size_t closeWireSize,
bool remote); bool remote);
bool flushSendBuffer();
void sendOnSocket(); void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message, const std::string& message,

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "7.5.2" #define IX_WEBSOCKET_VERSION "7.5.3"