diff --git a/CMakeLists.txt b/CMakeLists.txt index cf398baf..864818f6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,10 @@ if (NOT WIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") endif() +if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32") +endif() + set( IXWEBSOCKET_SOURCES ixwebsocket/IXSocket.cpp ixwebsocket/IXSocketServer.cpp diff --git a/ixwebsocket/IXDNSLookup.cpp b/ixwebsocket/IXDNSLookup.cpp index 624e1093..5e922088 100644 --- a/ixwebsocket/IXDNSLookup.cpp +++ b/ixwebsocket/IXDNSLookup.cpp @@ -73,7 +73,7 @@ namespace ix errMsg = "no error"; // Maybe a cancellation request got in before the background thread terminated ? - if (isCancellationRequested()) + if (isCancellationRequested && isCancellationRequested()) { errMsg = "cancellation requested"; return nullptr; @@ -121,7 +121,7 @@ namespace ix } // Were we cancelled ? - if (isCancellationRequested()) + if (isCancellationRequested && isCancellationRequested()) { errMsg = "cancellation requested"; return nullptr; @@ -129,7 +129,7 @@ namespace ix } // Maybe a cancellation request got in before the bg terminated ? - if (isCancellationRequested()) + if (isCancellationRequested && isCancellationRequested()) { errMsg = "cancellation requested"; return nullptr; diff --git a/ixwebsocket/IXSocket.cpp b/ixwebsocket/IXSocket.cpp index 76b4d234..224d8bfe 100644 --- a/ixwebsocket/IXSocket.cpp +++ b/ixwebsocket/IXSocket.cpp @@ -210,7 +210,7 @@ namespace ix { while (true) { - if (isCancellationRequested()) return false; + if (isCancellationRequested && isCancellationRequested()) return false; char* buffer = const_cast(str.c_str()); int len = (int) str.size(); @@ -222,7 +222,7 @@ namespace ix { return ret == len; } - // There is possibly something to be write, try again + // There is possibly something to be writen, try again else if (ret < 0 && (getErrno() == EWOULDBLOCK || getErrno() == EAGAIN)) { @@ -241,7 +241,7 @@ namespace ix { while (true) { - if (isCancellationRequested()) return false; + if (isCancellationRequested && isCancellationRequested()) return false; ssize_t ret; ret = recv(buffer, 1); @@ -304,9 +304,12 @@ namespace ix std::vector output; while (output.size() != length) { - if (isCancellationRequested()) return std::make_pair(false, std::string()); + if (isCancellationRequested && isCancellationRequested()) + { + return std::make_pair(false, std::string()); + } - int size = std::min(kChunkSize, length - output.size()); + size_t size = std::min(kChunkSize, length - output.size()); ssize_t ret = recv((char*)&_readBuffer[0], size); if (ret <= 0 && (getErrno() != EWOULDBLOCK && diff --git a/ixwebsocket/IXSocketConnect.cpp b/ixwebsocket/IXSocketConnect.cpp index 6d3be10a..7944fa40 100644 --- a/ixwebsocket/IXSocketConnect.cpp +++ b/ixwebsocket/IXSocketConnect.cpp @@ -66,7 +66,7 @@ namespace ix for (;;) { - if (isCancellationRequested()) // Must handle timeout as well + if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well { closeSocket(fd); errMsg = "Cancelled"; diff --git a/ws/CMakeLists.txt b/ws/CMakeLists.txt index 2a2b4ee2..fe26f0b3 100644 --- a/ws/CMakeLists.txt +++ b/ws/CMakeLists.txt @@ -23,6 +23,8 @@ add_executable(ws ixcrypto/IXHash.cpp ixcrypto/IXUuid.cpp + IXRedisClient.cpp + ws_http_client.cpp ws_ping_pong.cpp ws_broadcast_server.cpp @@ -32,6 +34,8 @@ add_executable(ws ws_transfer.cpp ws_send.cpp ws_receive.cpp + ws_redis_publish.cpp + ws_redis_subscribe.cpp ws.cpp) if (APPLE AND USE_TLS) diff --git a/ws/IXRedisClient.cpp b/ws/IXRedisClient.cpp new file mode 100644 index 00000000..95a1ad32 --- /dev/null +++ b/ws/IXRedisClient.cpp @@ -0,0 +1,166 @@ +/* + * IXRedisClient.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include "IXRedisClient.h" +#include +#include + +#include +#include +#include +#include + +namespace ix +{ + bool RedisClient::connect(const std::string& hostname, int port) + { + bool tls = false; + std::string errorMsg; + _socket = createSocket(tls, errorMsg); + + if (!_socket) + { + return false; + } + + std::string errMsg; + return _socket->connect(hostname, port, errMsg, nullptr); + } + + bool RedisClient::publish(const std::string& channel, + const std::string& message) + { + if (!_socket) return false; + + std::stringstream ss; + ss << "PUBLISH "; + ss << channel; + ss << " "; + ss << message; + ss << "\r\n"; + + bool sent = _socket->writeBytes(ss.str(), nullptr); + if (!sent) + { + return false; + } + + auto pollResult = _socket->isReadyToRead(-1); + if (pollResult == PollResultType::Error) + { + return false; + } + + auto lineResult = _socket->readLine(nullptr); + auto lineValid = lineResult.first; + auto line = lineResult.second; + + return lineValid; + } + + // + // FIXME: we assume that redis never return errors... + // + bool RedisClient::subscribe(const std::string& channel, + const OnRedisSubscribeCallback& callback) + { + if (!_socket) return false; + + std::stringstream ss; + ss << "SUBSCRIBE "; + ss << channel; + ss << "\r\n"; + + bool sent = _socket->writeBytes(ss.str(), nullptr); + if (!sent) + { + return false; + } + + // Wait 1s for the response + auto pollResult = _socket->isReadyToRead(-1); + if (pollResult == PollResultType::Error) + { + return false; + } + + // Read the first line of the response + auto lineResult = _socket->readLine(nullptr); + auto lineValid = lineResult.first; + auto line = lineResult.second; + + if (!lineValid) return false; + + // There are 5 items for the subscribe repply + for (int i = 0; i < 5; ++i) + { + auto lineResult = _socket->readLine(nullptr); + auto lineValid = lineResult.first; + auto line = lineResult.second; + + if (!lineValid) return false; + } + + // Wait indefinitely for new messages + while (true) + { + // Wait until something is ready to read + auto pollResult = _socket->isReadyToRead(-1); + if (pollResult == PollResultType::Error) + { + return false; + } + + // The first line of the response describe the return type, + // => *3 (an array of 3 elements) + auto lineResult = _socket->readLine(nullptr); + auto lineValid = lineResult.first; + auto line = lineResult.second; + + if (!lineValid) return false; + + int arraySize; + { + std::stringstream ss; + ss << line.substr(1, line.size()-1); + ss >> arraySize; + } + + // There are 6 items for each received message + for (int i = 0; i < arraySize; ++i) + { + auto lineResult = _socket->readLine(nullptr); + auto lineValid = lineResult.first; + auto line = lineResult.second; + + if (!lineValid) return false; + + // Messages are string, which start with a string size + // => $7 (7 bytes) + int stringSize; + std::stringstream ss; + ss << line.substr(1, line.size()-1); + ss >> stringSize; + + auto readResult = _socket->readBytes(stringSize, nullptr, nullptr); + if (!readResult.first) return false; + + if (i == 2) + { + // The message is the 3rd element. + callback(readResult.second); + } + + // read last 2 bytes (\r\n) + char c; + _socket->readByte(&c, nullptr); + _socket->readByte(&c, nullptr); + } + } + + return true; + } +} diff --git a/ws/IXRedisClient.h b/ws/IXRedisClient.h new file mode 100644 index 00000000..42f5e77f --- /dev/null +++ b/ws/IXRedisClient.h @@ -0,0 +1,36 @@ +/* + * IXRedisClient.h + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#pragma once + +#include +#include + +namespace ix +{ + class Socket; + + class RedisClient { + public: + using OnRedisSubscribeCallback = std::function; + + RedisClient() = default; + ~RedisClient() = default; + + bool connect(const std::string& hostname, + int port); + + bool publish(const std::string& channel, + const std::string& message); + + bool subscribe(const std::string& channel, + const OnRedisSubscribeCallback& callback); + + private: + std::shared_ptr _socket; + }; +} + diff --git a/ws/ws.cpp b/ws/ws.cpp index 9aea1a6d..64f86bfc 100644 --- a/ws/ws.cpp +++ b/ws/ws.cpp @@ -35,12 +35,15 @@ int main(int argc, char** argv) std::string output; std::string hostname("127.0.0.1"); std::string pidfile; + std::string channel; + std::string message; bool headersOnly = false; bool followRedirects = false; bool verbose = false; bool save = false; bool compress = false; int port = 8080; + int redisPort = 6379; int connectTimeOut = 60; int transferTimeout = 1800; int maxRedirects = 5; @@ -96,6 +99,18 @@ int main(int argc, char** argv) httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout"); httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout"); + CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher"); + redisPublishApp->add_option("--port", redisPort, "Port"); + redisPublishApp->add_option("--host", hostname, "Hostname"); + redisPublishApp->add_option("channel", channel, "Channel")->required(); + redisPublishApp->add_option("message", message, "Message")->required(); + + CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber"); + redisSubscribeApp->add_option("--port", redisPort, "Port"); + redisSubscribeApp->add_option("--host", hostname, "Hostname"); + redisSubscribeApp->add_option("channel", channel, "Channel")->required(); + redisSubscribeApp->add_flag("-v", verbose, "Verbose"); + CLI11_PARSE(app, argc, argv); // pid file handling @@ -149,6 +164,14 @@ int main(int argc, char** argv) followRedirects, maxRedirects, verbose, save, output, compress); } + else if (app.got_subcommand("redis_publish")) + { + return ix::ws_redis_publish_main(hostname, redisPort, channel, message); + } + else if (app.got_subcommand("redis_subscribe")) + { + return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose); + } return 1; } diff --git a/ws/ws.h b/ws/ws.h index 45cba245..d5b5637a 100644 --- a/ws/ws.h +++ b/ws/ws.h @@ -39,4 +39,14 @@ namespace ix int ws_send_main(const std::string& url, const std::string& path); + + int ws_redis_publish_main(const std::string& hostname, + int port, + const std::string& channel, + const std::string& message); + + int ws_redis_subscribe_main(const std::string& hostname, + int port, + const std::string& channel, + bool verbose); } diff --git a/ws/ws_redis_publish.cpp b/ws/ws_redis_publish.cpp new file mode 100644 index 00000000..592f664b --- /dev/null +++ b/ws/ws_redis_publish.cpp @@ -0,0 +1,35 @@ +/* + * ws_redis_publish.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include "IXRedisClient.h" + +namespace ix +{ + int ws_redis_publish_main(const std::string& hostname, + int port, + const std::string& channel, + const std::string& message) + { + RedisClient redisClient; + if (!redisClient.connect(hostname, port)) + { + std::cerr << "Cannot connect to redis host" << std::endl; + return 1; + } + + std::cerr << "Publishing message " << message + << " to " << channel << "..." << std::endl; + if (!redisClient.publish(channel, message)) + { + std::cerr << "Error publishing to channel " << channel << std::endl; + return 1; + } + + return 0; + } +} diff --git a/ws/ws_redis_subscribe.cpp b/ws/ws_redis_subscribe.cpp new file mode 100644 index 00000000..50a76041 --- /dev/null +++ b/ws/ws_redis_subscribe.cpp @@ -0,0 +1,66 @@ +/* + * ws_redis_subscribe.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2019 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include "IXRedisClient.h" + +namespace ix +{ + int ws_redis_subscribe_main(const std::string& hostname, + int port, + const std::string& channel, + bool verbose) + { + RedisClient redisClient; + if (!redisClient.connect(hostname, port)) + { + std::cerr << "Cannot connect to redis host" << std::endl; + return 1; + } + + std::chrono::time_point lastTimePoint; + int msgPerSeconds = 0; + int msgCount = 0; + + auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose] + (const std::string& message) + { + if (verbose) + { + std::cout << message << std::endl; + } + + msgPerSeconds++; + + auto now = std::chrono::steady_clock::now(); + if (now - lastTimePoint > std::chrono::seconds(1)) + { + lastTimePoint = std::chrono::steady_clock::now(); + + msgCount += msgPerSeconds; + + // #messages 901 msg/s 150 + std::cout << "#messages " << msgCount << " " + << "msg/s " << msgPerSeconds + << std::endl; + + msgPerSeconds = 0; + } + }; + + std::cerr << "Subscribing to " << channel << "..." << std::endl; + if (!redisClient.subscribe(channel, callback)) + { + std::cerr << "Error subscribing to channel " << channel << std::endl; + return 1; + } + + return 0; + } +} +