Compare commits
2 Commits
v10.1.7
...
feature/kq
Author | SHA1 | Date | |
---|---|---|---|
|
0834198e74 | ||
|
22dd32d4e9 |
@@ -2,14 +2,6 @@
|
||||
|
||||
All changes to this project will be documented in this file.
|
||||
|
||||
## [10.1.7] - 2020-08-11
|
||||
|
||||
(ws) -q option imply info log level, not warning log level
|
||||
|
||||
## [10.1.6] - 2020-08-06
|
||||
|
||||
(websocket server) Handle programmer error when the server callback is not registered properly (fix #227)
|
||||
|
||||
## [10.1.5] - 2020-08-02
|
||||
|
||||
(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235).
|
||||
|
@@ -67,28 +67,9 @@ webSocket.stop()
|
||||
|
||||
### Sending messages
|
||||
|
||||
`WebSocketSendInfo result = websocket.send("foo")` will send a message.
|
||||
`websocket.send("foo")` will send a message.
|
||||
|
||||
If the connection was closed, sending will fail, and the success field of the result object will be set to false. There could also be a compression error in which case the compressError field will be set to true. The payloadSize field and wireSize fields will tell you respectively how much bytes the message weight, and how many bytes were sent on the wire (potentially compressed + counting the message header (a few bytes).
|
||||
|
||||
There is an optional progress callback that can be passed in as the second argument. If a message is large it will be fragmented into chunks which will be sent independantly. Everytime the we can write a fragment into the OS network cache, the callback will be invoked. If a user wants to cancel a slow send, false should be returned from within the callback.
|
||||
|
||||
Here is an example code snippet copied from the ws send sub-command. Each fragment weights 32K, so the total integer is the wireSize divided by 32K. As an example if you are sending 32M of data, uncompressed, total will be 1000. current will be set to 0 for the first fragment, then 1, 2 etc...
|
||||
|
||||
```
|
||||
auto result =
|
||||
_webSocket.sendBinary(serializedMsg, [this, throttle](int current, int total) -> bool {
|
||||
spdlog::info("ws_send: Step {} out of {}", current + 1, total);
|
||||
|
||||
if (throttle)
|
||||
{
|
||||
std::chrono::duration<double, std::milli> duration(10);
|
||||
std::this_thread::sleep_for(duration);
|
||||
}
|
||||
|
||||
return _connected;
|
||||
});
|
||||
```
|
||||
If the connection was closed and sending failed, the return value will be set to false.
|
||||
|
||||
### ReadyState
|
||||
|
||||
|
@@ -35,19 +35,122 @@ namespace ix
|
||||
: _sockfd(fd)
|
||||
, _selectInterrupt(createSelectInterrupt())
|
||||
{
|
||||
;
|
||||
#if defined(__APPLE__)
|
||||
_kqueuefd = kqueue();
|
||||
#endif
|
||||
}
|
||||
|
||||
Socket::~Socket()
|
||||
{
|
||||
close();
|
||||
|
||||
#if defined(__APPLE__)
|
||||
::close(_kqueuefd);
|
||||
#endif
|
||||
}
|
||||
|
||||
PollResultType Socket::poll(bool readyToRead,
|
||||
int timeoutMs,
|
||||
int sockfd,
|
||||
const SelectInterruptPtr& selectInterrupt)
|
||||
const SelectInterruptPtr& selectInterrupt,
|
||||
int kqueuefd)
|
||||
{
|
||||
#if defined(__APPLE__)
|
||||
// FIXME int kqueuefd = kqueue();
|
||||
|
||||
struct kevent ke;
|
||||
EV_SET(&ke, sockfd, (readyToRead) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0, 0, NULL);
|
||||
if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error;
|
||||
|
||||
int retval, numevents = 0;
|
||||
|
||||
int nfds = 1;
|
||||
#if 0
|
||||
if (selectInterrupt)
|
||||
{
|
||||
nfds = 2;
|
||||
int interruptFd = selectInterrupt->getFd();
|
||||
|
||||
struct kevent ke;
|
||||
EV_SET(&ke, interruptFd, EVFILT_READ, EV_ADD, 0, 0, NULL);
|
||||
if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error;
|
||||
}
|
||||
#endif
|
||||
|
||||
struct kevent *events;
|
||||
events = (struct kevent*) malloc(sizeof(struct kevent) * nfds);
|
||||
|
||||
if (timeoutMs != 0)
|
||||
{
|
||||
struct timespec timeout;
|
||||
timeout.tv_sec = timeoutMs / 1000;
|
||||
timeout.tv_nsec = (timeoutMs % 1000) * 1000 * 1000;
|
||||
retval = kevent(kqueuefd, NULL, 0, events, nfds, &timeout);
|
||||
}
|
||||
else
|
||||
{
|
||||
retval = kevent(kqueuefd, NULL, 0, events, nfds, NULL);
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (retval > 0) {
|
||||
int j;
|
||||
|
||||
numevents = retval;
|
||||
for(j = 0; j < numevents; j++) {
|
||||
int mask = 0;
|
||||
struct kevent *e = events+j;
|
||||
|
||||
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
|
||||
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
|
||||
eventLoop->fired[j].fd = e->ident;
|
||||
eventLoop->fired[j].mask = mask;
|
||||
}
|
||||
}
|
||||
#else
|
||||
PollResultType pollResult = PollResultType::ReadyForRead;
|
||||
if (retval < 0)
|
||||
{
|
||||
pollResult = PollResultType::Error;
|
||||
}
|
||||
|
||||
if (retval > 0) {
|
||||
struct kevent *e = events;
|
||||
if (e->filter == EVFILT_READ)
|
||||
{
|
||||
pollResult = PollResultType::ReadyForRead;
|
||||
}
|
||||
else if (e->filter == EVFILT_WRITE)
|
||||
{
|
||||
pollResult = PollResultType::ReadyForWrite;
|
||||
|
||||
int optval = -1;
|
||||
socklen_t optlen = sizeof(optval);
|
||||
|
||||
// getsockopt() puts the errno value for connect into optval so 0
|
||||
// means no-error.
|
||||
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 || optval != 0)
|
||||
{
|
||||
pollResult = PollResultType::Error;
|
||||
|
||||
// set errno to optval so that external callers can have an
|
||||
// appropriate error description when calling strerror
|
||||
errno = optval;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pollResult = PollResultType::Timeout;
|
||||
}
|
||||
#endif
|
||||
|
||||
free(events);
|
||||
|
||||
// ::close(kqueuefd); //FMXE
|
||||
|
||||
return pollResult;
|
||||
#else
|
||||
//
|
||||
// We used to use ::select to poll but on Android 9 we get large fds out of
|
||||
// ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching
|
||||
@@ -142,6 +245,7 @@ namespace ix
|
||||
}
|
||||
|
||||
return pollResult;
|
||||
#endif
|
||||
}
|
||||
|
||||
PollResultType Socket::isReadyToRead(int timeoutMs)
|
||||
@@ -152,7 +256,7 @@ namespace ix
|
||||
}
|
||||
|
||||
bool readyToRead = true;
|
||||
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt);
|
||||
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd);
|
||||
}
|
||||
|
||||
PollResultType Socket::isReadyToWrite(int timeoutMs)
|
||||
@@ -163,7 +267,7 @@ namespace ix
|
||||
}
|
||||
|
||||
bool readyToRead = false;
|
||||
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt);
|
||||
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd);
|
||||
}
|
||||
|
||||
// Wake up from poll/select by writing to the pipe which is watched by select
|
||||
|
@@ -13,6 +13,13 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
// For kqueue
|
||||
#if defined(__APPLE__)
|
||||
#include <sys/types.h>
|
||||
#include <sys/event.h>
|
||||
#include <sys/time.h>
|
||||
#endif
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <BaseTsd.h>
|
||||
typedef SSIZE_T ssize_t;
|
||||
@@ -94,7 +101,8 @@ namespace ix
|
||||
static PollResultType poll(bool readyToRead,
|
||||
int timeoutMs,
|
||||
int sockfd,
|
||||
const SelectInterruptPtr& selectInterrupt);
|
||||
const SelectInterruptPtr& selectInterrupt,
|
||||
int kqueuefd);
|
||||
|
||||
|
||||
// Used as special codes for pipe communication
|
||||
@@ -114,5 +122,9 @@ namespace ix
|
||||
static constexpr size_t kChunkSize = 1 << 15;
|
||||
|
||||
SelectInterruptPtr _selectInterrupt;
|
||||
|
||||
#if defined(__APPLE__)
|
||||
int _kqueuefd;
|
||||
#endif
|
||||
};
|
||||
} // namespace ix
|
||||
|
@@ -66,7 +66,10 @@ namespace ix
|
||||
int timeoutMs = 10;
|
||||
bool readyToRead = false;
|
||||
auto selectInterrupt = std::make_unique<SelectInterrupt>();
|
||||
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt);
|
||||
|
||||
int kqueuefd = kqueue();
|
||||
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt, kqueuefd);
|
||||
::close(kqueuefd);
|
||||
|
||||
if (pollResult == PollResultType::Timeout)
|
||||
{
|
||||
|
@@ -259,8 +259,11 @@ namespace ix
|
||||
int timeoutMs = 10;
|
||||
bool readyToRead = true;
|
||||
auto selectInterrupt = std::make_unique<SelectInterrupt>();
|
||||
|
||||
int kqueuefd = kqueue();
|
||||
PollResultType pollResult =
|
||||
Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt);
|
||||
Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt, kqueuefd);
|
||||
::close(kqueuefd);
|
||||
|
||||
if (pollResult == PollResultType::Error)
|
||||
{
|
||||
|
@@ -405,11 +405,6 @@ namespace ix
|
||||
_onMessageCallback = callback;
|
||||
}
|
||||
|
||||
bool WebSocket::isOnMessageCallbackRegistered() const
|
||||
{
|
||||
return _onMessageCallback != nullptr;
|
||||
}
|
||||
|
||||
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
|
||||
{
|
||||
_onTrafficTrackerCallback = callback;
|
||||
|
@@ -84,7 +84,6 @@ namespace ix
|
||||
const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage);
|
||||
|
||||
void setOnMessageCallback(const OnMessageCallback& callback);
|
||||
bool isOnMessageCallbackRegistered() const;
|
||||
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
|
||||
static void resetTrafficTrackerCallback();
|
||||
|
||||
|
@@ -86,15 +86,6 @@ namespace ix
|
||||
if (_onConnectionCallback)
|
||||
{
|
||||
_onConnectionCallback(webSocket, connectionState, std::move(connectionInfo));
|
||||
|
||||
if (!webSocket->isOnMessageCallbackRegistered())
|
||||
{
|
||||
logError("WebSocketServer Application developer error: Server callback improperly "
|
||||
"registerered.");
|
||||
logError("Missing call to setOnMessageCallback inside setOnConnectionCallback.");
|
||||
connectionState->setTerminated();
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (_onClientMessageCallback)
|
||||
{
|
||||
|
@@ -6,4 +6,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define IX_WEBSOCKET_VERSION "10.1.7"
|
||||
#define IX_WEBSOCKET_VERSION "10.1.5"
|
||||
|
@@ -36,50 +36,63 @@
|
||||
*
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <iostream>
|
||||
#include <libwebsockets.h>
|
||||
#include <signal.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
|
||||
static int interrupted;
|
||||
static struct lws *client_wsi;
|
||||
|
||||
std::atomic<uint64_t> receivedCount(0);
|
||||
|
||||
static int callback_dumb_increment(
|
||||
struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len)
|
||||
{
|
||||
switch (reason)
|
||||
static int
|
||||
callback_dumb_increment(struct lws *wsi, enum lws_callback_reasons reason,
|
||||
void *user, void *in, size_t len)
|
||||
{
|
||||
switch (reason) {
|
||||
|
||||
/* because we are protocols[0] ... */
|
||||
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
||||
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char*) in : "(null)");
|
||||
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
|
||||
in ? (char *)in : "(null)");
|
||||
client_wsi = NULL;
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: established\n", __func__); break;
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED:
|
||||
lwsl_user("%s: established\n", __func__);
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE: receivedCount++; break;
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE:
|
||||
receivedCount++;
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_CLOSED: client_wsi = NULL; break;
|
||||
case LWS_CALLBACK_CLIENT_CLOSED:
|
||||
client_wsi = NULL;
|
||||
break;
|
||||
|
||||
default: break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
||||
}
|
||||
|
||||
static const struct lws_protocols protocols[] = {{
|
||||
static const struct lws_protocols protocols[] = {
|
||||
{
|
||||
"dumb-increment-protocol",
|
||||
callback_dumb_increment,
|
||||
0,
|
||||
0,
|
||||
},
|
||||
{NULL, NULL, 0, 0}};
|
||||
{ NULL, NULL, 0, 0 }
|
||||
};
|
||||
|
||||
static void sigint_handler(int sig)
|
||||
static void
|
||||
sigint_handler(int sig)
|
||||
{
|
||||
interrupted = 1;
|
||||
}
|
||||
@@ -92,8 +105,12 @@ int main(int argc, const char** argv)
|
||||
auto timer = [&receivedCountTotal, &receivedCountPerSecs] {
|
||||
while (!interrupted)
|
||||
{
|
||||
std::cerr << "messages received: " << receivedCountPerSecs << " per second "
|
||||
<< receivedCountTotal << " total" << std::endl;
|
||||
std::cerr << "messages received: "
|
||||
<< receivedCountPerSecs
|
||||
<< " per second "
|
||||
<< receivedCountTotal
|
||||
<< " total"
|
||||
<< std::endl;
|
||||
|
||||
receivedCountPerSecs = receivedCount - receivedCountTotal;
|
||||
receivedCountTotal += receivedCountPerSecs;
|
||||
@@ -118,7 +135,8 @@ int main(int argc, const char** argv)
|
||||
/* | LLL_DEBUG */;
|
||||
|
||||
signal(SIGINT, sigint_handler);
|
||||
if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p);
|
||||
if ((p = lws_cmdline_option(argc, argv, "-d")))
|
||||
logs = atoi(p);
|
||||
|
||||
lws_set_log_level(logs, NULL);
|
||||
lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n");
|
||||
@@ -138,8 +156,7 @@ int main(int argc, const char** argv)
|
||||
info.fd_limit_per_thread = 1 + 1 + 1;
|
||||
|
||||
context = lws_create_context(&info);
|
||||
if (!context)
|
||||
{
|
||||
if (!context) {
|
||||
lwsl_err("lws init failed\n");
|
||||
return 1;
|
||||
}
|
||||
@@ -154,7 +171,8 @@ int main(int argc, const char** argv)
|
||||
i.protocol = protocols[0].name; /* "dumb-increment-protocol" */
|
||||
i.pwsi = &client_wsi;
|
||||
|
||||
if (lws_cmdline_option(argc, argv, "--h2")) i.alpn = "h2";
|
||||
if (lws_cmdline_option(argc, argv, "--h2"))
|
||||
i.alpn = "h2";
|
||||
|
||||
lws_client_connect_via_info(&i);
|
||||
|
||||
|
Reference in New Issue
Block a user