Compare commits

...

23 Commits

Author SHA1 Message Date
5e1a4541bf play with cpp_redis 2019-03-29 11:42:45 -07:00
2e9c610ac9 Bump sleep time in test shell script 2019-03-29 09:36:56 -07:00
eb063ec60a (redis_subscribe) in verbose mode, received message gets printed with a 'received: ' header 2019-03-29 09:35:19 -07:00
37fb14646d Add clarification notice about third party modules 2019-03-29 09:34:17 -07:00
ae543518d3 offline version of remark-latest 2019-03-28 16:06:43 -07:00
c865d64608 redis conf slides 2019-03-28 14:17:19 -07:00
3004422cb6 slides 2019-03-27 16:27:52 -07:00
0c46a17443 add redis-conf slides 2019-03-27 15:53:55 -07:00
497373d976 ws redis command improvements + test script 2019-03-27 13:41:46 -07:00
91198aca0d (ws) redis_subscribe and redis_publish can take a password + display subscribe response 2019-03-26 09:33:22 -07:00
b17a5e5f0b update doc 2019-03-24 21:48:14 -07:00
3f0ef59f65 remove Formula folder
Homebrew stuff is at https://github.com/bsergean/homebrew-IXWebSocket
2019-03-24 21:43:38 -07:00
1e96edc293 (server) fix masking bug 2019-03-22 15:33:04 -07:00
0afb77393b can send TEXT message (we only support BINARY messages now) 2019-03-22 14:24:22 -07:00
7614b642bb unmasked code is broken 2019-03-22 14:24:15 -07:00
bc89580dfe remove printf + unittest fix 2019-03-22 09:56:28 -07:00
358ae13a88 (server) server should not mask data when sending to client (some python client libraries enforce that and assert) 2019-03-22 09:53:56 -07:00
ccf9dcba70 (server) HTTP response is malformed 2019-03-22 09:52:19 -07:00
94604fad61 minor cleanup 2019-03-21 13:51:25 -07:00
5c4cc7c50d HTTP/1.1 response should contains a reason (websocket server)
Fix compatibility problem with websockets python library, where the response does not contains a reason

File "/.../lib/python3.7/site-packages/websockets/http.py", line 126, in read_response
version, status_code, reason = status_line[:-2].split(b' ', 2)
ValueError: not enough values to unpack (expected 3, got 2)

The above exception was the direct cause of the following exception:

websockets.exceptions.InvalidMessage: Malformed HTTP message
2019-03-21 13:43:47 -07:00
9ed961ec06 cleanup, remove dead method 2019-03-21 10:06:59 -07:00
e6bd8cc8c4 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 21:51:38 -07:00
ee25bd0f92 Feature/connection state (#25)
* (cmake) add a warning about 32/64 conversion problems.

* fix typo

* New connection state for server code + fix OpenSSL double init bug

* update README
2019-03-20 18:34:24 -07:00
58 changed files with 1802 additions and 152 deletions

View File

@ -39,6 +39,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
)
set( IXWEBSOCKET_HEADERS
@ -66,6 +67,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
)
# Platform specific code
@ -133,3 +135,4 @@ set( IXWEBSOCKET_INCLUDE_DIRS
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)
add_subdirectory(third_party/cpp_redis)

1
DOCKER_VERSION Normal file
View File

@ -0,0 +1 @@
1.3.2

View File

@ -1,31 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

1
Dockerfile Symbolic link
View File

@ -0,0 +1 @@
Dockerfile.dev

31
Dockerfile.dev Normal file
View File

@ -0,0 +1,31 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

30
Dockerfile.prod Normal file
View File

@ -0,0 +1,30 @@
FROM debian:buster
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN adduser app
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make"]
# Now run in usermode
USER app
EXPOSE 8765
CMD ["bash"]

View File

@ -1,20 +0,0 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

View File

@ -4,13 +4,14 @@
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex and bi-directionnal communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS
* iOS
* Linux
* Android
* Android
The code was made to compile once on Windows but support is currently broken on this platform.
## Examples
@ -46,9 +47,12 @@ webSocket.setOnMessageCallback(
// Now that our callback is setup, we can start our background thread and receive messages
webSocket.start();
// Send a message to the server
// Send a message to the server (default to BINARY mode)
webSocket.send("hello world");
// The message can be sent in TEXT mode
webSocket.sendText("hello again");
// ... finally ...
// Stop the connection
@ -63,10 +67,11 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -77,6 +82,12 @@ server.setOnConnectionCallback(
{
std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl;
@ -207,11 +218,11 @@ If the remote end (server) breaks the connection, the code will try to perpetual
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 1G were sent and received succesfully.
## Limitations
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* No utf-8 validation is made when sending TEXT message with sendText()
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue.
@ -223,13 +234,13 @@ Here is a simplistic diagram which explains how the code is structured in term o
+-----------------------+ --- Public
| | Start the receiving Background thread. Auto reconnection. Simple websocket Ping.
| IXWebSocket | Interface used by C++ test clients. No IX dependencies.
| |
| |
+-----------------------+
| |
| IXWebSocketServer | Run a server and give each connections its own WebSocket object.
| | Each connection is handled in a new OS thread.
| |
+-----------------------+ --- Private
+-----------------------+ --- Private
| |
| IXWebSocketTransport | Low level websocket code, framing, managing raw socket. Adapted from easywsclient.
| |

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

View File

@ -0,0 +1,2 @@
all:
(cd .. ; make docker && make docker_push)

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

BIN
doc/redis_conf_2019/neo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 168 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 673 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

21
docker-compose.yml Normal file
View File

@ -0,0 +1,21 @@
version: "3"
services:
ws:
stdin_open: true
tty: true
image: bsergean/ws:build
ports:
- "8765:8765"
entrypoint: bash
networks:
- ws-net
depends_on:
- redis1
redis1:
image: redis:alpine
networks:
- ws-net
networks:
ws-net:

View File

@ -0,0 +1,33 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
_id = std::to_string(_globalId++);
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@ -0,0 +1,33 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@ -21,6 +21,7 @@
namespace ix
{
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr),

View File

@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
std::once_flag _openSSLInitFlag;
static std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful;
};

View File

@ -29,7 +29,8 @@ namespace ix
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_stop(false)
_stop(false),
_connectionStateFactory(&ConnectionState::createConnectionState)
{
}
@ -145,6 +146,12 @@ namespace ix
::close(_serverFd);
}
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run()
{
// Set the socket to non blocking mode, so that accept calls are not blocking
@ -214,6 +221,12 @@ namespace ix
continue;
}
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
@ -221,7 +234,8 @@ namespace ix
f = std::async(std::launch::async,
&SocketServer::handleConnection,
this,
clientFd);
clientFd,
connectionState);
}
}
}

View File

@ -6,6 +6,8 @@
#pragma once
#include "IXConnectionState.h"
#include <utility> // pair
#include <string>
#include <set>
@ -20,6 +22,8 @@ namespace ix
{
class SocketServer {
public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
@ -27,6 +31,8 @@ namespace ix
virtual ~SocketServer();
virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
@ -60,9 +66,13 @@ namespace ix
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods
void run();
virtual void handleConnection(int fd) = 0;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;
};
}

View File

@ -302,7 +302,13 @@ namespace ix
WebSocketSendInfo WebSocket::send(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, false, onProgressCallback);
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Text, onProgressCallback);
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
@ -311,11 +317,11 @@ namespace ix
constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false);
return sendMessage(text, true);
return sendMessage(text, SendMessageKind::Ping);
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
bool ping,
SendMessageKind sendMessageKind,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
@ -332,13 +338,22 @@ namespace ix
std::lock_guard<std::mutex> lock(_writeMutex);
WebSocketSendInfo webSocketSendInfo;
if (ping)
switch (sendMessageKind)
{
webSocketSendInfo = _ws.sendPing(text);
}
else
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
case SendMessageKind::Text:
{
webSocketSendInfo = _ws.sendText(text, onProgressCallback);
} break;
case SendMessageKind::Binary:
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
} break;
case SendMessageKind::Ping:
{
webSocketSendInfo = _ws.sendPing(text);
} break;
}
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);

View File

@ -101,6 +101,8 @@ namespace ix
WebSocketSendInfo send(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close();
@ -120,7 +122,7 @@ namespace ix
private:
WebSocketSendInfo sendMessage(const std::string& text,
bool ping,
SendMessageKind sendMessageKind,
const OnProgressCallback& callback = nullptr);
bool isConnected() const;

View File

@ -114,7 +114,7 @@ namespace ix
std::stringstream ss;
ss << "HTTP/1.1 ";
ss << code;
ss << "\r\n";
ss << " ";
ss << reason;
ss << "\r\n";
@ -353,7 +353,7 @@ namespace ix
WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"].c_str(), output);
std::stringstream ss;
ss << "HTTP/1.1 101\r\n";
ss << "HTTP/1.1 101 Switching Protocols\r\n";
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";

View File

@ -49,10 +49,12 @@ namespace ix
_onConnectionCallback = callback;
}
void WebSocketServer::handleConnection(int fd)
void WebSocketServer::handleConnection(
int fd,
std::shared_ptr<ConnectionState> connectionState)
{
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket);
_onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection();

View File

@ -20,7 +20,8 @@
namespace ix
{
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>,
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer {
public:
@ -49,7 +50,8 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs;
// Methods
virtual void handleConnection(int fd) final;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
};
}

View File

@ -58,6 +58,7 @@ namespace ix
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_readyState(CLOSED),
_closeCode(0),
_closeWireSize(0),
@ -123,6 +124,9 @@ namespace ix
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{
// Server should not mask the data it sends to the client
_useMask = false;
std::string errorMsg;
_socket = createSocket(fd, errorMsg);
@ -280,19 +284,15 @@ namespace ix
_txbuf.insert(_txbuf.end(), header.begin(), header.end());
_txbuf.insert(_txbuf.end(), begin, end);
// Masking
for (size_t i = 0; i != (size_t) message_size; ++i)
if (_useMask)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
for (size_t i = 0; i != (size_t) message_size; ++i)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
}
}
}
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& buffer)
{
std::lock_guard<std::mutex> lock(_txbufMutex);
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
}
void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws)
{
if (ws.mask)
@ -656,7 +656,8 @@ namespace ix
std::vector<uint8_t> header;
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0);
(message_size >= 65536 ? 6 : 0) +
(_useMask ? 4 : 0), 0);
header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
@ -673,27 +674,33 @@ namespace ix
if (message_size < 126)
{
header[1] = (message_size & 0xff) | 0x80;
header[1] = (message_size & 0xff) | (_useMask ? 0x80 : 0);
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
if (_useMask)
{
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
}
}
else if (message_size < 65536)
{
header[1] = 126 | 0x80;
header[1] = 126 | (_useMask ? 0x80 : 0);
header[2] = (message_size >> 8) & 0xff;
header[3] = (message_size >> 0) & 0xff;
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
if (_useMask)
{
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
}
}
else
{ // TODO: run coverage testing here
header[1] = 127 | 0x80;
header[1] = 127 | (_useMask ? 0x80 : 0);
header[2] = (message_size >> 56) & 0xff;
header[3] = (message_size >> 48) & 0xff;
header[4] = (message_size >> 40) & 0xff;
@ -703,10 +710,13 @@ namespace ix
header[8] = (message_size >> 8) & 0xff;
header[9] = (message_size >> 0) & 0xff;
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
if (_useMask)
{
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
}
}
// _txbuf will keep growing until it can be transmitted over the socket:
@ -732,6 +742,15 @@ namespace ix
_enablePerMessageDeflate, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendText(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::TEXT_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket()
{
std::lock_guard<std::mutex> lock(_txbufMutex);

View File

@ -30,6 +30,13 @@ namespace ix
{
class Socket;
enum class SendMessageKind
{
Text,
Binary,
Ping
};
class WebSocketTransport
{
public:
@ -71,6 +78,8 @@ namespace ix
void poll();
WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message);
void close();
ReadyStateValues getReadyState() const;
@ -100,6 +109,10 @@ namespace ix
uint8_t masking_key[4];
};
// Tells whether we should mask the data we send.
// client should mask but server should not
bool _useMask;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf;
@ -148,7 +161,7 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the heartbeat period
// No data was send through the socket for longer than the heartbeat period
bool heartBeatPeriodExceeded();
void sendOnSocket();
@ -174,7 +187,6 @@ namespace ix
std::string::const_iterator end,
uint64_t message_size,
uint8_t masking_key[4]);
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);

View File

@ -9,8 +9,20 @@ brew:
mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker
NAME := bsergean/ws
TAG := $(shell cat DOCKER_VERSION)
IMG := ${NAME}:${TAG}
LATEST := ${NAME}:latest
BUILD := ${NAME}:build
docker:
docker build -t ws:latest .
docker build -t ${IMG} .
docker tag ${IMG} ${BUILD}
docker_push:
docker tag ${IMG} ${LATEST}
docker push ${LATEST}
run:
docker run --cap-add sys_ptrace -it ws:latest

View File

@ -29,6 +29,7 @@ set (SOURCES
IXDNSLookupTest.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
)
# Some unittest don't work on windows yet

View File

@ -0,0 +1,43 @@
/*
* IXSocketConnectTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include "catch.hpp"
#include "IXTest.h"
#include <ixwebsocket/IXSocketConnect.h>
#include <iostream>
using namespace ix;
TEST_CASE("socket_connect", "[net]")
{
SECTION("Test connecting to a known hostname")
{
std::string errMsg;
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd != -1);
}
SECTION("Test connecting to a non-existing hostname")
{
std::string errMsg;
std::string hostname("12313lhjlkjhopiupoijlkasdckljqwehrlkqjwehraospidcuaposidcasdc");
int fd = SocketConnect::connect(hostname, 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
SECTION("Test connecting to a good hostname, with cancellation")
{
std::string errMsg;
// The callback returning true means we are requesting cancellation
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return true; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
}

View File

@ -16,6 +16,7 @@
#include <iostream>
#include <stdlib.h>
#include <stack>
#include <iomanip>
namespace ix
{
@ -148,4 +149,21 @@ namespace ix
return -1;
}
void hexDump(const std::string& prefix,
const std::string& s)
{
std::ostringstream ss;
bool upper_case = false;
for (std::string::size_type i = 0; i < s.length(); ++i)
{
ss << std::hex
<< std::setfill('0')
<< std::setw(2)
<< (upper_case ? std::uppercase : std::nouppercase) << (int)s[i];
}
std::cout << prefix << ": " << s << " => " << ss.str() << std::endl;
}
}

View File

@ -128,10 +128,11 @@ namespace
{
// A dev/null server
server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -141,6 +142,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
@ -210,6 +212,10 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
webSocketClientA.stop();
webSocketClientB.stop();
// Here we test heart beat period exceeded for clientA
// but it should not be exceeded for clientB which has sent data.
// -> expected ping messages == 2, but add a small buffer to make this more reliable.
REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4);

View File

@ -18,13 +18,32 @@ using namespace ix;
namespace ix
{
bool startServer(ix::WebSocketServer& server)
// Test that we can override the connectionState impl to provide our own
class ConnectionStateCustom : public ConnectionState
{
void computeId()
{
// a very boring invariant id that we can test against in the unittest
_id = "foobarConnectionId";
}
};
bool startServer(ix::WebSocketServer& server,
std::string& connectionId)
{
auto factory = []() -> std::shared_ptr<ConnectionState>
{
return std::make_shared<ConnectionStateCustom>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -34,12 +53,16 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
@ -78,7 +101,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@ -111,7 +135,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@ -147,7 +172,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
std::string errMsg;
bool tls = false;
@ -178,6 +204,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(connectionId == "foobarConnectionId");
server.stop();
REQUIRE(server.getClients().size() == 0);
}

View File

@ -217,10 +217,11 @@ namespace
bool startServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -230,6 +231,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)

View File

@ -27,7 +27,7 @@ class Command(object):
thread.join(timeout)
if thread.is_alive():
print 'Command timeout, kill it: ' + self.cmd
print('Command timeout, kill it: ' + self.cmd)
self.process.terminate()
thread.join()
return False, 255

1
third_party/README.md vendored Normal file
View File

@ -0,0 +1 @@
Except ZLIB on Windows (whose port is currently broken...) all dependencies here are for the ws command line tools, not for the IXWebSockets library which is standalone.

View File

@ -1,2 +1,3 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.md' -exec sed -i '' 's/[[:space:]]*$//' {} \+

View File

@ -42,5 +42,5 @@ if (APPLE AND USE_TLS)
target_link_libraries(ws "-framework foundation" "-framework security")
endif()
target_link_libraries(ws ixwebsocket)
target_link_libraries(ws ixwebsocket cpp_redis tacopie)
install(TARGETS ws RUNTIME DESTINATION bin)

View File

@ -7,6 +7,7 @@
#include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <cpp_redis/cpp_redis>
#include <sstream>
#include <iomanip>
@ -17,6 +18,14 @@ namespace ix
{
bool RedisClient::connect(const std::string& hostname, int port)
{
_sub.connect(hostname, port, []
(const std::string& host, std::size_t port, cpp_redis::connect_state status) {
if (status == cpp_redis::connect_state::dropped) {
std::cout << "client disconnected from " << host << ":" << port << std::endl;
}
});
// also subscribe the old way
bool tls = false;
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
@ -28,8 +37,53 @@ namespace ix
std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr);
}
bool RedisClient::auth(const std::string& password,
std::string& response)
{
// authentication if server-server requires it
// _sub.auth(password, [&response](const cpp_redis::reply& reply) {
// if (reply.is_error()) { std::cerr << "Authentication failed: " << reply.as_string() << std::endl; }
// else {
// std::cout << "successful authentication" << std::endl;
// }
// });
return true;
#if 0
response.clear();
if (!_socket) return false;
std::stringstream ss;
ss << "AUTH ";
ss << password;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
response = line;
return lineValid;
#endif
}
bool RedisClient::publish(const std::string& channel,
const std::string& message)
{
@ -65,8 +119,22 @@ namespace ix
// FIXME: we assume that redis never return errors...
//
bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback)
{
_sub.subscribe(channel, [&callback](const std::string& chan, const std::string& msg) {
callback(msg);
});
_sub.commit();
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
return true;
#if 0
if (!_socket) return false;
std::stringstream ss;
@ -87,10 +155,14 @@ namespace ix
return false;
}
// build the response as a single string
std::stringstream oss;
// Read the first line of the response
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
oss << line;
if (!lineValid) return false;
@ -100,10 +172,13 @@ namespace ix
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
oss << line;
if (!lineValid) return false;
}
responseCallback(oss.str());
// Wait indefinitely for new messages
while (true)
{
@ -114,7 +189,7 @@ namespace ix
return false;
}
// The first line of the response describe the return type,
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
@ -162,5 +237,6 @@ namespace ix
}
return true;
#endif
}
}

View File

@ -8,6 +8,7 @@
#include <memory>
#include <functional>
#include <cpp_redis/cpp_redis>
namespace ix
{
@ -15,6 +16,7 @@ namespace ix
class RedisClient {
public:
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
@ -23,13 +25,19 @@ namespace ix
bool connect(const std::string& hostname,
int port);
bool auth(const std::string& password,
std::string& response);
bool publish(const std::string& channel,
const std::string& message);
bool subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback);
private:
cpp_redis::subscriber _sub;
std::shared_ptr<Socket> _socket;
};
}

View File

@ -20,6 +20,8 @@ Subcommands:
broadcast_server Broadcasting server
ping Ping pong
curl HTTP Client
redis_publish Redis publisher
redis_subscribe Redis subscriber
```
## file transfer
@ -29,7 +31,7 @@ Subcommands:
ws transfer # running on port 8080.
# Start receiver first
ws receive ws://localhost:8080
ws receive ws://localhost:8080
# Then send a file. File will be received and written to disk by the receiver process
ws send ws://localhost:8080 /file/to/path

25
ws/test_ws_redis.sh Normal file
View File

@ -0,0 +1,25 @@
#!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/pidfile.subscribe`
exit 1
}
REDIS_HOST=${REDIS_HOST:=localhost}
ws redis_subscribe --pidfile /tmp/pidfile.subscribe --host $REDIS_HOST foo &
# Wait for the subscriber to be ready
sleep 0.5
# Now publish messages
ws redis_publish -c 100000 --host ${REDIS_HOST} foo bar
# Wait a little for all messages to be received
sleep 1.5
# Cleanup
cleanup

View File

@ -37,6 +37,7 @@ int main(int argc, char** argv)
std::string pidfile;
std::string channel;
std::string message;
std::string password;
bool headersOnly = false;
bool followRedirects = false;
bool verbose = false;
@ -48,6 +49,7 @@ int main(int argc, char** argv)
int transferTimeout = 1800;
int maxRedirects = 5;
int delayMs = -1;
int count = 1;
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required();
@ -102,14 +104,18 @@ int main(int argc, char** argv)
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("--password", password, "Password");
redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required();
redisPublishApp->add_option("-c", count, "Count");
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("--password", password, "Password");
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI11_PARSE(app, argc, argv);
@ -166,11 +172,12 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("redis_publish"))
{
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
return ix::ws_redis_publish_main(hostname, redisPort, password,
channel, message, count);
}
else if (app.got_subcommand("redis_subscribe"))
{
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
return ix::ws_redis_subscribe_main(hostname, redisPort, password, channel, verbose);
}
return 1;

View File

@ -42,11 +42,14 @@ namespace ix
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message);
const std::string& message,
int count);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose);
}

View File

@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)

View File

@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket)
[](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket](ix::WebSocketMessageType messageType,
[webSocket, connectionState](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)

View File

@ -12,8 +12,10 @@ namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
const std::string& message)
const std::string& message,
int count)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
@ -22,12 +24,27 @@ namespace ix
return 1;
}
std::cerr << "Publishing message " << message
<< " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
if (!password.empty())
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
for (int i = 0; i < count; i++)
{
//std::cerr << "Publishing message " << message
// << " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
}
return 0;

View File

@ -7,12 +7,15 @@
#include <iostream>
#include <sstream>
#include <chrono>
#include <thread>
#include <atomic>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& password,
const std::string& channel,
bool verbose)
{
@ -23,38 +26,56 @@ namespace ix
return 1;
}
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
int msgPerSeconds = 0;
int msgCount = 0;
if (!password.empty())
{
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
auto callback = [&msgPerSeconds, &msgCount, verbose]
(const std::string& message)
{
if (verbose)
{
std::cout << message << std::endl;
std::cout << "received: " << message << std::endl;
}
msgPerSeconds++;
msgCount++;
};
auto now = std::chrono::steady_clock::now();
if (now - lastTimePoint > std::chrono::seconds(1))
auto responseCallback = [](const std::string& redisResponse)
{
std::cout << "Redis subscribe response: " << redisResponse << std::endl;
};
auto timer = [&msgPerSeconds, &msgCount]
{
while (true)
{
lastTimePoint = std::chrono::steady_clock::now();
msgCount += msgPerSeconds;
// #messages 901 msg/s 150
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, callback))
if (!redisClient.subscribe(channel, responseCallback, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1;
@ -63,4 +84,3 @@ namespace ix
return 0;
}
}

View File

@ -17,10 +17,11 @@ namespace ix
ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -30,6 +31,7 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)