Compare commits

..

2 Commits

Author SHA1 Message Date
e8bcde1cc1 linux build fix 2019-02-22 22:01:37 -08:00
d39ae478a2 linux build fix 2019-02-22 21:56:22 -08:00
76 changed files with 653 additions and 3088 deletions

0
.gitmodules vendored Normal file
View File

View File

@ -15,15 +15,11 @@ if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif() endif()
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
endif()
set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXWebSocket.cpp ixwebsocket/IXWebSocket.cpp
@ -33,20 +29,13 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketPerMessageDeflate.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
) )
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSetThreadName.h ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h ixwebsocket/IXCancellationRequest.h
@ -62,12 +51,6 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/libwshandshake.hpp ixwebsocket/libwshandshake.hpp
ixwebsocket/IXHttpClient.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
) )
# Platform specific code # Platform specific code
@ -77,8 +60,6 @@ elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp) list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
else() else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp) list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptEventFd.cpp)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSelectInterruptEventFd.h)
endif() endif()
if (USE_TLS) if (USE_TLS)

View File

@ -1,31 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

1
Dockerfile Symbolic link
View File

@ -0,0 +1 @@
docker/Dockerfile.debian

View File

@ -1,20 +0,0 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

View File

@ -5,16 +5,17 @@
## Introduction ## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex [*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms. communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS * macOS
* iOS * iOS
* Linux * Linux
* Android * Android
* Windows (no TLS support yet)
## Examples ## Examples
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage. The ws folder countains many interactive programs for chat and file transfers demonstrating client and server usage.
Here is what the client API looks like. Here is what the client API looks like.
@ -24,7 +25,7 @@ ix::WebSocket webSocket;
std::string url("ws://localhost:8080/"); std::string url("ws://localhost:8080/");
webSocket.setUrl(url); webSocket.setUrl(url);
// Optional heart beat, sent every 45 seconds when there is not any traffic // Optional heart beat, sent every 45 seconds when there isn't any traffic
// to make sure that load balancers do not kill an idle connection. // to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45); webSocket.setHeartBeatPeriod(45);
@ -63,11 +64,10 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -77,16 +77,7 @@ server.setOnConnectionCallback(
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
{ {
@ -119,81 +110,12 @@ server.wait();
``` ```
Here is what the HTTP client API looks like. Note that HTTP client support is very recent and subject to changes.
```
//
// Preparation
//
HttpClient httpClient;
HttpRequestArgs args;
// Custom headers can be set
WebSocketHttpHeaders headers;
headers["Foo"] = "bar";
args.extraHeaders = headers;
// Timeout options
args.connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout;
// Redirect options
args.followRedirects = followRedirects;
args.maxRedirects = maxRedirects;
// Misc
args.compress = compress; // Enable gzip compression
args.verbose = verbose;
args.logger = [](const std::string& msg)
{
std::cout << msg;
};
//
// Request
//
HttpResponse out;
std::string url = "https://www.google.com";
// HEAD request
out = httpClient.head(url, args);
// GET request
out = httpClient.get(url, args);
// POST request with parameters
HttpParameters httpParameters;
httpParameters["foo"] = "bar";
out = httpClient.post(url, httpParameters, args);
// POST request with a body
out = httpClient.post(url, std::string("foo=bar"), args);
//
// Result
//
auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
```
## Build ## Build
CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project. CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project.
There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`. There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`.
You can build and install the ws command line tool with Homebrew.
```
brew tap bsergean/IXWebSocket
brew install IXWebSocket
```
## Implementation details ## Implementation details
### Per Message Deflate compression. ### Per Message Deflate compression.
@ -220,11 +142,11 @@ Large frames are broken up into smaller chunks or messages to avoid filling up t
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well. * There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that. * Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue. * The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue.
## C++ code organization ## C++ code organization
Here is a simplistic diagram which explains how the code is structured in term of class/modules. Here's a simplistic diagram which explains how the code is structured in term of class/modules.
``` ```
+-----------------------+ --- Public +-----------------------+ --- Public
@ -276,7 +198,7 @@ If the connection was closed and sending failed, the return value will be set to
1. WebSocket_ReadyState_Connecting - The connection is not yet open. 1. WebSocket_ReadyState_Connecting - The connection is not yet open.
2. WebSocket_ReadyState_Open - The connection is open and ready to communicate. 2. WebSocket_ReadyState_Open - The connection is open and ready to communicate.
3. WebSocket_ReadyState_Closing - The connection is in the process of closing. 3. WebSocket_ReadyState_Closing - The connection is in the process of closing.
4. WebSocket_MessageType_Close - The connection is closed or could not be opened. 4. WebSocket_MessageType_Close - The connection is closed or couldn't be opened.
### Open and Close notifications ### Open and Close notifications
@ -386,7 +308,7 @@ websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes l
### Heartbeat. ### Heartbeat.
You can configure an optional heart beat / keep-alive, sent every 45 seconds You can configure an optional heart beat / keep-alive, sent every 45 seconds
when there is no any traffic to make sure that load balancers do not kill an when there isn't any traffic to make sure that load balancers do not kill an
idle connection. idle connection.
``` ```

16
docker/Dockerfile Normal file
View File

@ -0,0 +1,16 @@
FROM debian:stretch
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

11
docker/Dockerfile.alpine Normal file
View File

@ -0,0 +1,11 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

11
docker/Dockerfile.centos Normal file
View File

@ -0,0 +1,11 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

22
docker/Dockerfile.debian Normal file
View File

@ -0,0 +1,22 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
COPY . .
WORKDIR ws
RUN ["sh", "docker_build.sh"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "8765"]

8
docker/Dockerfile.gcc Normal file
View File

@ -0,0 +1,8 @@
FROM gcc:8
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

View File

@ -1,37 +0,0 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
#include <sstream>
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
std::stringstream ss;
ss << _globalId++;
_id = ss.str();
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@ -1,33 +0,0 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@ -73,7 +73,7 @@ namespace ix
errMsg = "no error"; errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ? // Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested && isCancellationRequested()) if (isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@ -121,7 +121,7 @@ namespace ix
} }
// Were we cancelled ? // Were we cancelled ?
if (isCancellationRequested && isCancellationRequested()) if (isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@ -129,7 +129,7 @@ namespace ix
} }
// Maybe a cancellation request got in before the bg terminated ? // Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested && isCancellationRequested()) if (isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;

82
ixwebsocket/IXEventFd.cpp Normal file
View File

@ -0,0 +1,82 @@
/*
* IXEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
#include "IXEventFd.h"
#ifdef __linux__
# include <sys/eventfd.h>
#endif
#ifndef _WIN32
#include <unistd.h> // for write
#endif
namespace ix
{
EventFd::EventFd() :
_eventfd(-1)
{
#ifdef __linux__
_eventfd = eventfd(0, 0);
#endif
}
EventFd::~EventFd()
{
#ifdef __linux__
::close(_eventfd);
#endif
}
bool EventFd::notify()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// select will wake up when a non-zero value is written to our eventfd
uint64_t value = 1;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
bool EventFd::clear()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
int EventFd::getFd()
{
return _eventfd;
}
}

23
ixwebsocket/IXEventFd.h Normal file
View File

@ -0,0 +1,23 @@
/*
* IXEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
class EventFd {
public:
EventFd();
virtual ~EventFd();
bool notify();
bool clear();
int getFd();
private:
int _eventfd;
};
}

View File

@ -1,467 +0,0 @@
/*
* IXHttpClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXHttpClient.h"
#include "IXUrlParser.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXSocketFactory.h"
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
#include <zlib.h>
namespace ix
{
const std::string HttpClient::kPost = "POST";
const std::string HttpClient::kGet = "GET";
const std::string HttpClient::kHead = "HEAD";
HttpClient::HttpClient()
{
}
HttpClient::~HttpClient()
{
}
HttpResponse HttpClient::request(
const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects)
{
uint64_t uploadSize = 0;
uint64_t downloadSize = 0;
int code = 0;
WebSocketHttpHeaders headers;
std::string payload;
std::string protocol, host, path, query;
int port;
bool websocket = false;
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket))
{
std::stringstream ss;
ss << "Cannot parse url: " << url;
return std::make_tuple(code, HttpErrorCode_UrlMalformed,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
bool tls = protocol == "https";
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return std::make_tuple(code, HttpErrorCode_CannotCreateSocket,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Build request string
std::stringstream ss;
ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "User-Agent: ixwebsocket/1.0.0" << "\r\n";
ss << "Accept: */*" << "\r\n";
if (args.compress)
{
ss << "Accept-Encoding: gzip" << "\r\n";
}
// Append extra headers
for (auto&& it : args.extraHeaders)
{
ss << it.first << ": " << it.second << "\r\n";
}
if (verb == kPost)
{
ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end())
{
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
}
ss << "\r\n";
ss << body;
}
else
{
ss << "\r\n";
}
std::string req(ss.str());
std::string errMsg;
std::atomic<bool> requestInitCancellation(false);
// Make a cancellation object dealing with connection timeout
auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success)
{
std::stringstream ss;
ss << "Cannot connect to url: " << url;
return std::make_tuple(code, HttpErrorCode_CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Make a new cancellation object dealing with transfer timeout
isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation);
if (args.verbose)
{
std::stringstream ss;
ss << "Sending " << verb << " request "
<< "to " << host << ":" << port << std::endl
<< "request size: " << req.size() << " bytes" << std::endl
<< "=============" << std::endl
<< req
<< "=============" << std::endl
<< std::endl;
log(ss.str(), args);
}
if (!_socket->writeBytes(req, isCancellationRequested))
{
std::string errorMsg("Cannot send request");
return std::make_tuple(code, HttpErrorCode_SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uploadSize = req.size();
auto lineResult = _socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid)
{
std::string errorMsg("Cannot retrieve status line");
return std::make_tuple(code, HttpErrorCode_CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (args.verbose)
{
std::stringstream ss;
ss << "Status line " << line;
log(ss.str(), args);
}
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{
std::string errorMsg("Cannot parse response code from status line");
return std::make_tuple(code, HttpErrorCode_MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
headers = result.second;
if (!headersValid)
{
std::string errorMsg("Cannot parse http headers");
return std::make_tuple(code, HttpErrorCode_HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects)
{
if (headers.find("Location") == headers.end())
{
std::string errorMsg("Missing location header for redirect");
return std::make_tuple(code, HttpErrorCode_MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (redirects >= args.maxRedirects)
{
std::stringstream ss;
ss << "Too many redirects: " << redirects;
return std::make_tuple(code, HttpErrorCode_TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Recurse
std::string location = headers["Location"];
return request(location, verb, body, args, redirects+1);
}
if (verb == "HEAD")
{
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
// Parse response:
if (headers.find("Content-Length") != headers.end())
{
ssize_t contentLength = -1;
ss.str("");
ss << headers["Content-Length"];
ss >> contentLength;
payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
}
else if (headers.find("Transfer-Encoding") != headers.end() &&
headers["Transfer-Encoding"] == "chunked")
{
std::stringstream ss;
while (true)
{
lineResult = _socket->readLine(isCancellationRequested);
line = lineResult.second;
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uint64_t chunkSize;
ss.str("");
ss << std::hex << line;
ss >> chunkSize;
if (args.verbose)
{
std::stringstream oss;
oss << "Reading " << chunkSize << " bytes"
<< std::endl;
log(oss.str(), args);
}
payload.reserve(payload.size() + chunkSize);
// Read a chunk
auto chunkResult = _socket->readBytes(chunkSize,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
// Read the line that terminates the chunk (\r\n)
lineResult = _socket->readLine(isCancellationRequested);
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (chunkSize == 0) break;
}
}
else if (code == 204)
{
; // 204 is NoContent response code
}
else
{
std::string errorMsg("Cannot read http body");
return std::make_tuple(code, HttpErrorCode_CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
downloadSize = payload.size();
// If the content was compressed with gzip, decode it
if (headers["Content-Encoding"] == "gzip")
{
std::string decompressedPayload;
if (!gzipInflate(payload, decompressedPayload))
{
std::string errorMsg("Error decompressing payload");
return std::make_tuple(code, HttpErrorCode_Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload = decompressedPayload;
}
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
HttpResponse HttpClient::get(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kGet, std::string(), args);
}
HttpResponse HttpClient::head(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kHead, std::string(), args);
}
HttpResponse HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
{
return request(url, kPost, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
{
return request(url, kPost, body, args);
}
std::string HttpClient::urlEncode(const std::string& value)
{
std::ostringstream escaped;
escaped.fill('0');
escaped << std::hex;
for (std::string::const_iterator i = value.begin(), n = value.end();
i != n; ++i)
{
std::string::value_type c = (*i);
// Keep alphanumeric and other accepted characters intact
if (isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~')
{
escaped << c;
continue;
}
// Any other characters are percent-encoded
escaped << std::uppercase;
escaped << '%' << std::setw(2) << int((unsigned char) c);
escaped << std::nouppercase;
}
return escaped.str();
}
std::string HttpClient::serializeHttpParameters(const HttpParameters& httpParameters)
{
std::stringstream ss;
size_t count = httpParameters.size();
size_t i = 0;
for (auto&& it : httpParameters)
{
ss << urlEncode(it.first)
<< "="
<< urlEncode(it.second);
if (i++ < (count-1))
{
ss << "&";
}
}
return ss.str();
}
bool HttpClient::gzipInflate(
const std::string& in,
std::string& out)
{
z_stream inflateState;
std::memset(&inflateState, 0, sizeof(inflateState));
inflateState.zalloc = Z_NULL;
inflateState.zfree = Z_NULL;
inflateState.opaque = Z_NULL;
inflateState.avail_in = 0;
inflateState.next_in = Z_NULL;
if (inflateInit2(&inflateState, 16+MAX_WBITS) != Z_OK)
{
return false;
}
inflateState.avail_in = (uInt) in.size();
inflateState.next_in = (unsigned char *)(const_cast<char *>(in.data()));
const int kBufferSize = 1 << 14;
std::unique_ptr<unsigned char[]> compressBuffer =
std::make_unique<unsigned char[]>(kBufferSize);
do
{
inflateState.avail_out = (uInt) kBufferSize;
inflateState.next_out = compressBuffer.get();
int ret = inflate(&inflateState, Z_SYNC_FLUSH);
if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
{
inflateEnd(&inflateState);
return false;
}
out.append(
reinterpret_cast<char *>(compressBuffer.get()),
kBufferSize - inflateState.avail_out
);
} while (inflateState.avail_out == 0);
inflateEnd(&inflateState);
return true;
}
void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args)
{
if (args.logger)
{
args.logger(msg);
}
}
}

View File

@ -1,107 +0,0 @@
/*
* IXHttpClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <algorithm>
#include <functional>
#include <mutex>
#include <atomic>
#include <tuple>
#include <memory>
#include <map>
#include "IXSocket.h"
#include "IXWebSocketHttpHeaders.h"
namespace ix
{
enum HttpErrorCode
{
HttpErrorCode_Ok = 0,
HttpErrorCode_CannotConnect = 1,
HttpErrorCode_Timeout = 2,
HttpErrorCode_Gzip = 3,
HttpErrorCode_UrlMalformed = 4,
HttpErrorCode_CannotCreateSocket = 5,
HttpErrorCode_SendError = 6,
HttpErrorCode_ReadError = 7,
HttpErrorCode_CannotReadStatusLine = 8,
HttpErrorCode_MissingStatus = 9,
HttpErrorCode_HeaderParsingError = 10,
HttpErrorCode_MissingLocation = 11,
HttpErrorCode_TooManyRedirects = 12,
HttpErrorCode_ChunkReadError = 13,
HttpErrorCode_CannotReadBody = 14
};
using HttpResponse = std::tuple<int, // status
HttpErrorCode, // error code
WebSocketHttpHeaders,
std::string, // payload
std::string, // error msg
uint64_t, // upload size
uint64_t>; // download size
using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>;
struct HttpRequestArgs
{
std::string url;
WebSocketHttpHeaders extraHeaders;
std::string body;
int connectTimeout;
int transferTimeout;
bool followRedirects;
int maxRedirects;
bool verbose;
bool compress;
Logger logger;
OnProgressCallback onProgressCallback;
};
class HttpClient {
public:
HttpClient();
~HttpClient();
HttpResponse get(const std::string& url,
const HttpRequestArgs& args);
HttpResponse head(const std::string& url,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
private:
HttpResponse request(const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects = 0);
std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value);
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(
const std::string& in,
std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost;
const static std::string kGet;
const static std::string kHead;
};
}

View File

@ -1,46 +0,0 @@
/*
* IXSelectInterrupt.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterrupt.h"
namespace ix
{
SelectInterrupt::SelectInterrupt()
{
;
}
SelectInterrupt::~SelectInterrupt()
{
;
}
bool SelectInterrupt::init(std::string& /*errorMsg*/)
{
return true;
}
bool SelectInterrupt::notify(uint64_t /*value*/)
{
return true;
}
uint64_t SelectInterrupt::read()
{
return 0;
}
bool SelectInterrupt::clear()
{
return true;
}
int SelectInterrupt::getFd() const
{
return -1;
}
}

View File

@ -1,28 +0,0 @@
/*
* IXSelectInterrupt.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterrupt {
public:
SelectInterrupt();
virtual ~SelectInterrupt();
virtual bool init(std::string& errorMsg);
virtual bool notify(uint64_t value);
virtual bool clear();
virtual uint64_t read();
virtual int getFd() const;
};
}

View File

@ -1,116 +0,0 @@
/*
* IXSelectInterruptEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On Linux we use eventd to wake up select.
//
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptEventFd.h"
#include <sys/eventfd.h>
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
SelectInterruptEventFd::SelectInterruptEventFd()
{
_eventfd = -1;
}
SelectInterruptEventFd::~SelectInterruptEventFd()
{
::close(_eventfd);
}
bool SelectInterruptEventFd::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_eventfd == -1);
_eventfd = eventfd(0, 0);
if (_eventfd < 0)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in eventfd()"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
if (fcntl(_eventfd, F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
return true;
}
bool SelectInterruptEventFd::notify(uint64_t value)
{
int fd = _eventfd;
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptEventFd::read()
{
int fd = _eventfd;
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptEventFd::clear()
{
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
}
int SelectInterruptEventFd::getFd() const
{
return _eventfd;
}
}

View File

@ -1,32 +0,0 @@
/*
* IXSelectInterruptEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptEventFd : public SelectInterrupt {
public:
SelectInterruptEventFd();
virtual ~SelectInterruptEventFd();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
int _eventfd;
};
}

View File

@ -1,25 +0,0 @@
/*
* IXSelectInterruptFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterruptFactory.h"
#if defined(__linux__) || defined(__APPLE__)
# include <ixwebsocket/IXSelectInterruptPipe.h>
#else
# include <ixwebsocket/IXSelectInterrupt.h>
#endif
namespace ix
{
std::shared_ptr<SelectInterrupt> createSelectInterrupt()
{
#if defined(__linux__) || defined(__APPLE__)
return std::make_shared<SelectInterruptPipe>();
#else
return std::make_shared<SelectInterrupt>();
#endif
}
}

View File

@ -1,15 +0,0 @@
/*
* IXSelectInterruptFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class SelectInterrupt;
std::shared_ptr<SelectInterrupt> createSelectInterrupt();
}

View File

@ -1,138 +0,0 @@
/*
* IXSelectInterruptPipe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptPipe.h"
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
// File descriptor at index 0 in _fildes is the read end of the pipe
// File descriptor at index 1 in _fildes is the write end of the pipe
const int SelectInterruptPipe::kPipeReadIndex = 0;
const int SelectInterruptPipe::kPipeWriteIndex = 1;
SelectInterruptPipe::SelectInterruptPipe()
{
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
SelectInterruptPipe::~SelectInterruptPipe()
{
::close(_fildes[kPipeReadIndex]);
::close(_fildes[kPipeWriteIndex]);
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
bool SelectInterruptPipe::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_fildes[kPipeReadIndex] == -1);
assert(_fildes[kPipeWriteIndex] == -1);
if (pipe(_fildes) < 0)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in pipe() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
return false;
}
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#ifdef F_SETNOSIGPIPE
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#endif
return true;
}
bool SelectInterruptPipe::notify(uint64_t value)
{
int fd = _fildes[kPipeWriteIndex];
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptPipe::read()
{
int fd = _fildes[kPipeReadIndex];
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptPipe::clear()
{
return true;
}
int SelectInterruptPipe::getFd() const
{
return _fildes[kPipeReadIndex];
}
}

View File

@ -1,39 +0,0 @@
/*
* IXSelectInterruptPipe.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptPipe : public SelectInterrupt {
public:
SelectInterruptPipe();
virtual ~SelectInterruptPipe();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
// Used to identify the read/write idx
static const int kPipeReadIndex;
static const int kPipeWriteIndex;
};
}

View File

@ -7,8 +7,6 @@
#include "IXSocket.h" #include "IXSocket.h"
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXNetSystem.h" #include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -25,15 +23,11 @@ namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd), _sockfd(fd)
_selectInterrupt(createSelectInterrupt())
{ {
;
} }
Socket::~Socket() Socket::~Socket()
@ -45,93 +39,44 @@ namespace ix
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
if (onPollCallback) onPollCallback(PollResultType::Error); onPollCallback(PollResultType_Error);
return; return;
} }
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
if (onPollCallback) onPollCallback(pollResult);
}
PollResultType Socket::select(bool readyToRead, int timeoutMs)
{
fd_set rfds; fd_set rfds;
fd_set wfds;
FD_ZERO(&rfds); FD_ZERO(&rfds);
FD_ZERO(&wfds); FD_SET(_sockfd, &rfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds; #ifdef __linux__
FD_SET(_sockfd, fds); FD_SET(_eventfd.getFd(), &rfds);
#endif
// File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1)
{
FD_SET(interruptFd, fds);
}
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = timeoutMs / 1000; timeout.tv_sec = timeoutSecs;
timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000); timeout.tv_usec = 0;
// Compute the highest fd.
int sockfd = _sockfd; int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd); int nfds = (std::max)(sockfd, _eventfd.getFd());
int ret = select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr, PollResultType pollResult = PollResultType_ReadyForRead;
(timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0) if (ret < 0)
{ {
pollResult = PollResultType::Error; pollResult = PollResultType_Error;
} }
else if (ret == 0) else if (ret == 0)
{ {
pollResult = PollResultType::Timeout; pollResult = PollResultType_Timeout;
}
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
uint64_t value = _selectInterrupt->read();
if (value == kSendRequest)
{
pollResult = PollResultType::SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType::CloseRequest;
}
}
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{
pollResult = PollResultType::ReadyForRead;
}
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{
pollResult = PollResultType::ReadyForWrite;
} }
return pollResult; onPollCallback(pollResult);
} }
PollResultType Socket::isReadyToRead(int timeoutMs) void Socket::wakeUpFromPoll()
{ {
bool readyToRead = true; // this will wake up the thread blocked on select, only needed on Linux
return select(readyToRead, timeoutMs); _eventfd.notify();
}
PollResultType Socket::isReadyToWrite(int timeoutMs)
{
bool readyToRead = false;
return select(readyToRead, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{
return _selectInterrupt->notify(wakeUpCode);
} }
bool Socket::connect(const std::string& host, bool Socket::connect(const std::string& host,
@ -141,7 +86,7 @@ namespace ix
{ {
std::lock_guard<std::mutex> lock(_socketMutex); std::lock_guard<std::mutex> lock(_socketMutex);
if (!_selectInterrupt->clear()) return false; if (!_eventfd.clear()) return false;
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
return _sockfd != -1; return _sockfd != -1;
@ -200,40 +145,24 @@ namespace ix
#endif #endif
} }
bool Socket::init(std::string& errorMsg) bool Socket::init()
{ {
return _selectInterrupt->init(errorMsg); #ifdef _WIN32
INT rc;
WSADATA wsaData;
rc = WSAStartup(MAKEWORD(2, 2), &wsaData);
return rc != 0;
#else
return true;
#endif
} }
bool Socket::writeBytes(const std::string& str, void Socket::cleanup()
const CancellationRequest& isCancellationRequested)
{ {
while (true) #ifdef _WIN32
{ WSACleanup();
if (isCancellationRequested && isCancellationRequested()) return false; #endif
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
ssize_t ret = send(buffer, len);
// We wrote some bytes, as needed, all good.
if (ret > 0)
{
return ret == len;
}
// There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
continue;
}
// There was an error during the write, abort
else
{
return false;
}
}
} }
bool Socket::readByte(void* buffer, bool Socket::readByte(void* buffer,
@ -241,7 +170,7 @@ namespace ix
{ {
while (true) while (true)
{ {
if (isCancellationRequested && isCancellationRequested()) return false; if (isCancellationRequested()) return false;
ssize_t ret; ssize_t ret;
ret = recv(buffer, 1); ret = recv(buffer, 1);
@ -255,12 +184,23 @@ namespace ix
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a timeout until something is written.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(1) == PollResultType::Error) fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms timeout
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{ {
return false; return false;
} }
continue;
} }
// There was an error during the read, abort // There was an error during the read, abort
else else
@ -270,8 +210,38 @@ namespace ix
} }
} }
std::pair<bool, std::string> Socket::readLine( bool Socket::writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
ssize_t ret = send(buffer, len);
// We wrote some bytes, as needed, all good.
if (ret > 0)
{
return ret == len;
}
// There is possibly something to be write, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
continue;
}
// There was an error during the write, abort
else
{
return false;
}
}
}
std::pair<bool, std::string> Socket::readLine(const CancellationRequest& isCancellationRequested)
{ {
char c; char c;
std::string line; std::string line;
@ -281,8 +251,7 @@ namespace ix
{ {
if (!readByte(&c, isCancellationRequested)) if (!readByte(&c, isCancellationRequested))
{ {
// Return what we were able to read return std::make_pair(false, std::string());
return std::make_pair(false, line);
} }
line += c; line += c;
@ -290,52 +259,4 @@ namespace ix
return std::make_pair(true, line); return std::make_pair(true, line);
} }
std::pair<bool, std::string> Socket::readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested)
{
if (_readBuffer.empty())
{
_readBuffer.resize(kChunkSize);
}
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{
// Error
return std::make_pair(false, std::string());
}
else if (ret > 0)
{
output.insert(output.end(),
_readBuffer.begin(),
_readBuffer.begin() + ret);
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(1) == PollResultType::Error)
{
return std::make_pair(false, std::string());
}
}
return std::make_pair(true, std::string(output.begin(),
output.end()));
}
} }

View File

@ -10,29 +10,22 @@
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <vector>
#include <memory>
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
#endif #endif
#include "IXEventFd.h"
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXProgressCallback.h"
namespace ix namespace ix
{ {
class SelectInterrupt; enum PollResultType
enum class PollResultType
{ {
ReadyForRead = 0, PollResultType_ReadyForRead = 0,
ReadyForWrite = 1, PollResultType_Timeout = 1,
Timeout = 2, PollResultType_Error = 2
Error = 3,
SendRequest = 4,
CloseRequest = 5
}; };
class Socket { class Socket {
@ -41,17 +34,12 @@ namespace ix
Socket(int fd = -1); Socket(int fd = -1);
virtual ~Socket(); virtual ~Socket();
bool init(std::string& errorMsg);
void configure(); void configure();
// Functions to check whether there is activity on the socket virtual void poll(const OnPollCallback& onPollCallback,
void poll(const OnPollCallback& onPollCallback, int timeoutSecs = kDefaultPollTimeout);
int timeoutSecs = kDefaultPollTimeout); virtual void wakeUpFromPoll();
bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToRead(int timeoutMs);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,
@ -70,36 +58,21 @@ namespace ix
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
bool writeBytes(const std::string& str, bool writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested);
static int getErrno(); static int getErrno();
static bool init(); // Required on Windows to initialize WinSocket
// Used as special codes for pipe communication static void cleanup(); // Required on Windows to cleanup WinSocket
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected: protected:
void closeSocket(int fd); void closeSocket(int fd);
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;
EventFd _eventfd;
private: private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout; static const int kDefaultPollNoTimeout;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
std::shared_ptr<SelectInterrupt> _selectInterrupt;
}; };
} }

View File

@ -66,7 +66,7 @@ namespace ix
for (;;) for (;;)
{ {
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well if (isCancellationRequested()) // Must handle timeout as well
{ {
closeSocket(fd); closeSocket(fd);
errMsg = "Cancelled"; errMsg = "Cancelled";

View File

@ -1,64 +0,0 @@
/*
* IXSocketFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocketFactory.h"
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
namespace ix
{
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket;
if (!tls)
{
socket = std::make_shared<Socket>();
}
else
{
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
socket = std::make_shared<SocketAppleSSL>();
# else
socket = std::make_shared<SocketOpenSSL>();
# endif
#else
errorMsg = "TLS support is not enabled on this platform.";
return nullptr;
#endif
}
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket = std::make_shared<Socket>(fd);
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
}

View File

@ -1,20 +0,0 @@
/*
* IXSocketFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class Socket;
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg);
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg);
}

View File

@ -21,7 +21,6 @@
namespace ix namespace ix
{ {
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false); std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd), SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr), _ssl_connection(nullptr),

View File

@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method; const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
static std::once_flag _openSSLInitFlag; std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful; static std::atomic<bool> _openSSLInitializationSuccessful;
}; };

View File

@ -29,8 +29,7 @@ namespace ix
_host(host), _host(host),
_backlog(backlog), _backlog(backlog),
_maxConnections(maxConnections), _maxConnections(maxConnections),
_stop(false), _stop(false)
_connectionStateFactory(&ConnectionState::createConnectionState)
{ {
} }
@ -146,12 +145,6 @@ namespace ix
::close(_serverFd); ::close(_serverFd);
} }
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run() void SocketServer::run()
{ {
// Set the socket to non blocking mode, so that accept calls are not blocking // Set the socket to non blocking mode, so that accept calls are not blocking
@ -221,12 +214,6 @@ namespace ix
continue; continue;
} }
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
// //
// the destructor of a future returned by std::async blocks, // the destructor of a future returned by std::async blocks,
@ -234,8 +221,7 @@ namespace ix
f = std::async(std::launch::async, f = std::async(std::launch::async,
&SocketServer::handleConnection, &SocketServer::handleConnection,
this, this,
clientFd, clientFd);
connectionState);
} }
} }
} }

View File

@ -6,8 +6,6 @@
#pragma once #pragma once
#include "IXConnectionState.h"
#include <utility> // pair #include <utility> // pair
#include <string> #include <string>
#include <set> #include <set>
@ -22,8 +20,6 @@ namespace ix
{ {
class SocketServer { class SocketServer {
public: public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort, SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog, int backlog = SocketServer::kDefaultTcpBacklog,
@ -31,8 +27,6 @@ namespace ix
virtual ~SocketServer(); virtual ~SocketServer();
virtual void stop(); virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort; const static int kDefaultPort;
const static std::string kDefaultHost; const static std::string kDefaultHost;
const static int kDefaultTcpBacklog; const static int kDefaultTcpBacklog;
@ -66,13 +60,9 @@ namespace ix
std::condition_variable _conditionVariable; std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods // Methods
void run(); void run();
virtual void handleConnection(int fd, virtual void handleConnection(int fd) = 0;
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
}; };
} }

View File

@ -1,104 +0,0 @@
/*
* IXUrlParser.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXUrlParser.h"
#include <iostream>
#include <sstream>
namespace ix
{
//
// The only difference between those 2 regex is the protocol
//
std::regex UrlParser::_httpRegex("(http|https)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::regex UrlParser::_webSocketRegex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
bool UrlParser::parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket)
{
std::cmatch what;
if (!regex_match(url.c_str(), what,
websocket ? _webSocketRegex : _httpRegex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws" || protocol == "http")
{
port = 80;
}
else if (protocol == "wss" || protocol == "https")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void UrlParser::printUrl(const std::string& url, bool websocket)
{
std::string protocol, host, path, query;
int port {0};
if (!parse(url, protocol, host, path, query, port, websocket))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
}

View File

@ -1,31 +0,0 @@
/*
* IXUrlParser.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <regex>
namespace ix
{
class UrlParser
{
public:
static bool parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket);
static void printUrl(const std::string& url, bool websocket);
private:
static std::regex _httpRegex;
static std::regex _webSocketRegex;
};
}

View File

@ -79,10 +79,10 @@ namespace ix
return _perMessageDeflateOptions; return _perMessageDeflateOptions;
} }
void WebSocket::setHeartBeatPeriod(int heartBeatPeriod) void WebSocket::setHeartBeatPeriod(int hearBeatPeriod)
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = heartBeatPeriod; _heartBeatPeriod = hearBeatPeriod;
} }
int WebSocket::getHeartBeatPeriod() const int WebSocket::getHeartBeatPeriod() const
@ -252,11 +252,6 @@ namespace ix
{ {
webSocketMessageType = WebSocket_MessageType_Pong; webSocketMessageType = WebSocket_MessageType_Pong;
} break; } break;
case WebSocketTransport::FRAGMENT:
{
webSocketMessageType = WebSocket_MessageType_Fragment;
} break;
} }
WebSocketErrorInfo webSocketErrorInfo; WebSocketErrorInfo webSocketErrorInfo;
@ -379,9 +374,4 @@ namespace ix
{ {
_automaticReconnection = false; _automaticReconnection = false;
} }
size_t WebSocket::bufferedAmount() const
{
return _ws.bufferedAmount();
}
} }

View File

@ -39,8 +39,7 @@ namespace ix
WebSocket_MessageType_Close = 2, WebSocket_MessageType_Close = 2,
WebSocket_MessageType_Error = 3, WebSocket_MessageType_Error = 3,
WebSocket_MessageType_Ping = 4, WebSocket_MessageType_Ping = 4,
WebSocket_MessageType_Pong = 5, WebSocket_MessageType_Pong = 5
WebSocket_MessageType_Fragment = 6
}; };
struct WebSocketOpenInfo struct WebSocketOpenInfo
@ -89,7 +88,7 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs); void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int heartBeatPeriod); void setHeartBeatPeriod(int hearBeatPeriod);
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();
@ -112,7 +111,6 @@ namespace ix
const std::string& getUrl() const; const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const; int getHeartBeatPeriod() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection(); void enableAutomaticReconnection();
void disableAutomaticReconnection(); void disableAutomaticReconnection();

View File

@ -6,7 +6,6 @@
#include "IXWebSocketHandshake.h" #include "IXWebSocketHandshake.h"
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXUrlParser.h"
#include "libwshandshake.hpp" #include "libwshandshake.hpp"
@ -33,6 +32,90 @@ namespace ix
} }
bool WebSocketHandshake::parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port)
{
std::regex ex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::cmatch what;
if (!regex_match(url.c_str(), what, ex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws")
{
port = 80;
}
else if (protocol == "wss")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void WebSocketHandshake::printUrl(const std::string& url)
{
std::string protocol, host, path, query;
int port {0};
if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
std::string WebSocketHandshake::trim(const std::string& str) std::string WebSocketHandshake::trim(const std::string& str)
{ {
std::string out(str); std::string out(str);
@ -109,6 +192,61 @@ namespace ix
return s; return s;
} }
std::pair<bool, WebSocketHttpHeaders> WebSocketHandshake::parseHttpHeaders(
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[256];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!_socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
// Make the name lower case.
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
WebSocketInitResult WebSocketHandshake::sendErrorResponse(int code, const std::string& reason) WebSocketInitResult WebSocketHandshake::sendErrorResponse(int code, const std::string& reason)
{ {
std::stringstream ss; std::stringstream ss;
@ -217,7 +355,7 @@ namespace ix
return WebSocketInitResult(false, status, ss.str()); return WebSocketInitResult(false, status, ss.str());
} }
auto result = parseHttpHeaders(_socket, isCancellationRequested); auto result = parseHttpHeaders(isCancellationRequested);
auto headersValid = result.first; auto headersValid = result.first;
auto headers = result.second; auto headers = result.second;
@ -312,7 +450,7 @@ namespace ix
} }
// Retrieve and validate HTTP headers // Retrieve and validate HTTP headers
auto result = parseHttpHeaders(_socket, isCancellationRequested); auto result = parseHttpHeaders(isCancellationRequested);
auto headersValid = result.first; auto headersValid = result.first;
auto headers = result.second; auto headers = result.second;

View File

@ -59,10 +59,19 @@ namespace ix
WebSocketInitResult serverHandshake(int fd, WebSocketInitResult serverHandshake(int fd,
int timeoutSecs); int timeoutSecs);
static bool parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port);
private: private:
static void printUrl(const std::string& url);
std::string genRandomString(const int len); std::string genRandomString(const int len);
// Parse HTTP headers // Parse HTTP headers
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(const CancellationRequest& isCancellationRequested);
WebSocketInitResult sendErrorResponse(int code, const std::string& reason); WebSocketInitResult sendErrorResponse(int code, const std::string& reason);
std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line); std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line);

View File

@ -1,66 +0,0 @@
/*
* IXWebSocketHttpHeaders.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketHttpHeaders.h"
#include "IXSocket.h"
#include <string>
#include <unordered_map>
namespace ix
{
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[1024];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 1023 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
}

View File

@ -6,40 +6,10 @@
#pragma once #pragma once
#include "IXCancellationRequest.h"
#include <string> #include <string>
#include <map> #include <unordered_map>
#include <memory>
#include <algorithm>
namespace ix namespace ix
{ {
class Socket; using WebSocketHttpHeaders = std::unordered_map<std::string, std::string>;
struct CaseInsensitiveLess
{
// Case Insensitive compare_less binary function
struct NocaseCompare
{
bool operator() (const unsigned char& c1, const unsigned char& c2) const
{
return std::tolower(c1) < std::tolower(c2);
}
};
bool operator() (const std::string & s1, const std::string & s2) const
{
return std::lexicographical_compare
(s1.begin(), s1.end(), // source range
s2.begin(), s2.end(), // dest range
NocaseCompare()); // comparison
}
};
using WebSocketHttpHeaders = std::map<std::string, std::string, CaseInsensitiveLess>;
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested);
} }

View File

@ -49,12 +49,10 @@ namespace ix
_onConnectionCallback = callback; _onConnectionCallback = callback;
} }
void WebSocketServer::handleConnection( void WebSocketServer::handleConnection(int fd)
int fd,
std::shared_ptr<ConnectionState> connectionState)
{ {
auto webSocket = std::make_shared<WebSocket>(); auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState); _onConnectionCallback(webSocket);
webSocket->disableAutomaticReconnection(); webSocket->disableAutomaticReconnection();

View File

@ -20,8 +20,7 @@
namespace ix namespace ix
{ {
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>, using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer { class WebSocketServer : public SocketServer {
public: public:
@ -50,8 +49,7 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs; const static int kDefaultHandShakeTimeoutSecs;
// Methods // Methods
virtual void handleConnection(int fd, virtual void handleConnection(int fd) final;
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
}; };
} }

View File

@ -1,31 +1,7 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2012, 2013 <dhbaird@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/* /*
* IXWebSocketTransport.cpp * IXWebSocketTransport.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/ */
// //
@ -35,8 +11,14 @@
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
#include "IXWebSocketHandshake.h" #include "IXWebSocketHandshake.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXUrlParser.h"
#include "IXSocketFactory.h" #ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
# include "IXSocketAppleSSL.h"
# else
# include "IXSocketOpenSSL.h"
# endif
#endif
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -53,7 +35,7 @@
namespace ix namespace ix
{ {
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat"); const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1); const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize; constexpr size_t WebSocketTransport::kChunkSize;
@ -75,11 +57,11 @@ namespace ix
} }
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int heartBeatPeriod) int hearBeatPeriod)
{ {
_perMessageDeflateOptions = perMessageDeflateOptions; _perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); _enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = heartBeatPeriod; _heartBeatPeriod = hearBeatPeriod;
} }
// Client // Client
@ -88,21 +70,31 @@ namespace ix
{ {
std::string protocol, host, path, query; std::string protocol, host, path, query;
int port; int port;
bool websocket = true;
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket)) if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
{ {
return WebSocketInitResult(false, 0, return WebSocketInitResult(false, 0,
std::string("Could not parse URL ") + url); std::string("Could not parse URL ") + url);
} }
bool tls = protocol == "wss"; if (protocol == "wss")
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{ {
return WebSocketInitResult(false, 0, errorMsg); _socket.reset();
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
_socket = std::make_shared<SocketAppleSSL>();
# else
_socket = std::make_shared<SocketOpenSSL>();
# endif
#else
return WebSocketInitResult(false, 0, "TLS is not supported.");
#endif
}
else
{
_socket.reset();
_socket = std::make_shared<Socket>();
} }
WebSocketHandshake webSocketHandshake(_requestInitCancellation, WebSocketHandshake webSocketHandshake(_requestInitCancellation,
@ -123,13 +115,8 @@ namespace ix
// Server // Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs) WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{ {
std::string errorMsg; _socket.reset();
_socket = createSocket(fd, errorMsg); _socket = std::make_shared<Socket>(fd);
if (!_socket)
{
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation, WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket, _socket,
@ -189,75 +176,43 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or // If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a // send for a duration exceeding our heart-beat period, send a
// ping to the server. // ping to the server.
if (pollResult == PollResultType::Timeout && if (pollResult == PollResultType_Timeout &&
heartBeatPeriodExceeded()) heartBeatPeriodExceeded())
{ {
std::stringstream ss; std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str()); sendPing(ss.str());
return;
} }
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType::SendRequest)
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType::Error) while (true)
{ {
_socket->close(); ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
setReadyState(CLOSED);
break; if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
} _socket->getErrno() == EAGAIN))
else if (result == PollResultType::ReadyForWrite) {
{ break;
sendOnSocket(); }
} else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
} }
} }
else if (pollResult == PollResultType::ReadyForRead)
{
while (true)
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || if (isSendBufferEmpty() && _readyState == CLOSING)
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
}
}
}
else if (pollResult == PollResultType::Error)
{
_socket->close();
}
else if (pollResult == PollResultType::CloseRequest)
{
_socket->close();
}
// Avoid a race condition where we get stuck in select
// while closing.
if (_readyState == CLOSING)
{ {
_socket->close(); _socket->close();
setReadyState(CLOSED);
} }
}, },
_heartBeatPeriod); _heartBeatPeriod);
@ -435,10 +390,6 @@ namespace ix
emitMessage(MSG, getMergedChunks(), ws, onMessageCallback); emitMessage(MSG, getMergedChunks(), ws, onMessageCallback);
_chunks.clear(); _chunks.clear();
} }
else
{
emitMessage(FRAGMENT, std::string(), ws, onMessageCallback);
}
} }
} }
else if (ws.opcode == wsheader_type::PING) else if (ws.opcode == wsheader_type::PING)
@ -522,7 +473,7 @@ namespace ix
size_t wireSize = message.size(); size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed // When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != FRAGMENT) if (_enablePerMessageDeflate && ws.rsv1)
{ {
std::string decompressedMessage; std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage); bool success = _perMessageDeflate.decompress(message, decompressedMessage);
@ -620,7 +571,7 @@ namespace ix
// Send message // Send message
sendFragment(opcodeType, fin, begin, end, compress); sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback((int)i, (int) steps)) if (onProgressCallback && !onProgressCallback(i, steps))
{ {
break; break;
} }
@ -629,12 +580,6 @@ namespace ix
} }
} }
// Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty())
{
_socket->wakeUpFromPoll(Socket::kSendRequest);
}
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize); return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
} }
@ -780,18 +725,8 @@ namespace ix
sendData(wsheader_type::CLOSE, normalClosure, compress); sendData(wsheader_type::CLOSE, normalClosure, compress);
setReadyState(CLOSING); setReadyState(CLOSING);
_socket->wakeUpFromPoll(Socket::kCloseRequest); _socket->wakeUpFromPoll();
_socket->close(); _socket->close();
_closeCode = 1000;
_closeReason = "Normal Closure";
setReadyState(CLOSED);
}
size_t WebSocketTransport::bufferedAmount() const
{
std::lock_guard<std::mutex> lock(_txbufMutex);
return _txbuf.size();
} }
} // namespace ix } // namespace ix

View File

@ -45,8 +45,7 @@ namespace ix
{ {
MSG, MSG,
PING, PING,
PONG, PONG
FRAGMENT
}; };
using OnMessageCallback = std::function<void(const std::string&, using OnMessageCallback = std::function<void(const std::string&,
@ -61,7 +60,7 @@ namespace ix
~WebSocketTransport(); ~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int heartBeatPeriod); int hearBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs); int timeoutSecs);
@ -77,7 +76,6 @@ namespace ix
void setReadyState(ReadyStateValues readyStateValue); void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback); void dispatch(const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
private: private:
std::string _url; std::string _url;
@ -148,7 +146,7 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex; mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint; std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer than the heartbeat period // No data was send through the socket for longer that the hearbeat period
bool heartBeatPeriodExceeded(); bool heartBeatPeriodExceeded();
void sendOnSocket(); void sendOnSocket();

View File

@ -3,21 +3,19 @@
# #
all: brew all: brew
install: brew
brew: brew:
mkdir -p build && (cd build ; cmake .. ; make -j install) mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker .PHONY: docker
docker: docker:
docker build -t ws:latest . docker build -t broadcast_server:latest .
run: run:
docker run --cap-add sys_ptrace -it ws:latest docker run --cap-add sys_ptrace -it broadcast_server:latest bash
# this is helpful to remove trailing whitespaces # this is helpful to remove trailing whitespaces
trail: trail:
sh third_party/remote_trailing_whitespaces.sh sh third_party/remove_trailing_whitespaces.sh
build: build:
(cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make) (cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make)
@ -38,9 +36,6 @@ test_server:
test: test:
python test/run.py python test/run.py
ws_test: all
(cd ws ; bash test_ws.sh)
# For the fork that is configured with appveyor # For the fork that is configured with appveyor
rebase_upstream: rebase_upstream:
git fetch upstream git fetch upstream
@ -48,9 +43,5 @@ rebase_upstream:
git reset --hard upstream/master git reset --hard upstream/master
git push origin master --force git push origin master --force
install_cmake_for_linux:
mkdir -p /tmp/cmake
(cd /tmp/cmake ; curl -L -O https://github.com/Kitware/CMake/releases/download/v3.14.0-rc4/cmake-3.14.0-rc4-Linux-x86_64.tar.gz ; tar zxf cmake-3.14.0-rc4-Linux-x86_64.tar.gz)
.PHONY: test .PHONY: test
.PHONY: build .PHONY: build

View File

@ -5,13 +5,19 @@
*/ */
#include <iostream> #include <iostream>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h> #include <ixwebsocket/IXCancellationRequest.h>
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
#include "IXTest.h" #include "IXTest.h"
#include "catch.hpp" #include "catch.hpp"
#include <string.h>
using namespace ix; using namespace ix;
@ -33,15 +39,16 @@ namespace ix
Logger() << "errMsg: " << errMsg; Logger() << "errMsg: " << errMsg;
REQUIRE(success); REQUIRE(success);
Logger() << "Sending request: " << request std::cout << "Sending request: " << request
<< "to " << host << ":" << port; << "to " << host << ":" << port
<< std::endl;
REQUIRE(socket->writeBytes(request, isCancellationRequested)); REQUIRE(socket->writeBytes(request, isCancellationRequested));
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
Logger() << "read error: " << strerror(Socket::getErrno()); std::cout << "read error: " << strerror(Socket::getErrno()) << std::endl;
REQUIRE(lineValid); REQUIRE(lineValid);
@ -55,18 +62,10 @@ TEST_CASE("socket", "[socket]")
{ {
SECTION("Connect to google HTTP server. Send GET request without header. Should return 200") SECTION("Connect to google HTTP server. Send GET request without header. Should return 200")
{ {
std::string errMsg; std::shared_ptr<Socket> socket(new Socket);
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("www.google.com"); std::string host("www.google.com");
int port = 80; int port = 80;
std::string request("GET / HTTP/1.1\r\n\r\n");
std::stringstream ss;
ss << "GET / HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "\r\n";
std::string request(ss.str());
int expectedStatus = 200; int expectedStatus = 200;
int timeoutSecs = 3; int timeoutSecs = 3;
@ -76,9 +75,11 @@ TEST_CASE("socket", "[socket]")
#if defined(__APPLE__) or defined(__linux__) #if defined(__APPLE__) or defined(__linux__)
SECTION("Connect to google HTTPS server. Send GET request without header. Should return 200") SECTION("Connect to google HTTPS server. Send GET request without header. Should return 200")
{ {
std::string errMsg; # ifdef __APPLE__
bool tls = true; std::shared_ptr<Socket> socket = std::make_shared<SocketAppleSSL>();
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); # else
std::shared_ptr<Socket> socket = std::make_shared<SocketOpenSSL>();
# endif
std::string host("www.google.com"); std::string host("www.google.com");
int port = 443; int port = 443;
std::string request("GET / HTTP/1.1\r\n\r\n"); std::string request("GET / HTTP/1.1\r\n\r\n");

View File

@ -69,15 +69,10 @@ namespace ix
Logger() << msg; Logger() << msg;
} }
int getAnyFreePortSimple()
{
static int defaultPort = 8090;
return defaultPort++;
}
int getAnyFreePort() int getAnyFreePort()
{ {
int defaultPort = 8090; int defaultPort = 8090;
int sockfd; int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{ {
@ -127,15 +122,8 @@ namespace ix
{ {
while (true) while (true)
{ {
#if defined(__has_feature)
# if __has_feature(address_sanitizer)
int port = getAnyFreePortSimple();
# else
int port = getAnyFreePort(); int port = getAnyFreePort();
# endif
#else
int port = getAnyFreePort();
#endif
// //
// Only port above 1024 can be used by non root users, but for some // Only port above 1024 can be used by non root users, but for some
// reason I got port 7 returned with macOS when binding on port 0... // reason I got port 7 returned with macOS when binding on port 0...

View File

@ -65,7 +65,7 @@ namespace
_webSocket.setUrl(url); _webSocket.setUrl(url);
// The important bit for this test. // The important bit for this test.
// Set a 1 second heartbeat ; if no traffic is present on the connection for 1 second // Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client. // a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1); _webSocket.setHeartBeatPeriod(1);
@ -128,11 +128,10 @@ namespace
{ {
// A dev/null server // A dev/null server
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket, [&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType, [webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -142,7 +141,6 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New server connection"; Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@ -8,7 +8,6 @@
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include <ixwebsocket/IXSocketFactory.h>
#include "IXTest.h" #include "IXTest.h"
@ -18,32 +17,13 @@ using namespace ix;
namespace ix namespace ix
{ {
// Test that we can override the connectionState impl to provide our own bool startServer(ix::WebSocketServer& server)
class ConnectionStateCustom : public ConnectionState
{ {
void computeId()
{
// a very boring invariant id that we can test against in the unittest
_id = "foobarConnectionId";
}
};
bool startServer(ix::WebSocketServer& server,
std::string& connectionId)
{
auto factory = []() -> std::shared_ptr<ConnectionState>
{
return std::make_shared<ConnectionStateCustom>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, [webSocket, &server](ix::WebSocketMessageType messageType,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -52,18 +32,13 @@ namespace ix
{ {
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
connectionState->computeId();
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
{ {
Logger() << it.first << ": " << it.second; Logger() << it.first << ": " << it.second;
} }
connectionId = connectionState->getId();
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
@ -102,21 +77,19 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
std::string connectionId; REQUIRE(startServer(server));
REQUIRE(startServer(server, connectionId));
std::string errMsg; Socket socket;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost"); std::string host("localhost");
std::string errMsg;
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;
}; };
bool success = socket->connect(host, port, errMsg, isCancellationRequested); bool success = socket.connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success); REQUIRE(success);
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket.readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
@ -136,24 +109,22 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
std::string connectionId; REQUIRE(startServer(server));
REQUIRE(startServer(server, connectionId));
std::string errMsg; Socket socket;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost"); std::string host("localhost");
std::string errMsg;
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;
}; };
bool success = socket->connect(host, port, errMsg, isCancellationRequested); bool success = socket.connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success); REQUIRE(success);
Logger() << "writeBytes"; Logger() << "writeBytes";
socket->writeBytes("GET /\r\n", isCancellationRequested); socket.writeBytes("GET /\r\n", isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket.readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
@ -173,28 +144,26 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{ {
int port = getFreePort(); int port = getFreePort();
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
std::string connectionId; REQUIRE(startServer(server));
REQUIRE(startServer(server, connectionId));
std::string errMsg; Socket socket;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost"); std::string host("localhost");
std::string errMsg;
auto isCancellationRequested = []() -> bool auto isCancellationRequested = []() -> bool
{ {
return false; return false;
}; };
bool success = socket->connect(host, port, errMsg, isCancellationRequested); bool success = socket.connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success); REQUIRE(success);
socket->writeBytes("GET / HTTP/1.1\r\n" socket.writeBytes("GET / HTTP/1.1\r\n"
"Upgrade: websocket\r\n" "Upgrade: websocket\r\n"
"Sec-WebSocket-Version: 13\r\n" "Sec-WebSocket-Version: 13\r\n"
"Sec-WebSocket-Key: foobar\r\n" "Sec-WebSocket-Key: foobar\r\n"
"\r\n", "\r\n",
isCancellationRequested); isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket.readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
@ -205,8 +174,6 @@ TEST_CASE("Websocket_server", "[websocket_server]")
// Give us 500ms for the server to notice that clients went away // Give us 500ms for the server to notice that clients went away
ix::msleep(500); ix::msleep(500);
REQUIRE(connectionId == "foobarConnectionId");
server.stop(); server.stop();
REQUIRE(server.getClients().size() == 0); REQUIRE(server.getClients().size() == 0);
} }

View File

@ -164,21 +164,10 @@ namespace
ss << "cmd_websocket_chat: Error ! " << error.reason; ss << "cmd_websocket_chat: Error ! " << error.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
else else
{ {
ss << "Unexpected ix::WebSocketMessageType"; // FIXME: missing ping/pong messages
ss << "Invalid ix::WebSocketMessageType";
log(ss.str()); log(ss.str());
} }
}); });
@ -217,11 +206,10 @@ namespace
bool startServer(ix::WebSocketServer& server) bool startServer(ix::WebSocketServer& server)
{ {
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -231,7 +219,6 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
Logger() << "New connection"; Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri; Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:"; Logger() << "Headers:";
for (auto it : openInfo.headers) for (auto it : openInfo.headers)

View File

@ -2,47 +2,14 @@ import os
import platform import platform
import shutil import shutil
import subprocess
import threading
class Command(object):
"""Run system commands with timeout
From http://www.bo-yang.net/2016/12/01/python-run-command-with-timeout
Python3 might have a builtin way to do that.
"""
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run_command(self, capture = False):
self.process = subprocess.Popen(self.cmd, shell=True)
self.process.communicate()
def run(self, timeout = 5 * 60):
'''5 minutes default timeout'''
thread = threading.Thread(target=self.run_command, args=())
thread.start()
thread.join(timeout)
if thread.is_alive():
print('Command timeout, kill it: ' + self.cmd)
self.process.terminate()
thread.join()
return False, 255
else:
return True, self.process.returncode
osName = platform.system() osName = platform.system()
print('os name = {}'.format(osName)) print('os name = {}'.format(osName))
root = os.path.dirname(os.path.realpath(__file__)) root = os.path.dirname(os.path.realpath(__file__))
buildDir = os.path.join(root, 'build', osName) buildDir = os.path.join(root, 'build')
if not os.path.exists(buildDir): if not os.path.exists(buildDir):
os.makedirs(buildDir) os.mkdir(buildDir)
os.chdir(buildDir) os.chdir(buildDir)
@ -71,7 +38,7 @@ sanitizerFlags = sanitizersFlags[sanitizer]
# os.environ['CC'] = 'clang-cl' # os.environ['CC'] = 'clang-cl'
# os.environ['CXX'] = 'clang-cl' # os.environ['CXX'] = 'clang-cl'
cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ../..'.format(generator, sanitizerFlags) cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ..'.format(generator, sanitizerFlags)
print(cmakeCmd) print(cmakeCmd)
ret = os.system(cmakeCmd) ret = os.system(cmakeCmd)
assert ret == 0, 'CMake failed, exiting' assert ret == 0, 'CMake failed, exiting'
@ -100,7 +67,6 @@ def findFiles(prefix):
# We need to copy the zlib DLL in the current work directory # We need to copy the zlib DLL in the current work directory
shutil.copy(os.path.join( shutil.copy(os.path.join(
'..',
'..', '..',
'..', '..',
'third_party', 'third_party',
@ -111,9 +77,6 @@ shutil.copy(os.path.join(
'bin', 'bin',
'zlib.dll'), '.') 'zlib.dll'), '.')
# lldb = "lldb --batch -o 'run' -k 'thread backtrace all' -k 'quit 1'" testCommand = '{} {}'.format(testBinary, os.getenv('TEST', ''))
lldb = "" # Disabled for now ret = os.system(testCommand)
testCommand = '{} {} {}'.format(lldb, testBinary, os.getenv('TEST', ''))
command = Command(testCommand)
timedout, ret = command.run()
assert ret == 0, 'Test command failed' assert ret == 0, 'Test command failed'

View File

@ -11,6 +11,10 @@
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
ix::Socket::init(); // for Windows
int result = Catch::Session().run(argc, argv); int result = Catch::Session().run(argc, argv);
ix::Socket::cleanup(); // for Windows
return result; return result;
} }

View File

@ -1,3 +1,2 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+ find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+ find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.md' -exec sed -i '' 's/[[:space:]]*$//' {} \+

1
ws/.gitignore vendored
View File

@ -1,2 +1 @@
build build
node_modules

View File

@ -23,9 +23,6 @@ add_executable(ws
ixcrypto/IXHash.cpp ixcrypto/IXHash.cpp
ixcrypto/IXUuid.cpp ixcrypto/IXUuid.cpp
IXRedisClient.cpp
ws_http_client.cpp
ws_ping_pong.cpp ws_ping_pong.cpp
ws_broadcast_server.cpp ws_broadcast_server.cpp
ws_echo_server.cpp ws_echo_server.cpp
@ -34,8 +31,6 @@ add_executable(ws
ws_transfer.cpp ws_transfer.cpp
ws_send.cpp ws_send.cpp
ws_receive.cpp ws_receive.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws.cpp) ws.cpp)
if (APPLE AND USE_TLS) if (APPLE AND USE_TLS)

View File

@ -1,166 +0,0 @@
/*
* IXRedisClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXRedisClient.h"
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
namespace ix
{
bool RedisClient::connect(const std::string& hostname, int port)
{
bool tls = false;
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return false;
}
std::string errMsg;
return _socket->connect(hostname, port, errMsg, nullptr);
}
bool RedisClient::publish(const std::string& channel,
const std::string& message)
{
if (!_socket) return false;
std::stringstream ss;
ss << "PUBLISH ";
ss << channel;
ss << " ";
ss << message;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
return lineValid;
}
//
// FIXME: we assume that redis never return errors...
//
bool RedisClient::subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback)
{
if (!_socket) return false;
std::stringstream ss;
ss << "SUBSCRIBE ";
ss << channel;
ss << "\r\n";
bool sent = _socket->writeBytes(ss.str(), nullptr);
if (!sent)
{
return false;
}
// Wait 1s for the response
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// Read the first line of the response
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// There are 5 items for the subscribe repply
for (int i = 0; i < 5; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
}
// Wait indefinitely for new messages
while (true)
{
// Wait until something is ready to read
auto pollResult = _socket->isReadyToRead(-1);
if (pollResult == PollResultType::Error)
{
return false;
}
// The first line of the response describe the return type,
// => *3 (an array of 3 elements)
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
int arraySize;
{
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> arraySize;
}
// There are 6 items for each received message
for (int i = 0; i < arraySize; ++i)
{
auto lineResult = _socket->readLine(nullptr);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid) return false;
// Messages are string, which start with a string size
// => $7 (7 bytes)
int stringSize;
std::stringstream ss;
ss << line.substr(1, line.size()-1);
ss >> stringSize;
auto readResult = _socket->readBytes(stringSize, nullptr, nullptr);
if (!readResult.first) return false;
if (i == 2)
{
// The message is the 3rd element.
callback(readResult.second);
}
// read last 2 bytes (\r\n)
char c;
_socket->readByte(&c, nullptr);
_socket->readByte(&c, nullptr);
}
}
return true;
}
}

View File

@ -1,36 +0,0 @@
/*
* IXRedisClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
#include <functional>
namespace ix
{
class Socket;
class RedisClient {
public:
using OnRedisSubscribeCallback = std::function<void(const std::string&)>;
RedisClient() = default;
~RedisClient() = default;
bool connect(const std::string& hostname,
int port);
bool publish(const std::string& channel,
const std::string& message);
bool subscribe(const std::string& channel,
const OnRedisSubscribeCallback& callback);
private:
std::shared_ptr<Socket> _socket;
};
}

View File

@ -1,62 +1,10 @@
# General
ws is a command line tool that should exercise most of the IXWebSocket code, and provide example code.
```
$ ws --help
ws is a websocket tool
Usage: ws [OPTIONS] SUBCOMMAND
Options:
-h,--help Print this help message and exit
Subcommands:
send Send a file
receive Receive a file
transfer Broadcasting server
connect Connect to a remote server
chat Group chat
echo_server Echo server
broadcast_server Broadcasting server
ping Ping pong
curl HTTP Client
```
## file transfer
``` ```
# Start transfer server, which is just a broadcast server at this point # Start transfer server, which is just a broadcast server at this point
ws transfer # running on port 8080. ./ws transfer # running on port 8080.
# Start receiver first # Start receiver first
ws receive ws://localhost:8080 ./ws receive ws://localhost:8080
# Then send a file. File will be received and written to disk by the receiver process # Then send a file. File will be received and written to disk by the receiver process
ws send ws://localhost:8080 /file/to/path ./ws send ws://localhost:8080 /file/to/path
```
## curl
```
$ ws curl --help
HTTP Client
Usage: ws curl [OPTIONS] url
Positionals:
url TEXT REQUIRED Connection url
Options:
-h,--help Print this help message and exit
-d TEXT Form data
-F TEXT Form data
-H TEXT Header
--output TEXT Output file
-I Send a HEAD request
-L Follow redirects
--max-redirects INT Max Redirects
-v Verbose
-O Save output to disk
--compress Enable gzip compression
--connect-timeout INT Connection timeout
--transfer-timeout INT Transfer timeout
``` ```

View File

@ -13,7 +13,6 @@ g++ --std=c++14 \
../ixwebsocket/IXSocket.cpp \ ../ixwebsocket/IXSocket.cpp \
../ixwebsocket/IXSocketServer.cpp \ ../ixwebsocket/IXSocketServer.cpp \
../ixwebsocket/IXSocketConnect.cpp \ ../ixwebsocket/IXSocketConnect.cpp \
../ixwebsocket/IXSocketFactory.cpp \
../ixwebsocket/IXDNSLookup.cpp \ ../ixwebsocket/IXDNSLookup.cpp \
../ixwebsocket/IXCancellationRequest.cpp \ ../ixwebsocket/IXCancellationRequest.cpp \
../ixwebsocket/IXWebSocket.cpp \ ../ixwebsocket/IXWebSocket.cpp \
@ -23,16 +22,12 @@ g++ --std=c++14 \
../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \ ../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp \ ../ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \ ../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
../ixwebsocket/IXWebSocketHttpHeaders.cpp \
../ixwebsocket/IXHttpClient.cpp \
../ixwebsocket/IXUrlParser.cpp \
../ixwebsocket/IXSocketOpenSSL.cpp \ ../ixwebsocket/IXSocketOpenSSL.cpp \
../ixwebsocket/linux/IXSetThreadName_linux.cpp \ ../ixwebsocket/linux/IXSetThreadName_linux.cpp \
../third_party/msgpack11/msgpack11.cpp \ ../third_party/jsoncpp/jsoncpp.cpp \
ixcrypto/IXBase64.cpp \ ixcrypto/IXBase64.cpp \
ixcrypto/IXHash.cpp \ ixcrypto/IXHash.cpp \
ixcrypto/IXUuid.cpp \ ixcrypto/IXUuid.cpp \
ws_http_client.cpp \
ws_ping_pong.cpp \ ws_ping_pong.cpp \
ws_broadcast_server.cpp \ ws_broadcast_server.cpp \
ws_echo_server.cpp \ ws_echo_server.cpp \

19
ws/package-lock.json generated
View File

@ -1,19 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"ws": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.2.0.tgz",
"integrity": "sha512-deZYUNlt2O4buFCa3t5bKLf8A7FPP/TVjwOeVNpw818Ma5nk4MLXls2eoEGS39o8119QIYxTrTDoPQ5B/gTD6w==",
"requires": {
"async-limiter": "1.0.0"
}
}
}
}

View File

@ -1,64 +0,0 @@
#!/bin/sh
# Handle Ctrl-C by killing all sub-processing AND exiting
trap cleanup INT
function cleanup {
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`
exit 1
}
rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test
# Start a transport server
cd /tmp/ws_test
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile.transfer &
# Wait until the transfer server is up
while true
do
nc -zv 127.0.0.1 8090 && {
echo "Transfer server up and running"
break
}
echo "sleep ... wait for transfer server"
sleep 0.1
done
# Start a receiver
mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive
ws receive --delay 10 ws://127.0.0.1:8090 --pidfile /tmp/ws_test/pidfile.receive &
mkdir /tmp/ws_test/send
cd /tmp/ws_test/send
dd if=/dev/urandom of=20M_file count=20000 bs=1024
# Start the sender job
ws send --pidfile /tmp/ws_test/pidfile.send ws://127.0.0.1:8090 20M_file
# Wait until the file has been written to disk
while true
do
if test -f /tmp/ws_test/receive/20M_file ; then
echo "Received file does exists, exiting loop"
break
fi
echo "sleep ... wait for output file"
sleep 0.1
done
cksum /tmp/ws_test/send/20M_file
cksum /tmp/ws_test/receive/20M_file
# Give some time to ws receive to terminate
sleep 2
# Cleanup
kill `cat /tmp/ws_test/pidfile.transfer`
kill `cat /tmp/ws_test/pidfile.receive`
kill `cat /tmp/ws_test/pidfile.send`

124
ws/ws.cpp
View File

@ -1,14 +1,9 @@
/* /*
* ws.cpp * ws.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/ */
//
// Main driver for websocket utilities
//
#include "ws.h"
// //
// Main drive for websocket utilities // Main drive for websocket utilities
// //
@ -16,12 +11,32 @@
#include <string> #include <string>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <fstream>
#include <unistd.h>
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
namespace ix
{
int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port);
int ws_broadcast_server_main(int port);
int ws_chat_main(const std::string& url,
const std::string& user);
int ws_connect_main(const std::string& url);
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate);
int ws_transfer_main(int port);
int ws_send_main(const std::string& url,
const std::string& path);
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
CLI::App app{"ws is a websocket tool"}; CLI::App app{"ws is a websocket tool"};
@ -30,41 +45,17 @@ int main(int argc, char** argv)
std::string url("ws://127.0.0.1:8080"); std::string url("ws://127.0.0.1:8080");
std::string path; std::string path;
std::string user; std::string user;
std::string data;
std::string headers;
std::string output;
std::string hostname("127.0.0.1");
std::string pidfile;
std::string channel;
std::string message;
bool headersOnly = false;
bool followRedirects = false;
bool verbose = false;
bool save = false;
bool compress = false;
int port = 8080; int port = 8080;
int redisPort = 6379;
int connectTimeOut = 60;
int transferTimeout = 1800;
int maxRedirects = 5;
int delayMs = -1;
CLI::App* sendApp = app.add_subcommand("send", "Send a file"); CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required(); sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send") sendApp->add_option("path", path, "Path to the file to send")->required();
->required()->check(CLI::ExistingPath);
sendApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file"); CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required(); receiveApp->add_option("url", url, "Connection url")->required();
receiveApp->add_option("--delay", delayMs, "Delay (ms) to wait after receiving a fragment"
" to artificially slow down the receiver");
receiveApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url"); transferApp->add_option("--port", port, "Connection url");
transferApp->add_option("--host", hostname, "Hostname");
transferApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
@ -74,59 +65,21 @@ int main(int argc, char** argv)
chatApp->add_option("user", user, "User name")->required(); chatApp->add_option("user", user, "User name")->required();
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server"); CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Port"); echoServerApp->add_option("--port", port, "Connection url");
echoServerApp->add_option("--host", hostname, "Hostname");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Port"); broadcastServerApp->add_option("--port", port, "Connection url");
broadcastServerApp->add_option("--host", hostname, "Hostname");
CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong"); CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong");
pingPongApp->add_option("url", url, "Connection url")->required(); pingPongApp->add_option("url", url, "Connection url")->required();
CLI::App* httpClientApp = app.add_subcommand("curl", "HTTP Client");
httpClientApp->add_option("url", url, "Connection url")->required();
httpClientApp->add_option("-d", data, "Form data")->join();
httpClientApp->add_option("-F", data, "Form data")->join();
httpClientApp->add_option("-H", headers, "Header")->join();
httpClientApp->add_option("--output", output, "Output file");
httpClientApp->add_flag("-I", headersOnly, "Send a HEAD request");
httpClientApp->add_flag("-L", followRedirects, "Follow redirects");
httpClientApp->add_option("--max-redirects", maxRedirects, "Max Redirects");
httpClientApp->add_flag("-v", verbose, "Verbose");
httpClientApp->add_flag("-O", save, "Save output to disk");
httpClientApp->add_flag("--compress", compress, "Enable gzip compression");
httpClientApp->add_option("--connect-timeout", connectTimeOut, "Connection timeout");
httpClientApp->add_option("--transfer-timeout", transferTimeout, "Transfer timeout");
CLI::App* redisPublishApp = app.add_subcommand("redis_publish", "Redis publisher");
redisPublishApp->add_option("--port", redisPort, "Port");
redisPublishApp->add_option("--host", hostname, "Hostname");
redisPublishApp->add_option("channel", channel, "Channel")->required();
redisPublishApp->add_option("message", message, "Message")->required();
CLI::App* redisSubscribeApp = app.add_subcommand("redis_subscribe", "Redis subscriber");
redisSubscribeApp->add_option("--port", redisPort, "Port");
redisSubscribeApp->add_option("--host", hostname, "Hostname");
redisSubscribeApp->add_option("channel", channel, "Channel")->required();
redisSubscribeApp->add_flag("-v", verbose, "Verbose");
CLI11_PARSE(app, argc, argv); CLI11_PARSE(app, argc, argv);
// pid file handling ix::Socket::init();
if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
if (app.got_subcommand("transfer")) if (app.got_subcommand("transfer"))
{ {
return ix::ws_transfer_main(port, hostname); return ix::ws_transfer_main(port);
} }
else if (app.got_subcommand("send")) else if (app.got_subcommand("send"))
{ {
@ -135,7 +88,7 @@ int main(int argc, char** argv)
else if (app.got_subcommand("receive")) else if (app.got_subcommand("receive"))
{ {
bool enablePerMessageDeflate = false; bool enablePerMessageDeflate = false;
return ix::ws_receive_main(url, enablePerMessageDeflate, delayMs); return ix::ws_receive_main(url, enablePerMessageDeflate);
} }
else if (app.got_subcommand("connect")) else if (app.got_subcommand("connect"))
{ {
@ -147,31 +100,16 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("echo_server")) else if (app.got_subcommand("echo_server"))
{ {
return ix::ws_echo_server_main(port, hostname); return ix::ws_echo_server_main(port);
} }
else if (app.got_subcommand("broadcast_server")) else if (app.got_subcommand("broadcast_server"))
{ {
return ix::ws_broadcast_server_main(port, hostname); return ix::ws_broadcast_server_main(port);
} }
else if (app.got_subcommand("ping")) else if (app.got_subcommand("ping"))
{ {
return ix::ws_ping_pong_main(url); return ix::ws_ping_pong_main(url);
} }
else if (app.got_subcommand("curl"))
{
return ix::ws_http_client_main(url, headers, data, headersOnly,
connectTimeOut, transferTimeout,
followRedirects, maxRedirects, verbose,
save, output, compress);
}
else if (app.got_subcommand("redis_publish"))
{
return ix::ws_redis_publish_main(hostname, redisPort, channel, message);
}
else if (app.got_subcommand("redis_subscribe"))
{
return ix::ws_redis_subscribe_main(hostname, redisPort, channel, verbose);
}
return 1; return 1;
} }

52
ws/ws.h
View File

@ -1,52 +0,0 @@
/*
* ws.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
namespace ix
{
int ws_http_client_main(const std::string& url,
const std::string& headers,
const std::string& data,
bool headersOnly,
int connectTimeout,
int transferTimeout,
bool followRedirects,
int maxRedirects,
bool verbose,
bool save,
const std::string& output,
bool compress);
int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_transfer_main(int port, const std::string& hostname);
int ws_chat_main(const std::string& url,
const std::string& user);
int ws_connect_main(const std::string& url);
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate,
int delayMs);
int ws_send_main(const std::string& url,
const std::string& path);
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message);
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose);
}

View File

@ -10,18 +10,17 @@
namespace ix namespace ix
{ {
int ws_broadcast_server_main(int port, const std::string& hostname) int ws_broadcast_server_main(int port)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; std::cout << "Listening on port " << port << std::endl;
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -31,7 +30,6 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
@ -41,47 +39,16 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection" << std::endl;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(str);
[](int current, int total) -> bool
{
std::cerr << "Step " << current
<< " out of " << total << std::endl;
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (client->bufferedAmount() != 0);
} }
} }
} }

View File

@ -5,10 +5,9 @@
*/ */
// //
// Simple chat program that talks to a broadcast server // Simple chat program that talks to the node.js server at
// Broadcast server can be ran with `ws broadcast_server` // websocket_chat_server/broacast-server.js
// //
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <queue> #include <queue>
@ -94,26 +93,16 @@ namespace ix
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
log("ws chat: connected"); ss << "cmd_websocket_chat: user "
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
ss << "ws chat: user "
<< _user << _user
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
ss << "ws chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " disconnected !" << " disconnected !";
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
@ -127,7 +116,7 @@ namespace ix
_receivedQueue.push(result.second); _receivedQueue.push(result.second);
ss << std::endl ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second << result.first << " > " << result.second
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
@ -198,7 +187,5 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketChat.stop(); webSocketChat.stop();
return 0;
} }
} }

View File

@ -84,8 +84,6 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: " ss << "ws_connect: received message: "
<< str; << str;
log(ss.str()); log(ss.str());
@ -153,6 +151,7 @@ namespace ix
int ws_connect_main(const std::string& url) int ws_connect_main(const std::string& url)
{ {
Socket::init();
interactiveMain(url); interactiveMain(url);
return 0; return 0;
} }

View File

@ -10,18 +10,17 @@
namespace ix namespace ix
{ {
int ws_echo_server_main(int port, const std::string& hostname) int ws_echo_server_main(int port)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; std::cout << "Listening on port " << port << std::endl;
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[](std::shared_ptr<ix::WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState](ix::WebSocketMessageType messageType, [webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -31,7 +30,6 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
@ -41,18 +39,7 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection" << std::endl;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {

View File

@ -1,191 +0,0 @@
/*
* http_client.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <fstream>
#include <ixwebsocket/IXHttpClient.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
namespace ix
{
std::string extractFilename(const std::string& path)
{
std::string::size_type idx;
idx = path.rfind('/');
if (idx != std::string::npos)
{
std::string filename = path.substr(idx+1);
return filename;
}
else
{
return path;
}
}
WebSocketHttpHeaders parseHeaders(const std::string& data)
{
WebSocketHttpHeaders headers;
// Split by \n
std::string token;
std::stringstream tokenStream(data);
while (std::getline(tokenStream, token))
{
std::size_t pos = token.rfind(':');
// Bail out if last '.' is found
if (pos == std::string::npos) continue;
auto key = token.substr(0, pos);
auto val = token.substr(pos+2);
std::cerr << key << ": " << val << std::endl;
headers[key] = val;
}
return headers;
}
//
// Useful endpoint to test HTTP post
// https://postman-echo.com/post
//
HttpParameters parsePostParameters(const std::string& data)
{
HttpParameters httpParameters;
// Split by \n
std::string token;
std::stringstream tokenStream(data);
while (std::getline(tokenStream, token))
{
std::size_t pos = token.rfind('=');
// Bail out if last '.' is found
if (pos == std::string::npos) continue;
auto key = token.substr(0, pos);
auto val = token.substr(pos+1);
std::cerr << key << ": " << val << std::endl;
httpParameters[key] = val;
}
return httpParameters;
}
int ws_http_client_main(const std::string& url,
const std::string& headersData,
const std::string& data,
bool headersOnly,
int connectTimeout,
int transferTimeout,
bool followRedirects,
int maxRedirects,
bool verbose,
bool save,
const std::string& output,
bool compress)
{
HttpRequestArgs args;
args.extraHeaders = parseHeaders(headersData);
args.connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout;
args.followRedirects = followRedirects;
args.maxRedirects = maxRedirects;
args.verbose = verbose;
args.compress = compress;
args.logger = [](const std::string& msg)
{
std::cout << msg;
};
args.onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
HttpParameters httpParameters = parsePostParameters(data);
HttpClient httpClient;
HttpResponse out;
if (headersOnly)
{
out = httpClient.head(url, args);
}
else if (data.empty())
{
out = httpClient.get(url, args);
}
else
{
out = httpClient.post(url, httpParameters, args);
}
std::cerr << std::endl;
auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
for (auto it : responseHeaders)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::cerr << "Upload size: " << uploadSize << std::endl;
std::cerr << "Download size: " << downloadSize << std::endl;
std::cerr << "Status: " << statusCode << std::endl;
if (errorCode != HttpErrorCode_Ok)
{
std::cerr << "error message: " << errorMsg << std::endl;
}
if (!headersOnly && errorCode == HttpErrorCode_Ok)
{
if (save || !output.empty())
{
// FIMXE we should decode the url first
std::string filename = extractFilename(url);
if (!output.empty())
{
filename = output;
}
std::cout << "Writing to disk: " << filename << std::endl;
std::ofstream out(filename);
out.write((char*)&payload.front(), payload.size());
out.close();
}
else
{
if (responseHeaders["Content-Type"] != "application/octet-stream")
{
std::cout << "payload: " << payload << std::endl;
}
else
{
std::cerr << "Binary output can mess up your terminal." << std::endl;
std::cerr << "Use the -O flag to save the file to disk." << std::endl;
std::cerr << "You can also use the --output option to specify a filename." << std::endl;
}
}
}
return 0;
}
}

View File

@ -61,19 +61,10 @@ namespace ix
const ix::WebSocketOpenInfo& openInfo, const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo) const ix::WebSocketCloseInfo& closeInfo)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
@ -162,7 +153,5 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketPingPong.stop(); webSocketPingPong.stop();
return 0;
} }
} }

View File

@ -26,8 +26,7 @@ namespace ix
{ {
public: public:
WebSocketReceiver(const std::string& _url, WebSocketReceiver(const std::string& _url,
bool enablePerMessageDeflate, bool enablePerMessageDeflate);
int delayMs);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -42,8 +41,6 @@ namespace ix
std::string _id; std::string _id;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
bool _enablePerMessageDeflate; bool _enablePerMessageDeflate;
int _delayMs;
int _receivedFragmentCounter;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
std::condition_variable _condition; std::condition_variable _condition;
@ -54,12 +51,9 @@ namespace ix
}; };
WebSocketReceiver::WebSocketReceiver(const std::string& url, WebSocketReceiver::WebSocketReceiver(const std::string& url,
bool enablePerMessageDeflate, bool enablePerMessageDeflate) :
int delayMs) :
_url(url), _url(url),
_enablePerMessageDeflate(enablePerMessageDeflate), _enablePerMessageDeflate(enablePerMessageDeflate)
_delayMs(delayMs),
_receivedFragmentCounter(0)
{ {
; ;
} }
@ -152,16 +146,11 @@ namespace ix
std::string filename = data["filename"].string_value(); std::string filename = data["filename"].string_value();
filename = extractFilename(filename); filename = extractFilename(filename);
std::string filenameTmp = filename + ".tmp"; std::cout << "Writing to disk: " << filename << std::endl;
std::ofstream out(filename);
std::cout << "Writing to disk: " << filenameTmp << std::endl;
std::ofstream out(filenameTmp);
out.write((char*)&content.front(), content.size()); out.write((char*)&content.front(), content.size());
out.close(); out.close();
std::cout << "Renaming " << filenameTmp << " to " << filename << std::endl;
rename(filenameTmp.c_str(), filename.c_str());
std::map<MsgPack, MsgPack> pdu; std::map<MsgPack, MsgPack> pdu;
pdu["ack"] = true; pdu["ack"] = true;
pdu["id"] = data["id"]; pdu["id"] = data["id"];
@ -217,21 +206,8 @@ namespace ix
handleMessage(str); handleMessage(str);
_condition.notify_one(); _condition.notify_one();
} }
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
ss << "ws_receive: received fragment " << _receivedFragmentCounter++;
log(ss.str());
if (_delayMs > 0)
{
// Introduce an arbitrary delay, to simulate a slow connection
std::chrono::duration<double, std::milli> duration(_delayMs);
std::this_thread::sleep_for(duration);
}
}
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "ws_receive ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl;
@ -249,10 +225,9 @@ namespace ix
} }
void wsReceive(const std::string& url, void wsReceive(const std::string& url,
bool enablePerMessageDeflate, bool enablePerMessageDeflate)
int delayMs)
{ {
WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate, delayMs); WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate);
webSocketReceiver.start(); webSocketReceiver.start();
webSocketReceiver.waitForConnection(); webSocketReceiver.waitForConnection();
@ -267,10 +242,10 @@ namespace ix
} }
int ws_receive_main(const std::string& url, int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate, bool enablePerMessageDeflate)
int delayMs)
{ {
wsReceive(url, enablePerMessageDeflate, delayMs); Socket::init();
wsReceive(url, enablePerMessageDeflate);
return 0; return 0;
} }
} }

View File

@ -1,35 +0,0 @@
/*
* ws_redis_publish.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_publish_main(const std::string& hostname,
int port,
const std::string& channel,
const std::string& message)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::cerr << "Publishing message " << message
<< " to " << channel << "..." << std::endl;
if (!redisClient.publish(channel, message))
{
std::cerr << "Error publishing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

View File

@ -1,66 +0,0 @@
/*
* ws_redis_subscribe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <chrono>
#include "IXRedisClient.h"
namespace ix
{
int ws_redis_subscribe_main(const std::string& hostname,
int port,
const std::string& channel,
bool verbose)
{
RedisClient redisClient;
if (!redisClient.connect(hostname, port))
{
std::cerr << "Cannot connect to redis host" << std::endl;
return 1;
}
std::chrono::time_point<std::chrono::steady_clock> lastTimePoint;
int msgPerSeconds = 0;
int msgCount = 0;
auto callback = [&lastTimePoint, &msgPerSeconds, &msgCount, verbose]
(const std::string& message)
{
if (verbose)
{
std::cout << message << std::endl;
}
msgPerSeconds++;
auto now = std::chrono::steady_clock::now();
if (now - lastTimePoint > std::chrono::seconds(1))
{
lastTimePoint = std::chrono::steady_clock::now();
msgCount += msgPerSeconds;
// #messages 901 msg/s 150
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
}
};
std::cerr << "Subscribing to " << channel << "..." << std::endl;
if (!redisClient.subscribe(channel, callback))
{
std::cerr << "Error subscribing to channel " << channel << std::endl;
return 1;
}
return 0;
}
}

View File

@ -162,7 +162,6 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "ws_send ";
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl; ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl; ss << "Wait time(ms): " << error.wait_time << std::endl;
@ -247,7 +246,7 @@ namespace ix
_webSocket.send(msg.dump(), _webSocket.send(msg.dump(),
[throttle](int current, int total) -> bool [throttle](int current, int total) -> bool
{ {
std::cout << "ws_send: Step " << current << " out of " << total << std::endl; std::cout << "Step " << current << " out of " << total << std::endl;
if (throttle) if (throttle)
{ {
@ -258,16 +257,6 @@ namespace ix
return true; return true;
}); });
do
{
size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << "ws_send: " << bufferedAmount
<< " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (_webSocket.bufferedAmount() != 0);
bench.report(); bench.report();
auto duration = bench.getDuration(); auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration; auto transferRate = 1000 * content.size() / duration;
@ -300,6 +289,7 @@ namespace ix
bool throttle = false; bool throttle = false;
bool enablePerMessageDeflate = false; bool enablePerMessageDeflate = false;
Socket::init();
wsSend(url, path, enablePerMessageDeflate, throttle); wsSend(url, path, enablePerMessageDeflate, throttle);
return 0; return 0;
} }

View File

@ -10,18 +10,17 @@
namespace ix namespace ix
{ {
int ws_transfer_main(int port, const std::string& hostname) int ws_transfer_main(int port)
{ {
std::cout << "Listening on " << hostname << ":" << port << std::endl; std::cout << "Listening on port " << port << std::endl;
ix::WebSocketServer server(port, hostname); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -31,7 +30,6 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
std::cerr << "id: " << connectionState->getId() << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
@ -41,23 +39,7 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
std::cerr << "Closed connection" std::cerr << "Closed connection" << std::endl;
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
std::cerr << "Received message fragment "
<< std::endl;
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
@ -66,23 +48,7 @@ namespace ix
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str, client->send(str);
[](int current, int total) -> bool
{
std::cerr << "ws_transfer: Step " << current
<< " out of " << total << std::endl;
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << "ws_transfer: " << bufferedAmount
<< " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (client->bufferedAmount() != 0);
} }
} }
} }