IXWebSocket/ixwebsocket/IXSocket.cpp

355 lines
9.0 KiB
C++
Raw Normal View History

2018-09-27 23:56:48 +02:00
/*
* IXSocket.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocket.h"
2018-12-15 01:28:11 +01:00
#include "IXSocketConnect.h"
2019-01-06 05:38:43 +01:00
#include "IXNetSystem.h"
2019-03-15 02:37:38 +01:00
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
2018-10-09 06:42:45 +02:00
2018-09-27 23:56:48 +02:00
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
2018-10-09 06:42:45 +02:00
#include <assert.h>
2018-09-27 23:56:48 +02:00
#include <stdint.h>
#include <fcntl.h>
2018-10-09 06:42:45 +02:00
#include <sys/types.h>
#include <algorithm>
2018-09-27 23:56:48 +02:00
#ifdef min
#undef min
#endif
namespace ix
2018-09-27 23:56:48 +02:00
{
2019-01-24 21:42:49 +01:00
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize;
2019-01-24 21:42:49 +01:00
Socket::Socket(int fd) :
2019-03-15 02:37:38 +01:00
_sockfd(fd),
_selectInterrupt(createSelectInterrupt())
2018-09-27 23:56:48 +02:00
{
;
2018-09-27 23:56:48 +02:00
}
Socket::~Socket()
{
close();
}
PollResultType Socket::poll(int timeoutMs)
2018-09-27 23:56:48 +02:00
{
2019-01-27 05:50:25 +01:00
if (_sockfd == -1)
{
2019-04-19 01:42:44 +02:00
return PollResultType::Error;
2019-01-27 05:50:25 +01:00
}
2018-09-27 23:56:48 +02:00
return isReadyToRead(timeoutMs);
}
2019-03-19 01:54:51 +01:00
PollResultType Socket::select(bool readyToRead, int timeoutMs)
{
fd_set rfds;
fd_set wfds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds);
2019-03-15 02:37:38 +01:00
// File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1)
{
FD_SET(interruptFd, fds);
}
2019-01-27 05:57:48 +01:00
struct timeval timeout;
2019-03-19 01:54:51 +01:00
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = 1000 * (timeoutMs % 1000);
2019-03-19 01:54:51 +01:00
// Compute the highest fd.
int sockfd = _sockfd;
2019-03-15 02:37:38 +01:00
int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
2019-03-19 01:54:51 +01:00
(timeoutMs < 0) ? nullptr : &timeout);
2019-03-19 17:29:57 +01:00
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0)
{
2019-03-19 17:29:57 +01:00
pollResult = PollResultType::Error;
}
else if (ret == 0)
{
2019-03-19 17:29:57 +01:00
pollResult = PollResultType::Timeout;
}
2019-03-19 06:00:08 +01:00
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
2019-03-15 02:37:38 +01:00
uint64_t value = _selectInterrupt->read();
if (value == kSendRequest)
{
2019-03-19 17:29:57 +01:00
pollResult = PollResultType::SendRequest;
}
else if (value == kCloseRequest)
{
2019-03-19 17:29:57 +01:00
pollResult = PollResultType::CloseRequest;
}
}
2019-03-19 06:00:08 +01:00
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{
2019-03-19 17:29:57 +01:00
pollResult = PollResultType::ReadyForRead;
2019-03-19 06:00:08 +01:00
}
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{
2019-03-19 17:29:57 +01:00
pollResult = PollResultType::ReadyForWrite;
}
return pollResult;
2018-09-27 23:56:48 +02:00
}
2019-03-19 01:54:51 +01:00
PollResultType Socket::isReadyToRead(int timeoutMs)
{
bool readyToRead = true;
2019-03-19 01:54:51 +01:00
return select(readyToRead, timeoutMs);
}
2019-03-19 01:54:51 +01:00
PollResultType Socket::isReadyToWrite(int timeoutMs)
{
bool readyToRead = false;
2019-03-19 01:54:51 +01:00
return select(readyToRead, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint64_t wakeUpCode)
2018-09-27 23:56:48 +02:00
{
2019-03-15 02:37:38 +01:00
return _selectInterrupt->notify(wakeUpCode);
2018-09-27 23:56:48 +02:00
}
bool Socket::connect(const std::string& host,
int port,
std::string& errMsg,
2018-12-15 01:28:11 +01:00
const CancellationRequest& isCancellationRequested)
2018-09-27 23:56:48 +02:00
{
std::lock_guard<std::mutex> lock(_socketMutex);
2019-03-15 02:37:38 +01:00
if (!_selectInterrupt->clear()) return false;
2018-09-27 23:56:48 +02:00
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
2018-09-27 23:56:48 +02:00
return _sockfd != -1;
}
void Socket::close()
{
std::lock_guard<std::mutex> lock(_socketMutex);
if (_sockfd == -1) return;
2018-10-09 06:42:45 +02:00
closeSocket(_sockfd);
2018-09-27 23:56:48 +02:00
_sockfd = -1;
}
ssize_t Socket::send(char* buffer, size_t length)
2018-09-27 23:56:48 +02:00
{
int flags = 0;
#ifdef MSG_NOSIGNAL
flags = MSG_NOSIGNAL;
#endif
return ::send(_sockfd, buffer, length, flags);
2018-09-27 23:56:48 +02:00
}
ssize_t Socket::send(const std::string& buffer)
2018-09-27 23:56:48 +02:00
{
return send((char*)&buffer[0], buffer.size());
}
ssize_t Socket::recv(void* buffer, size_t length)
2018-09-27 23:56:48 +02:00
{
int flags = 0;
#ifdef MSG_NOSIGNAL
flags = MSG_NOSIGNAL;
#endif
return ::recv(_sockfd, (char*) buffer, length, flags);
2018-09-27 23:56:48 +02:00
}
2019-01-05 02:28:13 +01:00
int Socket::getErrno()
2018-10-09 06:42:45 +02:00
{
int err;
2018-10-09 06:42:45 +02:00
#ifdef _WIN32
err = WSAGetLastError();
2018-10-09 06:42:45 +02:00
#else
err = errno;
2018-10-09 06:42:45 +02:00
#endif
return err;
2018-10-09 06:42:45 +02:00
}
2019-05-06 18:13:42 +02:00
bool Socket::isWaitNeeded()
{
int err = getErrno();
if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
2019-05-06 18:13:42 +02:00
{
return true;
}
return false;
}
2018-10-09 06:42:45 +02:00
void Socket::closeSocket(int fd)
{
#ifdef _WIN32
closesocket(fd);
#else
::close(fd);
#endif
}
2019-03-15 02:37:38 +01:00
bool Socket::init(std::string& errorMsg)
2018-10-09 06:42:45 +02:00
{
2019-03-15 02:37:38 +01:00
return _selectInterrupt->init(errorMsg);
2018-10-09 06:42:45 +02:00
}
bool Socket::writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested && isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
ssize_t ret = send(buffer, len);
// We wrote some bytes, as needed, all good.
if (ret > 0)
{
return ret == len;
}
// There is possibly something to be writen, try again
2019-05-06 18:13:42 +02:00
else if (ret < 0 && Socket::isWaitNeeded())
{
continue;
}
// There was an error during the write, abort
else
{
return false;
}
}
}
bool Socket::readByte(void* buffer,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested && isCancellationRequested()) return false;
2019-01-06 06:10:08 +01:00
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
if (ret == 1)
{
return true;
}
// There is possibly something to be read, try again
2019-05-06 18:13:42 +02:00
else if (ret < 0 && Socket::isWaitNeeded())
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
2019-03-19 17:29:57 +01:00
if (isReadyToRead(1) == PollResultType::Error)
{
return false;
}
}
// There was an error during the read, abort
else
{
return false;
}
}
}
std::pair<bool, std::string> Socket::readLine(
const CancellationRequest& isCancellationRequested)
{
char c;
std::string line;
line.reserve(64);
for (int i = 0; i < 2 || (line[i-2] != '\r' && line[i-1] != '\n'); ++i)
{
if (!readByte(&c, isCancellationRequested))
{
// Return what we were able to read
return std::make_pair(false, line);
}
line += c;
}
return std::make_pair(true, line);
}
std::pair<bool, std::string> Socket::readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested)
{
if (_readBuffer.empty())
{
_readBuffer.resize(kChunkSize);
}
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
2019-05-06 18:13:42 +02:00
if (ret <= 0 && !Socket::isWaitNeeded())
{
// Error
return std::make_pair(false, std::string());
}
2019-05-06 18:13:42 +02:00
else
{
output.insert(output.end(),
_readBuffer.begin(),
_readBuffer.begin() + ret);
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
2019-03-19 17:29:57 +01:00
if (isReadyToRead(1) == PollResultType::Error)
{
return std::make_pair(false, std::string());
}
}
return std::make_pair(true, std::string(output.begin(),
output.end()));
}
2018-09-27 23:56:48 +02:00
}