Add simple Redis Server which is only capable of doing publish / subscribe. New ws redis_server sub-command to use it. The server is used in the unittest, so that we can run on CI in environment where redis isn not available like github actions env.

This commit is contained in:
Benjamin Sergeant
2019-09-23 21:04:01 -07:00
parent 8f8385f8f8
commit fbf80f4ab1
20 changed files with 497 additions and 27 deletions

View File

@ -8,6 +8,7 @@ set (IXSNAKE_SOURCES
ixsnake/IXSnakeProtocol.cpp
ixsnake/IXAppConfig.cpp
ixsnake/IXRedisClient.cpp
ixsnake/IXRedisServer.cpp
)
set (IXSNAKE_HEADERS
@ -15,6 +16,7 @@ set (IXSNAKE_HEADERS
ixsnake/IXSnakeProtocol.h
ixsnake/IXAppConfig.h
ixsnake/IXRedisClient.h
ixsnake/IXRedisServer.h
)
add_library(ixsnake STATIC

View File

@ -132,9 +132,9 @@ namespace ix
if (!_socket) return false;
std::stringstream ss;
ss << "SUBSCRIBE ";
ss << channel;
ss << "\r\n";
ss << "*2\r\n";
ss << writeString("SUBSCRIBE");
ss << writeString(channel);
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)

View File

@ -0,0 +1,300 @@
/*
* IXRedisServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXRedisServer.h"
#include <ixwebsocket/IXNetSystem.h>
#include <ixwebsocket/IXSocketConnect.h>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h>
#include <fstream>
#include <iostream>
#include <sstream>
#include <vector>
namespace ix
{
RedisServer::RedisServer(int port, const std::string& host, int backlog, size_t maxConnections)
: SocketServer(port, host, backlog, maxConnections)
, _connectedClientsCount(0)
, _stopHandlingConnections(false)
{
;
}
RedisServer::~RedisServer()
{
stop();
}
void RedisServer::stop()
{
stopAcceptingConnections();
_stopHandlingConnections = true;
while (_connectedClientsCount != 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
_stopHandlingConnections = false;
SocketServer::stop();
}
void RedisServer::handleConnection(int fd, std::shared_ptr<ConnectionState> connectionState)
{
_connectedClientsCount++;
std::string errorMsg;
auto socket = createSocket(fd, errorMsg);
// Set the socket to non blocking mode + other tweaks
SocketConnect::configure(fd);
while (!_stopHandlingConnections)
{
std::vector<std::string> tokens;
if (!parseRequest(socket, tokens))
{
if (_stopHandlingConnections)
{
logError("Cancellation requested");
}
else
{
logError("Error parsing request");
}
break;
}
bool success = false;
// publish
if (tokens[0] == "COMMAND")
{
success = handleCommand(socket, tokens);
}
else if (tokens[0] == "PUBLISH")
{
success = handlePublish(socket, tokens);
}
else if (tokens[0] == "SUBSCRIBE")
{
success = handleSubscribe(socket, tokens);
}
if (!success)
{
if (_stopHandlingConnections)
{
logError("Cancellation requested");
}
else
{
logError("Error processing request for command: " + tokens[0]);
}
break;
}
}
cleanupSubscribers(socket);
logInfo("Connection closed for connection id " + connectionState->getId());
connectionState->setTerminated();
Socket::closeSocket(fd);
_connectedClientsCount--;
}
void RedisServer::cleanupSubscribers(std::shared_ptr<Socket> socket)
{
std::lock_guard<std::mutex> lock(_mutex);
for (auto&& it : _subscribers)
{
it.second.erase(socket);
}
for (auto it : _subscribers)
{
std::stringstream ss;
ss << "Subscription id: " << it.first
<< " #subscribers: " << it.second.size();
logInfo(ss.str());
}
}
size_t RedisServer::getConnectedClientsCount()
{
return _connectedClientsCount;
}
bool RedisServer::startsWith(const std::string& str,
const std::string& start)
{
return str.compare(0, start.length(), start) == 0;
}
std::string RedisServer::writeString(const std::string& str)
{
std::stringstream ss;
ss << "$";
ss << str.size();
ss << "\r\n";
ss << str;
ss << "\r\n";
return ss.str();
}
bool RedisServer::parseRequest(
std::shared_ptr<Socket> socket,
std::vector<std::string>& tokens)
{
// Parse first line
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
auto lineResult = socket->readLine(cb);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
std::string str = line.substr(1);
std::stringstream ss;
ss << str;
int count;
ss >> count;
for (int i = 0; i < count; ++i)
{
auto lineResult = socket->readLine(cb);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
int stringSize;
std::stringstream ss;
ss << line.substr(1, line.size() - 1);
ss >> stringSize;
auto readResult = socket->readBytes(stringSize, nullptr, nullptr);
if (!readResult.first) return false;
// read last 2 bytes (\r\n)
char c;
socket->readByte(&c, nullptr);
socket->readByte(&c, nullptr);
tokens.push_back(readResult.second);
}
return true;
}
bool RedisServer::handleCommand(
std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 1) return false;
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
std::stringstream ss;
// return 2 nested arrays
ss << "*2\r\n";
//
// publish
//
ss << "*6\r\n";
ss << writeString("publish"); // 1
ss << ":3\r\n"; // 2
ss << "*0\r\n"; // 3
ss << ":1\r\n"; // 4
ss << ":2\r\n"; // 5
ss << ":1\r\n"; // 6
//
// subscribe
//
ss << "*6\r\n";
ss << writeString("subscribe"); // 1
ss << ":2\r\n"; // 2
ss << "*0\r\n"; // 3
ss << ":1\r\n"; // 4
ss << ":1\r\n"; // 5
ss << ":1\r\n"; // 6
socket->writeBytes(ss.str(), cb);
return true;
}
bool RedisServer::handleSubscribe(
std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 2) return false;
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
std::string channel = tokens[1];
// Respond
socket->writeBytes("*3\r\n", cb);
socket->writeBytes(writeString("subscribe"), cb);
socket->writeBytes(writeString(channel), cb);
socket->writeBytes(":1\r\n", cb);
std::lock_guard<std::mutex> lock(_mutex);
_subscribers[channel].insert(socket);
return true;
}
bool RedisServer::handlePublish(
std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens)
{
if (tokens.size() != 3) return false;
auto cb = makeCancellationRequestWithTimeout(30, _stopHandlingConnections);
std::string channel = tokens[1];
std::string data = tokens[2];
// now dispatch the message to subscribers (write custom method)
std::lock_guard<std::mutex> lock(_mutex);
auto it = _subscribers.find(channel);
if (it == _subscribers.end())
{
// return the number of clients that received the message, 0 in that case
socket->writeBytes(":0\r\n", cb);
return true;
}
auto subscribers = it->second;
for (auto jt : subscribers)
{
jt->writeBytes("*3\r\n", cb);
jt->writeBytes(writeString("message"), cb);
jt->writeBytes(writeString(channel), cb);
jt->writeBytes(writeString(data), cb);
}
// return the number of clients that received the message.
std::stringstream ss;
ss << ":"
<< std::to_string(subscribers.size())
<< "\r\n";
socket->writeBytes(ss.str(), cb);
return true;
}
} // namespace ix

View File

@ -0,0 +1,67 @@
/*
* IXRedisServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSocketServer.h"
#include "IXSocket.h"
#include <functional>
#include <memory>
#include <mutex>
#include <set>
#include <map>
#include <string>
#include <thread>
#include <utility> // pair
namespace ix
{
class RedisServer final : public SocketServer
{
public:
RedisServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = SocketServer::kDefaultMaxConnections);
virtual ~RedisServer();
virtual void stop() final;
private:
// Member variables
std::atomic<int> _connectedClientsCount;
// Subscribers
// We could store connection states in there, to add better debugging
// since a connection state has a readable ID
std::map<std::string, std::set<std::shared_ptr<Socket>>> _subscribers;
std::mutex _mutex;
std::atomic<bool> _stopHandlingConnections;
// Methods
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
bool startsWith(const std::string& str, const std::string& start);
std::string writeString(const std::string& str);
bool parseRequest(
std::shared_ptr<Socket> socket,
std::vector<std::string>& tokens);
bool handlePublish(std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens);
bool handleSubscribe(std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens);
bool handleCommand(std::shared_ptr<Socket> socket,
const std::vector<std::string>& tokens);
void cleanupSubscribers(std::shared_ptr<Socket> socket);
};
} // namespace ix