Compare commits

...

26 Commits

Author SHA1 Message Date
5e1a4541bf play with cpp_redis 2019-03-29 11:42:45 -07:00
2e9c610ac9 Bump sleep time in test shell script 2019-03-29 09:36:56 -07:00
eb063ec60a (redis_subscribe) in verbose mode, received message gets printed with a 'received: ' header 2019-03-29 09:35:19 -07:00
37fb14646d Add clarification notice about third party modules 2019-03-29 09:34:17 -07:00
ae543518d3 offline version of remark-latest 2019-03-28 16:06:43 -07:00
c865d64608 redis conf slides 2019-03-28 14:17:19 -07:00
3004422cb6 slides 2019-03-27 16:27:52 -07:00
0c46a17443 add redis-conf slides 2019-03-27 15:53:55 -07:00
497373d976 ws redis command improvements + test script 2019-03-27 13:41:46 -07:00
91198aca0d (ws) redis_subscribe and redis_publish can take a password + display subscribe response 2019-03-26 09:33:22 -07:00
b17a5e5f0b update doc 2019-03-24 21:48:14 -07:00
3f0ef59f65 remove Formula folder
Homebrew stuff is at https://github.com/bsergean/homebrew-IXWebSocket
2019-03-24 21:43:38 -07:00
1e96edc293 (server) fix masking bug 2019-03-22 15:33:04 -07:00
0afb77393b can send TEXT message (we only support BINARY messages now) 2019-03-22 14:24:22 -07:00
7614b642bb unmasked code is broken 2019-03-22 14:24:15 -07:00
bc89580dfe remove printf + unittest fix 2019-03-22 09:56:28 -07:00
358ae13a88 (server) server should not mask data when sending to client (some python client libraries enforce that and assert) 2019-03-22 09:53:56 -07:00
ccf9dcba70 (server) HTTP response is malformed 2019-03-22 09:52:19 -07:00
94604fad61 minor cleanup 2019-03-21 13:51:25 -07:00
5c4cc7c50d HTTP/1.1 response should contains a reason (websocket server)
Fix compatibility problem with websockets python library, where the response does not contains a reason

File "/.../lib/python3.7/site-packages/websockets/http.py", line 126, in read_response
version, status_code, reason = status_line[:-2].split(b' ', 2)
ValueError: not enough values to unpack (expected 3, got 2)

The above exception was the direct cause of the following exception:

websockets.exceptions.InvalidMessage: Malformed HTTP message
2019-03-21 13:43:47 -07:00
9ed961ec06 cleanup, remove dead method 2019-03-21 10:06:59 -07:00
e6bd8cc8c4 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 21:51:38 -07:00
ee25bd0f92 Feature/connection state (#25)
* (cmake) add a warning about 32/64 conversion problems.

* fix typo

* New connection state for server code + fix OpenSSL double init bug

* update README
2019-03-20 18:34:24 -07:00
e77b9176f3 Feature/redis (#23)
* Fix warning

* (cmake) add a warning about 32/64 conversion problems.

* simple redis clients

* can publish to redis

* redis subscribe

* display messages received per second

* verbose flag

* (cmake) use clang only compile option -Wshorten-64-to-32 when compiling with clang
2019-03-20 14:29:02 -07:00
afe8b966ad Fixed heartbeat typos (#22) 2019-03-19 21:31:43 -07:00
310724c961 make PollResultType an enum class 2019-03-19 09:29:57 -07:00
62 changed files with 2167 additions and 170 deletions

View File

@ -15,6 +15,10 @@ if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif()
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
@ -35,6 +39,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
)
set( IXWEBSOCKET_HEADERS
@ -62,6 +67,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
)
# Platform specific code
@ -129,3 +135,4 @@ set( IXWEBSOCKET_INCLUDE_DIRS
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)
add_subdirectory(third_party/cpp_redis)

1
DOCKER_VERSION Normal file
View File

@ -0,0 +1 @@
1.3.2

View File

@ -1,31 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

1
Dockerfile Symbolic link
View File

@ -0,0 +1 @@
Dockerfile.dev

31
Dockerfile.dev Normal file
View File

@ -0,0 +1,31 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

30
Dockerfile.prod Normal file
View File

@ -0,0 +1,30 @@
FROM debian:buster
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN adduser app
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make"]
# Now run in usermode
USER app
EXPOSE 8765
CMD ["bash"]

View File

@ -4,14 +4,15 @@
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex and bi-directionnal communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS
* iOS
* Linux
* Android
The code was made to compile once on Windows but support is currently broken on this platform.
## Examples
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage.
@ -46,9 +47,12 @@ webSocket.setOnMessageCallback(
// Now that our callback is setup, we can start our background thread and receive messages
webSocket.start();
// Send a message to the server
// Send a message to the server (default to BINARY mode)
webSocket.send("hello world");
// The message can be sent in TEXT mode
webSocket.sendText("hello again");
// ... finally ...
// Stop the connection
@ -63,10 +67,11 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -77,6 +82,12 @@ server.setOnConnectionCallback(
{
std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl;
@ -207,11 +218,11 @@ If the remote end (server) breaks the connection, the code will try to perpetual
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 1G were sent and received succesfully.
## Limitations
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* No utf-8 validation is made when sending TEXT message with sendText()
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue.

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

View File

@ -0,0 +1,2 @@
all:
(cd .. ; make docker && make docker_push)

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

BIN
doc/redis_conf_2019/neo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 168 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 673 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

21
docker-compose.yml Normal file
View File

@ -0,0 +1,21 @@
version: "3"
services:
ws:
stdin_open: true
tty: true
image: bsergean/ws:build
ports:
- "8765:8765"
entrypoint: bash
networks:
- ws-net
depends_on:
- redis1
redis1:
image: redis:alpine
networks:
- ws-net
networks:
ws-net:

View File

@ -0,0 +1,33 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
_id = std::to_string(_globalId++);
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@ -0,0 +1,33 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@ -73,7 +73,7 @@ namespace ix
errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
@ -121,7 +121,7 @@ namespace ix
}
// Were we cancelled ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
@ -129,7 +129,7 @@ namespace ix
}
// Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;

View File

@ -45,7 +45,7 @@ namespace ix
{
if (_sockfd == -1)
{
if (onPollCallback) onPollCallback(PollResultType_Error);
if (onPollCallback) onPollCallback(PollResultType::Error);
return;
}
@ -82,14 +82,14 @@ namespace ix
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead;
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType_Error;
pollResult = PollResultType::Error;
}
else if (ret == 0)
{
pollResult = PollResultType_Timeout;
pollResult = PollResultType::Timeout;
}
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
@ -97,20 +97,20 @@ namespace ix
if (value == kSendRequest)
{
pollResult = PollResultType_SendRequest;
pollResult = PollResultType::SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType_CloseRequest;
pollResult = PollResultType::CloseRequest;
}
}
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{
pollResult = PollResultType_ReadyForRead;
pollResult = PollResultType::ReadyForRead;
}
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{
pollResult = PollResultType_ReadyForWrite;
pollResult = PollResultType::ReadyForWrite;
}
return pollResult;
@ -210,7 +210,7 @@ namespace ix
{
while (true)
{
if (isCancellationRequested()) return false;
if (isCancellationRequested && isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
@ -222,7 +222,7 @@ namespace ix
{
return ret == len;
}
// There is possibly something to be write, try again
// There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
@ -241,7 +241,7 @@ namespace ix
{
while (true)
{
if (isCancellationRequested()) return false;
if (isCancellationRequested && isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
@ -257,7 +257,7 @@ namespace ix
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(1) == PollResultType_Error)
if (isReadyToRead(1) == PollResultType::Error)
{
return false;
}
@ -304,9 +304,12 @@ namespace ix
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested()) return std::make_pair(false, std::string());
if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
int size = std::min(kChunkSize, length - output.size());
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
@ -326,7 +329,7 @@ namespace ix
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(1) == PollResultType_Error)
if (isReadyToRead(1) == PollResultType::Error)
{
return std::make_pair(false, std::string());
}

View File

@ -25,14 +25,14 @@ namespace ix
{
class SelectInterrupt;
enum PollResultType
enum class PollResultType
{
PollResultType_ReadyForRead = 0,
PollResultType_ReadyForWrite = 1,
PollResultType_Timeout = 2,
PollResultType_Error = 3,
PollResultType_SendRequest = 4,
PollResultType_CloseRequest = 5
ReadyForRead = 0,
ReadyForWrite = 1,
Timeout = 2,
Error = 3,
SendRequest = 4,
CloseRequest = 5
};
class Socket {

View File

@ -66,7 +66,7 @@ namespace ix
for (;;)
{
if (isCancellationRequested()) // Must handle timeout as well
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{
closeSocket(fd);
errMsg = "Cancelled";

View File

@ -21,6 +21,7 @@
namespace ix
{
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr),

View File

@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
std::once_flag _openSSLInitFlag;
static std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful;
};

View File

@ -29,7 +29,8 @@ namespace ix
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_stop(false)
_stop(false),
_connectionStateFactory(&ConnectionState::createConnectionState)
{
}
@ -145,6 +146,12 @@ namespace ix
::close(_serverFd);
}
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run()
{
// Set the socket to non blocking mode, so that accept calls are not blocking
@ -214,6 +221,12 @@ namespace ix
continue;
}
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
@ -221,7 +234,8 @@ namespace ix
f = std::async(std::launch::async,
&SocketServer::handleConnection,
this,
clientFd);
clientFd,
connectionState);
}
}
}

View File

@ -6,6 +6,8 @@
#pragma once
#include "IXConnectionState.h"
#include <utility> // pair
#include <string>
#include <set>
@ -20,6 +22,8 @@ namespace ix
{
class SocketServer {
public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
@ -27,6 +31,8 @@ namespace ix
virtual ~SocketServer();
virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
@ -60,9 +66,13 @@ namespace ix
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods
void run();
virtual void handleConnection(int fd) = 0;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;
};
}

View File

@ -79,10 +79,10 @@ namespace ix
return _perMessageDeflateOptions;
}
void WebSocket::setHeartBeatPeriod(int hearBeatPeriod)
void WebSocket::setHeartBeatPeriod(int heartBeatPeriod)
{
std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = hearBeatPeriod;
_heartBeatPeriod = heartBeatPeriod;
}
int WebSocket::getHeartBeatPeriod() const
@ -302,7 +302,13 @@ namespace ix
WebSocketSendInfo WebSocket::send(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, false, onProgressCallback);
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Text, onProgressCallback);
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
@ -311,11 +317,11 @@ namespace ix
constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false);
return sendMessage(text, true);
return sendMessage(text, SendMessageKind::Ping);
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
bool ping,
SendMessageKind sendMessageKind,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
@ -332,13 +338,22 @@ namespace ix
std::lock_guard<std::mutex> lock(_writeMutex);
WebSocketSendInfo webSocketSendInfo;
if (ping)
switch (sendMessageKind)
{
webSocketSendInfo = _ws.sendPing(text);
}
else
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
case SendMessageKind::Text:
{
webSocketSendInfo = _ws.sendText(text, onProgressCallback);
} break;
case SendMessageKind::Binary:
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
} break;
case SendMessageKind::Ping:
{
webSocketSendInfo = _ws.sendPing(text);
} break;
}
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);

View File

@ -89,7 +89,7 @@ namespace ix
void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int hearBeatPeriod);
void setHeartBeatPeriod(int heartBeatPeriod);
// Run asynchronously, by calling start and stop.
void start();
@ -101,6 +101,8 @@ namespace ix
WebSocketSendInfo send(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close();
@ -120,7 +122,7 @@ namespace ix
private:
WebSocketSendInfo sendMessage(const std::string& text,
bool ping,
SendMessageKind sendMessageKind,
const OnProgressCallback& callback = nullptr);
bool isConnected() const;

View File

@ -114,7 +114,7 @@ namespace ix
std::stringstream ss;
ss << "HTTP/1.1 ";
ss << code;
ss << "\r\n";
ss << " ";
ss << reason;
ss << "\r\n";
@ -353,7 +353,7 @@ namespace ix
WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"].c_str(), output);
std::stringstream ss;
ss << "HTTP/1.1 101\r\n";
ss << "HTTP/1.1 101 Switching Protocols\r\n";
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";

View File

@ -49,10 +49,12 @@ namespace ix
_onConnectionCallback = callback;
}
void WebSocketServer::handleConnection(int fd)
void WebSocketServer::handleConnection(
int fd,
std::shared_ptr<ConnectionState> connectionState)
{
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket);
_onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection();

View File

@ -20,7 +20,8 @@
namespace ix
{
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>,
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer {
public:
@ -49,7 +50,8 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs;
// Methods
virtual void handleConnection(int fd) final;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
};
}

View File

@ -53,11 +53,12 @@
namespace ix
{
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_readyState(CLOSED),
_closeCode(0),
_closeWireSize(0),
@ -75,11 +76,11 @@ namespace ix
}
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod)
int heartBeatPeriod)
{
_perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = hearBeatPeriod;
_heartBeatPeriod = heartBeatPeriod;
}
// Client
@ -123,6 +124,9 @@ namespace ix
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{
// Server should not mask the data it sends to the client
_useMask = false;
std::string errorMsg;
_socket = createSocket(fd, errorMsg);
@ -189,7 +193,7 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
if (pollResult == PollResultType::Timeout &&
heartBeatPeriodExceeded())
{
std::stringstream ss;
@ -198,7 +202,7 @@ namespace ix
}
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType_SendRequest)
else if (pollResult == PollResultType::SendRequest)
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
@ -206,19 +210,19 @@ namespace ix
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType_Error)
if (result == PollResultType::Error)
{
_socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType_ReadyForWrite)
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
}
}
else if (pollResult == PollResultType_ReadyForRead)
else if (pollResult == PollResultType::ReadyForRead)
{
while (true)
{
@ -244,11 +248,11 @@ namespace ix
}
}
}
else if (pollResult == PollResultType_Error)
else if (pollResult == PollResultType::Error)
{
_socket->close();
}
else if (pollResult == PollResultType_CloseRequest)
else if (pollResult == PollResultType::CloseRequest)
{
_socket->close();
}
@ -280,19 +284,15 @@ namespace ix
_txbuf.insert(_txbuf.end(), header.begin(), header.end());
_txbuf.insert(_txbuf.end(), begin, end);
// Masking
for (size_t i = 0; i != (size_t) message_size; ++i)
if (_useMask)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
for (size_t i = 0; i != (size_t) message_size; ++i)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
}
}
}
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& buffer)
{
std::lock_guard<std::mutex> lock(_txbufMutex);
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
}
void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws)
{
if (ws.mask)
@ -656,7 +656,8 @@ namespace ix
std::vector<uint8_t> header;
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0);
(message_size >= 65536 ? 6 : 0) +
(_useMask ? 4 : 0), 0);
header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
@ -673,27 +674,33 @@ namespace ix
if (message_size < 126)
{
header[1] = (message_size & 0xff) | 0x80;
header[1] = (message_size & 0xff) | (_useMask ? 0x80 : 0);
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
if (_useMask)
{
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
}
}
else if (message_size < 65536)
{
header[1] = 126 | 0x80;
header[1] = 126 | (_useMask ? 0x80 : 0);
header[2] = (message_size >> 8) & 0xff;
header[3] = (message_size >> 0) & 0xff;
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
if (_useMask)
{
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
}
}
else
{ // TODO: run coverage testing here
header[1] = 127 | 0x80;
header[1] = 127 | (_useMask ? 0x80 : 0);
header[2] = (message_size >> 56) & 0xff;
header[3] = (message_size >> 48) & 0xff;
header[4] = (message_size >> 40) & 0xff;
@ -703,10 +710,13 @@ namespace ix
header[8] = (message_size >> 8) & 0xff;
header[9] = (message_size >> 0) & 0xff;
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
if (_useMask)
{
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
}
}
// _txbuf will keep growing until it can be transmitted over the socket:
@ -732,6 +742,15 @@ namespace ix
_enablePerMessageDeflate, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendText(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::TEXT_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket()
{
std::lock_guard<std::mutex> lock(_txbufMutex);

View File

@ -30,6 +30,13 @@ namespace ix
{
class Socket;
enum class SendMessageKind
{
Text,
Binary,
Ping
};
class WebSocketTransport
{
public:
@ -61,7 +68,7 @@ namespace ix
~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod);
int heartBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs);
@ -71,6 +78,8 @@ namespace ix
void poll();
WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message);
void close();
ReadyStateValues getReadyState() const;
@ -100,6 +109,10 @@ namespace ix
uint8_t masking_key[4];
};
// Tells whether we should mask the data we send.
// client should mask but server should not
bool _useMask;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf;
@ -148,7 +161,7 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period
// No data was send through the socket for longer than the heartbeat period
bool heartBeatPeriodExceeded();
void sendOnSocket();
@ -174,7 +187,6 @@ namespace ix
std::string::const_iterator end,
uint64_t message_size,
uint8_t masking_key[4]);
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);

View File

@ -9,8 +9,20 @@ brew:
mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker
NAME := bsergean/ws
TAG := $(shell cat DOCKER_VERSION)
IMG := ${NAME}:${TAG}
LATEST := ${NAME}:latest
BUILD := ${NAME}:build
docker:
docker build -t ws:latest .
docker build -t ${IMG} .
docker tag ${IMG} ${BUILD}
docker_push:
docker tag ${IMG} ${LATEST}
docker push ${LATEST}
run:
docker run --cap-add sys_ptrace -it ws:latest
@ -39,7 +51,7 @@ test:
python test/run.py
ws_test: all
(cd ws ; sh test_ws.sh)
(cd ws ; bash test_ws.sh)
# For the fork that is configured with appveyor
rebase_upstream:

View File

@ -29,6 +29,7 @@ set (SOURCES
IXDNSLookupTest.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
)
# Some unittest don't work on windows yet

View File

@ -0,0 +1,43 @@
/*
* IXSocketConnectTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include "catch.hpp"
#include "IXTest.h"
#include <ixwebsocket/IXSocketConnect.h>
#include <iostream>
using namespace ix;
TEST_CASE("socket_connect", "[net]")
{
SECTION("Test connecting to a known hostname")
{
std::string errMsg;
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd != -1);
}
SECTION("Test connecting to a non-existing hostname")
{
std::string errMsg;
std::string hostname("12313lhjlkjhopiupoijlkasdckljqwehrlkqjwehraospidcuaposidcasdc");
int fd = SocketConnect::connect(hostname, 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
SECTION("Test connecting to a good hostname, with cancellation")
{
std::string errMsg;
// The callback returning true means we are requesting cancellation
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return true; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
}

View File

@ -16,6 +16,7 @@
#include <iostream>
#include <stdlib.h>
#include <stack>
#include <iomanip>
namespace ix
{
@ -148,4 +149,21 @@ namespace ix
return -1;
}
void hexDump(const std::string& prefix,
const std::string& s)
{
std::ostringstream ss;
bool upper_case = false;
for (std::string::size_type i = 0; i < s.length(); ++i)
{
ss << std::hex
<< std::setfill('0')
<< std::setw(2)
<< (upper_case ? std::uppercase : std::nouppercase) << (int)s[i];
}
std::cout << prefix << ": " << s << " => " << ss.str() << std::endl;
}
}

View File

@ -65,7 +65,7 @@ namespace
_webSocket.setUrl(url);
// The important bit for this test.
// Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second
// Set a 1 second heartbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1);
@ -128,10 +128,11 @@ namespace
{
// A dev/null server
server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -141,6 +142,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
@ -210,6 +212,10 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
webSocketClientA.stop();
webSocketClientB.stop();
// Here we test heart beat period exceeded for clientA
// but it should not be exceeded for clientB which has sent data.
// -> expected ping messages == 2, but add a small buffer to make this more reliable.
REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4);

View File

@ -18,13 +18,32 @@ using namespace ix;
namespace ix
{
bool startServer(ix::WebSocketServer& server)
// Test that we can override the connectionState impl to provide our own
class ConnectionStateCustom : public ConnectionState
{
void computeId()
{
// a very boring invariant id that we can test against in the unittest
_id = "foobarConnectionId";
}
};
bool startServer(ix::WebSocketServer& server,
std::string& connectionId)
{
auto factory = []() -> std::shared_ptr<ConnectionState>
{
return std::make_shared<ConnectionStateCustom>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -34,12 +53,16 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
@ -78,7 +101,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@ -111,7 +135,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@ -147,7 +172,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@ -178,6 +204,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(connectionId == "foobarConnectionId");
server.stop();
REQUIRE(server.getClients().size() == 0);
}

View File

@ -217,10 +217,11 @@ namespace
bool startServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -230,6 +231,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)

View File

@ -27,7 +27,7 @@ class Command(object):
thread.join(timeout)
if thread.is_alive():
print 'Command timeout, kill it: ' + self.cmd
print('Command timeout, kill it: ' + self.cmd)
self.process.terminate()
thread.join()
return False, 255

1
third_party/README.md vendored Normal file
View File

@ -0,0 +1 @@
Except ZLIB on Windows (whose port is currently broken...) all dependencies here are for the ws command line tools, not for the IXWebSockets library which is standalone.

View File

@ -1,20 +0,0 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

View File

@ -1,2 +1,3 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.md' -exec sed -i '' 's/[[:space:]]*$//' {} \+

View File

@ -23,6 +23,8 @@ add_executable(ws
ixcrypto/IXHash.cpp
ixcrypto/IXUuid.cpp
IXRedisClient.cpp
ws_http_client.cpp
ws_ping_pong.cpp
ws_broadcast_server.cpp
@ -32,11 +34,13 @@ add_executable(ws
ws_transfer.cpp
ws_send.cpp
ws_receive.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws "-framework foundation" "-framework security")
endif()
target_link_libraries(ws ixwebsocket)
target_link_libraries(ws ixwebsocket cpp_redis tacopie)
install(TARGETS ws RUNTIME DESTINATION bin)

242
ws/IXRedisClient.cpp Normal file
View File

@ -0,0 +1,242 @@
/*
* IXRedisClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <cpp_redis/cpp_redis>
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
namespace ix
{
bool RedisClient::connect(const std::string& hostname, int port)
{
_sub.connect(hostname, port, []
(const std::string& host, std::size_t port, cpp_redis::connect_state status) {
if (status == cpp_redis::connect_state::dropped) {
std::cout << "client disconnected from " << host << ":" << port << std::endl;
}
});
// also subscribe the old way
bool tls = false;
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return false;
}
std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr);
}
bool RedisClient::auth(const std::string& password,
std::string& response)
{
// authentication if server-server requires it
// _sub.auth(password, [&response](const cpp_redis::reply& reply) {
// if (reply.is_error()) { std::cerr << "Authentication failed: " << reply.as_string() << std::endl; }
// else {
// std::cout << "successful authentication" << std::endl;
// }
// });
return true;
#if 0
response.clear();
if (!_socket) return false;
std::stringstream ss;
ss << "AUTH ";
ss << password;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
response = line;
return lineValid;
#endif
}
bool RedisClient::publish(const std::string& channel,
const std::string& message)
{
if (!_socket) return false;
std::stringstream ss;
ss << "PUBLISH ";
ss << channel;
ss << " ";
ss << message;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
return lineValid;
}
//
// FIXME: we assume that redis never return errors...
//
bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback)
{
_sub.subscribe(channel, [&callback](const std::string& chan, const std::string& msg) {
callback(msg);
});
_sub.commit();
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
return true;
#if 0
if (!_socket) return false;
std::stringstream ss;
ss << "SUBSCRIBE ";
ss << channel;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
// Wait 1s for the response
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// build the response as a single string
std::stringstream oss;
// Read the first line of the response
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
oss << line;
if (!lineValid) return false;
// There are 5 items for the subscribe repply
for (int i = 0; i < 5; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
oss << line;
if (!lineValid) return false;
}
responseCallback(oss.str());
// Wait indefinitely for new messages
while (true)
{
// Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
int arraySize;
{
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> arraySize;
}
// There are 6 items for each received message
for (int i = 0; i < arraySize; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// Messages are string, which start with a string size
// => $7 (7 bytes)
int stringSize;
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> stringSize;
auto readResult = _socket->readBytes(stringSize, nullptr, nullptr);
if (!readResult.first) return false;
if (i == 2)
{
// The message is the 3rd element.
callback(readResult.second);
}
// read last 2 bytes (\r\n)
char c;
_socket->readByte(&c, nullptr);
_socket->readByte(&c, nullptr);
}
}
return true;
#endif
}
}

44
ws/IXRedisClient.h Normal file
View File

@ -0,0 +1,44 @@
/*
* IXRedisClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
#include <functional>
#include <cpp_redis/cpp_redis>
namespace ix
{
class Socket;
class RedisClient {
public:
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
~RedisClient() = default;
bool connect(const std::string& hostname,
int port);
bool auth(const std::string& password,
std::string& response);
bool publish(const std::string& channel,
const std::string& message);
bool subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback);
private:
cpp_redis::subscriber _sub;
std::shared_ptr<Socket> _socket;
};
}

View File

@ -20,6 +20,8 @@ Subcommands:
broadcast_server Broadcasting server
ping Ping pong
curl HTTP Client
redis_publish Redis publisher
redis_subscribe Redis subscriber
```
## file transfer

25
ws/test_ws_redis.sh Normal file
View File

@ -0,0 +1,25 @@
#!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/pidfile.subscribe`
exit 1
}
REDIS_HOST=${REDIS_HOST:=localhost}
ws redis_subscribe --pidfile /tmp/pidfile.subscribe --host $REDIS_HOST foo &
# Wait for the subscriber to be ready
sleep 0.5
# Now publish messages
ws redis_publish -c 100000 --host ${REDIS_HOST} foo bar
# Wait a little for all messages to be received
sleep 1.5
# Cleanup
cleanup

View File

@ -35,16 +35,21 @@ int main(int argc, char** argv)
std::string output;
std::string hostname("127.0.0.1");
std::string pidfile;
std::string channel;
std::string message;
std::string password;
bool headersOnly = false;
bool followRedirects = false;
bool verbose = false;
bool save = false;
bool compress = false;
int port = 8080;
int redisPort = 6379;
int connectTimeOut = 60;
int transferTimeout = 1800;
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required();
@ -96,6 +101,22 @@ int main(int argc, char** argv)
httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout");
httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout");
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("--password", password, "Password");
redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required();
redisPublishApp->add_option("-c", count, "Count");
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("--password", password, "Password");
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI11_PARSE(app, argc, argv);
// pid file handling
@ -149,6 +170,15 @@ int main(int argc, char** argv)
followRedirects, maxRedirects, verbose,
save, output, compress);
}
else if (app.got_subcommand("redis_publish"))
{
return ix::ws_redis_publish_main(hostname, redisPort, password,
channel, message, count);
}
else if (app.got_subcommand("redis_subscribe"))
{
return ix::ws_redis_subscribe_main(hostname, redisPort, password, channel, verbose);
}
return 1;
}

13
ws/ws.h
View File

@ -39,4 +39,17 @@ namespace ix
int ws_send_main(const std::string& url,
const std::string& path);
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message,
int count);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose);
}

View File

@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)

View File

@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket)
[](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket](ix::WebSocketMessageType messageType,
[webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)

52
ws/ws_redis_publish.cpp Normal file
View File

@ -0,0 +1,52 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message,
int count)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
if (!password.empty())
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
for (int i = 0; i < count; i++)
{
//std::cerr << "Publishing message " << message
// << " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
}
return 0;
}
}

86
ws/ws_redis_subscribe.cpp Normal file
View File

@ -0,0 +1,86 @@
/*
* ws_redis_subscribe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <chrono>
#include <thread>
#include <atomic>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
if (!password.empty())
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
auto callback = [&msgPerSeconds, &msgCount, verbose]
(const std::string& message)
{
if (verbose)
{
std::cout << "received: " << message << std::endl;
}
msgPerSeconds++;
msgCount++;
};
auto responseCallback = [](const std::string& redisResponse)
{
std::cout << "Redis subscribe response: " << redisResponse << std::endl;
};
auto timer = [&msgPerSeconds, &msgCount]
{
while (true)
{
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, responseCallback, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

View File

@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)