This commit is contained in:
Benjamin Sergeant 2019-03-13 12:05:17 -07:00
parent 11092027cd
commit d1a7b9a985
10 changed files with 163 additions and 77 deletions

View File

@ -23,6 +23,8 @@ namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const int Socket::kSendRequest = 1;
const int Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize; constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) : Socket::Socket(int fd) :
@ -45,31 +47,18 @@ namespace ix
return; return;
} }
int ret = select(timeoutSecs, 0); PollResultType pollResult = select(timeoutSecs, 0);
PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType_Error;
}
else if (ret == 0)
{
pollResult = PollResultType_Timeout;
}
if (onPollCallback) onPollCallback(pollResult); if (onPollCallback) onPollCallback(pollResult);
} }
int Socket::select(int timeoutSecs, int timeoutMs) PollResultType Socket::select(int timeoutSecs, int timeoutMs)
{ {
fd_set rfds; fd_set rfds;
FD_ZERO(&rfds); FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds); FD_SET(_sockfd, &rfds);
#ifdef __linux__ // File descriptor at index 0 in _fildes is the read end of the pipe
FD_SET(_eventfd.getFd(), &rfds);
#endif
if (_fildes[0] != -1) if (_fildes[0] != -1)
{ {
FD_SET(_fildes[0], &rfds); FD_SET(_fildes[0], &rfds);
@ -80,35 +69,47 @@ namespace ix
timeout.tv_usec = 1000 * timeoutMs; timeout.tv_usec = 1000 * timeoutMs;
// Compute the highest fd. // Compute the highest fd.
// FIXME / cleanup int sockfd = _sockfd;
std::vector<int> fds = { _sockfd, _eventfd.getFd(), _fildes[0] }; int nfds = (std::max)(sockfd, _fildes[0]);
int nfds = -1;
for (auto fd : fds)
{
if (fd >= nfds)
{
nfds = fd;
}
}
int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, int ret = ::select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout); (timeoutSecs < 0) ? nullptr : &timeout);
if (_fildes[0] != -1 && FD_ISSET(_fildes[0], &rfds)) PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType_Error;
}
else if (ret == 0)
{
pollResult = PollResultType_Timeout;
}
else if (_fildes[0] != -1 && FD_ISSET(_fildes[0], &rfds))
{ {
fprintf(stderr, "something wrote to the pipe\n");
uint64_t value = 0; uint64_t value = 0;
read(_fildes[0], &value, sizeof(value)); read(_fildes[0], &value, sizeof(value));
if (value == kSendRequest)
{
pollResult = PollResultType_SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType_CloseRequest;
}
} }
return ret; return pollResult;
} }
void Socket::wakeUpFromPoll() // Wake up from poll/select by writing to the pipe which is is watched by select
bool Socket::wakeUpFromPoll(int wakeUpCode)
{ {
uint64_t value = 0; // File descriptor at index 1 in _fildes is the write end of the pipe
write(_fildes[1], &value, sizeof(value)); if (_fildes[1] == -1) return false;
int value = wakeUpCode;
return ::write(_fildes[1], &value, sizeof(value)) == 4;
} }
bool Socket::connect(const std::string& host, bool Socket::connect(const std::string& host,
@ -118,7 +119,6 @@ namespace ix
{ {
std::lock_guard<std::mutex> lock(_socketMutex); std::lock_guard<std::mutex> lock(_socketMutex);
if (!_eventfd.clear()) return false;
if (pipe(_fildes) < 0) return false; if (pipe(_fildes) < 0) return false;
fcntl(_fildes[0], F_SETFL, O_NONBLOCK); fcntl(_fildes[0], F_SETFL, O_NONBLOCK);

View File

@ -18,7 +18,6 @@
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
#endif #endif
#include "IXEventFd.h"
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXProgressCallback.h" #include "IXProgressCallback.h"
@ -28,7 +27,9 @@ namespace ix
{ {
PollResultType_ReadyForRead = 0, PollResultType_ReadyForRead = 0,
PollResultType_Timeout = 1, PollResultType_Timeout = 1,
PollResultType_Error = 2 PollResultType_Error = 2,
PollResultType_SendRequest = 3,
PollResultType_CloseRequest = 4
}; };
class Socket { class Socket {
@ -40,10 +41,10 @@ namespace ix
void configure(); void configure();
int select(int timeoutSecs, int timeoutMs); PollResultType select(int timeoutSecs, int timeoutMs);
virtual void poll(const OnPollCallback& onPollCallback, virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
virtual void wakeUpFromPoll(); virtual bool wakeUpFromPoll(int wakeUpCode);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,
@ -74,12 +75,15 @@ namespace ix
static bool init(); // Required on Windows to initialize WinSocket static bool init(); // Required on Windows to initialize WinSocket
static void cleanup(); // Required on Windows to cleanup WinSocket static void cleanup(); // Required on Windows to cleanup WinSocket
// Used as special codes for pipe communication
static const int kSendRequest;
static const int kCloseRequest;
protected: protected:
void closeSocket(int fd); void closeSocket(int fd);
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;
EventFd _eventfd;
private: private:
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
@ -89,6 +93,9 @@ namespace ix
std::vector<uint8_t> _readBuffer; std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15; static constexpr size_t kChunkSize = 1 << 15;
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2]; int _fildes[2];
}; };
} }

View File

@ -379,4 +379,9 @@ namespace ix
{ {
_automaticReconnection = false; _automaticReconnection = false;
} }
size_t WebSocket::bufferedAmount() const
{
return _ws.bufferedAmount();
}
} }

View File

@ -112,6 +112,7 @@ namespace ix
const std::string& getUrl() const; const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const; int getHeartBeatPeriod() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection(); void enableAutomaticReconnection();
void disableAutomaticReconnection(); void disableAutomaticReconnection();

View File

@ -1,7 +1,31 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2012, 2013 <dhbaird@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/* /*
* IXWebSocketTransport.cpp * IXWebSocketTransport.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/ */
// //
@ -14,14 +38,6 @@
#include "IXUrlParser.h" #include "IXUrlParser.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
# include "IXSocketAppleSSL.h"
# else
# include "IXSocketOpenSSL.h"
# endif
#endif
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -184,42 +200,51 @@ namespace ix
std::stringstream ss; std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str()); sendPing(ss.str());
return;
} }
// Make sure we send all the buffered data // Make sure we send all the buffered data
// there can be a lot of it for large messages. // there can be a lot of it for large messages.
while (!isSendBufferEmpty() && !_requestInitCancellation) sendOnSocket(); else if (pollResult == PollResultType_SendRequest)
while (true)
{ {
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); while (!isSendBufferEmpty() && !_requestInitCancellation)
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{ {
break; sendOnSocket();
}
else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
} }
} }
else if (pollResult == PollResultType_ReadyForRead)
{
while (true)
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (isSendBufferEmpty() && _readyState == CLOSING) if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
}
}
}
else if (pollResult == PollResultType_Error)
{ {
_socket->close(); _socket->close();
setReadyState(CLOSED);
} }
else if (pollResult == PollResultType_CloseRequest)
{
;
}
}, },
_heartBeatPeriod); _heartBeatPeriod);
} }
@ -590,7 +615,7 @@ namespace ix
} }
} }
_socket->wakeUpFromPoll(); _socket->wakeUpFromPoll(Socket::kSendRequest);
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize); return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
} }
@ -737,8 +762,17 @@ namespace ix
sendData(wsheader_type::CLOSE, normalClosure, compress); sendData(wsheader_type::CLOSE, normalClosure, compress);
setReadyState(CLOSING); setReadyState(CLOSING);
_socket->wakeUpFromPoll(); _socket->wakeUpFromPoll(Socket::kCloseRequest);
_socket->close(); _socket->close();
_closeCode = 1000;
setReadyState(CLOSED);
}
size_t WebSocketTransport::bufferedAmount() const
{
std::lock_guard<std::mutex> lock(_txbufMutex);
return _txbuf.size();
} }
} // namespace ix } // namespace ix

View File

@ -77,6 +77,7 @@ namespace ix
void setReadyState(ReadyStateValues readyStateValue); void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback); void dispatch(const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
private: private:
std::string _url; std::string _url;

View File

@ -164,10 +164,21 @@ namespace
ss << "cmd_websocket_chat: Error ! " << error.reason; ss << "cmd_websocket_chat: Error ! " << error.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
else else
{ {
// FIXME: missing ping/pong messages ss << "Unexpected ix::WebSocketMessageType";
ss << "Invalid ix::WebSocketMessageType";
log(ss.str()); log(ss.str());
} }
}); });

View File

@ -71,6 +71,15 @@ namespace ix
<< " out of " << total << std::endl; << " out of " << total << std::endl;
return true; return true;
}); });
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (client->bufferedAmount() != 0);
} }
} }
} }

View File

@ -257,6 +257,15 @@ namespace ix
return true; return true;
}); });
do
{
size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << bufferedAmount << " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (_webSocket.bufferedAmount() != 0);
bench.report(); bench.report();
auto duration = bench.getDuration(); auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration; auto transferRate = 1000 * content.size() / duration;

View File

@ -70,6 +70,15 @@ namespace ix
<< " out of " << total << std::endl; << " out of " << total << std::endl;
return true; return true;
}); });
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (client->bufferedAmount() != 0);
} }
} }
} }