(socket server) do not create a select interrupt object everytime when polling for notifications while waiting for new connections, instead use a persistent one which is a member variable

This commit is contained in:
Benjamin Sergeant 2020-08-15 15:28:15 -07:00
parent ed2ed0f7ae
commit 261095fa12
9 changed files with 31 additions and 19 deletions

View File

@ -2,6 +2,10 @@
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [10.2.1] - 2020-08-15
(socket server) do not create a select interrupt object everytime when polling for notifications while waiting for new connections, instead use a persistent one which is a member variable
## [10.2.0] - 2020-08-14 ## [10.2.0] - 2020-08-14
(ixwebsocket client) handle HTTP redirects (ixwebsocket client) handle HTTP redirects

View File

@ -8,6 +8,9 @@
namespace ix namespace ix
{ {
const uint64_t SelectInterrupt::kSendRequest = 1;
const uint64_t SelectInterrupt::kCloseRequest = 2;
SelectInterrupt::SelectInterrupt() SelectInterrupt::SelectInterrupt()
{ {
; ;

View File

@ -6,6 +6,7 @@
#pragma once #pragma once
#include <memory>
#include <stdint.h> #include <stdint.h>
#include <string> #include <string>
@ -23,5 +24,11 @@ namespace ix
virtual bool clear(); virtual bool clear();
virtual uint64_t read(); virtual uint64_t read();
virtual int getFd() const; virtual int getFd() const;
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
}; };
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
} // namespace ix } // namespace ix

View File

@ -27,8 +27,6 @@ namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize; constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) Socket::Socket(int fd)
@ -96,11 +94,11 @@ namespace ix
{ {
uint64_t value = selectInterrupt->read(); uint64_t value = selectInterrupt->read();
if (value == kSendRequest) if (value == SelectInterrupt::kSendRequest)
{ {
pollResult = PollResultType::SendRequest; pollResult = PollResultType::SendRequest;
} }
else if (value == kCloseRequest) else if (value == SelectInterrupt::kCloseRequest)
{ {
pollResult = PollResultType::CloseRequest; pollResult = PollResultType::CloseRequest;
} }

View File

@ -34,12 +34,10 @@ typedef SSIZE_T ssize_t;
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXProgressCallback.h" #include "IXProgressCallback.h"
#include "IXSelectInterrupt.h"
namespace ix namespace ix
{ {
class SelectInterrupt;
using SelectInterruptPtr = std::unique_ptr<SelectInterrupt>;
enum class PollResultType enum class PollResultType
{ {
ReadyForRead = 0, ReadyForRead = 0,
@ -96,11 +94,6 @@ namespace ix
int sockfd, int sockfd,
const SelectInterruptPtr& selectInterrupt); const SelectInterruptPtr& selectInterrupt);
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected: protected:
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;

View File

@ -12,6 +12,7 @@
#include "IXSocket.h" #include "IXSocket.h"
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#include "IXSelectInterruptFactory.h"
#include <assert.h> #include <assert.h>
#include <sstream> #include <sstream>
#include <stdio.h> #include <stdio.h>
@ -36,6 +37,7 @@ namespace ix
, _stop(false) , _stop(false)
, _stopGc(false) , _stopGc(false)
, _connectionStateFactory(&ConnectionState::createConnectionState) , _connectionStateFactory(&ConnectionState::createConnectionState)
, _acceptSelectInterrupt(createSelectInterrupt())
{ {
} }
@ -193,6 +195,7 @@ namespace ix
if (_thread.joinable()) if (_thread.joinable())
{ {
_stop = true; _stop = true;
_acceptSelectInterrupt->notify(SelectInterrupt::kCloseRequest); // Wake up select
_thread.join(); _thread.join();
_stop = false; _stop = false;
} }
@ -249,7 +252,7 @@ namespace ix
// Set the socket to non blocking mode, so that accept calls are not blocking // Set the socket to non blocking mode, so that accept calls are not blocking
SocketConnect::configure(_serverFd); SocketConnect::configure(_serverFd);
setThreadName("SocketServer::listen"); setThreadName("SocketServer::accept");
for (;;) for (;;)
{ {
@ -258,9 +261,8 @@ namespace ix
// Use poll to check whether a new connection is in progress // Use poll to check whether a new connection is in progress
int timeoutMs = 10; int timeoutMs = 10;
bool readyToRead = true; bool readyToRead = true;
auto selectInterrupt = std::make_unique<SelectInterrupt>();
PollResultType pollResult = PollResultType pollResult =
Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt); Socket::poll(readyToRead, timeoutMs, _serverFd, _acceptSelectInterrupt);
if (pollResult == PollResultType::Error) if (pollResult == PollResultType::Error)
{ {
@ -308,6 +310,7 @@ namespace ix
continue; continue;
} }
// Retrieve connection info, the ip address of the remote peer/client)
std::unique_ptr<ConnectionInfo> connectionInfo; std::unique_ptr<ConnectionInfo> connectionInfo;
if (_addressFamily == AF_INET) if (_addressFamily == AF_INET)

View File

@ -8,6 +8,7 @@
#include "IXConnectionInfo.h" #include "IXConnectionInfo.h"
#include "IXConnectionState.h" #include "IXConnectionState.h"
#include "IXSelectInterrupt.h"
#include "IXSocketTLSOptions.h" #include "IXSocketTLSOptions.h"
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
@ -112,5 +113,8 @@ namespace ix
size_t getConnectionsThreadsCount(); size_t getConnectionsThreadsCount();
SocketTLSOptions _socketTLSOptions; SocketTLSOptions _socketTLSOptions;
// to wake up from select
SelectInterruptPtr _acceptSelectInterrupt;
}; };
} // namespace ix } // namespace ix

View File

@ -659,7 +659,7 @@ namespace ix
// send back the CLOSE frame // send back the CLOSE frame
sendCloseFrame(code, reason); sendCloseFrame(code, reason);
wakeUpFromPoll(Socket::kCloseRequest); wakeUpFromPoll(SelectInterrupt::kCloseRequest);
bool remote = true; bool remote = true;
closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote); closeSocketAndSwitchToClosedState(code, reason, _rxbuf.size(), remote);
@ -879,7 +879,7 @@ namespace ix
// Request to flush the send buffer on the background thread if it isn't empty // Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty()) if (!isSendBufferEmpty())
{ {
wakeUpFromPoll(Socket::kSendRequest); wakeUpFromPoll(SelectInterrupt::kSendRequest);
// FIXME: we should have a timeout when sending large messages: see #131 // FIXME: we should have a timeout when sending large messages: see #131
if (_blockingSend && !flushSendBuffer()) if (_blockingSend && !flushSendBuffer())
@ -1148,7 +1148,7 @@ namespace ix
sendCloseFrame(code, reason); sendCloseFrame(code, reason);
// wake up the poll, but do not close yet // wake up the poll, but do not close yet
wakeUpFromPoll(Socket::kSendRequest); wakeUpFromPoll(SelectInterrupt::kSendRequest);
} }
size_t WebSocketTransport::bufferedAmount() const size_t WebSocketTransport::bufferedAmount() const

View File

@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "10.2.1" #define IX_WEBSOCKET_VERSION "10.2.2"