Compare commits

..

5 Commits

Author SHA1 Message Date
Benjamin Sergeant
c36dc0e16a fix unittest 2019-03-20 18:50:56 -07:00
Benjamin Sergeant
7e45659377 update README 2019-03-20 18:33:55 -07:00
Benjamin Sergeant
788c92487c New connection state for server code + fix OpenSSL double init bug 2019-03-20 18:26:29 -07:00
Benjamin Sergeant
0999073435 fix typo 2019-03-20 18:25:28 -07:00
Benjamin Sergeant
2cae6f4cf8 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 16:16:54 -07:00
45 changed files with 127 additions and 1629 deletions

View File

@@ -135,4 +135,3 @@ set( IXWEBSOCKET_INCLUDE_DIRS
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} ) target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws) add_subdirectory(ws)
add_subdirectory(third_party/cpp_redis)

View File

@@ -1 +0,0 @@
1.3.2

View File

@@ -1 +0,0 @@
Dockerfile.dev

31
Dockerfile Normal file
View File

@@ -0,0 +1,31 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

View File

@@ -1,31 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

View File

@@ -1,30 +0,0 @@
FROM debian:buster
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN adduser app
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make"]
# Now run in usermode
USER app
EXPOSE 8765
CMD ["bash"]

View File

@@ -0,0 +1,20 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

View File

@@ -4,15 +4,14 @@
## Introduction ## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex and bi-directionnal communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms. [*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS * macOS
* iOS * iOS
* Linux * Linux
* Android * Android
The code was made to compile once on Windows but support is currently broken on this platform.
## Examples ## Examples
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage. The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage.
@@ -47,12 +46,9 @@ webSocket.setOnMessageCallback(
// Now that our callback is setup, we can start our background thread and receive messages // Now that our callback is setup, we can start our background thread and receive messages
webSocket.start(); webSocket.start();
// Send a message to the server (default to BINARY mode) // Send a message to the server
webSocket.send("hello world"); webSocket.send("hello world");
// The message can be sent in TEXT mode
webSocket.sendText("hello again");
// ... finally ... // ... finally ...
// Stop the connection // Stop the connection
@@ -218,11 +214,11 @@ If the remote end (server) breaks the connection, the code will try to perpetual
### Large messages ### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 1G were sent and received succesfully. Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
## Limitations ## Limitations
* No utf-8 validation is made when sending TEXT message with sendText() * There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that. * Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue. * The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 80 KiB

View File

@@ -1,2 +0,0 @@
all:
(cd .. ; make docker && make docker_push)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 113 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 168 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 673 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 MiB

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

File diff suppressed because it is too large Load Diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

View File

@@ -1,21 +0,0 @@
version: "3"
services:
ws:
stdin_open: true
tty: true
image: bsergean/ws:build
ports:
- "8765:8765"
entrypoint: bash
networks:
- ws-net
depends_on:
- redis1
redis1:
image: redis:alpine
networks:
- ws-net
networks:
ws-net:

View File

@@ -6,6 +6,8 @@
#include "IXConnectionState.h" #include "IXConnectionState.h"
#include <sstream>
namespace ix namespace ix
{ {
std::atomic<uint64_t> ConnectionState::_globalId(0); std::atomic<uint64_t> ConnectionState::_globalId(0);
@@ -17,7 +19,9 @@ namespace ix
void ConnectionState::computeId() void ConnectionState::computeId()
{ {
_id = std::to_string(_globalId++); std::stringstream ss;
ss << _globalId++;
_id = ss.str();
} }
const std::string& ConnectionState::getId() const const std::string& ConnectionState::getId() const

View File

@@ -302,13 +302,7 @@ namespace ix
WebSocketSendInfo WebSocket::send(const std::string& text, WebSocketSendInfo WebSocket::send(const std::string& text,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {
return sendMessage(text, SendMessageKind::Binary, onProgressCallback); return sendMessage(text, false, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Text, onProgressCallback);
} }
WebSocketSendInfo WebSocket::ping(const std::string& text) WebSocketSendInfo WebSocket::ping(const std::string& text)
@@ -317,11 +311,11 @@ namespace ix
constexpr size_t pingMaxPayloadSize = 125; constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false); if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false);
return sendMessage(text, SendMessageKind::Ping); return sendMessage(text, true);
} }
WebSocketSendInfo WebSocket::sendMessage(const std::string& text, WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
SendMessageKind sendMessageKind, bool ping,
const OnProgressCallback& onProgressCallback) const OnProgressCallback& onProgressCallback)
{ {
if (!isConnected()) return WebSocketSendInfo(false); if (!isConnected()) return WebSocketSendInfo(false);
@@ -338,22 +332,13 @@ namespace ix
std::lock_guard<std::mutex> lock(_writeMutex); std::lock_guard<std::mutex> lock(_writeMutex);
WebSocketSendInfo webSocketSendInfo; WebSocketSendInfo webSocketSendInfo;
switch (sendMessageKind) if (ping)
{
case SendMessageKind::Text:
{
webSocketSendInfo = _ws.sendText(text, onProgressCallback);
} break;
case SendMessageKind::Binary:
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
} break;
case SendMessageKind::Ping:
{ {
webSocketSendInfo = _ws.sendPing(text); webSocketSendInfo = _ws.sendPing(text);
} break; }
else
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
} }
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false); WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);

View File

@@ -101,8 +101,6 @@ namespace ix
WebSocketSendInfo send(const std::string& text, WebSocketSendInfo send(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr); const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(); void close();
@@ -122,7 +120,7 @@ namespace ix
private: private:
WebSocketSendInfo sendMessage(const std::string& text, WebSocketSendInfo sendMessage(const std::string& text,
SendMessageKind sendMessageKind, bool ping,
const OnProgressCallback& callback = nullptr); const OnProgressCallback& callback = nullptr);
bool isConnected() const; bool isConnected() const;

View File

@@ -114,7 +114,7 @@ namespace ix
std::stringstream ss; std::stringstream ss;
ss << "HTTP/1.1 "; ss << "HTTP/1.1 ";
ss << code; ss << code;
ss << " "; ss << "\r\n";
ss << reason; ss << reason;
ss << "\r\n"; ss << "\r\n";
@@ -353,7 +353,7 @@ namespace ix
WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"].c_str(), output); WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"].c_str(), output);
std::stringstream ss; std::stringstream ss;
ss << "HTTP/1.1 101 Switching Protocols\r\n"; ss << "HTTP/1.1 101\r\n";
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n"; ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n"; ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n"; ss << "Connection: Upgrade\r\n";

View File

@@ -58,7 +58,6 @@ namespace ix
constexpr size_t WebSocketTransport::kChunkSize; constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() : WebSocketTransport::WebSocketTransport() :
_useMask(true),
_readyState(CLOSED), _readyState(CLOSED),
_closeCode(0), _closeCode(0),
_closeWireSize(0), _closeWireSize(0),
@@ -124,9 +123,6 @@ namespace ix
// Server // Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs) WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{ {
// Server should not mask the data it sends to the client
_useMask = false;
std::string errorMsg; std::string errorMsg;
_socket = createSocket(fd, errorMsg); _socket = createSocket(fd, errorMsg);
@@ -284,13 +280,17 @@ namespace ix
_txbuf.insert(_txbuf.end(), header.begin(), header.end()); _txbuf.insert(_txbuf.end(), header.begin(), header.end());
_txbuf.insert(_txbuf.end(), begin, end); _txbuf.insert(_txbuf.end(), begin, end);
if (_useMask) // Masking
{
for (size_t i = 0; i != (size_t) message_size; ++i) for (size_t i = 0; i != (size_t) message_size; ++i)
{ {
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3]; *(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
} }
} }
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& buffer)
{
std::lock_guard<std::mutex> lock(_txbufMutex);
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
} }
void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws) void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws)
@@ -656,8 +656,7 @@ namespace ix
std::vector<uint8_t> header; std::vector<uint8_t> header;
header.assign(2 + header.assign(2 +
(message_size >= 126 ? 2 : 0) + (message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + (message_size >= 65536 ? 6 : 0) + 4, 0);
(_useMask ? 4 : 0), 0);
header[0] = type; header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end. // The fin bit indicate that this is the last fragment. Fin is French for end.
@@ -674,33 +673,27 @@ namespace ix
if (message_size < 126) if (message_size < 126)
{ {
header[1] = (message_size & 0xff) | (_useMask ? 0x80 : 0); header[1] = (message_size & 0xff) | 0x80;
if (_useMask)
{
header[2] = masking_key[0]; header[2] = masking_key[0];
header[3] = masking_key[1]; header[3] = masking_key[1];
header[4] = masking_key[2]; header[4] = masking_key[2];
header[5] = masking_key[3]; header[5] = masking_key[3];
} }
}
else if (message_size < 65536) else if (message_size < 65536)
{ {
header[1] = 126 | (_useMask ? 0x80 : 0); header[1] = 126 | 0x80;
header[2] = (message_size >> 8) & 0xff; header[2] = (message_size >> 8) & 0xff;
header[3] = (message_size >> 0) & 0xff; header[3] = (message_size >> 0) & 0xff;
if (_useMask)
{
header[4] = masking_key[0]; header[4] = masking_key[0];
header[5] = masking_key[1]; header[5] = masking_key[1];
header[6] = masking_key[2]; header[6] = masking_key[2];
header[7] = masking_key[3]; header[7] = masking_key[3];
} }
}
else else
{ // TODO: run coverage testing here { // TODO: run coverage testing here
header[1] = 127 | (_useMask ? 0x80 : 0); header[1] = 127 | 0x80;
header[2] = (message_size >> 56) & 0xff; header[2] = (message_size >> 56) & 0xff;
header[3] = (message_size >> 48) & 0xff; header[3] = (message_size >> 48) & 0xff;
header[4] = (message_size >> 40) & 0xff; header[4] = (message_size >> 40) & 0xff;
@@ -710,14 +703,11 @@ namespace ix
header[8] = (message_size >> 8) & 0xff; header[8] = (message_size >> 8) & 0xff;
header[9] = (message_size >> 0) & 0xff; header[9] = (message_size >> 0) & 0xff;
if (_useMask)
{
header[10] = masking_key[0]; header[10] = masking_key[0];
header[11] = masking_key[1]; header[11] = masking_key[1];
header[12] = masking_key[2]; header[12] = masking_key[2];
header[13] = masking_key[3]; header[13] = masking_key[3];
} }
}
// _txbuf will keep growing until it can be transmitted over the socket: // _txbuf will keep growing until it can be transmitted over the socket:
appendToSendBuffer(header, message_begin, message_end, appendToSendBuffer(header, message_begin, message_end,
@@ -742,15 +732,6 @@ namespace ix
_enablePerMessageDeflate, onProgressCallback); _enablePerMessageDeflate, onProgressCallback);
} }
WebSocketSendInfo WebSocketTransport::sendText(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::TEXT_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket() void WebSocketTransport::sendOnSocket()
{ {
std::lock_guard<std::mutex> lock(_txbufMutex); std::lock_guard<std::mutex> lock(_txbufMutex);

View File

@@ -30,13 +30,6 @@ namespace ix
{ {
class Socket; class Socket;
enum class SendMessageKind
{
Text,
Binary,
Ping
};
class WebSocketTransport class WebSocketTransport
{ {
public: public:
@@ -78,8 +71,6 @@ namespace ix
void poll(); void poll();
WebSocketSendInfo sendBinary(const std::string& message, WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback); const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message); WebSocketSendInfo sendPing(const std::string& message);
void close(); void close();
ReadyStateValues getReadyState() const; ReadyStateValues getReadyState() const;
@@ -109,10 +100,6 @@ namespace ix
uint8_t masking_key[4]; uint8_t masking_key[4];
}; };
// Tells whether we should mask the data we send.
// client should mask but server should not
bool _useMask;
// Buffer for reading from our socket. That buffer is never resized. // Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf; std::vector<uint8_t> _readbuf;
@@ -187,6 +174,7 @@ namespace ix
std::string::const_iterator end, std::string::const_iterator end,
uint64_t message_size, uint64_t message_size,
uint8_t masking_key[4]); uint8_t masking_key[4]);
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned(); unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws); void unmaskReceiveBuffer(const wsheader_type& ws);

View File

@@ -9,20 +9,8 @@ brew:
mkdir -p build && (cd build ; cmake .. ; make -j install) mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker .PHONY: docker
NAME := bsergean/ws
TAG := $(shell cat DOCKER_VERSION)
IMG := ${NAME}:${TAG}
LATEST := ${NAME}:latest
BUILD := ${NAME}:build
docker: docker:
docker build -t ${IMG} . docker build -t ws:latest .
docker tag ${IMG} ${BUILD}
docker_push:
docker tag ${IMG} ${LATEST}
docker push ${LATEST}
run: run:
docker run --cap-add sys_ptrace -it ws:latest docker run --cap-add sys_ptrace -it ws:latest

View File

@@ -29,7 +29,6 @@ set (SOURCES
IXDNSLookupTest.cpp IXDNSLookupTest.cpp
IXSocketTest.cpp IXSocketTest.cpp
IXSocketConnectTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet

View File

@@ -1,43 +0,0 @@
/*
* IXSocketConnectTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include "catch.hpp"
#include "IXTest.h"
#include <ixwebsocket/IXSocketConnect.h>
#include <iostream>
using namespace ix;
TEST_CASE("socket_connect", "[net]")
{
SECTION("Test connecting to a known hostname")
{
std::string errMsg;
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd != -1);
}
SECTION("Test connecting to a non-existing hostname")
{
std::string errMsg;
std::string hostname("12313lhjlkjhopiupoijlkasdckljqwehrlkqjwehraospidcuaposidcasdc");
int fd = SocketConnect::connect(hostname, 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
SECTION("Test connecting to a good hostname, with cancellation")
{
std::string errMsg;
// The callback returning true means we are requesting cancellation
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return true; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
}

View File

@@ -16,7 +16,6 @@
#include <iostream> #include <iostream>
#include <stdlib.h> #include <stdlib.h>
#include <stack> #include <stack>
#include <iomanip>
namespace ix namespace ix
{ {
@@ -149,21 +148,4 @@ namespace ix
return -1; return -1;
} }
void hexDump(const std::string& prefix,
const std::string& s)
{
std::ostringstream ss;
bool upper_case = false;
for (std::string::size_type i = 0; i < s.length(); ++i)
{
ss << std::hex
<< std::setfill('0')
<< std::setw(2)
<< (upper_case ? std::uppercase : std::nouppercase) << (int)s[i];
}
std::cout << prefix << ": " << s << " => " << ss.str() << std::endl;
}
} }

View File

@@ -212,10 +212,6 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
webSocketClientA.stop(); webSocketClientA.stop();
webSocketClientB.stop(); webSocketClientB.stop();
// Here we test heart beat period exceeded for clientA
// but it should not be exceeded for clientB which has sent data.
// -> expected ping messages == 2, but add a small buffer to make this more reliable.
REQUIRE(serverReceivedPingMessages >= 2); REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4); REQUIRE(serverReceivedPingMessages <= 4);

View File

@@ -52,8 +52,9 @@ namespace ix
{ {
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New connection";
connectionState->computeId(); connectionState->computeId();
Logger() << "New connection";
Logger() << "id: " << connectionState->getId(); Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";

View File

@@ -1 +0,0 @@
Except ZLIB on Windows (whose port is currently broken...) all dependencies here are for the ws command line tools, not for the IXWebSockets library which is standalone.

View File

@@ -42,5 +42,5 @@ if (APPLE AND USE_TLS)
target_link_libraries(ws "-framework foundation" "-framework security") target_link_libraries(ws "-framework foundation" "-framework security")
endif() endif()
target_link_libraries(ws ixwebsocket cpp_redis tacopie) target_link_libraries(ws ixwebsocket)
install(TARGETS ws RUNTIME DESTINATION bin) install(TARGETS ws RUNTIME DESTINATION bin)

View File

@@ -7,7 +7,6 @@
#include "IXRedisClient.h" #include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h> #include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <cpp_redis/cpp_redis>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
@@ -18,14 +17,6 @@ namespace ix
{ {
bool RedisClient::connect(const std::string& hostname, int port) bool RedisClient::connect(const std::string& hostname, int port)
{ {
_sub.connect(hostname, port, []
(const std::string& host, std::size_t port, cpp_redis::connect_state status) {
if (status == cpp_redis::connect_state::dropped) {
std::cout << "client disconnected from " << host << ":" << port << std::endl;
}
});
// also subscribe the old way
bool tls = false; bool tls = false;
std::string errorMsg; std::string errorMsg;
_socket = createSocket(tls, errorMsg); _socket = createSocket(tls, errorMsg);
@@ -37,53 +28,8 @@ namespace ix
std::string errMsg; std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr); return _socket->connect(hostname, port, errMsg, nullptr);
} }
bool RedisClient::auth(const std::string& password,
std::string& response)
{
// authentication if server-server requires it
// _sub.auth(password, [&response](const cpp_redis::reply& reply) {
// if (reply.is_error()) { std::cerr << "Authentication failed: " << reply.as_string() << std::endl; }
// else {
// std::cout << "successful authentication" << std::endl;
// }
// });
return true;
#if 0
response.clear();
if (!_socket) return false;
std::stringstream ss;
ss << "AUTH ";
ss << password;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
response = line;
return lineValid;
#endif
}
bool RedisClient::publish(const std::string& channel, bool RedisClient::publish(const std::string& channel,
const std::string& message) const std::string& message)
{ {
@@ -119,22 +65,8 @@ namespace ix
// FIXME: we assume that redis never return errors... // FIXME: we assume that redis never return errors...
// //
bool RedisClient::subscribe(const std::string& channel, bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback) const OnRedisSubscribeCallback& callback)
{ {
_sub.subscribe(channel, [&callback](const std::string& chan, const std::string& msg) {
callback(msg);
});
_sub.commit();
while (true)
{
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
return true;
#if 0
if (!_socket) return false; if (!_socket) return false;
std::stringstream ss; std::stringstream ss;
@@ -155,14 +87,10 @@ namespace ix
return false; return false;
} }
// build the response as a single string
std::stringstream oss;
// Read the first line of the response // Read the first line of the response
auto lineResult = _socket->readLine(nullptr); auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
oss << line;
if (!lineValid) return false; if (!lineValid) return false;
@@ -172,13 +100,10 @@ namespace ix
auto lineResult = _socket->readLine(nullptr); auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
oss << line;
if (!lineValid) return false; if (!lineValid) return false;
} }
responseCallback(oss.str());
// Wait indefinitely for new messages // Wait indefinitely for new messages
while (true) while (true)
{ {
@@ -237,6 +162,5 @@ namespace ix
} }
return true; return true;
#endif
} }
} }

View File

@@ -8,7 +8,6 @@
#include <memory> #include <memory>
#include <functional> #include <functional>
#include <cpp_redis/cpp_redis>
namespace ix namespace ix
{ {
@@ -16,7 +15,6 @@ namespace ix
class RedisClient { class RedisClient {
public: public:
using OnRedisSubscribeResponseCallback = std::function<void(const std::string&)>;
using OnRedisSubscribeCallback = std::function<void(const std::string&)>; using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default; RedisClient() = default;
@@ -25,19 +23,13 @@ namespace ix
bool connect(const std::string& hostname, bool connect(const std::string& hostname,
int port); int port);
bool auth(const std::string& password,
std::string& response);
bool publish(const std::string& channel, bool publish(const std::string& channel,
const std::string& message); const std::string& message);
bool subscribe(const std::string& channel, bool subscribe(const std::string& channel,
const OnRedisSubscribeResponseCallback& responseCallback,
const OnRedisSubscribeCallback& callback); const OnRedisSubscribeCallback& callback);
private: private:
cpp_redis::subscriber _sub;
std::shared_ptr<Socket> _socket; std::shared_ptr<Socket> _socket;
}; };
} }

View File

@@ -20,8 +20,6 @@ Subcommands:
broadcast_server Broadcasting server broadcast_server Broadcasting server
ping Ping pong ping Ping pong
curl HTTP Client curl HTTP Client
redis_publish Redis publisher
redis_subscribe Redis subscriber
``` ```
## file transfer ## file transfer

View File

@@ -1,25 +0,0 @@
#!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/pidfile.subscribe`
exit 1
}
REDIS_HOST=${REDIS_HOST:=localhost}
ws redis_subscribe --pidfile /tmp/pidfile.subscribe --host $REDIS_HOST foo &
# Wait for the subscriber to be ready
sleep 0.5
# Now publish messages
ws redis_publish -c 100000 --host ${REDIS_HOST} foo bar
# Wait a little for all messages to be received
sleep 1.5
# Cleanup
cleanup

View File

@@ -37,7 +37,6 @@ int main(int argc, char** argv)
std::string pidfile; std::string pidfile;
std::string channel; std::string channel;
std::string message; std::string message;
std::string password;
bool headersOnly = false; bool headersOnly = false;
bool followRedirects = false; bool followRedirects = false;
bool verbose = false; bool verbose = false;
@@ -49,7 +48,6 @@ int main(int argc, char** argv)
int transferTimeout = 1800; int transferTimeout = 1800;
int maxRedirects = 5; int maxRedirects = 5;
int delayMs = -1; int delayMs = -1;
int count = 1;
CLI::App* sendApp = app.add_subcommand("send", "Send a file"); CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required(); sendApp->add_option("url", url, "Connection url")->required();
@@ -104,18 +102,14 @@ int main(int argc, char** argv)
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher"); CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port"); redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname"); redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("--password", password, "Password");
redisPublishApp->add_option("channel", channel, "Channel")->required(); redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required(); redisPublishApp->add_option("message", message, "Message")->required();
redisPublishApp->add_option("-c", count, "Count");
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber"); CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port"); redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname"); redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("--password", password, "Password");
redisSubscribeApp->add_option("channel", channel, "Channel")->required(); redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose"); redisSubscribeApp->add_flag("-v", verbose, "Verbose");
redisSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
@@ -172,12 +166,11 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("redis_publish")) else if (app.got_subcommand("redis_publish"))
{ {
return ix::ws_redis_publish_main(hostname, redisPort, password, return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
channel, message, count);
} }
else if (app.got_subcommand("redis_subscribe")) else if (app.got_subcommand("redis_subscribe"))
{ {
return ix::ws_redis_subscribe_main(hostname, redisPort, password, channel, verbose); return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
} }
return 1; return 1;

View File

@@ -42,14 +42,11 @@ namespace ix
int ws_redis_publish_main(const std::string& hostname, int ws_redis_publish_main(const std::string& hostname,
int port, int port,
const std::string& password,
const std::string& channel, const std::string& channel,
const std::string& message, const std::string& message);
int count);
int ws_redis_subscribe_main(const std::string& hostname, int ws_redis_subscribe_main(const std::string& hostname,
int port, int port,
const std::string& password,
const std::string& channel, const std::string& channel,
bool verbose); bool verbose);
} }

View File

@@ -12,10 +12,8 @@ namespace ix
{ {
int ws_redis_publish_main(const std::string& hostname, int ws_redis_publish_main(const std::string& hostname,
int port, int port,
const std::string& password,
const std::string& channel, const std::string& channel,
const std::string& message, const std::string& message)
int count)
{ {
RedisClient redisClient; RedisClient redisClient;
if (!redisClient.connect(hostname, port)) if (!redisClient.connect(hostname, port))
@@ -24,28 +22,13 @@ namespace ix
return 1; return 1;
} }
if (!password.empty()) std::cerr << "Publishing message " << message
{ << " to " << channel << "..." << std::endl;
std::string authResponse;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
for (int i = 0; i < count; i++)
{
//std::cerr << "Publishing message " << message
// << " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message)) if (!redisClient.publish(channel, message))
{ {
std::cerr << "Error publishing to channel " << channel << std::endl; std::cerr << "Error publishing to channel " << channel << std::endl;
return 1; return 1;
} }
}
return 0; return 0;
} }

View File

@@ -7,15 +7,12 @@
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <chrono> #include <chrono>
#include <thread>
#include <atomic>
#include "IXRedisClient.h" #include "IXRedisClient.h"
namespace ix namespace ix
{ {
int ws_redis_subscribe_main(const std::string& hostname, int ws_redis_subscribe_main(const std::string& hostname,
int port, int port,
const std::string& password,
const std::string& channel, const std::string& channel,
bool verbose) bool verbose)
{ {
@@ -26,56 +23,38 @@ namespace ix
return 1; return 1;
} }
if (!password.empty()) std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
{ int msgPerSeconds = 0;
std::string authResponse; int msgCount = 0;
if (!redisClient.auth(password, authResponse))
{
std::stringstream ss;
std::cerr << "Cannot authenticated to redis" << std::endl;
return 1;
}
std::cout << "Auth response: " << authResponse << ":" << port << std::endl;
}
std::atomic<int> msgPerSeconds(0); auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
std::atomic<int> msgCount(0);
auto callback = [&msgPerSeconds, &msgCount, verbose]
(const std::string& message) (const std::string& message)
{ {
if (verbose) if (verbose)
{ {
std::cout << "received: " << message << std::endl; std::cout << message << std::endl;
} }
msgPerSeconds++; msgPerSeconds++;
msgCount++;
};
auto responseCallback = [](const std::string& redisResponse) auto now = std::chrono::steady_clock::now();
if (now - lastTimePoint > std::chrono::seconds(1))
{ {
std::cout << "Redis subscribe response: " << redisResponse << std::endl; lastTimePoint = std::chrono::steady_clock::now();
};
auto timer = [&msgPerSeconds, &msgCount] msgCount += msgPerSeconds;
{
while (true) // #messages 901 msg/s 150
{
std::cout << "#messages " << msgCount << " " std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds << "msg/s " << msgPerSeconds
<< std::endl; << std::endl;
msgPerSeconds = 0; msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
} }
}; };
std::thread t(timer);
std::cerr << "Subscribing to " << channel << "..." << std::endl; std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, responseCallback, callback)) if (!redisClient.subscribe(channel, callback))
{ {
std::cerr << "Error subscribing to channel " << channel << std::endl; std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1; return 1;
@@ -84,3 +63,4 @@ namespace ix
return 0; return 0;
} }
} }