Add simple Redis Server which is only capable of doing publish / subscribe. New ws redis_server sub-command to use it. The server is used in the unittest, so that we can run on CI in environment where redis isn not available like github actions env.

This commit is contained in:
Benjamin Sergeant 2019-09-23 21:04:01 -07:00
parent 95722e3bbb
commit ceb0c602c9
20 changed files with 497 additions and 27 deletions

View File

@ -1 +1 @@
6.2.3
6.2.5

View File

@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.
## [6.2.5] - 2019-09-23
- Add simple Redis Server which is only capable of doing publish / subscribe. New ws redis_server sub-command to use it. The server is used in the unittest, so that we can run on CI in environment where redis isn not available like github actions env.
## [6.2.4] - 2019-09-22
- Add options to configure TLS ; contributed by Matt DeBoer. Only implemented for OpenSSL TLS backend for now.

View File

@ -8,6 +8,7 @@ set (IXSNAKE_SOURCES
ixsnake/IXSnakeProtocol.cpp
ixsnake/IXAppConfig.cpp
ixsnake/IXRedisClient.cpp
ixsnake/IXRedisServer.cpp
)
set (IXSNAKE_HEADERS
@ -15,6 +16,7 @@ set (IXSNAKE_HEADERS
ixsnake/IXSnakeProtocol.h
ixsnake/IXAppConfig.h
ixsnake/IXRedisClient.h
ixsnake/IXRedisServer.h
)
add_library(ixsnake STATIC

View File

@ -132,9 +132,9 @@ namespace ix
if (!_socket) return false;
std::stringstream ss;
ss << "SUBSCRIBE ";
ss << channel;
ss << "\r\n";
ss << "*2\r\n";
ss << writeString("SUBSCRIBE");
ss << writeString(channel);
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)

View File

@ -0,0 +1,300 @@
/*
* IXRedisServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXRedisServer.h"
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocketConnect.h>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h>
#include <fstream>
#include <iostream>
#include <sstream>
#include <vector>
namespace ix
{
RedisServer::RedisServer(int port, const std::string& host, int backlog, size_t maxConnections)
: SocketServer(port, host, backlog, maxConnections)
, _connectedClientsCount(0)
, _stopHandlingConnections(false)
{
;
}
RedisServer::~RedisServer()
{
stop();
}
void RedisServer::stop()
{
stopAcceptingConnections();
_stopHandlingConnections = true;
while (_connectedClientsCount != 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
_stopHandlingConnections = false;
SocketServer::stop();
}
void RedisServer::handleConnection(int fd, std::shared_ptr<ConnectionState> connectionState)
{
_connectedClientsCount++;
std::string errorMsg;
auto socket = createSocket(fd, errorMsg);
// Set the socket to non blocking mode + other tweaks
SocketConnect::configure(fd);
while (!_stopHandlingConnections)
{
std::vector<std::string> tokens;
if (!parseRequest(socket, tokens))
{
if (_stopHandlingConnections)
{
logError("Cancellation requested");
}
else
{
logError("Error parsing request");
}
break;
}
bool success = false;
// publish
if (tokens[0] == "COMMAND")
{
success = handleCommand(socket, tokens);
}
else if (tokens[0] == "PUBLISH")
{
success = handlePublish(socket, tokens);
}
else if (tokens[0] == "SUBSCRIBE")
{
success = handleSubscribe(socket, tokens);
}
if (!success)
{
if (_stopHandlingConnections)
{
logError("Cancellation requested");
}
else
{
logError("Error processing request for command: " + tokens[0]);
}
break;
}
}
cleanupSubscribers(socket);
logInfo("Connection closed for connection id " + connectionState->getId());
connectionState->setTerminated();
Socket::closeSocket(fd);
_connectedClientsCount--;
}
void RedisServer::cleanupSubscribers(std::shared_ptr<Socket> socket)
{
std::lock_guard<std::mutex> lock(_mutex);
for (auto&& it : _subscribers)
{
it.second.erase(socket);
}
for (auto it : _subscribers)
{
std::stringstream ss;
ss << "Subscription id: " << it.first
<< " #subscribers: " << it.second.size();
logInfo(ss.str());
}
}
size_t RedisServer::getConnectedClientsCount()
{
return _connectedClientsCount;
}
bool RedisServer::startsWith(const std::string& str,
const std::string& start)
{
return str.compare(0, start.length(), start) == 0;
}
std::string RedisServer::writeString(const std::string& str)
{
std::stringstream ss;
ss << "$";
ss << str.size();
ss << "\r\n";
ss << str;
ss << "\r\n";
return ss.str();
}
bool RedisServer::parseRequest(
std::shared_ptr<Socket> socket,
std::vector<std::string>& tokens)
{
// Parse first line
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
auto lineResult = socket->readLine(cb);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
std::string str = line.substr(1);
std::stringstream ss;
ss << str;
int count;
ss >> count;
for (int i = 0; i < count; ++i)
{
auto lineResult = socket->readLine(cb);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
int stringSize;
std::stringstream ss;
ss << line.substr(1, line.size() - 1);
ss >> stringSize;
auto readResult = socket->readBytes(stringSize, nullptr, nullptr);
if (!readResult.first) return false;
// read last 2 bytes (\r\n)
char c;
socket->readByte(&c, nullptr);
socket->readByte(&c, nullptr);
tokens.push_back(readResult.second);
}
return true;
}
bool RedisServer::handleCommand(
std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 1) return false;
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
std::stringstream ss;
// return 2 nested arrays
ss << "*2\r\n";
//
// publish
//
ss << "*6\r\n";
ss << writeString("publish"); // 1
ss << ":3\r\n"; // 2
ss << "*0\r\n"; // 3
ss << ":1\r\n"; // 4
ss << ":2\r\n"; // 5
ss << ":1\r\n"; // 6
//
// subscribe
//
ss << "*6\r\n";
ss << writeString("subscribe"); // 1
ss << ":2\r\n"; // 2
ss << "*0\r\n"; // 3
ss << ":1\r\n"; // 4
ss << ":1\r\n"; // 5
ss << ":1\r\n"; // 6
socket->writeBytes(ss.str(), cb);
return true;
}
bool RedisServer::handleSubscribe(
std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 2) return false;
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
std::string channel = tokens[1];
// Respond
socket->writeBytes("*3\r\n", cb);
socket->writeBytes(writeString("subscribe"), cb);
socket->writeBytes(writeString(channel), cb);
socket->writeBytes(":1\r\n", cb);
std::lock_guard<std::mutex> lock(_mutex);
_subscribers[channel].insert(socket);
return true;
}
bool RedisServer::handlePublish(
std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 3) return false;
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
std::string channel = tokens[1];
std::string data = tokens[2];
// now dispatch the message to subscribers (write custom method)
std::lock_guard<std::mutex> lock(_mutex);
auto it = _subscribers.find(channel);
if (it == _subscribers.end())
{
// return the number of clients that received the message, 0 in that case
socket->writeBytes(":0\r\n", cb);
return true;
}
auto subscribers = it->second;
for (auto jt : subscribers)
{
jt->writeBytes("*3\r\n", cb);
jt->writeBytes(writeString("message"), cb);
jt->writeBytes(writeString(channel), cb);
jt->writeBytes(writeString(data), cb);
}
// return the number of clients that received the message.
std::stringstream ss;
ss << ":"
<< std::to_string(subscribers.size())
<< "\r\n";
socket->writeBytes(ss.str(), cb);
return true;
}
} // namespace ix

View File

@ -0,0 +1,67 @@
/*
* IXRedisServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSocketServer.h"
#include "IXSocket.h"
#include <functional>
#include <memory>
#include <mutex>
#include <set>
#include <map>
#include <string>
#include <thread>
#include <utility> // pair
namespace ix
{
class RedisServer final : public SocketServer
{
public:
RedisServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = SocketServer::kDefaultMaxConnections);
virtual ~RedisServer();
virtual void stop() final;
private:
// Member variables
std::atomic<int> _connectedClientsCount;
// Subscribers
// We could store connection states in there, to add better debugging
// since a connection state has a readable ID
std::map<std::string, std::set<std::shared_ptr<Socket>>> _subscribers;
std::mutex _mutex;
std::atomic<bool> _stopHandlingConnections;
// Methods
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
bool startsWith(const std::string& str, const std::string& start);
std::string writeString(const std::string& str);
bool parseRequest(
std::shared_ptr<Socket> socket,
std::vector<std::string>& tokens);
bool handlePublish(std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens);
bool handleSubscribe(std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens);
bool handleCommand(std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens);
void cleanupSubscribers(std::shared_ptr<Socket> socket);
};
} // namespace ix

View File

@ -119,6 +119,8 @@ namespace ix
void SocketServer::start()
{
_stop = false;
if (!_thread.joinable())
{
_thread = std::thread(&SocketServer::run, this);

View File

@ -58,6 +58,8 @@ namespace ix
void stopAcceptingConnections();
std::atomic<bool> _stop;
private:
// Member variables
int _port;
@ -71,7 +73,6 @@ namespace ix
std::mutex _logMutex;
// background thread to wait for incoming connections
std::atomic<bool> _stop;
std::thread _thread;
void run();

View File

@ -6,4 +6,4 @@
#pragma once
#define IX_WEBSOCKET_VERSION "6.2.3"
#define IX_WEBSOCKET_VERSION "6.2.5"

View File

@ -4,13 +4,14 @@
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include <ixsnake/IXSnakeServer.h>
#include "IXTest.h"
#include "catch.hpp"
#include <chrono>
#include <iostream>
#include <ixcobra/IXCobraConnection.h>
#include <ixcrypto/IXUuid.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
using namespace ix;
@ -166,14 +167,6 @@ namespace
//
void SatoriChat::run()
{
snake::AppConfig appConfig = makeSnakeServerConfig(8008);
// Display config on the terminal for debugging
dumpConfig(appConfig);
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
// "chat" conf
std::string appkey("FC2F10139A2BAc53BB72D9db967b024f");
std::string channel = _session;
@ -186,12 +179,16 @@ namespace
_conn.setEventCallback([this, channel](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& /*headers*/,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
log("Subscriber connected: " + _user);
for (auto&& it : headers)
{
log("Headers " + it.first + " " + it.second);
}
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
@ -253,8 +250,6 @@ namespace
const ix::WebSocketHttpHeaders& /*headers*/,
const std::string& /*subscriptionId*/,
CobraConnection::MsgId /*msgId*/) { ; });
snakeServer.stop();
}
} // namespace
@ -264,6 +259,14 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
@ -293,6 +296,7 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
if (timeout <= 0)
{
snakeServer.stop();
redisServer.stop();
REQUIRE(false); // timeout
}
}
@ -317,6 +321,7 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
if (timeout <= 0)
{
snakeServer.stop();
redisServer.stop();
REQUIRE(false); // timeout
}
}
@ -327,15 +332,18 @@ TEST_CASE("Cobra_chat", "[cobra_chat]")
chatA.stop();
chatB.stop();
// FIXME: improve this and make it exact matches
// we get unreliable result set
REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
std::cout << incomingBytes << std::endl;
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
std::cerr << "Stopping snake server... ";
snakeServer.stop();
std::cerr << "OK" << std::endl;
std::cerr << "Stopping redis server... ";
redisServer.stop();
std::cerr << "OK" << std::endl;
}
}

View File

@ -8,6 +8,7 @@
#include <iostream>
#include <ixcobra/IXCobraMetricsPublisher.h>
#include <ixcrypto/IXUuid.h>
#include <ixsnake/IXRedisServer.h>
#include <ixsnake/IXSnakeServer.h>
#include <set>
@ -15,6 +16,23 @@ using namespace ix;
namespace
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
void setupTrafficTrackerCallback()
{
ix::CobraConnection::setTrafficTrackerCallback([](size_t size, bool incoming) {
if (incoming)
{
incomingBytes += size;
}
else
{
outgoingBytes += size;
}
});
}
//
// This project / appkey is configure on cobra to not do any batching.
// This way we can start a subscriber and receive all messages as they come in.
@ -53,12 +71,16 @@ namespace
conn.setEventCallback([&conn](ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& /*headers*/,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
Logger() << "Subscriber connected:";
for (auto&& it : headers)
{
log("Headers " + it.first + " " + it.second);
}
}
if (eventType == ix::CobraConnection_EventType_Error)
{
@ -123,9 +145,19 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
{
int port = getFreePort();
snake::AppConfig appConfig = makeSnakeServerConfig(port);
// Start a redis server
ix::RedisServer redisServer(appConfig.redisPort);
auto res = redisServer.listen();
REQUIRE(res.first);
redisServer.start();
// Start a snake server
snake::SnakeServer snakeServer(appConfig);
snakeServer.run();
setupTrafficTrackerCallback();
std::stringstream ss;
ss << "ws://localhost:" << port;
std::string endpoint = ss.str();
@ -147,6 +179,8 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
timeout -= 10;
if (timeout <= 0)
{
snakeServer.stop();
redisServer.stop();
REQUIRE(false); // timeout
}
}
@ -233,5 +267,14 @@ TEST_CASE("Cobra_Metrics_Publisher", "[cobra]")
CHECK(gIds.count("sms_set_rate_control_id") == 1);
CHECK(gIds.count("sms_set_blacklist_id") == 1);
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
std::cerr << "Stopping snake server... ";
snakeServer.stop();
std::cerr << "OK" << std::endl;
std::cerr << "Stopping redis server... ";
redisServer.stop();
std::cerr << "OK" << std::endl;
}

View File

@ -155,7 +155,7 @@ namespace ix
appConfig.port = port;
appConfig.hostname = "127.0.0.1";
appConfig.verbose = true;
appConfig.redisPort = 6379;
appConfig.redisPort = getFreePort();
appConfig.redisPassword = "";
appConfig.redisHosts.push_back("localhost"); // only one host supported now

View File

@ -6,9 +6,9 @@
#pragma once
#include <ixsnake/IXAppConfig.h>
#include "IXGetFreePort.h"
#include <iostream>
#include <ixsnake/IXAppConfig.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex>
#include <spdlog/spdlog.h>

View File

@ -47,6 +47,7 @@ add_executable(ws
ws_receive.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws_redis_server.cpp
ws_snake.cpp
ws_cobra_subscribe.cpp
ws_cobra_metrics_publish.cpp

View File

@ -243,6 +243,10 @@ int main(int argc, char** argv)
autobahnApp->add_option("--url", url, "url");
autobahnApp->add_flag("-q", quiet, "Quiet");
CLI::App* redisServerApp = app.add_subcommand("redis_server", "Redis server");
redisServerApp->add_option("--port", port, "Port");
redisServerApp->add_option("--host", hostname, "Hostname");
CLI11_PARSE(app, argc, argv);
// pid file handling
@ -364,6 +368,10 @@ int main(int argc, char** argv)
{
ret = ix::ws_autobahn_main(url, quiet);
}
else if (app.got_subcommand("redis_server"))
{
ret = ix::ws_redis_server_main(port, hostname);
}
ix::uninitNetSystem();
return ret;

View File

@ -111,4 +111,6 @@ namespace ix
int ws_httpd_main(int port, const std::string& hostname);
int ws_autobahn_main(const std::string& url, bool quiet);
int ws_redis_server_main(int port, const std::string& hostname);
} // namespace ix

View File

@ -4,8 +4,8 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <ixsnake/IXRedisClient.h>
#include <iostream>
#include <ixsnake/IXRedisClient.h>
#include <sstream>
namespace ix

32
ws/ws_redis_server.cpp Normal file
View File

@ -0,0 +1,32 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <ixsnake/IXRedisServer.h>
#include <spdlog/spdlog.h>
#include <sstream>
namespace ix
{
int ws_redis_server_main(int port, const std::string& hostname)
{
spdlog::info("Listening on {}:{}", hostname, port);
ix::RedisServer server(port, hostname);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}
} // namespace ix

View File

@ -4,10 +4,10 @@
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <ixsnake/IXRedisClient.h>
#include <atomic>
#include <chrono>
#include <iostream>
#include <ixsnake/IXRedisClient.h>
#include <sstream>
#include <thread>

View File

@ -4,9 +4,9 @@
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <ixsnake/IXSnakeServer.h>
#include <fstream>
#include <iostream>
#include <ixsnake/IXSnakeServer.h>
#include <sstream>
namespace