Feature/redis (#23)
* Fix warning * (cmake) add a warning about 32/64 conversion problems. * simple redis clients * can publish to redis * redis subscribe * display messages received per second * verbose flag * (cmake) use clang only compile option -Wshorten-64-to-32 when compiling with clang
This commit is contained in:
parent
afe8b966ad
commit
e77b9176f3
@ -15,6 +15,10 @@ if (NOT WIN32)
|
|||||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
|
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
|
||||||
|
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
|
||||||
|
endif()
|
||||||
|
|
||||||
set( IXWEBSOCKET_SOURCES
|
set( IXWEBSOCKET_SOURCES
|
||||||
ixwebsocket/IXSocket.cpp
|
ixwebsocket/IXSocket.cpp
|
||||||
ixwebsocket/IXSocketServer.cpp
|
ixwebsocket/IXSocketServer.cpp
|
||||||
|
@ -73,7 +73,7 @@ namespace ix
|
|||||||
errMsg = "no error";
|
errMsg = "no error";
|
||||||
|
|
||||||
// Maybe a cancellation request got in before the background thread terminated ?
|
// Maybe a cancellation request got in before the background thread terminated ?
|
||||||
if (isCancellationRequested())
|
if (isCancellationRequested && isCancellationRequested())
|
||||||
{
|
{
|
||||||
errMsg = "cancellation requested";
|
errMsg = "cancellation requested";
|
||||||
return nullptr;
|
return nullptr;
|
||||||
@ -121,7 +121,7 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Were we cancelled ?
|
// Were we cancelled ?
|
||||||
if (isCancellationRequested())
|
if (isCancellationRequested && isCancellationRequested())
|
||||||
{
|
{
|
||||||
errMsg = "cancellation requested";
|
errMsg = "cancellation requested";
|
||||||
return nullptr;
|
return nullptr;
|
||||||
@ -129,7 +129,7 @@ namespace ix
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Maybe a cancellation request got in before the bg terminated ?
|
// Maybe a cancellation request got in before the bg terminated ?
|
||||||
if (isCancellationRequested())
|
if (isCancellationRequested && isCancellationRequested())
|
||||||
{
|
{
|
||||||
errMsg = "cancellation requested";
|
errMsg = "cancellation requested";
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
@ -210,7 +210,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
if (isCancellationRequested()) return false;
|
if (isCancellationRequested && isCancellationRequested()) return false;
|
||||||
|
|
||||||
char* buffer = const_cast<char*>(str.c_str());
|
char* buffer = const_cast<char*>(str.c_str());
|
||||||
int len = (int) str.size();
|
int len = (int) str.size();
|
||||||
@ -222,7 +222,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
return ret == len;
|
return ret == len;
|
||||||
}
|
}
|
||||||
// There is possibly something to be write, try again
|
// There is possibly something to be writen, try again
|
||||||
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
|
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
|
||||||
getErrno() == EAGAIN))
|
getErrno() == EAGAIN))
|
||||||
{
|
{
|
||||||
@ -241,7 +241,7 @@ namespace ix
|
|||||||
{
|
{
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
if (isCancellationRequested()) return false;
|
if (isCancellationRequested && isCancellationRequested()) return false;
|
||||||
|
|
||||||
ssize_t ret;
|
ssize_t ret;
|
||||||
ret = recv(buffer, 1);
|
ret = recv(buffer, 1);
|
||||||
@ -304,9 +304,12 @@ namespace ix
|
|||||||
std::vector<uint8_t> output;
|
std::vector<uint8_t> output;
|
||||||
while (output.size() != length)
|
while (output.size() != length)
|
||||||
{
|
{
|
||||||
if (isCancellationRequested()) return std::make_pair(false, std::string());
|
if (isCancellationRequested && isCancellationRequested())
|
||||||
|
{
|
||||||
|
return std::make_pair(false, std::string());
|
||||||
|
}
|
||||||
|
|
||||||
int size = std::min(kChunkSize, length - output.size());
|
size_t size = std::min(kChunkSize, length - output.size());
|
||||||
ssize_t ret = recv((char*)&_readBuffer[0], size);
|
ssize_t ret = recv((char*)&_readBuffer[0], size);
|
||||||
|
|
||||||
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
|
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
|
||||||
|
@ -66,7 +66,7 @@ namespace ix
|
|||||||
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
if (isCancellationRequested()) // Must handle timeout as well
|
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
|
||||||
{
|
{
|
||||||
closeSocket(fd);
|
closeSocket(fd);
|
||||||
errMsg = "Cancelled";
|
errMsg = "Cancelled";
|
||||||
|
@ -23,6 +23,8 @@ add_executable(ws
|
|||||||
ixcrypto/IXHash.cpp
|
ixcrypto/IXHash.cpp
|
||||||
ixcrypto/IXUuid.cpp
|
ixcrypto/IXUuid.cpp
|
||||||
|
|
||||||
|
IXRedisClient.cpp
|
||||||
|
|
||||||
ws_http_client.cpp
|
ws_http_client.cpp
|
||||||
ws_ping_pong.cpp
|
ws_ping_pong.cpp
|
||||||
ws_broadcast_server.cpp
|
ws_broadcast_server.cpp
|
||||||
@ -32,6 +34,8 @@ add_executable(ws
|
|||||||
ws_transfer.cpp
|
ws_transfer.cpp
|
||||||
ws_send.cpp
|
ws_send.cpp
|
||||||
ws_receive.cpp
|
ws_receive.cpp
|
||||||
|
ws_redis_publish.cpp
|
||||||
|
ws_redis_subscribe.cpp
|
||||||
ws.cpp)
|
ws.cpp)
|
||||||
|
|
||||||
if (APPLE AND USE_TLS)
|
if (APPLE AND USE_TLS)
|
||||||
|
166
ws/IXRedisClient.cpp
Normal file
166
ws/IXRedisClient.cpp
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
/*
|
||||||
|
* IXRedisClient.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "IXRedisClient.h"
|
||||||
|
#include <ixwebsocket/IXSocketFactory.h>
|
||||||
|
#include <ixwebsocket/IXSocket.h>
|
||||||
|
|
||||||
|
#include <sstream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <vector>
|
||||||
|
#include <cstring>
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
bool RedisClient::connect(const std::string& hostname, int port)
|
||||||
|
{
|
||||||
|
bool tls = false;
|
||||||
|
std::string errorMsg;
|
||||||
|
_socket = createSocket(tls, errorMsg);
|
||||||
|
|
||||||
|
if (!_socket)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string errMsg;
|
||||||
|
return _socket->connect(hostname, port, errMsg, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RedisClient::publish(const std::string& channel,
|
||||||
|
const std::string& message)
|
||||||
|
{
|
||||||
|
if (!_socket) return false;
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "PUBLISH ";
|
||||||
|
ss << channel;
|
||||||
|
ss << " ";
|
||||||
|
ss << message;
|
||||||
|
ss << "\r\n";
|
||||||
|
|
||||||
|
bool sent = _socket->writeBytes(ss.str(), nullptr);
|
||||||
|
if (!sent)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto pollResult = _socket->isReadyToRead(-1);
|
||||||
|
if (pollResult == PollResultType::Error)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
|
auto lineValid = lineResult.first;
|
||||||
|
auto line = lineResult.second;
|
||||||
|
|
||||||
|
return lineValid;
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// FIXME: we assume that redis never return errors...
|
||||||
|
//
|
||||||
|
bool RedisClient::subscribe(const std::string& channel,
|
||||||
|
const OnRedisSubscribeCallback& callback)
|
||||||
|
{
|
||||||
|
if (!_socket) return false;
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "SUBSCRIBE ";
|
||||||
|
ss << channel;
|
||||||
|
ss << "\r\n";
|
||||||
|
|
||||||
|
bool sent = _socket->writeBytes(ss.str(), nullptr);
|
||||||
|
if (!sent)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait 1s for the response
|
||||||
|
auto pollResult = _socket->isReadyToRead(-1);
|
||||||
|
if (pollResult == PollResultType::Error)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the first line of the response
|
||||||
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
|
auto lineValid = lineResult.first;
|
||||||
|
auto line = lineResult.second;
|
||||||
|
|
||||||
|
if (!lineValid) return false;
|
||||||
|
|
||||||
|
// There are 5 items for the subscribe repply
|
||||||
|
for (int i = 0; i < 5; ++i)
|
||||||
|
{
|
||||||
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
|
auto lineValid = lineResult.first;
|
||||||
|
auto line = lineResult.second;
|
||||||
|
|
||||||
|
if (!lineValid) return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait indefinitely for new messages
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
// Wait until something is ready to read
|
||||||
|
auto pollResult = _socket->isReadyToRead(-1);
|
||||||
|
if (pollResult == PollResultType::Error)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The first line of the response describe the return type,
|
||||||
|
// => *3 (an array of 3 elements)
|
||||||
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
|
auto lineValid = lineResult.first;
|
||||||
|
auto line = lineResult.second;
|
||||||
|
|
||||||
|
if (!lineValid) return false;
|
||||||
|
|
||||||
|
int arraySize;
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << line.substr(1, line.size()-1);
|
||||||
|
ss >> arraySize;
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are 6 items for each received message
|
||||||
|
for (int i = 0; i < arraySize; ++i)
|
||||||
|
{
|
||||||
|
auto lineResult = _socket->readLine(nullptr);
|
||||||
|
auto lineValid = lineResult.first;
|
||||||
|
auto line = lineResult.second;
|
||||||
|
|
||||||
|
if (!lineValid) return false;
|
||||||
|
|
||||||
|
// Messages are string, which start with a string size
|
||||||
|
// => $7 (7 bytes)
|
||||||
|
int stringSize;
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << line.substr(1, line.size()-1);
|
||||||
|
ss >> stringSize;
|
||||||
|
|
||||||
|
auto readResult = _socket->readBytes(stringSize, nullptr, nullptr);
|
||||||
|
if (!readResult.first) return false;
|
||||||
|
|
||||||
|
if (i == 2)
|
||||||
|
{
|
||||||
|
// The message is the 3rd element.
|
||||||
|
callback(readResult.second);
|
||||||
|
}
|
||||||
|
|
||||||
|
// read last 2 bytes (\r\n)
|
||||||
|
char c;
|
||||||
|
_socket->readByte(&c, nullptr);
|
||||||
|
_socket->readByte(&c, nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
36
ws/IXRedisClient.h
Normal file
36
ws/IXRedisClient.h
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
/*
|
||||||
|
* IXRedisClient.h
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
class Socket;
|
||||||
|
|
||||||
|
class RedisClient {
|
||||||
|
public:
|
||||||
|
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
|
||||||
|
|
||||||
|
RedisClient() = default;
|
||||||
|
~RedisClient() = default;
|
||||||
|
|
||||||
|
bool connect(const std::string& hostname,
|
||||||
|
int port);
|
||||||
|
|
||||||
|
bool publish(const std::string& channel,
|
||||||
|
const std::string& message);
|
||||||
|
|
||||||
|
bool subscribe(const std::string& channel,
|
||||||
|
const OnRedisSubscribeCallback& callback);
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::shared_ptr<Socket> _socket;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
23
ws/ws.cpp
23
ws/ws.cpp
@ -35,12 +35,15 @@ int main(int argc, char** argv)
|
|||||||
std::string output;
|
std::string output;
|
||||||
std::string hostname("127.0.0.1");
|
std::string hostname("127.0.0.1");
|
||||||
std::string pidfile;
|
std::string pidfile;
|
||||||
|
std::string channel;
|
||||||
|
std::string message;
|
||||||
bool headersOnly = false;
|
bool headersOnly = false;
|
||||||
bool followRedirects = false;
|
bool followRedirects = false;
|
||||||
bool verbose = false;
|
bool verbose = false;
|
||||||
bool save = false;
|
bool save = false;
|
||||||
bool compress = false;
|
bool compress = false;
|
||||||
int port = 8080;
|
int port = 8080;
|
||||||
|
int redisPort = 6379;
|
||||||
int connectTimeOut = 60;
|
int connectTimeOut = 60;
|
||||||
int transferTimeout = 1800;
|
int transferTimeout = 1800;
|
||||||
int maxRedirects = 5;
|
int maxRedirects = 5;
|
||||||
@ -96,6 +99,18 @@ int main(int argc, char** argv)
|
|||||||
httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout");
|
httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout");
|
||||||
httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout");
|
httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout");
|
||||||
|
|
||||||
|
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
|
||||||
|
redisPublishApp->add_option("--port", redisPort, "Port");
|
||||||
|
redisPublishApp->add_option("--host", hostname, "Hostname");
|
||||||
|
redisPublishApp->add_option("channel", channel, "Channel")->required();
|
||||||
|
redisPublishApp->add_option("message", message, "Message")->required();
|
||||||
|
|
||||||
|
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
|
||||||
|
redisSubscribeApp->add_option("--port", redisPort, "Port");
|
||||||
|
redisSubscribeApp->add_option("--host", hostname, "Hostname");
|
||||||
|
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
|
||||||
|
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
|
||||||
|
|
||||||
CLI11_PARSE(app, argc, argv);
|
CLI11_PARSE(app, argc, argv);
|
||||||
|
|
||||||
// pid file handling
|
// pid file handling
|
||||||
@ -149,6 +164,14 @@ int main(int argc, char** argv)
|
|||||||
followRedirects, maxRedirects, verbose,
|
followRedirects, maxRedirects, verbose,
|
||||||
save, output, compress);
|
save, output, compress);
|
||||||
}
|
}
|
||||||
|
else if (app.got_subcommand("redis_publish"))
|
||||||
|
{
|
||||||
|
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
|
||||||
|
}
|
||||||
|
else if (app.got_subcommand("redis_subscribe"))
|
||||||
|
{
|
||||||
|
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
|
||||||
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
10
ws/ws.h
10
ws/ws.h
@ -39,4 +39,14 @@ namespace ix
|
|||||||
|
|
||||||
int ws_send_main(const std::string& url,
|
int ws_send_main(const std::string& url,
|
||||||
const std::string& path);
|
const std::string& path);
|
||||||
|
|
||||||
|
int ws_redis_publish_main(const std::string& hostname,
|
||||||
|
int port,
|
||||||
|
const std::string& channel,
|
||||||
|
const std::string& message);
|
||||||
|
|
||||||
|
int ws_redis_subscribe_main(const std::string& hostname,
|
||||||
|
int port,
|
||||||
|
const std::string& channel,
|
||||||
|
bool verbose);
|
||||||
}
|
}
|
||||||
|
35
ws/ws_redis_publish.cpp
Normal file
35
ws/ws_redis_publish.cpp
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
/*
|
||||||
|
* ws_redis_publish.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <sstream>
|
||||||
|
#include "IXRedisClient.h"
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
int ws_redis_publish_main(const std::string& hostname,
|
||||||
|
int port,
|
||||||
|
const std::string& channel,
|
||||||
|
const std::string& message)
|
||||||
|
{
|
||||||
|
RedisClient redisClient;
|
||||||
|
if (!redisClient.connect(hostname, port))
|
||||||
|
{
|
||||||
|
std::cerr << "Cannot connect to redis host" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cerr << "Publishing message " << message
|
||||||
|
<< " to " << channel << "..." << std::endl;
|
||||||
|
if (!redisClient.publish(channel, message))
|
||||||
|
{
|
||||||
|
std::cerr << "Error publishing to channel " << channel << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
66
ws/ws_redis_subscribe.cpp
Normal file
66
ws/ws_redis_subscribe.cpp
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
/*
|
||||||
|
* ws_redis_subscribe.cpp
|
||||||
|
* Author: Benjamin Sergeant
|
||||||
|
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <sstream>
|
||||||
|
#include <chrono>
|
||||||
|
#include "IXRedisClient.h"
|
||||||
|
|
||||||
|
namespace ix
|
||||||
|
{
|
||||||
|
int ws_redis_subscribe_main(const std::string& hostname,
|
||||||
|
int port,
|
||||||
|
const std::string& channel,
|
||||||
|
bool verbose)
|
||||||
|
{
|
||||||
|
RedisClient redisClient;
|
||||||
|
if (!redisClient.connect(hostname, port))
|
||||||
|
{
|
||||||
|
std::cerr << "Cannot connect to redis host" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
|
||||||
|
int msgPerSeconds = 0;
|
||||||
|
int msgCount = 0;
|
||||||
|
|
||||||
|
auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
|
||||||
|
(const std::string& message)
|
||||||
|
{
|
||||||
|
if (verbose)
|
||||||
|
{
|
||||||
|
std::cout << message << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
msgPerSeconds++;
|
||||||
|
|
||||||
|
auto now = std::chrono::steady_clock::now();
|
||||||
|
if (now - lastTimePoint > std::chrono::seconds(1))
|
||||||
|
{
|
||||||
|
lastTimePoint = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
msgCount += msgPerSeconds;
|
||||||
|
|
||||||
|
// #messages 901 msg/s 150
|
||||||
|
std::cout << "#messages " << msgCount << " "
|
||||||
|
<< "msg/s " << msgPerSeconds
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
msgPerSeconds = 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::cerr << "Subscribing to " << channel << "..." << std::endl;
|
||||||
|
if (!redisClient.subscribe(channel, callback))
|
||||||
|
{
|
||||||
|
std::cerr << "Error subscribing to channel " << channel << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user