Compare commits

..

5 Commits

Author SHA1 Message Date
Benjamin Sergeant
c36dc0e16a fix unittest 2019-03-20 18:50:56 -07:00
Benjamin Sergeant
7e45659377 update README 2019-03-20 18:33:55 -07:00
Benjamin Sergeant
788c92487c New connection state for server code + fix OpenSSL double init bug 2019-03-20 18:26:29 -07:00
Benjamin Sergeant
0999073435 fix typo 2019-03-20 18:25:28 -07:00
Benjamin Sergeant
2cae6f4cf8 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 16:16:54 -07:00
45 changed files with 127 additions and 1629 deletions

View File

@@ -135,4 +135,3 @@ set( IXWEBSOCKET_INCLUDE_DIRS
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)
add_subdirectory(third_party/cpp_redis)

View File

@@ -1 +0,0 @@
1.3.2

View File

@@ -1 +0,0 @@
Dockerfile.dev

31
Dockerfile Normal file
View File

@@ -0,0 +1,31 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

View File

@@ -1,31 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

View File

@@ -1,30 +0,0 @@
FROM debian:buster
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN adduser app
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make"]
# Now run in usermode
USER app
EXPOSE 8765
CMD ["bash"]

View File

@@ -0,0 +1,20 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

View File

@@ -4,15 +4,14 @@
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex and bi-directionnal communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS
* iOS
* Linux
* Android
The code was made to compile once on Windows but support is currently broken on this platform.
## Examples
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage.
@@ -47,12 +46,9 @@ webSocket.setOnMessageCallback(
// Now that our callback is setup, we can start our background thread and receive messages
webSocket.start();
// Send a message to the server (default to BINARY mode)
// Send a message to the server
webSocket.send("hello world");
// The message can be sent in TEXT mode
webSocket.sendText("hello again");
// ... finally ...
// Stop the connection
@@ -218,11 +214,11 @@ If the remote end (server) breaks the connection, the code will try to perpetual
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 1G were sent and received succesfully.
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
## Limitations
* No utf-8 validation is made when sending TEXT message with sendText()
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 80 KiB

View File

@@ -1,2 +0,0 @@
all:
(cd .. ; make docker && make docker_push)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 113 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 168 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 673 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 MiB

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

File diff suppressed because it is too large Load Diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

View File

@@ -1,21 +0,0 @@
version: "3"
services:
ws:
stdin_open: true
tty: true
image: bsergean/ws:build
ports:
- "8765:8765"
entrypoint: bash
networks:
- ws-net
depends_on:
- redis1
redis1:
image: redis:alpine
networks:
- ws-net
networks:
ws-net:

View File

@@ -6,6 +6,8 @@
#include "IXConnectionState.h"
#include <sstream>
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
@@ -17,7 +19,9 @@ namespace ix
void ConnectionState::computeId()
{
_id = std::to_string(_globalId++);
std::stringstream ss;
ss << _globalId++;
_id = ss.str();
}
const std::string& ConnectionState::getId() const

View File

@@ -302,13 +302,7 @@ namespace ix
WebSocketSendInfo WebSocket::send(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Text, onProgressCallback);
return sendMessage(text, false, onProgressCallback);
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
@@ -317,11 +311,11 @@ namespace ix
constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false);
return sendMessage(text, SendMessageKind::Ping);
return sendMessage(text, true);
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
SendMessageKind sendMessageKind,
bool ping,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
@@ -338,22 +332,13 @@ namespace ix
std::lock_guard<std::mutex> lock(_writeMutex);
WebSocketSendInfo webSocketSendInfo;
switch (sendMessageKind)
if (ping)
{
case SendMessageKind::Text:
{
webSocketSendInfo = _ws.sendText(text, onProgressCallback);
} break;
case SendMessageKind::Binary:
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
} break;
case SendMessageKind::Ping:
{
webSocketSendInfo = _ws.sendPing(text);
} break;
webSocketSendInfo = _ws.sendPing(text);
}
else
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
}
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);

View File

@@ -101,8 +101,6 @@ namespace ix
WebSocketSendInfo send(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close();
@@ -122,7 +120,7 @@ namespace ix
private:
WebSocketSendInfo sendMessage(const std::string& text,
SendMessageKind sendMessageKind,
bool ping,
const OnProgressCallback& callback = nullptr);
bool isConnected() const;

View File

@@ -114,7 +114,7 @@ namespace ix
std::stringstream ss;
ss << "HTTP/1.1 ";
ss << code;
ss << " ";
ss << "\r\n";
ss << reason;
ss << "\r\n";
@@ -353,7 +353,7 @@ namespace ix
WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"].c_str(), output);
std::stringstream ss;
ss << "HTTP/1.1 101 Switching Protocols\r\n";
ss << "HTTP/1.1 101\r\n";
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";

View File

@@ -58,7 +58,6 @@ namespace ix
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_readyState(CLOSED),
_closeCode(0),
_closeWireSize(0),
@@ -124,9 +123,6 @@ namespace ix
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{
// Server should not mask the data it sends to the client
_useMask = false;
std::string errorMsg;
_socket = createSocket(fd, errorMsg);
@@ -284,15 +280,19 @@ namespace ix
_txbuf.insert(_txbuf.end(), header.begin(), header.end());
_txbuf.insert(_txbuf.end(), begin, end);
if (_useMask)
// Masking
for (size_t i = 0; i != (size_t) message_size; ++i)
{
for (size_t i = 0; i != (size_t) message_size; ++i)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
}
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
}
}
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& buffer)
{
std::lock_guard<std::mutex> lock(_txbufMutex);
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
}
void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws)
{
if (ws.mask)
@@ -656,8 +656,7 @@ namespace ix
std::vector<uint8_t> header;
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) +
(_useMask ? 4 : 0), 0);
(message_size >= 65536 ? 6 : 0) + 4, 0);
header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
@@ -674,33 +673,27 @@ namespace ix
if (message_size < 126)
{
header[1] = (message_size & 0xff) | (_useMask ? 0x80 : 0);
header[1] = (message_size & 0xff) | 0x80;
if (_useMask)
{
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
}
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
}
else if (message_size < 65536)
{
header[1] = 126 | (_useMask ? 0x80 : 0);
header[1] = 126 | 0x80;
header[2] = (message_size >> 8) & 0xff;
header[3] = (message_size >> 0) & 0xff;
if (_useMask)
{
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
}
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
}
else
{ // TODO: run coverage testing here
header[1] = 127 | (_useMask ? 0x80 : 0);
header[1] = 127 | 0x80;
header[2] = (message_size >> 56) & 0xff;
header[3] = (message_size >> 48) & 0xff;
header[4] = (message_size >> 40) & 0xff;
@@ -710,13 +703,10 @@ namespace ix
header[8] = (message_size >> 8) & 0xff;
header[9] = (message_size >> 0) & 0xff;
if (_useMask)
{
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
}
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
}
// _txbuf will keep growing until it can be transmitted over the socket:
@@ -742,15 +732,6 @@ namespace ix
_enablePerMessageDeflate, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendText(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::TEXT_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket()
{
std::lock_guard<std::mutex> lock(_txbufMutex);

View File

@@ -30,13 +30,6 @@ namespace ix
{
class Socket;
enum class SendMessageKind
{
Text,
Binary,
Ping
};
class WebSocketTransport
{
public:
@@ -78,8 +71,6 @@ namespace ix
void poll();
WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message);
void close();
ReadyStateValues getReadyState() const;
@@ -109,10 +100,6 @@ namespace ix
uint8_t masking_key[4];
};
// Tells whether we should mask the data we send.
// client should mask but server should not
bool _useMask;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf;
@@ -187,6 +174,7 @@ namespace ix
std::string::const_iterator end,
uint64_t message_size,
uint8_t masking_key[4]);
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);

View File

@@ -9,20 +9,8 @@ brew:
mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker
NAME := bsergean/ws
TAG := $(shell cat DOCKER_VERSION)
IMG := ${NAME}:${TAG}
LATEST := ${NAME}:latest
BUILD := ${NAME}:build
docker:
docker build -t ${IMG} .
docker tag ${IMG} ${BUILD}
docker_push:
docker tag ${IMG} ${LATEST}
docker push ${LATEST}
docker build -t ws:latest .
run:
docker run --cap-add sys_ptrace -it ws:latest

View File

@@ -29,7 +29,6 @@ set (SOURCES
IXDNSLookupTest.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
)
# Some unittest don't work on windows yet

View File

@@ -1,43 +0,0 @@
/*
* IXSocketConnectTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include "catch.hpp"
#include "IXTest.h"
#include <ixwebsocket/IXSocketConnect.h>
#include <iostream>
using namespace ix;
TEST_CASE("socket_connect", "[net]")
{
SECTION("Test connecting to a known hostname")
{
std::string errMsg;
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd != -1);
}
SECTION("Test connecting to a non-existing hostname")
{
std::string errMsg;
std::string hostname("12313lhjlkjhopiupoijlkasdckljqwehrlkqjwehraospidcuaposidcasdc");
int fd = SocketConnect::connect(hostname, 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
SECTION("Test connecting to a good hostname, with cancellation")
{
std::string errMsg;
// The callback returning true means we are requesting cancellation
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return true; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
}

View File

@@ -16,7 +16,6 @@
#include <iostream>
#include <stdlib.h>
#include <stack>
#include <iomanip>
namespace ix
{
@@ -149,21 +148,4 @@ namespace ix
return -1;
}
void hexDump(const std::string& prefix,
const std::string& s)
{
std::ostringstream ss;
bool upper_case = false;
for (std::string::size_type i = 0; i < s.length(); ++i)
{
ss << std::hex
<< std::setfill('0')
<< std::setw(2)
<< (upper_case ? std::uppercase : std::nouppercase) << (int)s[i];
}
std::cout << prefix << ": " << s << " => " << ss.str() << std::endl;
}
}

View File

@@ -212,10 +212,6 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
webSocketClientA.stop();
webSocketClientB.stop();
// Here we test heart beat period exceeded for clientA
// but it should not be exceeded for clientB which has sent data.
// -> expected ping messages == 2, but add a small buffer to make this more reliable.
REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4);

View File

@@ -39,7 +39,7 @@ namespace ix
server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, connectionState,
@@ -52,8 +52,9 @@ namespace ix
{
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";

View File

@@ -1 +0,0 @@
Except ZLIB on Windows (whose port is currently broken...) all dependencies here are for the ws command line tools, not for the IXWebSockets library which is standalone.

View File

@@ -42,5 +42,5 @@ if (APPLE AND USE_TLS)
target_link_libraries(ws "-framework foundation" "-framework security")
endif()
target_link_libraries(ws ixwebsocket cpp_redis tacopie)
target_link_libraries(ws ixwebsocket)
install(TARGETS ws RUNTIME DESTINATION bin)

View File

@@ -7,7 +7,6 @@
#include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <cpp_redis/cpp_redis>
#include <sstream>
#include <iomanip>
@@ -18,14 +17,6 @@ namespace ix
{
bool RedisClient::connect(const std::string& hostname, int port)
{
_sub.connect(hostname, port, []
(const std::string& host, std::size_t port, cpp_redis::connect_state status) {
if (status == cpp_redis::connect_state::dropped) {
std::cout << "client disconnected from " << host << ":" << port << std::endl;
}
});
// also subscribe the old way
bool tls = false;
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
@@ -37,53 +28,8 @@ namespace ix
std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr);
}
bool RedisClient::auth(const std::string& password,
std::string& response)
{
// authentication if server-server requires it
// _sub.auth(password, [&response](const cpp_redis::reply& reply) {
// if (reply.is_error()) { std::cerr << "Authentication failed: " << reply.as_string() << std::endl; }
// else {
// std::cout << "successful authentication" << std::endl;
// }
// });
return true;
#if 0
response.clear();
if (!_socket) return false;
std::stringstream ss;
ss << "AUTH ";
ss << password;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
response = line;
return lineValid;
#endif
}
bool RedisClient::publish(const std::string& channel,
const std::string& message)
{
@@ -119,22 +65,8 @@ namespace ix
// FIXME: we assume that redis never return errors...
//
bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback)
{
_sub.subscribe(channel, [&callback](const std::string& chan, const std::string& msg) {
callback(msg);
});
_sub.commit();
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
return true;
#if 0
if (!_socket) return false;
std::stringstream ss;
@@ -155,14 +87,10 @@ namespace ix
return false;
}
// build the response as a single string
std::stringstream oss;
// Read the first line of the response
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
oss << line;
if (!lineValid) return false;
@@ -172,13 +100,10 @@ namespace ix
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
oss << line;
if (!lineValid) return false;
}
responseCallback(oss.str());
// Wait indefinitely for new messages
while (true)
{
@@ -237,6 +162,5 @@ namespace ix
}
return true;
#endif
}
}

View File

@@ -8,7 +8,6 @@
#include <memory>
#include <functional>
#include <cpp_redis/cpp_redis>
namespace ix
{
@@ -16,7 +15,6 @@ namespace ix
class RedisClient {
public:
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
@@ -25,19 +23,13 @@ namespace ix
bool connect(const std::string& hostname,
int port);
bool auth(const std::string& password,
std::string& response);
bool publish(const std::string& channel,
const std::string& message);
bool subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback);
private:
cpp_redis::subscriber _sub;
std::shared_ptr<Socket> _socket;
};
}

View File

@@ -20,8 +20,6 @@ Subcommands:
broadcast_server Broadcasting server
ping Ping pong
curl HTTP Client
redis_publish Redis publisher
redis_subscribe Redis subscriber
```
## file transfer

View File

@@ -1,25 +0,0 @@
#!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/pidfile.subscribe`
exit 1
}
REDIS_HOST=${REDIS_HOST:=localhost}
ws redis_subscribe --pidfile /tmp/pidfile.subscribe --host $REDIS_HOST foo &
# Wait for the subscriber to be ready
sleep 0.5
# Now publish messages
ws redis_publish -c 100000 --host ${REDIS_HOST} foo bar
# Wait a little for all messages to be received
sleep 1.5
# Cleanup
cleanup

View File

@@ -37,7 +37,6 @@ int main(int argc, char** argv)
std::string pidfile;
std::string channel;
std::string message;
std::string password;
bool headersOnly = false;
bool followRedirects = false;
bool verbose = false;
@@ -49,7 +48,6 @@ int main(int argc, char** argv)
int transferTimeout = 1800;
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required();
@@ -104,18 +102,14 @@ int main(int argc, char** argv)
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("--password", password, "Password");
redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required();
redisPublishApp->add_option("-c", count, "Count");
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("--password", password, "Password");
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI11_PARSE(app, argc, argv);
@@ -172,12 +166,11 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("redis_publish"))
{
return ix::ws_redis_publish_main(hostname, redisPort, password,
channel, message, count);
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
}
else if (app.got_subcommand("redis_subscribe"))
{
return ix::ws_redis_subscribe_main(hostname, redisPort, password, channel, verbose);
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
}
return 1;

View File

@@ -42,14 +42,11 @@ namespace ix
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message,
int count);
const std::string& message);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose);
}

View File

@@ -12,10 +12,8 @@ namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message,
int count)
const std::string& message)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
@@ -24,27 +22,12 @@ namespace ix
return 1;
}
if (!password.empty())
std::cerr << "Publishing message " << message
<< " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
for (int i = 0; i < count; i++)
{
//std::cerr << "Publishing message " << message
// << " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
return 0;

View File

@@ -7,15 +7,12 @@
#include <iostream>
#include <sstream>
#include <chrono>
#include <thread>
#include <atomic>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose)
{
@@ -26,56 +23,38 @@ namespace ix
return 1;
}
if (!password.empty())
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
int msgPerSeconds = 0;
int msgCount = 0;
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
auto callback = [&msgPerSeconds, &msgCount, verbose]
auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
(const std::string& message)
{
if (verbose)
{
std::cout << "received: " << message << std::endl;
std::cout << message << std::endl;
}
msgPerSeconds++;
msgCount++;
};
auto responseCallback = [](const std::string& redisResponse)
{
std::cout << "Redis subscribe response: " << redisResponse << std::endl;
};
auto timer = [&msgPerSeconds, &msgCount]
{
while (true)
auto now = std::chrono::steady_clock::now();
if (now - lastTimePoint > std::chrono::seconds(1))
{
lastTimePoint = std::chrono::steady_clock::now();
msgCount += msgPerSeconds;
// #messages 901 msg/s 150
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, responseCallback, callback))
if (!redisClient.subscribe(channel, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1;
@@ -84,3 +63,4 @@ namespace ix
return 0;
}
}