Compare commits
	
		
			2 Commits
		
	
	
		
			v10.1.5
			...
			feature/kq
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 0834198e74 | ||
|  | 22dd32d4e9 | 
| @@ -2,10 +2,6 @@ | ||||
|  | ||||
| All changes to this project will be documented in this file. | ||||
|  | ||||
| ## [10.1.6] - 2020-08-06 | ||||
|  | ||||
| (websocket server) Handle programmer error when the server callback is not registered properly (fix #227) | ||||
|  | ||||
| ## [10.1.5] - 2020-08-02 | ||||
|  | ||||
| (ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235). | ||||
|   | ||||
| @@ -35,19 +35,122 @@ namespace ix | ||||
|         : _sockfd(fd) | ||||
|         , _selectInterrupt(createSelectInterrupt()) | ||||
|     { | ||||
|         ; | ||||
| #if defined(__APPLE__) | ||||
|         _kqueuefd = kqueue(); | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     Socket::~Socket() | ||||
|     { | ||||
|         close(); | ||||
|  | ||||
| #if defined(__APPLE__) | ||||
|         ::close(_kqueuefd); | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     PollResultType Socket::poll(bool readyToRead, | ||||
|                                 int timeoutMs, | ||||
|                                 int sockfd, | ||||
|                                 const SelectInterruptPtr& selectInterrupt) | ||||
|                                 const SelectInterruptPtr& selectInterrupt, | ||||
|                                 int kqueuefd) | ||||
|     { | ||||
| #if defined(__APPLE__) | ||||
|         // FIXME int kqueuefd = kqueue(); | ||||
|  | ||||
|         struct kevent ke; | ||||
|         EV_SET(&ke, sockfd, (readyToRead) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0, 0, NULL); | ||||
|         if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error; | ||||
|  | ||||
|         int retval, numevents = 0; | ||||
|  | ||||
|         int nfds = 1; | ||||
| #if 0 | ||||
|         if (selectInterrupt)  | ||||
|         { | ||||
|             nfds = 2; | ||||
|             int interruptFd = selectInterrupt->getFd(); | ||||
|  | ||||
|             struct kevent ke; | ||||
|             EV_SET(&ke, interruptFd, EVFILT_READ, EV_ADD, 0, 0, NULL); | ||||
|             if (kevent(kqueuefd, &ke, 1, NULL, 0, NULL) == -1) return PollResultType::Error; | ||||
|         } | ||||
| #endif | ||||
|  | ||||
|         struct kevent *events; | ||||
|         events = (struct kevent*) malloc(sizeof(struct kevent) * nfds); | ||||
|  | ||||
|         if (timeoutMs != 0) | ||||
|         { | ||||
|             struct timespec timeout; | ||||
|             timeout.tv_sec = timeoutMs / 1000; | ||||
|             timeout.tv_nsec = (timeoutMs % 1000) * 1000 * 1000; | ||||
|             retval = kevent(kqueuefd, NULL, 0, events, nfds, &timeout); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             retval = kevent(kqueuefd, NULL, 0, events, nfds, NULL); | ||||
|         } | ||||
|  | ||||
| #if 0 | ||||
|         if (retval > 0) { | ||||
|             int j; | ||||
|  | ||||
|             numevents = retval; | ||||
|             for(j = 0; j < numevents; j++) { | ||||
|                 int mask = 0; | ||||
|                 struct kevent *e = events+j; | ||||
|  | ||||
|                 if (e->filter == EVFILT_READ) mask |= AE_READABLE; | ||||
|                 if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; | ||||
|                 eventLoop->fired[j].fd = e->ident; | ||||
|                 eventLoop->fired[j].mask = mask; | ||||
|             } | ||||
|         } | ||||
| #else | ||||
|         PollResultType pollResult = PollResultType::ReadyForRead; | ||||
|         if (retval < 0) | ||||
|         { | ||||
|             pollResult = PollResultType::Error; | ||||
|         } | ||||
|  | ||||
|         if (retval > 0) { | ||||
|             struct kevent *e = events; | ||||
|             if (e->filter == EVFILT_READ) | ||||
|             { | ||||
|                 pollResult = PollResultType::ReadyForRead; | ||||
|             } | ||||
|             else if (e->filter == EVFILT_WRITE)  | ||||
|             { | ||||
|                 pollResult = PollResultType::ReadyForWrite; | ||||
|  | ||||
|                 int optval = -1; | ||||
|                 socklen_t optlen = sizeof(optval); | ||||
|  | ||||
|                 // getsockopt() puts the errno value for connect into optval so 0 | ||||
|                 // means no-error. | ||||
|                 if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 || optval != 0) | ||||
|                 { | ||||
|                     pollResult = PollResultType::Error; | ||||
|  | ||||
|                     // set errno to optval so that external callers can have an | ||||
|                     // appropriate error description when calling strerror | ||||
|                     errno = optval; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             pollResult = PollResultType::Timeout; | ||||
|         } | ||||
| #endif | ||||
|  | ||||
|         free(events); | ||||
|  | ||||
|         // ::close(kqueuefd); //FMXE | ||||
|  | ||||
|         return pollResult; | ||||
| #else | ||||
|         // | ||||
|         // We used to use ::select to poll but on Android 9 we get large fds out of | ||||
|         // ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching | ||||
| @@ -142,6 +245,7 @@ namespace ix | ||||
|         } | ||||
|  | ||||
|         return pollResult; | ||||
| #endif | ||||
|     } | ||||
|  | ||||
|     PollResultType Socket::isReadyToRead(int timeoutMs) | ||||
| @@ -152,7 +256,7 @@ namespace ix | ||||
|         } | ||||
|  | ||||
|         bool readyToRead = true; | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd); | ||||
|     } | ||||
|  | ||||
|     PollResultType Socket::isReadyToWrite(int timeoutMs) | ||||
| @@ -163,7 +267,7 @@ namespace ix | ||||
|         } | ||||
|  | ||||
|         bool readyToRead = false; | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); | ||||
|         return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt, _kqueuefd); | ||||
|     } | ||||
|  | ||||
|     // Wake up from poll/select by writing to the pipe which is watched by select | ||||
|   | ||||
| @@ -13,6 +13,13 @@ | ||||
| #include <string> | ||||
| #include <vector> | ||||
|  | ||||
| // For kqueue | ||||
| #if defined(__APPLE__) | ||||
| #include <sys/types.h> | ||||
| #include <sys/event.h> | ||||
| #include <sys/time.h> | ||||
| #endif | ||||
|  | ||||
| #ifdef _WIN32 | ||||
| #include <BaseTsd.h> | ||||
| typedef SSIZE_T ssize_t; | ||||
| @@ -94,7 +101,8 @@ namespace ix | ||||
|         static PollResultType poll(bool readyToRead, | ||||
|                                    int timeoutMs, | ||||
|                                    int sockfd, | ||||
|                                    const SelectInterruptPtr& selectInterrupt); | ||||
|                                    const SelectInterruptPtr& selectInterrupt, | ||||
|                                    int kqueuefd); | ||||
|  | ||||
|  | ||||
|         // Used as special codes for pipe communication | ||||
| @@ -114,5 +122,9 @@ namespace ix | ||||
|         static constexpr size_t kChunkSize = 1 << 15; | ||||
|  | ||||
|         SelectInterruptPtr _selectInterrupt; | ||||
|  | ||||
| #if defined(__APPLE__) | ||||
|         int _kqueuefd; | ||||
| #endif | ||||
|     }; | ||||
| } // namespace ix | ||||
|   | ||||
| @@ -66,7 +66,10 @@ namespace ix | ||||
|             int timeoutMs = 10; | ||||
|             bool readyToRead = false; | ||||
|             auto selectInterrupt = std::make_unique<SelectInterrupt>(); | ||||
|             PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt); | ||||
|  | ||||
|             int kqueuefd = kqueue(); | ||||
|             PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd, selectInterrupt, kqueuefd); | ||||
|             ::close(kqueuefd); | ||||
|  | ||||
|             if (pollResult == PollResultType::Timeout) | ||||
|             { | ||||
|   | ||||
| @@ -259,8 +259,11 @@ namespace ix | ||||
|             int timeoutMs = 10; | ||||
|             bool readyToRead = true; | ||||
|             auto selectInterrupt = std::make_unique<SelectInterrupt>(); | ||||
|  | ||||
|             int kqueuefd = kqueue(); | ||||
|             PollResultType pollResult = | ||||
|                 Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt); | ||||
|                 Socket::poll(readyToRead, timeoutMs, _serverFd, selectInterrupt, kqueuefd); | ||||
|             ::close(kqueuefd); | ||||
|  | ||||
|             if (pollResult == PollResultType::Error) | ||||
|             { | ||||
|   | ||||
| @@ -405,11 +405,6 @@ namespace ix | ||||
|         _onMessageCallback = callback; | ||||
|     } | ||||
|  | ||||
|     bool WebSocket::isOnMessageCallbackRegistered() const | ||||
|     { | ||||
|         return _onMessageCallback != nullptr; | ||||
|     } | ||||
|  | ||||
|     void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback) | ||||
|     { | ||||
|         _onTrafficTrackerCallback = callback; | ||||
|   | ||||
| @@ -84,7 +84,6 @@ namespace ix | ||||
|                    const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage); | ||||
|  | ||||
|         void setOnMessageCallback(const OnMessageCallback& callback); | ||||
|         bool isOnMessageCallbackRegistered() const; | ||||
|         static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback); | ||||
|         static void resetTrafficTrackerCallback(); | ||||
|  | ||||
|   | ||||
| @@ -86,15 +86,6 @@ namespace ix | ||||
|         if (_onConnectionCallback) | ||||
|         { | ||||
|             _onConnectionCallback(webSocket, connectionState, std::move(connectionInfo)); | ||||
|  | ||||
|             if (!webSocket->isOnMessageCallbackRegistered()) | ||||
|             { | ||||
|                 logError("WebSocketServer Application developer error: Server callback improperly " | ||||
|                          "registerered."); | ||||
|                 logError("Missing call to setOnMessageCallback inside setOnConnectionCallback."); | ||||
|                 connectionState->setTerminated(); | ||||
|                 return; | ||||
|             } | ||||
|         } | ||||
|         else if (_onClientMessageCallback) | ||||
|         { | ||||
|   | ||||
| @@ -33,139 +33,157 @@ | ||||
|  * messages received: 200825 per second 1677933 total | ||||
|  * messages received: 183542 per second 1861475 total | ||||
|  * ^C[2020/08/02 19:22:33:4450] U: Completed OK | ||||
|  * | ||||
|  *  | ||||
|  */ | ||||
|  | ||||
| #include <atomic> | ||||
| #include <iostream> | ||||
| #include <libwebsockets.h> | ||||
| #include <signal.h> | ||||
| #include <string.h> | ||||
| #include <signal.h> | ||||
|  | ||||
| #include <atomic> | ||||
| #include <thread> | ||||
| #include <iostream> | ||||
|  | ||||
| static int interrupted; | ||||
| static struct lws* client_wsi; | ||||
| static struct lws *client_wsi; | ||||
|  | ||||
| std::atomic<uint64_t> receivedCount(0); | ||||
|  | ||||
| static int callback_dumb_increment( | ||||
|     struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) | ||||
| static int | ||||
| callback_dumb_increment(struct lws *wsi, enum lws_callback_reasons reason, | ||||
|               void *user, void *in, size_t len) | ||||
| { | ||||
|     switch (reason) | ||||
|     { | ||||
|         switch (reason) { | ||||
|  | ||||
|         /* because we are protocols[0] ... */ | ||||
|         case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: | ||||
|             lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char*) in : "(null)"); | ||||
|             client_wsi = NULL; | ||||
|             break; | ||||
|                 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", | ||||
|                          in ? (char *)in : "(null)"); | ||||
|                 client_wsi = NULL; | ||||
|                 break; | ||||
|  | ||||
|         case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_user("%s: established\n", __func__); break; | ||||
|         case LWS_CALLBACK_CLIENT_ESTABLISHED: | ||||
|                 lwsl_user("%s: established\n", __func__); | ||||
|                 break; | ||||
|  | ||||
|         case LWS_CALLBACK_CLIENT_RECEIVE: receivedCount++; break; | ||||
|         case LWS_CALLBACK_CLIENT_RECEIVE: | ||||
|                 receivedCount++; | ||||
|                 break; | ||||
|  | ||||
|         case LWS_CALLBACK_CLIENT_CLOSED: client_wsi = NULL; break; | ||||
|         case LWS_CALLBACK_CLIENT_CLOSED: | ||||
|                 client_wsi = NULL; | ||||
|                 break; | ||||
|  | ||||
|         default: break; | ||||
|     } | ||||
|  | ||||
|     return lws_callback_http_dummy(wsi, reason, user, in, len); | ||||
| } | ||||
|  | ||||
| static const struct lws_protocols protocols[] = {{ | ||||
|                                                      "dumb-increment-protocol", | ||||
|                                                      callback_dumb_increment, | ||||
|                                                      0, | ||||
|                                                      0, | ||||
|                                                  }, | ||||
|                                                  {NULL, NULL, 0, 0}}; | ||||
|  | ||||
| static void sigint_handler(int sig) | ||||
| { | ||||
|     interrupted = 1; | ||||
| } | ||||
|  | ||||
| int main(int argc, const char** argv) | ||||
| { | ||||
|     uint64_t receivedCountTotal(0); | ||||
|     uint64_t receivedCountPerSecs(0); | ||||
|  | ||||
|     auto timer = [&receivedCountTotal, &receivedCountPerSecs] { | ||||
|         while (!interrupted) | ||||
|         { | ||||
|             std::cerr << "messages received: " << receivedCountPerSecs << " per second " | ||||
|                       << receivedCountTotal << " total" << std::endl; | ||||
|  | ||||
|             receivedCountPerSecs = receivedCount - receivedCountTotal; | ||||
|             receivedCountTotal += receivedCountPerSecs; | ||||
|  | ||||
|             auto duration = std::chrono::seconds(1); | ||||
|             std::this_thread::sleep_for(duration); | ||||
|         default: | ||||
|                 break; | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     std::thread t1(timer); | ||||
|  | ||||
|     struct lws_context_creation_info info; | ||||
|     struct lws_client_connect_info i; | ||||
|     struct lws_context* context; | ||||
|     const char* p; | ||||
|     int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE | ||||
|         /* for LLL_ verbosity above NOTICE to be built into lws, lws | ||||
|          * must have been configured with -DCMAKE_BUILD_TYPE=DEBUG | ||||
|          * instead of =RELEASE */ | ||||
|         /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ | ||||
|         /* | LLL_EXT */ /* | LLL_CLIENT */  /* | LLL_LATENCY */ | ||||
|         /* | LLL_DEBUG */; | ||||
|  | ||||
|     signal(SIGINT, sigint_handler); | ||||
|     if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p); | ||||
|  | ||||
|     lws_set_log_level(logs, NULL); | ||||
|     lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n"); | ||||
|  | ||||
|     memset(&info, 0, sizeof info);      /* otherwise uninitialized garbage */ | ||||
|     info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ | ||||
|     info.protocols = protocols; | ||||
|     info.timeout_secs = 10; | ||||
|  | ||||
|     /* | ||||
|      * since we know this lws context is only ever going to be used with | ||||
|      * one client wsis / fds / sockets at a time, let lws know it doesn't | ||||
|      * have to use the default allocations for fd tables up to ulimit -n. | ||||
|      * It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we | ||||
|      * will use. | ||||
|      */ | ||||
|     info.fd_limit_per_thread = 1 + 1 + 1; | ||||
|  | ||||
|     context = lws_create_context(&info); | ||||
|     if (!context) | ||||
|     { | ||||
|         lwsl_err("lws init failed\n"); | ||||
|         return 1; | ||||
|     } | ||||
|  | ||||
|     memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */ | ||||
|     i.context = context; | ||||
|     i.port = 8008; | ||||
|     i.address = "127.0.0.1"; | ||||
|     i.path = "/"; | ||||
|     i.host = i.address; | ||||
|     i.origin = i.address; | ||||
|     i.protocol = protocols[0].name; /* "dumb-increment-protocol" */ | ||||
|     i.pwsi = &client_wsi; | ||||
|  | ||||
|     if (lws_cmdline_option(argc, argv, "--h2")) i.alpn = "h2"; | ||||
|  | ||||
|     lws_client_connect_via_info(&i); | ||||
|  | ||||
|     while (n >= 0 && client_wsi && !interrupted) | ||||
|         n = lws_service(context, 0); | ||||
|  | ||||
|     lws_context_destroy(context); | ||||
|  | ||||
|     lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed"); | ||||
|  | ||||
|     t1.join(); | ||||
|  | ||||
|     return receivedCount > 10; | ||||
|         return lws_callback_http_dummy(wsi, reason, user, in, len); | ||||
| } | ||||
|  | ||||
| static const struct lws_protocols protocols[] = { | ||||
|         { | ||||
|                 "dumb-increment-protocol", | ||||
|                 callback_dumb_increment, | ||||
|                 0, | ||||
|                 0, | ||||
|         }, | ||||
|         { NULL, NULL, 0, 0 } | ||||
| }; | ||||
|  | ||||
| static void | ||||
| sigint_handler(int sig) | ||||
| { | ||||
|         interrupted = 1; | ||||
| } | ||||
|  | ||||
| int main(int argc, const char **argv) | ||||
| { | ||||
|         uint64_t receivedCountTotal(0); | ||||
|         uint64_t receivedCountPerSecs(0); | ||||
|  | ||||
|         auto timer = [&receivedCountTotal, &receivedCountPerSecs] { | ||||
|             while (!interrupted) | ||||
|             { | ||||
|                 std::cerr << "messages received: " | ||||
|                           << receivedCountPerSecs | ||||
|                           << " per second " | ||||
|                           << receivedCountTotal  | ||||
|                           << " total" | ||||
|                           << std::endl; | ||||
|  | ||||
|                 receivedCountPerSecs = receivedCount - receivedCountTotal; | ||||
|                 receivedCountTotal += receivedCountPerSecs; | ||||
|  | ||||
|                 auto duration = std::chrono::seconds(1); | ||||
|                 std::this_thread::sleep_for(duration); | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         std::thread t1(timer); | ||||
|  | ||||
|         struct lws_context_creation_info info; | ||||
|         struct lws_client_connect_info i; | ||||
|         struct lws_context *context; | ||||
|         const char *p; | ||||
|         int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE | ||||
|                 /* for LLL_ verbosity above NOTICE to be built into lws, lws | ||||
|                  * must have been configured with -DCMAKE_BUILD_TYPE=DEBUG | ||||
|                  * instead of =RELEASE */ | ||||
|                 /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ | ||||
|                 /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ | ||||
|                 /* | LLL_DEBUG */; | ||||
|  | ||||
|         signal(SIGINT, sigint_handler); | ||||
|         if ((p = lws_cmdline_option(argc, argv, "-d"))) | ||||
|                 logs = atoi(p); | ||||
|  | ||||
|         lws_set_log_level(logs, NULL); | ||||
|         lwsl_user("LWS minimal ws client rx [-d <logs>] [--h2]\n"); | ||||
|  | ||||
|         memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ | ||||
|         info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ | ||||
|         info.protocols = protocols; | ||||
|         info.timeout_secs = 10; | ||||
|  | ||||
|         /* | ||||
|          * since we know this lws context is only ever going to be used with | ||||
|          * one client wsis / fds / sockets at a time, let lws know it doesn't | ||||
|          * have to use the default allocations for fd tables up to ulimit -n. | ||||
|          * It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we | ||||
|          * will use. | ||||
|          */ | ||||
|         info.fd_limit_per_thread = 1 + 1 + 1; | ||||
|  | ||||
|         context = lws_create_context(&info); | ||||
|         if (!context) { | ||||
|                 lwsl_err("lws init failed\n"); | ||||
|                 return 1; | ||||
|         } | ||||
|  | ||||
|         memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */ | ||||
|         i.context = context; | ||||
|         i.port = 8008; | ||||
|         i.address = "127.0.0.1"; | ||||
|         i.path = "/"; | ||||
|         i.host = i.address; | ||||
|         i.origin = i.address; | ||||
|         i.protocol = protocols[0].name; /* "dumb-increment-protocol" */ | ||||
|         i.pwsi = &client_wsi; | ||||
|  | ||||
|         if (lws_cmdline_option(argc, argv, "--h2")) | ||||
|                 i.alpn = "h2"; | ||||
|  | ||||
|         lws_client_connect_via_info(&i); | ||||
|  | ||||
|         while (n >= 0 && client_wsi && !interrupted) | ||||
|                 n = lws_service(context, 0); | ||||
|  | ||||
|         lws_context_destroy(context); | ||||
|  | ||||
|         lwsl_user("Completed %s\n", receivedCount > 10 ? "OK" : "Failed"); | ||||
|  | ||||
|         t1.join(); | ||||
|  | ||||
|         return receivedCount > 10; | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user