send can fail silently when sending would block (EWOULDBLOCK return for send) (#18)
* try to use a pipe for communication * flush send buffer on the background thread * cleanup * linux fix / linux still use event fd for now * cleanup
This commit is contained in:
committed by
GitHub
parent
dedbeb3eab
commit
08c2cdbf1d
@ -1,7 +1,31 @@
|
||||
/*
|
||||
* The MIT License (MIT)
|
||||
*
|
||||
* Copyright (c) 2012, 2013 <dhbaird@gmail.com>
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
* THE SOFTWARE.
|
||||
*/
|
||||
|
||||
/*
|
||||
* IXWebSocketTransport.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
|
||||
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
//
|
||||
@ -14,14 +38,6 @@
|
||||
#include "IXUrlParser.h"
|
||||
#include "IXSocketFactory.h"
|
||||
|
||||
#ifdef IXWEBSOCKET_USE_TLS
|
||||
# ifdef __APPLE__
|
||||
# include "IXSocketAppleSSL.h"
|
||||
# else
|
||||
# include "IXSocketOpenSSL.h"
|
||||
# endif
|
||||
#endif
|
||||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
@ -80,16 +96,6 @@ namespace ix
|
||||
std::string("Could not parse URL ") + url);
|
||||
}
|
||||
|
||||
if (protocol != "ws" && protocol != "wss")
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << "Invalid protocol: " << protocol
|
||||
<< " for url " << url
|
||||
<< " . Supported protocols are ws and wss";
|
||||
|
||||
return WebSocketInitResult(false, 0, ss.str());
|
||||
}
|
||||
|
||||
bool tls = protocol == "wss";
|
||||
std::string errorMsg;
|
||||
_socket = createSocket(tls, errorMsg);
|
||||
@ -184,38 +190,51 @@ namespace ix
|
||||
std::stringstream ss;
|
||||
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
|
||||
sendPing(ss.str());
|
||||
return;
|
||||
}
|
||||
|
||||
while (true)
|
||||
// Make sure we send all the buffered data
|
||||
// there can be a lot of it for large messages.
|
||||
else if (pollResult == PollResultType_SendRequest)
|
||||
{
|
||||
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
|
||||
|
||||
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
|
||||
_socket->getErrno() == EAGAIN))
|
||||
while (!isSendBufferEmpty() && !_requestInitCancellation)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else if (ret <= 0)
|
||||
{
|
||||
_rxbuf.clear();
|
||||
_socket->close();
|
||||
setReadyState(CLOSED);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
_rxbuf.insert(_rxbuf.end(),
|
||||
_readbuf.begin(),
|
||||
_readbuf.begin() + ret);
|
||||
sendOnSocket();
|
||||
}
|
||||
}
|
||||
else if (pollResult == PollResultType_ReadyForRead)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
|
||||
|
||||
if (isSendBufferEmpty() && _readyState == CLOSING)
|
||||
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
|
||||
_socket->getErrno() == EAGAIN))
|
||||
{
|
||||
break;
|
||||
}
|
||||
else if (ret <= 0)
|
||||
{
|
||||
_rxbuf.clear();
|
||||
_socket->close();
|
||||
setReadyState(CLOSED);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
_rxbuf.insert(_rxbuf.end(),
|
||||
_readbuf.begin(),
|
||||
_readbuf.begin() + ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (pollResult == PollResultType_Error)
|
||||
{
|
||||
_socket->close();
|
||||
setReadyState(CLOSED);
|
||||
}
|
||||
else if (pollResult == PollResultType_CloseRequest)
|
||||
{
|
||||
;
|
||||
}
|
||||
|
||||
},
|
||||
_heartBeatPeriod);
|
||||
}
|
||||
@ -586,11 +605,7 @@ namespace ix
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we send all the buffered data ; there can be a lot of it
|
||||
// for large messages.
|
||||
// TODO / this will block the sending thread ; we need to eval whether
|
||||
// this is the right fix
|
||||
while (!isSendBufferEmpty()) sendOnSocket();
|
||||
_socket->wakeUpFromPoll(Socket::kSendRequest);
|
||||
|
||||
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
|
||||
}
|
||||
@ -737,8 +752,17 @@ namespace ix
|
||||
sendData(wsheader_type::CLOSE, normalClosure, compress);
|
||||
setReadyState(CLOSING);
|
||||
|
||||
_socket->wakeUpFromPoll();
|
||||
_socket->wakeUpFromPoll(Socket::kCloseRequest);
|
||||
_socket->close();
|
||||
|
||||
_closeCode = 1000;
|
||||
setReadyState(CLOSED);
|
||||
}
|
||||
|
||||
size_t WebSocketTransport::bufferedAmount() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_txbufMutex);
|
||||
return _txbuf.size();
|
||||
}
|
||||
|
||||
} // namespace ix
|
||||
|
Reference in New Issue
Block a user