linux fix / linux still use event fd for now

This commit is contained in:
Benjamin Sergeant 2019-03-13 17:23:05 -07:00
parent d1a7b9a985
commit 922d58eb59
10 changed files with 107 additions and 65 deletions

View File

@ -12,11 +12,19 @@ RUN apt-get -y install libz-dev
RUN apt-get -y install vim RUN apt-get -y install vim
RUN apt-get -y install make RUN apt-get -y install make
RUN apt-get -y install cmake RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . . COPY . .
WORKDIR ws ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
RUN ["sh", "docker_build.sh"] ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765 EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--hostname", "0.0.0.0"] CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

View File

@ -17,6 +17,8 @@
// cf Android/Kernel table here // cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel // https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
// //
// On macOS we use UNIX pipes to wake up select.
//
#include "IXEventFd.h" #include "IXEventFd.h"
@ -24,17 +26,24 @@
# include <sys/eventfd.h> # include <sys/eventfd.h>
#endif #endif
#ifndef _WIN32
#include <unistd.h> // for write #include <unistd.h> // for write
#endif #include <fcntl.h>
namespace ix namespace ix
{ {
EventFd::EventFd() : EventFd::EventFd()
_eventfd(-1)
{ {
#ifdef __linux__ #ifdef __linux__
_eventfd = -1;
_eventfd = eventfd(0, 0); _eventfd = eventfd(0, 0);
fcntl(_eventfd, F_SETFL, O_NONBLOCK);
#else
_fildes[0] = -1;
_fildes[1] = -1;
pipe(_fildes);
fcntl(_fildes[0], F_SETFL, O_NONBLOCK);
fcntl(_fildes[1], F_SETFL, O_NONBLOCK);
#endif #endif
} }
@ -42,22 +51,44 @@ namespace ix
{ {
#ifdef __linux__ #ifdef __linux__
::close(_eventfd); ::close(_eventfd);
#else
::close(_fildes[0]);
::close(_fildes[1]);
_fildes[0] = -1;
_fildes[1] = -1;
#endif #endif
} }
bool EventFd::notify() bool EventFd::notify(uint64_t value)
{ {
#if defined(__linux__) int fd;
if (_eventfd == -1) return false;
// select will wake up when a non-zero value is written to our eventfd #if defined(__linux__)
uint64_t value = 1; fd = _eventfd;
#else
// File descriptor at index 1 in _fildes is the write end of the pipe
fd = _fildes[1];
#endif
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t // we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8; return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t EventFd::read()
{
int fd;
#if defined(__linux__)
fd = _eventfd;
#else #else
return true; fd = _fildes[0];
#endif #endif
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
} }
bool EventFd::clear() bool EventFd::clear()
@ -77,6 +108,10 @@ namespace ix
int EventFd::getFd() int EventFd::getFd()
{ {
#if defined(__linux__)
return _eventfd; return _eventfd;
#else
return _fildes[0];
#endif
} }
} }

View File

@ -6,6 +6,8 @@
#pragma once #pragma once
#include <stdint.h>
namespace ix namespace ix
{ {
class EventFd { class EventFd {
@ -13,11 +15,19 @@ namespace ix
EventFd(); EventFd();
virtual ~EventFd(); virtual ~EventFd();
bool notify(); bool notify(uint64_t value);
bool clear(); bool clear();
uint64_t read();
int getFd(); int getFd();
private: private:
#if defined(__linux__)
int _eventfd; int _eventfd;
#else
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
#endif
}; };
} }

View File

@ -23,15 +23,14 @@ namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const int Socket::kSendRequest = 1; const uint8_t Socket::kSendRequest = 1;
const int Socket::kCloseRequest = 2; const uint8_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize; constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd) _sockfd(fd)
{ {
_fildes[0] = -1; ;
_fildes[1] = -1;
} }
Socket::~Socket() Socket::~Socket()
@ -59,9 +58,10 @@ namespace ix
FD_SET(_sockfd, &rfds); FD_SET(_sockfd, &rfds);
// File descriptor at index 0 in _fildes is the read end of the pipe // File descriptor at index 0 in _fildes is the read end of the pipe
if (_fildes[0] != -1) int eventfd = _eventfd.getFd();
if (eventfd != -1)
{ {
FD_SET(_fildes[0], &rfds); FD_SET(eventfd, &rfds);
} }
struct timeval timeout; struct timeval timeout;
@ -70,7 +70,7 @@ namespace ix
// Compute the highest fd. // Compute the highest fd.
int sockfd = _sockfd; int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _fildes[0]); int nfds = (std::max)(sockfd, eventfd);
int ret = ::select(nfds + 1, &rfds, nullptr, nullptr, int ret = ::select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout); (timeoutSecs < 0) ? nullptr : &timeout);
@ -84,10 +84,9 @@ namespace ix
{ {
pollResult = PollResultType_Timeout; pollResult = PollResultType_Timeout;
} }
else if (_fildes[0] != -1 && FD_ISSET(_fildes[0], &rfds)) else if (eventfd != -1 && FD_ISSET(eventfd, &rfds))
{ {
uint64_t value = 0; uint8_t value = _eventfd.read();
read(_fildes[0], &value, sizeof(value));
if (value == kSendRequest) if (value == kSendRequest)
{ {
@ -102,14 +101,10 @@ namespace ix
return pollResult; return pollResult;
} }
// Wake up from poll/select by writing to the pipe which is is watched by select // Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(int wakeUpCode) bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{ {
// File descriptor at index 1 in _fildes is the write end of the pipe return _eventfd.notify(wakeUpCode);
if (_fildes[1] == -1) return false;
int value = wakeUpCode;
return ::write(_fildes[1], &value, sizeof(value)) == 4;
} }
bool Socket::connect(const std::string& host, bool Socket::connect(const std::string& host,
@ -119,10 +114,7 @@ namespace ix
{ {
std::lock_guard<std::mutex> lock(_socketMutex); std::lock_guard<std::mutex> lock(_socketMutex);
if (pipe(_fildes) < 0) return false; if (!_eventfd.clear()) return false;
fcntl(_fildes[0], F_SETFL, O_NONBLOCK);
fcntl(_fildes[1], F_SETFL, O_NONBLOCK);
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
return _sockfd != -1; return _sockfd != -1;
@ -136,11 +128,6 @@ namespace ix
closeSocket(_sockfd); closeSocket(_sockfd);
_sockfd = -1; _sockfd = -1;
::close(_fildes[0]);
::close(_fildes[1]);
_fildes[0] = -1;
_fildes[1] = -1;
} }
ssize_t Socket::send(char* buffer, size_t length) ssize_t Socket::send(char* buffer, size_t length)

View File

@ -20,6 +20,7 @@ typedef SSIZE_T ssize_t;
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXProgressCallback.h" #include "IXProgressCallback.h"
#include "IXEventFd.h"
namespace ix namespace ix
{ {
@ -44,7 +45,7 @@ namespace ix
PollResultType select(int timeoutSecs, int timeoutMs); PollResultType select(int timeoutSecs, int timeoutMs);
virtual void poll(const OnPollCallback& onPollCallback, virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
virtual bool wakeUpFromPoll(int wakeUpCode); virtual bool wakeUpFromPoll(uint8_t wakeUpCode);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,
@ -76,8 +77,8 @@ namespace ix
static void cleanup(); // Required on Windows to cleanup WinSocket static void cleanup(); // Required on Windows to cleanup WinSocket
// Used as special codes for pipe communication // Used as special codes for pipe communication
static const int kSendRequest; static const uint8_t kSendRequest;
static const int kCloseRequest; static const uint8_t kCloseRequest;
protected: protected:
void closeSocket(int fd); void closeSocket(int fd);
@ -93,9 +94,6 @@ namespace ix
std::vector<uint8_t> _readBuffer; std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15; static constexpr size_t kChunkSize = 1 << 15;
// Store file descriptors used by the communication pipe. Communication EventFd _eventfd;
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
}; };
} }

View File

@ -8,10 +8,10 @@ brew:
.PHONY: docker .PHONY: docker
docker: docker:
docker build -t broadcast_server:latest . docker build -t ws:latest .
run: run:
docker run --cap-add sys_ptrace -it broadcast_server:latest bash docker run --cap-add sys_ptrace -it ws:latest
# this is helpful to remove trailing whitespaces # this is helpful to remove trailing whitespaces
trail: trail:
@ -43,5 +43,9 @@ rebase_upstream:
git reset --hard upstream/master git reset --hard upstream/master
git push origin master --force git push origin master --force
install_cmake_for_linux:
mkdir -p /tmp/cmake
(cd /tmp/cmake ; curl -L -O https://github.com/Kitware/CMake/releases/download/v3.14.0-rc4/cmake-3.14.0-rc4-Linux-x86_64.tar.gz ; tar zxf cmake-3.14.0-rc4-Linux-x86_64.tar.gz)
.PHONY: test .PHONY: test
.PHONY: build .PHONY: build

View File

@ -51,6 +51,7 @@ int main(int argc, char** argv)
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url"); transferApp->add_option("--port", port, "Connection url");
transferApp->add_option("--host", hostname, "Hostname");
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
@ -60,11 +61,12 @@ int main(int argc, char** argv)
chatApp->add_option("user", user, "User name")->required(); chatApp->add_option("user", user, "User name")->required();
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server"); CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Connection url"); echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Connection url"); broadcastServerApp->add_option("--port", port, "Port");
broadcastServerApp->add_option("--hostname", hostname, "Hostname"); broadcastServerApp->add_option("--host", hostname, "Hostname");
CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong"); CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong");
pingPongApp->add_option("url", url, "Connection url")->required(); pingPongApp->add_option("url", url, "Connection url")->required();
@ -90,7 +92,7 @@ int main(int argc, char** argv)
if (app.got_subcommand("transfer")) if (app.got_subcommand("transfer"))
{ {
return ix::ws_transfer_main(port); return ix::ws_transfer_main(port, hostname);
} }
else if (app.got_subcommand("send")) else if (app.got_subcommand("send"))
{ {
@ -111,7 +113,7 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("echo_server")) else if (app.got_subcommand("echo_server"))
{ {
return ix::ws_echo_server_main(port); return ix::ws_echo_server_main(port, hostname);
} }
else if (app.got_subcommand("broadcast_server")) else if (app.got_subcommand("broadcast_server"))
{ {

View File

@ -24,9 +24,9 @@ namespace ix
int ws_ping_pong_main(const std::string& url); int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port); int ws_echo_server_main(int port, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname); int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname);
int ws_chat_main(const std::string& url, int ws_chat_main(const std::string& url,
const std::string& user); const std::string& user);
@ -36,8 +36,6 @@ namespace ix
int ws_receive_main(const std::string& url, int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate); bool enablePerMessageDeflate);
int ws_transfer_main(int port);
int ws_send_main(const std::string& url, int ws_send_main(const std::string& url,
const std::string& path); const std::string& path);
} }

View File

@ -10,11 +10,11 @@
namespace ix namespace ix
{ {
int ws_echo_server_main(int port) int ws_echo_server_main(int port, const std::string& hostname)
{ {
std::cout << "Listening on port " << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket) [](std::shared_ptr<ix::WebSocket> webSocket)

View File

@ -10,11 +10,11 @@
namespace ix namespace ix
{ {
int ws_transfer_main(int port) int ws_transfer_main(int port, const std::string& hostname)
{ {
std::cout << "Listening on port " << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<ix::WebSocket> webSocket)