refactoring + cancellation was buggy during http upgrade
This commit is contained in:
parent
b95e5e36dc
commit
2b136b2981
@ -162,4 +162,64 @@ namespace ix
|
||||
WSACleanup();
|
||||
#endif
|
||||
}
|
||||
|
||||
bool Socket::readByte(void* buffer,
|
||||
const CancellationRequest& isCancellationRequested)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
if (isCancellationRequested()) return false;
|
||||
|
||||
int ret;
|
||||
ret = recv(buffer, 1);
|
||||
|
||||
// We read one byte, as needed, all good.
|
||||
if (ret == 1)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
// There is possibly something to be read, try again
|
||||
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
|
||||
getErrno() == EAGAIN))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
// There was an error during the read, abort
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool Socket::writeBytes(const std::string& str,
|
||||
const CancellationRequest& isCancellationRequested)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
if (isCancellationRequested()) return false;
|
||||
|
||||
char* buffer = const_cast<char*>(str.c_str());
|
||||
int len = (int) str.size();
|
||||
|
||||
int ret = send(buffer, len);
|
||||
|
||||
// We wrote some bytes, as needed, all good.
|
||||
if (ret > 0)
|
||||
{
|
||||
return ret == len;
|
||||
}
|
||||
// There is possibly something to be write, try again
|
||||
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
|
||||
getErrno() == EAGAIN))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
// There was an error during the write, abort
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,8 +14,6 @@
|
||||
#include "IXEventFd.h"
|
||||
#include "IXCancellationRequest.h"
|
||||
|
||||
struct addrinfo;
|
||||
|
||||
namespace ix
|
||||
{
|
||||
class Socket {
|
||||
@ -41,6 +39,13 @@ namespace ix
|
||||
virtual int send(const std::string& buffer);
|
||||
virtual int recv(void* buffer, size_t length);
|
||||
|
||||
// Blocking and cancellable versions, working with socket that can be set
|
||||
// to non blocking mode. Used during HTTP upgrade.
|
||||
bool readByte(void* buffer,
|
||||
const CancellationRequest& isCancellationRequested);
|
||||
bool writeBytes(const std::string& str,
|
||||
const CancellationRequest& isCancellationRequested);
|
||||
|
||||
int getErrno() const;
|
||||
static bool init(); // Required on Windows to initialize WinSocket
|
||||
static void cleanup(); // Required on Windows to cleanup WinSocket
|
||||
|
@ -144,7 +144,7 @@ namespace ix
|
||||
std::cout << "-------------------------------" << std::endl;
|
||||
}
|
||||
|
||||
std::pair<bool, WebSocketHttpHeaders> WebSocketTransport::parseHttpHeaders()
|
||||
std::pair<bool, WebSocketHttpHeaders> WebSocketTransport::parseHttpHeaders(const CancellationRequest& isCancellationRequested)
|
||||
{
|
||||
WebSocketHttpHeaders headers;
|
||||
|
||||
@ -159,7 +159,7 @@ namespace ix
|
||||
i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n');
|
||||
++i)
|
||||
{
|
||||
if (!readByte(line+i))
|
||||
if (!_socket->readByte(line+i, isCancellationRequested))
|
||||
{
|
||||
return std::make_pair(false, headers);
|
||||
}
|
||||
@ -254,13 +254,13 @@ namespace ix
|
||||
_socket = std::make_shared<Socket>();
|
||||
}
|
||||
|
||||
auto isCancellationRequested = [this]() -> bool
|
||||
{
|
||||
return _requestInitCancellation;
|
||||
};
|
||||
|
||||
std::string errMsg;
|
||||
bool success = _socket->connect(host, port, errMsg,
|
||||
[this]() -> bool
|
||||
{
|
||||
return _requestInitCancellation;
|
||||
}
|
||||
);
|
||||
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
|
||||
if (!success)
|
||||
{
|
||||
std::stringstream ss;
|
||||
@ -295,7 +295,7 @@ namespace ix
|
||||
|
||||
ss << "\r\n";
|
||||
|
||||
if (!writeBytes(ss.str()))
|
||||
if (!_socket->writeBytes(ss.str(), isCancellationRequested))
|
||||
{
|
||||
return WebSocketInitResult(false, 0, std::string("Failed sending GET request to ") + url);
|
||||
}
|
||||
@ -305,7 +305,7 @@ namespace ix
|
||||
int i;
|
||||
for (i = 0; i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n'); ++i)
|
||||
{
|
||||
if (!readByte(line+i))
|
||||
if (!_socket->readByte(line+i, isCancellationRequested))
|
||||
{
|
||||
return WebSocketInitResult(false, 0, std::string("Failed reading HTTP status line from ") + url);
|
||||
}
|
||||
@ -339,7 +339,7 @@ namespace ix
|
||||
return WebSocketInitResult(false, status, ss.str());
|
||||
}
|
||||
|
||||
auto result = parseHttpHeaders();
|
||||
auto result = parseHttpHeaders(isCancellationRequested);
|
||||
auto headersValid = result.first;
|
||||
auto headers = result.second;
|
||||
|
||||
@ -391,6 +391,11 @@ namespace ix
|
||||
_socket.reset();
|
||||
_socket = std::make_shared<Socket>(fd);
|
||||
|
||||
auto isCancellationRequested = [this]() -> bool
|
||||
{
|
||||
return _requestInitCancellation;
|
||||
};
|
||||
|
||||
std::string remote = std::string("remote fd ") + std::to_string(fd);
|
||||
|
||||
// Read first line
|
||||
@ -398,7 +403,7 @@ namespace ix
|
||||
int i;
|
||||
for (i = 0; i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n'); ++i)
|
||||
{
|
||||
if (!readByte(line+i))
|
||||
if (!_socket->readByte(line+i, isCancellationRequested))
|
||||
{
|
||||
return WebSocketInitResult(false, 0, std::string("Failed reading HTTP status line from ") + remote);
|
||||
}
|
||||
@ -411,7 +416,7 @@ namespace ix
|
||||
|
||||
// FIXME: Validate line content (GET /)
|
||||
|
||||
auto result = parseHttpHeaders();
|
||||
auto result = parseHttpHeaders(isCancellationRequested);
|
||||
auto headersValid = result.first;
|
||||
auto headers = result.second;
|
||||
|
||||
@ -437,7 +442,7 @@ namespace ix
|
||||
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
|
||||
ss << "\r\n";
|
||||
|
||||
if (!writeBytes(ss.str()))
|
||||
if (!_socket->writeBytes(ss.str(), isCancellationRequested))
|
||||
{
|
||||
return WebSocketInitResult(false, 0, std::string("Failed sending response to ") + remote);
|
||||
}
|
||||
@ -906,62 +911,4 @@ namespace ix
|
||||
_socket->close();
|
||||
}
|
||||
|
||||
bool WebSocketTransport::readByte(void* buffer)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
if (_readyState == CLOSING) return false;
|
||||
|
||||
int ret;
|
||||
ret = _socket->recv(buffer, 1);
|
||||
|
||||
// We read one byte, as needed, all good.
|
||||
if (ret == 1)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
// There is possibly something to be read, try again
|
||||
else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
|
||||
_socket->getErrno() == EAGAIN))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
// There was an error during the read, abort
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool WebSocketTransport::writeBytes(const std::string& str)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
if (_readyState == CLOSING) return false;
|
||||
|
||||
char* buffer = const_cast<char*>(str.c_str());
|
||||
int len = (int) str.size();
|
||||
|
||||
int ret = _socket->send(buffer, len);
|
||||
|
||||
// We wrote some bytes, as needed, all good.
|
||||
if (ret > 0)
|
||||
{
|
||||
return ret == len;
|
||||
}
|
||||
// There is possibly something to be write, try again
|
||||
else if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
|
||||
_socket->getErrno() == EAGAIN))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
// There was an error during the write, abort
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ix
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "IXWebSocketPerMessageDeflate.h"
|
||||
#include "IXWebSocketPerMessageDeflateOptions.h"
|
||||
#include "IXWebSocketHttpHeaders.h"
|
||||
#include "IXCancellationRequest.h"
|
||||
|
||||
namespace ix
|
||||
{
|
||||
@ -162,11 +163,7 @@ namespace ix
|
||||
void unmaskReceiveBuffer(const wsheader_type& ws);
|
||||
std::string genRandomString(const int len);
|
||||
|
||||
// Non blocking versions of read/write, used during http upgrade
|
||||
bool readByte(void* buffer);
|
||||
bool writeBytes(const std::string& str);
|
||||
|
||||
// Parse HTTP headers
|
||||
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders();
|
||||
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(const CancellationRequest& isCancellationRequested);
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user