Compare commits

..

2 Commits

Author SHA1 Message Date
Benjamin Sergeant
0834198e74 do not create a kqueue everytime we call poll 2020-08-05 15:00:01 -07:00
Benjamin Sergeant
22dd32d4e9 wip kqueue, with selectinterrupt not implemented + we create a queue every time 2020-08-05 14:47:03 -07:00
12 changed files with 267 additions and 169 deletions

View File

@@ -2,14 +2,6 @@
All changes to this project will be documented in this file. All changes to this project will be documented in this file.
## [10.1.7] - 2020-08-11
(ws) -q option imply info log level, not warning log level
## [10.1.6] - 2020-08-06
(websocket server) Handle programmer error when the server callback is not registered properly (fix #227)
## [10.1.5] - 2020-08-02 ## [10.1.5] - 2020-08-02
(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235). (ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235).

View File

@@ -67,28 +67,9 @@ webSocket.stop()
### Sending messages ### Sending messages
`WebSocketSendInfo result = websocket.send("foo")` will send a message. `websocket.send("foo")` will send a message.
If the connection was closed, sending will fail, and the success field of the result object will be set to false. There could also be a compression error in which case the compressError field will be set to true. The payloadSize field and wireSize fields will tell you respectively how much bytes the message weight, and how many bytes were sent on the wire (potentially compressed + counting the message header (a few bytes). If the connection was closed and sending failed, the return value will be set to false.
There is an optional progress callback that can be passed in as the second argument. If a message is large it will be fragmented into chunks which will be sent independantly. Everytime the we can write a fragment into the OS network cache, the callback will be invoked. If a user wants to cancel a slow send, false should be returned from within the callback.
Here is an example code snippet copied from the ws send sub-command. Each fragment weights 32K, so the total integer is the wireSize divided by 32K. As an example if you are sending 32M of data, uncompressed, total will be 1000. current will be set to 0 for the first fragment, then 1, 2 etc...
```
auto result =
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
if (throttle)
{
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
}
return _connected;
});
```
### ReadyState ### ReadyState

View File

@@ -35,19 +35,122 @@ namespace ix
: _sockfd(fd) : _sockfd(fd)
, _selectInterrupt(createSelectInterrupt()) , _selectInterrupt(createSelectInterrupt())
{ {
; #if defined(__APPLE__)
_kqueuefd = kqueue();
#endif
} }
Socket::~Socket() Socket::~Socket()
{ {
close(); close();
#if defined(__APPLE__)
::close(_kqueuefd);
#endif
} }
PollResultType Socket::poll(bool readyToRead, PollResultType Socket::poll(bool readyToRead,
int timeoutMs, int timeoutMs,
int sockfd, int sockfd,
const SelectInterruptPtr& selectInterrupt) const SelectInterruptPtr& selectInterrupt,
int kqueuefd)
{ {
#if defined(__APPLE__)
// FIXME int kqueuefd = kqueue();
struct kevent ke;
EV_SET(&ke, sockfd, (readyToRead) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0, 0, NULL);
if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error;
int retval, numevents = 0;
int nfds = 1;
#if 0
if (selectInterrupt)
{
nfds = 2;
int interruptFd = selectInterrupt->getFd();
struct kevent ke;
EV_SET(&ke, interruptFd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error;
}
#endif
struct kevent *events;
events = (struct kevent*) malloc(sizeof(struct kevent) * nfds);
if (timeoutMs != 0)
{
struct timespec timeout;
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_nsec = (timeoutMs % 1000) * 1000 * 1000;
retval = kevent(kqueuefd, NULL, 0, events, nfds, &timeout);
}
else
{
retval = kevent(kqueuefd, NULL, 0, events, nfds, NULL);
}
#if 0
if (retval > 0) {
int j;
numevents = retval;
for(j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = events+j;
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
}
}
#else
PollResultType pollResult = PollResultType::ReadyForRead;
if (retval < 0)
{
pollResult = PollResultType::Error;
}
if (retval > 0) {
struct kevent *e = events;
if (e->filter == EVFILT_READ)
{
pollResult = PollResultType::ReadyForRead;
}
else if (e->filter == EVFILT_WRITE)
{
pollResult = PollResultType::ReadyForWrite;
int optval = -1;
socklen_t optlen = sizeof(optval);
// getsockopt() puts the errno value for connect into optval so 0
// means no-error.
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 || optval != 0)
{
pollResult = PollResultType::Error;
// set errno to optval so that external callers can have an
// appropriate error description when calling strerror
errno = optval;
}
}
}
else
{
pollResult = PollResultType::Timeout;
}
#endif
free(events);
// ::close(kqueuefd); //FMXE
return pollResult;
#else
// //
// We used to use ::select to poll but on Android 9 we get large fds out of // We used to use ::select to poll but on Android 9 we get large fds out of
// ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching // ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching
@@ -142,6 +245,7 @@ namespace ix
} }
return pollResult; return pollResult;
#endif
} }
PollResultType Socket::isReadyToRead(int timeoutMs) PollResultType Socket::isReadyToRead(int timeoutMs)
@@ -152,7 +256,7 @@ namespace ix
} }
bool readyToRead = true; bool readyToRead = true;
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd);
} }
PollResultType Socket::isReadyToWrite(int timeoutMs) PollResultType Socket::isReadyToWrite(int timeoutMs)
@@ -163,7 +267,7 @@ namespace ix
} }
bool readyToRead = false; bool readyToRead = false;
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd);
} }
// Wake up from poll/select by writing to the pipe which is watched by select // Wake up from poll/select by writing to the pipe which is watched by select

View File

@@ -13,6 +13,13 @@
#include <string> #include <string>
#include <vector> #include <vector>
// For kqueue
#if defined(__APPLE__)
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#endif
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
@@ -94,7 +101,8 @@ namespace ix
static PollResultType poll(bool readyToRead, static PollResultType poll(bool readyToRead,
int timeoutMs, int timeoutMs,
int sockfd, int sockfd,
const SelectInterruptPtr& selectInterrupt); const SelectInterruptPtr& selectInterrupt,
int kqueuefd);
// Used as special codes for pipe communication // Used as special codes for pipe communication
@@ -114,5 +122,9 @@ namespace ix
static constexpr size_t kChunkSize = 1 << 15; static constexpr size_t kChunkSize = 1 << 15;
SelectInterruptPtr _selectInterrupt; SelectInterruptPtr _selectInterrupt;
#if defined(__APPLE__)
int _kqueuefd;
#endif
}; };
} // namespace ix } // namespace ix

View File

@@ -66,7 +66,10 @@ namespace ix
int timeoutMs = 10; int timeoutMs = 10;
bool readyToRead = false; bool readyToRead = false;
auto selectInterrupt = std::make_unique<SelectInterrupt>(); auto selectInterrupt = std::make_unique<SelectInterrupt>();
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt);
int kqueuefd = kqueue();
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt, kqueuefd);
::close(kqueuefd);
if (pollResult == PollResultType::Timeout) if (pollResult == PollResultType::Timeout)
{ {

View File

@@ -259,8 +259,11 @@ namespace ix
int timeoutMs = 10; int timeoutMs = 10;
bool readyToRead = true; bool readyToRead = true;
auto selectInterrupt = std::make_unique<SelectInterrupt>(); auto selectInterrupt = std::make_unique<SelectInterrupt>();
int kqueuefd = kqueue();
PollResultType pollResult = PollResultType pollResult =
Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt); Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt, kqueuefd);
::close(kqueuefd);
if (pollResult == PollResultType::Error) if (pollResult == PollResultType::Error)
{ {

View File

@@ -405,11 +405,6 @@ namespace ix
_onMessageCallback = callback; _onMessageCallback = callback;
} }
bool WebSocket::isOnMessageCallbackRegistered() const
{
return _onMessageCallback != nullptr;
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback) void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
{ {
_onTrafficTrackerCallback = callback; _onTrafficTrackerCallback = callback;

View File

@@ -84,7 +84,6 @@ namespace ix
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage); const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
bool isOnMessageCallbackRegistered() const;
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback(); static void resetTrafficTrackerCallback();

View File

@@ -86,15 +86,6 @@ namespace ix
if (_onConnectionCallback) if (_onConnectionCallback)
{ {
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo)); _onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
if (!webSocket->isOnMessageCallbackRegistered())
{
logError("WebSocketServer Application developer error: Server callback improperly "
"registerered.");
logError("Missing call to setOnMessageCallback inside setOnConnectionCallback.");
connectionState->setTerminated();
return;
}
} }
else if (_onClientMessageCallback) else if (_onClientMessageCallback)
{ {

View File

@@ -6,4 +6,4 @@
#pragma once #pragma once
#define IX_WEBSOCKET_VERSION "10.1.7" #define IX_WEBSOCKET_VERSION "10.1.5"

View File

@@ -36,136 +36,154 @@
* *
*/ */
#include <atomic>
#include <iostream>
#include <libwebsockets.h> #include <libwebsockets.h>
#include <signal.h>
#include <string.h> #include <string.h>
#include <signal.h>
#include <atomic>
#include <thread> #include <thread>
#include <iostream>
static int interrupted; static int interrupted;
static struct lws* client_wsi; static struct lws *client_wsi;
std::atomic<uint64_t> receivedCount(0); std::atomic<uint64_t> receivedCount(0);
static int callback_dumb_increment( static int
struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) callback_dumb_increment(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{ {
switch (reason) switch (reason) {
{
/* because we are protocols[0] ... */ /* because we are protocols[0] ... */
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char*) in : "(null)"); lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
client_wsi = NULL; in ? (char *)in : "(null)");
break; client_wsi = NULL;
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: established\n", __func__); break; case LWS_CALLBACK_CLIENT_ESTABLISHED:
lwsl_user("%s: established\n", __func__);
break;
case LWS_CALLBACK_CLIENT_RECEIVE: receivedCount++; break; case LWS_CALLBACK_CLIENT_RECEIVE:
receivedCount++;
break;
case LWS_CALLBACK_CLIENT_CLOSED: client_wsi = NULL; break; case LWS_CALLBACK_CLIENT_CLOSED:
client_wsi = NULL;
break;
default: break; default:
} break;
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
static const struct lws_protocols protocols[] = {{
"dumb-increment-protocol",
callback_dumb_increment,
0,
0,
},
{NULL, NULL, 0, 0}};
static void sigint_handler(int sig)
{
interrupted = 1;
}
int main(int argc, const char** argv)
{
uint64_t receivedCountTotal(0);
uint64_t receivedCountPerSecs(0);
auto timer = [&receivedCountTotal, &receivedCountPerSecs] {
while (!interrupted)
{
std::cerr << "messages received: " << receivedCountPerSecs << " per second "
<< receivedCountTotal << " total" << std::endl;
receivedCountPerSecs = receivedCount - receivedCountTotal;
receivedCountTotal += receivedCountPerSecs;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
} }
};
std::thread t1(timer); return lws_callback_http_dummy(wsi, reason, user, in, len);
}
struct lws_context_creation_info info;
struct lws_client_connect_info i; static const struct lws_protocols protocols[] = {
struct lws_context* context; {
const char* p; "dumb-increment-protocol",
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE callback_dumb_increment,
/* for LLL_ verbosity above NOTICE to be built into lws, lws 0,
* must have been configured with -DCMAKE_BUILD_TYPE=DEBUG 0,
* instead of =RELEASE */ },
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ { NULL, NULL, 0, 0 }
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ };
/* | LLL_DEBUG */;
static void
signal(SIGINT, sigint_handler); sigint_handler(int sig)
if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p); {
interrupted = 1;
lws_set_log_level(logs, NULL); }
lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n");
int main(int argc, const char **argv)
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ {
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ uint64_t receivedCountTotal(0);
info.protocols = protocols; uint64_t receivedCountPerSecs(0);
info.timeout_secs = 10;
auto timer = [&receivedCountTotal, &receivedCountPerSecs] {
/* while (!interrupted)
* since we know this lws context is only ever going to be used with {
* one client wsis / fds / sockets at a time, let lws know it doesn't std::cerr << "messages received: "
* have to use the default allocations for fd tables up to ulimit -n. << receivedCountPerSecs
* It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we << " per second "
* will use. << receivedCountTotal
*/ << " total"
info.fd_limit_per_thread = 1 + 1 + 1; << std::endl;
context = lws_create_context(&info); receivedCountPerSecs = receivedCount - receivedCountTotal;
if (!context) receivedCountTotal += receivedCountPerSecs;
{
lwsl_err("lws init failed\n"); auto duration = std::chrono::seconds(1);
return 1; std::this_thread::sleep_for(duration);
} }
};
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
i.context = context; std::thread t1(timer);
i.port = 8008;
i.address = "127.0.0.1"; struct lws_context_creation_info info;
i.path = "/"; struct lws_client_connect_info i;
i.host = i.address; struct lws_context *context;
i.origin = i.address; const char *p;
i.protocol = protocols[0].name; /* "dumb-increment-protocol" */ int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
i.pwsi = &client_wsi; /* for LLL_ verbosity above NOTICE to be built into lws, lws
* must have been configured with -DCMAKE_BUILD_TYPE=DEBUG
if (lws_cmdline_option(argc, argv, "--h2")) i.alpn = "h2"; * instead of =RELEASE */
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
lws_client_connect_via_info(&i); /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
/* | LLL_DEBUG */;
while (n >= 0 && client_wsi && !interrupted)
n = lws_service(context, 0); signal(SIGINT, sigint_handler);
if ((p = lws_cmdline_option(argc, argv, "-d")))
lws_context_destroy(context); logs = atoi(p);
lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed"); lws_set_log_level(logs, NULL);
lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n");
t1.join();
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
return receivedCount > 10; info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
info.timeout_secs = 10;
/*
* since we know this lws context is only ever going to be used with
* one client wsis / fds / sockets at a time, let lws know it doesn't
* have to use the default allocations for fd tables up to ulimit -n.
* It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we
* will use.
*/
info.fd_limit_per_thread = 1 + 1 + 1;
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
i.context = context;
i.port = 8008;
i.address = "127.0.0.1";
i.path = "/";
i.host = i.address;
i.origin = i.address;
i.protocol = protocols[0].name; /* "dumb-increment-protocol" */
i.pwsi = &client_wsi;
if (lws_cmdline_option(argc, argv, "--h2"))
i.alpn = "h2";
lws_client_connect_via_info(&i);
while (n >= 0 && client_wsi && !interrupted)
n = lws_service(context, 0);
lws_context_destroy(context);
lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed");
t1.join();
return receivedCount > 10;
} }

View File

@@ -507,7 +507,7 @@ int main(int argc, char** argv)
if (quiet) if (quiet)
{ {
spdlog::set_level(spdlog::level::info); spdlog::set_level(spdlog::level::warn);
} }
// Cobra config // Cobra config