Compare commits

..

6 Commits

148 changed files with 1406 additions and 17258 deletions

View File

@ -1 +0,0 @@
build

0
.gitmodules vendored Normal file
View File

View File

@ -2,16 +2,10 @@ language: cpp
dist: xenial dist: xenial
compiler: compiler:
- gcc
- clang - clang
os: # - gcc
- linux
- osx
matrix:
exclude:
# GCC fails on recent Travis OSX images.
- compiler: gcc
os: osx os: osx
# os: windows
# script: make test
script: python test/run.py script: python test/run.py

View File

@ -10,20 +10,15 @@ set (CMAKE_CXX_STANDARD 14)
set (CXX_STANDARD_REQUIRED ON) set (CXX_STANDARD_REQUIRED ON)
set (CMAKE_CXX_EXTENSIONS OFF) set (CMAKE_CXX_EXTENSIONS OFF)
# -Wshorten-64-to-32 does not work with clang
if (NOT WIN32) if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif() endif()
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
endif()
set( IXWEBSOCKET_SOURCES set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXDNSLookup.cpp ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXWebSocket.cpp ixwebsocket/IXWebSocket.cpp
@ -33,24 +28,16 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketPerMessageDeflate.cpp ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
) )
set( IXWEBSOCKET_HEADERS set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSetThreadName.h ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXWebSocket.h ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketServer.h ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h ixwebsocket/IXWebSocketTransport.h
@ -62,12 +49,6 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/libwshandshake.hpp ixwebsocket/libwshandshake.hpp
ixwebsocket/IXHttpClient.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
) )
# Platform specific code # Platform specific code
@ -77,8 +58,6 @@ elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp) list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
else() else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp) list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptEventFd.cpp)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSelectInterruptEventFd.h)
endif() endif()
if (USE_TLS) if (USE_TLS)
@ -133,5 +112,3 @@ set( IXWEBSOCKET_INCLUDE_DIRS
. .
../../shared/OpenSSL/include) ../../shared/OpenSSL/include)
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} ) target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)

View File

@ -1,31 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

1
Dockerfile Symbolic link
View File

@ -0,0 +1 @@
docker/Dockerfile.debian

View File

@ -1,20 +0,0 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

113
README.md
View File

@ -5,16 +5,17 @@
## Introduction ## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex [*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms. communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS * macOS
* iOS * iOS
* Linux * Linux
* Android * Android
* Windows (no TLS support yet)
## Examples ## Examples
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage. The examples folder countains a simple chat program, using a node.js broadcast server.
Here is what the client API looks like. Here is what the client API looks like.
@ -24,7 +25,7 @@ ix::WebSocket webSocket;
std::string url("ws://localhost:8080/"); std::string url("ws://localhost:8080/");
webSocket.setUrl(url); webSocket.setUrl(url);
// Optional heart beat, sent every 45 seconds when there is not any traffic // Optional heart beat, sent every 45 seconds when there isn't any traffic
// to make sure that load balancers do not kill an idle connection. // to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45); webSocket.setHeartBeatPeriod(45);
@ -63,11 +64,10 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<WebSocket> webSocket, [&server](std::shared_ptr<ix::WebSocket> webSocket)
std::shared_ptr<ConnectionState> connectionState)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType, [webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -77,16 +77,7 @@ server.setOnConnectionCallback(
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
{ {
@ -119,81 +110,12 @@ server.wait();
``` ```
Here is what the HTTP client API looks like. Note that HTTP client support is very recent and subject to changes.
```
//
// Preparation
//
HttpClient httpClient;
HttpRequestArgs args;
// Custom headers can be set
WebSocketHttpHeaders headers;
headers["Foo"] = "bar";
args.extraHeaders = headers;
// Timeout options
args.connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout;
// Redirect options
args.followRedirects = followRedirects;
args.maxRedirects = maxRedirects;
// Misc
args.compress = compress; // Enable gzip compression
args.verbose = verbose;
args.logger = [](const std::string& msg)
{
std::cout << msg;
};
//
// Request
//
HttpResponse out;
std::string url = "https://www.google.com";
// HEAD request
out = httpClient.head(url, args);
// GET request
out = httpClient.get(url, args);
// POST request with parameters
HttpParameters httpParameters;
httpParameters["foo"] = "bar";
out = httpClient.post(url, httpParameters, args);
// POST request with a body
out = httpClient.post(url, std::string("foo=bar"), args);
//
// Result
//
auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
```
## Build ## Build
CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project. CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project.
There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`. There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`.
You can build and install the ws command line tool with Homebrew.
```
brew tap bsergean/IXWebSocket
brew install IXWebSocket
```
## Implementation details ## Implementation details
### Per Message Deflate compression. ### Per Message Deflate compression.
@ -212,19 +134,25 @@ No manual polling to fetch data is required. Data is sent and received instantly
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
## Limitations ## Limitations
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well. * There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that. * Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue. * The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue.
## Examples
1. Bring up a terminal and jump to the examples folder.
2. Compile the example C++ code. `sh build.sh`
3. Install node.js from [here](https://nodejs.org/en/download/).
4. Type `npm install` to install the node.js dependencies. Then `node broadcast-server.js` to run the server.
5. Bring up a second terminal. `./cmd_websocket_chat bob`
6. Bring up a third terminal. `./cmd_websocket_chat bill`
7. Start typing things in any of those terminals. Hopefully you should see your message being received on the other end.
## C++ code organization ## C++ code organization
Here is a simplistic diagram which explains how the code is structured in term of class/modules. Here's a simplistic diagram which explains how the code is structured in term of class/modules.
``` ```
+-----------------------+ --- Public +-----------------------+ --- Public
@ -276,7 +204,7 @@ If the connection was closed and sending failed, the return value will be set to
1. WebSocket_ReadyState_Connecting - The connection is not yet open. 1. WebSocket_ReadyState_Connecting - The connection is not yet open.
2. WebSocket_ReadyState_Open - The connection is open and ready to communicate. 2. WebSocket_ReadyState_Open - The connection is open and ready to communicate.
3. WebSocket_ReadyState_Closing - The connection is in the process of closing. 3. WebSocket_ReadyState_Closing - The connection is in the process of closing.
4. WebSocket_MessageType_Close - The connection is closed or could not be opened. 4. WebSocket_MessageType_Close - The connection is closed or couldn't be opened.
### Open and Close notifications ### Open and Close notifications
@ -381,12 +309,11 @@ A ping message can be sent to the server, with an optional data string.
``` ```
websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long"); websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long");
```
### Heartbeat. ### Heartbeat.
You can configure an optional heart beat / keep-alive, sent every 45 seconds You can configure an optional heart beat / keep-alive, sent every 45 seconds
when there is no any traffic to make sure that load balancers do not kill an when there isn't any traffic to make sure that load balancers do not kill an
idle connection. idle connection.
``` ```

16
docker/Dockerfile Normal file
View File

@ -0,0 +1,16 @@
FROM debian:stretch
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

11
docker/Dockerfile.alpine Normal file
View File

@ -0,0 +1,11 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

11
docker/Dockerfile.centos Normal file
View File

@ -0,0 +1,11 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

19
docker/Dockerfile.debian Normal file
View File

@ -0,0 +1,19 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
COPY . .
WORKDIR test
RUN ["sh", "build_linux.sh"]

8
docker/Dockerfile.gcc Normal file
View File

@ -0,0 +1,8 @@
FROM gcc:8
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

9
examples/broadcast_server/.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
CMakeCache.txt
package-lock.json
CMakeFiles
ixwebsocket_unittest
cmake_install.cmake
node_modules
ixwebsocket
Makefile
build

View File

@ -0,0 +1,30 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (broadcast_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(broadcast_server .)
add_executable(broadcast_server
broadcast_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(broadcast_server "-framework foundation" "-framework security")
endif()
target_link_libraries(broadcast_server ixwebsocket)
install(TARGETS broadcast_server DESTINATION bin)

View File

@ -0,0 +1,74 @@
/*
* broadcast_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,2 +1,3 @@
build build
venv
node_modules node_modules

View File

@ -0,0 +1,23 @@
#
# cmd_websocket_chat.cpp
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (cmd_websocket_chat)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(cmd_websocket_chat cmd_websocket_chat.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(cmd_websocket_chat "-framework foundation" "-framework security")
endif()
target_link_libraries(cmd_websocket_chat ixwebsocket)
install(TARGETS cmd_websocket_chat DESTINATION bin)

39
examples/chat/README.md Normal file
View File

@ -0,0 +1,39 @@
# Building
1. cmake -G .
2. make
## Disable TLS
chat$ cmake -DUSE_TLS=OFF .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
chat$ make
Scanning dependencies of target ixwebsocket
[ 16%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 33%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 50%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 66%] Linking CXX static library libixwebsocket.a
[ 66%] Built target ixwebsocket
[ 83%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
## Enable TLS (default)
```
chat$ cmake -DUSE_TLS=ON .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
(venv) chat$ make
Scanning dependencies of target ixwebsocket
[ 14%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 28%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 42%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 57%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocketAppleSSL.cpp.o
[ 71%] Linking CXX static library libixwebsocket.a
[ 71%] Built target ixwebsocket
[ 85%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
```

View File

@ -0,0 +1,15 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -0,0 +1,17 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
clang++ --std=c++11 --stdlib=libc++ \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXSocketAppleSSL.cpp \
../../ixwebsocket/IXWebSocket.cpp \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat \
-framework Security \
-framework Foundation

View File

@ -1,12 +1,12 @@
/* /*
* ws_chat.cpp * cmd_websocket_chat.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/ */
// //
// Simple chat program that talks to a broadcast server // Simple chat program that talks to the node.js server at
// Broadcast server can be ran with `ws broadcast_server` // websocket_chat_server/broacast-server.js
// //
#include <iostream> #include <iostream>
@ -20,13 +20,19 @@
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;
namespace ix using namespace ix;
namespace
{ {
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketChat class WebSocketChat
{ {
public: public:
WebSocketChat(const std::string& url, WebSocketChat(const std::string& user);
const std::string& user);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -40,27 +46,19 @@ namespace ix
std::pair<std::string, std::string> decodeMessage(const std::string& str); std::pair<std::string, std::string> decodeMessage(const std::string& str);
private: private:
std::string _url;
std::string _user; std::string _user;
ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue;
void log(const std::string& msg); ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue;
}; };
WebSocketChat::WebSocketChat(const std::string& url, WebSocketChat::WebSocketChat(const std::string& user) :
const std::string& user) :
_url(url),
_user(user) _user(user)
{ {
; ;
} }
void WebSocketChat::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
size_t WebSocketChat::getReceivedMessagesCount() const size_t WebSocketChat::getReceivedMessagesCount() const
{ {
return _receivedQueue.size(); return _receivedQueue.size();
@ -78,10 +76,11 @@ namespace ix
void WebSocketChat::start() void WebSocketChat::start()
{ {
_webSocket.setUrl(_url); std::string url("ws://localhost:8080/");
_webSocket.setUrl(url);
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](ix::WebSocketMessageType messageType,
@ -94,26 +93,16 @@ namespace ix
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
log("ws chat: connected"); ss << "cmd_websocket_chat: user "
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
ss << "ws chat: user "
<< _user << _user
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
ss << "ws chat: user " ss << "cmd_websocket_chat: user "
<< _user << _user
<< " disconnected !" << " disconnected !";
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
@ -127,7 +116,7 @@ namespace ix
_receivedQueue.push(result.second); _receivedQueue.push(result.second);
ss << std::endl ss << std::endl
<< result.first << "(" << wireSize << " bytes)" << " > " << result.second << result.first << " > " << result.second
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
@ -175,11 +164,10 @@ namespace ix
_webSocket.send(encodeMessage(text)); _webSocket.send(encodeMessage(text));
} }
int ws_chat_main(const std::string& url, void interactiveMain(const std::string& user)
const std::string& user)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketChat webSocketChat(url, user); WebSocketChat webSocketChat(user);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
@ -198,7 +186,18 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketChat.stop(); webSocketChat.stop();
}
}
int main(int argc, char** argv)
{
std::string user("user");
if (argc == 2)
{
user = argv[1];
}
Socket::init();
interactiveMain(user);
return 0; return 0;
} }
}

31
examples/chat/package-lock.json generated Normal file
View File

@ -0,0 +1,31 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"safe-buffer": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
},
"ultron": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ultron/-/ultron-1.1.1.tgz",
"integrity": "sha512-UIEXBNeYmKptWH6z8ZnqTeS8fV74zG0/eRU9VGkpzz+LIJNs8W/zM/L+7ctCkRrgbNnnR0xxw4bKOr0cW0N0Og=="
},
"ws": {
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz",
"integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==",
"requires": {
"async-limiter": "1.0.0",
"safe-buffer": "5.1.2",
"ultron": "1.1.1"
}
}
}
}

View File

@ -0,0 +1,6 @@
{
"dependencies": {
"msgpack-js": "^0.3.0",
"ws": "^3.3.3"
}
}

View File

@ -15,6 +15,8 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(cobra_publisher ${OPENSSL_PREFIX}/include) include_directories(cobra_publisher ${OPENSSL_PREFIX}/include)
include_directories(cobra_publisher .) include_directories(cobra_publisher .)

View File

@ -81,55 +81,4 @@ namespace ix
return ret; return ret;
} }
static inline bool is_base64(unsigned char c)
{
return (isalnum(c) || (c == '+') || (c == '/'));
}
std::string base64_decode(const std::string& encoded_string)
{
int in_len = (int)encoded_string.size();
int i = 0;
int j = 0;
int in_ = 0;
unsigned char char_array_4[4], char_array_3[3];
std::string ret;
while(in_len-- && ( encoded_string[in_] != '=') && is_base64(encoded_string[in_]))
{
char_array_4[i++] = encoded_string[in_]; in_++;
if(i ==4)
{
for(i = 0; i <4; i++)
char_array_4[i] = base64_chars.find(char_array_4[i]);
char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for(i = 0; (i < 3); i++)
ret += char_array_3[i];
i = 0;
}
}
if(i)
{
for(j = i; j <4; j++)
char_array_4[j] = 0;
for(j = 0; j <4; j++)
char_array_4[j] = base64_chars.find(char_array_4[j]);
char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for(j = 0; (j < i - 1); j++) ret += char_array_3[j];
}
return ret;
}
} }

View File

@ -11,5 +11,4 @@
namespace ix namespace ix
{ {
std::string base64_encode(const std::string& data, size_t len); std::string base64_encode(const std::string& data, size_t len);
std::string base64_decode(const std::string& encoded_string);
} }

View File

@ -0,0 +1,30 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (echo_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(echo_server .)
add_executable(echo_server
echo_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(echo_server "-framework foundation" "-framework security")
endif()
target_link_libraries(echo_server ixwebsocket)
install(TARGETS echo_server DESTINATION bin)

View File

@ -0,0 +1,68 @@
/*
* echo_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
webSocket->send(str);
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1 +1,2 @@
venv
build build

View File

@ -0,0 +1,27 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ping_pong)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ping_pong ping_pong.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ping_pong "-framework foundation" "-framework security")
endif()
if (WIN32)
target_link_libraries(ping_pong wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
endif()
target_link_libraries(ping_pong ixwebsocket)
install(TARGETS ping_pong DESTINATION bin)

View File

@ -0,0 +1,15 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
response = await websocket.recv()
print(response)
pong_waiter = await websocket.ping('coucou')
ret = await pong_waiter # only if you want to wait for the pong
print(ret)
asyncio.get_event_loop().run_until_complete(
hello('ws://localhost:5678'))

View File

@ -1,7 +1,7 @@
/* /*
* ws_ping_pong.cpp * ping_pong.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/ */
#include <iostream> #include <iostream>
@ -9,8 +9,15 @@
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
namespace ix using namespace ix;
namespace
{ {
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketPingPong class WebSocketPingPong
{ {
public: public:
@ -26,8 +33,6 @@ namespace ix
private: private:
std::string _url; std::string _url;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
void log(const std::string& msg);
}; };
WebSocketPingPong::WebSocketPingPong(const std::string& url) : WebSocketPingPong::WebSocketPingPong(const std::string& url) :
@ -36,11 +41,6 @@ namespace ix
; ;
} }
void WebSocketPingPong::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
void WebSocketPingPong::stop() void WebSocketPingPong::stop()
{ {
_webSocket.stop(); _webSocket.stop();
@ -61,19 +61,10 @@ namespace ix
const ix::WebSocketOpenInfo& openInfo, const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo) const ix::WebSocketCloseInfo& closeInfo)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
@ -133,7 +124,7 @@ namespace ix
_webSocket.send(text); _webSocket.send(text);
} }
int ws_ping_pong_main(const std::string& url) void interactiveMain(const std::string& url)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketPingPong webSocketPingPong(url); WebSocketPingPong webSocketPingPong(url);
@ -162,7 +153,19 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketPingPong.stop(); webSocketPingPong.stop();
}
}
int main(int argc, char** argv)
{
if (argc != 2)
{
std::cerr << "Usage: ping_pong <url>" << std::endl;
return 1;
}
std::string url = argv[1];
Socket::init();
interactiveMain(url);
return 0; return 0;
} }
}

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python
import os
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(message)
await websocket.send(message)
if os.getenv('TEST_CLOSE'):
print('Closing')
# breakpoint()
await websocket.close(1001, 'close message')
# await websocket.close()
break
asyncio.get_event_loop().run_until_complete(
websockets.serve(echo, 'localhost', 5678))
asyncio.get_event_loop().run_forever()

View File

@ -0,0 +1,9 @@
#!/bin/sh
test -d build || {
mkdir -p build
cd build
cmake ..
}
(cd build ; make)
./build/ping_pong ws://localhost:5678

3
examples/ws_connect/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
build
venv
node_modules

View File

@ -0,0 +1,22 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ws_connect)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ws_connect ws_connect.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws_connect "-framework foundation" "-framework security")
endif()
target_link_libraries(ws_connect ixwebsocket)
install(TARGETS ws_connect DESTINATION bin)

View File

@ -0,0 +1,11 @@
# Building
1. mkdir build
2. cd build
3. cmake ..
4. make
## Disable TLS
* Enable: `cmake -DUSE_TLS=OFF ..`
* Disable: `cmake -DUSE_TLS=ON ..`

View File

@ -0,0 +1,25 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
-DIXWEBSOCKET_USE_TLS \
-g \
../../ixwebsocket/IXEventFd.cpp \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXSetThreadName.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
../../ixwebsocket/IXDNSLookup.cpp \
../../ixwebsocket/IXSocketConnect.cpp \
../../ixwebsocket/IXSocketOpenSSL.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
-I ../.. \
ws_connect.cpp \
-o ws_connect \
-lcrypto -lssl -lz -lpthread

View File

@ -9,8 +9,15 @@
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
namespace ix using namespace ix;
namespace
{ {
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketConnect class WebSocketConnect
{ {
public: public:
@ -25,8 +32,6 @@ namespace ix
private: private:
std::string _url; std::string _url;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
void log(const std::string& msg);
}; };
WebSocketConnect::WebSocketConnect(const std::string& url) : WebSocketConnect::WebSocketConnect(const std::string& url) :
@ -35,11 +40,6 @@ namespace ix
; ;
} }
void WebSocketConnect::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
void WebSocketConnect::stop() void WebSocketConnect::stop()
{ {
_webSocket.stop(); _webSocket.stop();
@ -84,8 +84,6 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: " ss << "ws_connect: received message: "
<< str; << str;
log(ss.str()); log(ss.str());
@ -150,11 +148,18 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketChat.stop(); webSocketChat.stop();
} }
}
int ws_connect_main(const std::string& url) int main(int argc, char** argv)
{ {
if (argc != 2)
{
std::cerr << "Usage: ws_connect <url>" << std::endl;
return 1;
}
std::string url = argv[1];
Socket::init();
interactiveMain(url); interactiveMain(url);
return 0; return 0;
} }
}

View File

@ -1,37 +0,0 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
#include <sstream>
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
std::stringstream ss;
ss << _globalId++;
_id = ss.str();
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@ -1,33 +0,0 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@ -73,7 +73,7 @@ namespace ix
errMsg = "no error"; errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ? // Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested && isCancellationRequested()) if (isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@ -121,7 +121,7 @@ namespace ix
} }
// Were we cancelled ? // Were we cancelled ?
if (isCancellationRequested && isCancellationRequested()) if (isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;
@ -129,7 +129,7 @@ namespace ix
} }
// Maybe a cancellation request got in before the bg terminated ? // Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested && isCancellationRequested()) if (isCancellationRequested())
{ {
errMsg = "cancellation requested"; errMsg = "cancellation requested";
return nullptr; return nullptr;

82
ixwebsocket/IXEventFd.cpp Normal file
View File

@ -0,0 +1,82 @@
/*
* IXEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
#include "IXEventFd.h"
#ifdef __linux__
# include <sys/eventfd.h>
#endif
#ifndef _WIN32
#include <unistd.h> // for write
#endif
namespace ix
{
EventFd::EventFd() :
_eventfd(-1)
{
#ifdef __linux__
_eventfd = eventfd(0, 0);
#endif
}
EventFd::~EventFd()
{
#ifdef __linux__
::close(_eventfd);
#endif
}
bool EventFd::notify()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// select will wake up when a non-zero value is written to our eventfd
uint64_t value = 1;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
bool EventFd::clear()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
int EventFd::getFd()
{
return _eventfd;
}
}

23
ixwebsocket/IXEventFd.h Normal file
View File

@ -0,0 +1,23 @@
/*
* IXEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
class EventFd {
public:
EventFd();
virtual ~EventFd();
bool notify();
bool clear();
int getFd();
private:
int _eventfd;
};
}

View File

@ -1,467 +0,0 @@
/*
* IXHttpClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXHttpClient.h"
#include "IXUrlParser.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXSocketFactory.h"
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
#include <zlib.h>
namespace ix
{
const std::string HttpClient::kPost = "POST";
const std::string HttpClient::kGet = "GET";
const std::string HttpClient::kHead = "HEAD";
HttpClient::HttpClient()
{
}
HttpClient::~HttpClient()
{
}
HttpResponse HttpClient::request(
const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects)
{
uint64_t uploadSize = 0;
uint64_t downloadSize = 0;
int code = 0;
WebSocketHttpHeaders headers;
std::string payload;
std::string protocol, host, path, query;
int port;
bool websocket = false;
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket))
{
std::stringstream ss;
ss << "Cannot parse url: " << url;
return std::make_tuple(code, HttpErrorCode_UrlMalformed,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
bool tls = protocol == "https";
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return std::make_tuple(code, HttpErrorCode_CannotCreateSocket,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Build request string
std::stringstream ss;
ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "User-Agent: ixwebsocket/1.0.0" << "\r\n";
ss << "Accept: */*" << "\r\n";
if (args.compress)
{
ss << "Accept-Encoding: gzip" << "\r\n";
}
// Append extra headers
for (auto&& it : args.extraHeaders)
{
ss << it.first << ": " << it.second << "\r\n";
}
if (verb == kPost)
{
ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end())
{
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
}
ss << "\r\n";
ss << body;
}
else
{
ss << "\r\n";
}
std::string req(ss.str());
std::string errMsg;
std::atomic<bool> requestInitCancellation(false);
// Make a cancellation object dealing with connection timeout
auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success)
{
std::stringstream ss;
ss << "Cannot connect to url: " << url;
return std::make_tuple(code, HttpErrorCode_CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Make a new cancellation object dealing with transfer timeout
isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation);
if (args.verbose)
{
std::stringstream ss;
ss << "Sending " << verb << " request "
<< "to " << host << ":" << port << std::endl
<< "request size: " << req.size() << " bytes" << std::endl
<< "=============" << std::endl
<< req
<< "=============" << std::endl
<< std::endl;
log(ss.str(), args);
}
if (!_socket->writeBytes(req, isCancellationRequested))
{
std::string errorMsg("Cannot send request");
return std::make_tuple(code, HttpErrorCode_SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uploadSize = req.size();
auto lineResult = _socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid)
{
std::string errorMsg("Cannot retrieve status line");
return std::make_tuple(code, HttpErrorCode_CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (args.verbose)
{
std::stringstream ss;
ss << "Status line " << line;
log(ss.str(), args);
}
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{
std::string errorMsg("Cannot parse response code from status line");
return std::make_tuple(code, HttpErrorCode_MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
headers = result.second;
if (!headersValid)
{
std::string errorMsg("Cannot parse http headers");
return std::make_tuple(code, HttpErrorCode_HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects)
{
if (headers.find("Location") == headers.end())
{
std::string errorMsg("Missing location header for redirect");
return std::make_tuple(code, HttpErrorCode_MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (redirects >= args.maxRedirects)
{
std::stringstream ss;
ss << "Too many redirects: " << redirects;
return std::make_tuple(code, HttpErrorCode_TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Recurse
std::string location = headers["Location"];
return request(location, verb, body, args, redirects+1);
}
if (verb == "HEAD")
{
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
// Parse response:
if (headers.find("Content-Length") != headers.end())
{
ssize_t contentLength = -1;
ss.str("");
ss << headers["Content-Length"];
ss >> contentLength;
payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
}
else if (headers.find("Transfer-Encoding") != headers.end() &&
headers["Transfer-Encoding"] == "chunked")
{
std::stringstream ss;
while (true)
{
lineResult = _socket->readLine(isCancellationRequested);
line = lineResult.second;
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uint64_t chunkSize;
ss.str("");
ss << std::hex << line;
ss >> chunkSize;
if (args.verbose)
{
std::stringstream oss;
oss << "Reading " << chunkSize << " bytes"
<< std::endl;
log(oss.str(), args);
}
payload.reserve(payload.size() + chunkSize);
// Read a chunk
auto chunkResult = _socket->readBytes(chunkSize,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
// Read the line that terminates the chunk (\r\n)
lineResult = _socket->readLine(isCancellationRequested);
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (chunkSize == 0) break;
}
}
else if (code == 204)
{
; // 204 is NoContent response code
}
else
{
std::string errorMsg("Cannot read http body");
return std::make_tuple(code, HttpErrorCode_CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
downloadSize = payload.size();
// If the content was compressed with gzip, decode it
if (headers["Content-Encoding"] == "gzip")
{
std::string decompressedPayload;
if (!gzipInflate(payload, decompressedPayload))
{
std::string errorMsg("Error decompressing payload");
return std::make_tuple(code, HttpErrorCode_Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload = decompressedPayload;
}
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
HttpResponse HttpClient::get(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kGet, std::string(), args);
}
HttpResponse HttpClient::head(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kHead, std::string(), args);
}
HttpResponse HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
{
return request(url, kPost, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
{
return request(url, kPost, body, args);
}
std::string HttpClient::urlEncode(const std::string& value)
{
std::ostringstream escaped;
escaped.fill('0');
escaped << std::hex;
for (std::string::const_iterator i = value.begin(), n = value.end();
i != n; ++i)
{
std::string::value_type c = (*i);
// Keep alphanumeric and other accepted characters intact
if (isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~')
{
escaped << c;
continue;
}
// Any other characters are percent-encoded
escaped << std::uppercase;
escaped << '%' << std::setw(2) << int((unsigned char) c);
escaped << std::nouppercase;
}
return escaped.str();
}
std::string HttpClient::serializeHttpParameters(const HttpParameters& httpParameters)
{
std::stringstream ss;
size_t count = httpParameters.size();
size_t i = 0;
for (auto&& it : httpParameters)
{
ss << urlEncode(it.first)
<< "="
<< urlEncode(it.second);
if (i++ < (count-1))
{
ss << "&";
}
}
return ss.str();
}
bool HttpClient::gzipInflate(
const std::string& in,
std::string& out)
{
z_stream inflateState;
std::memset(&inflateState, 0, sizeof(inflateState));
inflateState.zalloc = Z_NULL;
inflateState.zfree = Z_NULL;
inflateState.opaque = Z_NULL;
inflateState.avail_in = 0;
inflateState.next_in = Z_NULL;
if (inflateInit2(&inflateState, 16+MAX_WBITS) != Z_OK)
{
return false;
}
inflateState.avail_in = (uInt) in.size();
inflateState.next_in = (unsigned char *)(const_cast<char *>(in.data()));
const int kBufferSize = 1 << 14;
std::unique_ptr<unsigned char[]> compressBuffer =
std::make_unique<unsigned char[]>(kBufferSize);
do
{
inflateState.avail_out = (uInt) kBufferSize;
inflateState.next_out = compressBuffer.get();
int ret = inflate(&inflateState, Z_SYNC_FLUSH);
if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
{
inflateEnd(&inflateState);
return false;
}
out.append(
reinterpret_cast<char *>(compressBuffer.get()),
kBufferSize - inflateState.avail_out
);
} while (inflateState.avail_out == 0);
inflateEnd(&inflateState);
return true;
}
void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args)
{
if (args.logger)
{
args.logger(msg);
}
}
}

View File

@ -1,107 +0,0 @@
/*
* IXHttpClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <algorithm>
#include <functional>
#include <mutex>
#include <atomic>
#include <tuple>
#include <memory>
#include <map>
#include "IXSocket.h"
#include "IXWebSocketHttpHeaders.h"
namespace ix
{
enum HttpErrorCode
{
HttpErrorCode_Ok = 0,
HttpErrorCode_CannotConnect = 1,
HttpErrorCode_Timeout = 2,
HttpErrorCode_Gzip = 3,
HttpErrorCode_UrlMalformed = 4,
HttpErrorCode_CannotCreateSocket = 5,
HttpErrorCode_SendError = 6,
HttpErrorCode_ReadError = 7,
HttpErrorCode_CannotReadStatusLine = 8,
HttpErrorCode_MissingStatus = 9,
HttpErrorCode_HeaderParsingError = 10,
HttpErrorCode_MissingLocation = 11,
HttpErrorCode_TooManyRedirects = 12,
HttpErrorCode_ChunkReadError = 13,
HttpErrorCode_CannotReadBody = 14
};
using HttpResponse = std::tuple<int, // status
HttpErrorCode, // error code
WebSocketHttpHeaders,
std::string, // payload
std::string, // error msg
uint64_t, // upload size
uint64_t>; // download size
using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>;
struct HttpRequestArgs
{
std::string url;
WebSocketHttpHeaders extraHeaders;
std::string body;
int connectTimeout;
int transferTimeout;
bool followRedirects;
int maxRedirects;
bool verbose;
bool compress;
Logger logger;
OnProgressCallback onProgressCallback;
};
class HttpClient {
public:
HttpClient();
~HttpClient();
HttpResponse get(const std::string& url,
const HttpRequestArgs& args);
HttpResponse head(const std::string& url,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
private:
HttpResponse request(const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects = 0);
std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value);
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(
const std::string& in,
std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost;
const static std::string kGet;
const static std::string kHead;
};
}

View File

@ -1,14 +0,0 @@
/*
* IXProgressCallback.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <functional>
namespace ix
{
using OnProgressCallback = std::function<bool(int current, int total)>;
}

View File

@ -1,46 +0,0 @@
/*
* IXSelectInterrupt.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterrupt.h"
namespace ix
{
SelectInterrupt::SelectInterrupt()
{
;
}
SelectInterrupt::~SelectInterrupt()
{
;
}
bool SelectInterrupt::init(std::string& /*errorMsg*/)
{
return true;
}
bool SelectInterrupt::notify(uint64_t /*value*/)
{
return true;
}
uint64_t SelectInterrupt::read()
{
return 0;
}
bool SelectInterrupt::clear()
{
return true;
}
int SelectInterrupt::getFd() const
{
return -1;
}
}

View File

@ -1,28 +0,0 @@
/*
* IXSelectInterrupt.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterrupt {
public:
SelectInterrupt();
virtual ~SelectInterrupt();
virtual bool init(std::string& errorMsg);
virtual bool notify(uint64_t value);
virtual bool clear();
virtual uint64_t read();
virtual int getFd() const;
};
}

View File

@ -1,116 +0,0 @@
/*
* IXSelectInterruptEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On Linux we use eventd to wake up select.
//
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptEventFd.h"
#include <sys/eventfd.h>
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
SelectInterruptEventFd::SelectInterruptEventFd()
{
_eventfd = -1;
}
SelectInterruptEventFd::~SelectInterruptEventFd()
{
::close(_eventfd);
}
bool SelectInterruptEventFd::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_eventfd == -1);
_eventfd = eventfd(0, 0);
if (_eventfd < 0)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in eventfd()"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
if (fcntl(_eventfd, F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
return true;
}
bool SelectInterruptEventFd::notify(uint64_t value)
{
int fd = _eventfd;
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptEventFd::read()
{
int fd = _eventfd;
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptEventFd::clear()
{
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
}
int SelectInterruptEventFd::getFd() const
{
return _eventfd;
}
}

View File

@ -1,32 +0,0 @@
/*
* IXSelectInterruptEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptEventFd : public SelectInterrupt {
public:
SelectInterruptEventFd();
virtual ~SelectInterruptEventFd();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
int _eventfd;
};
}

View File

@ -1,25 +0,0 @@
/*
* IXSelectInterruptFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterruptFactory.h"
#if defined(__linux__) || defined(__APPLE__)
# include <ixwebsocket/IXSelectInterruptPipe.h>
#else
# include <ixwebsocket/IXSelectInterrupt.h>
#endif
namespace ix
{
std::shared_ptr<SelectInterrupt> createSelectInterrupt()
{
#if defined(__linux__) || defined(__APPLE__)
return std::make_shared<SelectInterruptPipe>();
#else
return std::make_shared<SelectInterrupt>();
#endif
}
}

View File

@ -1,15 +0,0 @@
/*
* IXSelectInterruptFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class SelectInterrupt;
std::shared_ptr<SelectInterrupt> createSelectInterrupt();
}

View File

@ -1,138 +0,0 @@
/*
* IXSelectInterruptPipe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptPipe.h"
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
// File descriptor at index 0 in _fildes is the read end of the pipe
// File descriptor at index 1 in _fildes is the write end of the pipe
const int SelectInterruptPipe::kPipeReadIndex = 0;
const int SelectInterruptPipe::kPipeWriteIndex = 1;
SelectInterruptPipe::SelectInterruptPipe()
{
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
SelectInterruptPipe::~SelectInterruptPipe()
{
::close(_fildes[kPipeReadIndex]);
::close(_fildes[kPipeWriteIndex]);
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
bool SelectInterruptPipe::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_fildes[kPipeReadIndex] == -1);
assert(_fildes[kPipeWriteIndex] == -1);
if (pipe(_fildes) < 0)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in pipe() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
return false;
}
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#ifdef F_SETNOSIGPIPE
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#endif
return true;
}
bool SelectInterruptPipe::notify(uint64_t value)
{
int fd = _fildes[kPipeWriteIndex];
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptPipe::read()
{
int fd = _fildes[kPipeReadIndex];
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptPipe::clear()
{
return true;
}
int SelectInterruptPipe::getFd() const
{
return _fildes[kPipeReadIndex];
}
}

View File

@ -1,39 +0,0 @@
/*
* IXSelectInterruptPipe.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptPipe : public SelectInterrupt {
public:
SelectInterruptPipe();
virtual ~SelectInterruptPipe();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
// Used to identify the read/write idx
static const int kPipeReadIndex;
static const int kPipeWriteIndex;
};
}

View File

@ -7,8 +7,6 @@
#include "IXSocket.h" #include "IXSocket.h"
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXNetSystem.h" #include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -17,6 +15,7 @@
#include <stdint.h> #include <stdint.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/types.h> #include <sys/types.h>
#include <poll.h>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
@ -25,15 +24,11 @@ namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd), _sockfd(fd)
_selectInterrupt(createSelectInterrupt())
{ {
;
} }
Socket::~Socket() Socket::~Socket()
@ -45,93 +40,44 @@ namespace ix
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
if (onPollCallback) onPollCallback(PollResultType::Error); onPollCallback(PollResultType_Error);
return; return;
} }
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs); #ifdef __linux__
constexpr int nfds = 2;
#else
constexpr int nfds = 1;
#endif
if (onPollCallback) onPollCallback(pollResult); struct pollfd fds[nfds];
} fds[0].fd = _sockfd;
fds[0].events = POLLIN;
PollResultType Socket::select(bool readyToRead, int timeoutMs) #ifdef __linux__
{ fds[1].fd = _eventfd.getFd();
fd_set rfds; fds[1].events = POLLIN;
fd_set wfds; #endif
FD_ZERO(&rfds); int ret = ::poll(fds, nfds, timeoutSecs * 1000);
FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds; PollResultType pollResult = PollResultType_ReadyForRead;
FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1)
{
FD_SET(interruptFd, fds);
}
struct timeval timeout;
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
// Compute the highest fd.
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0) if (ret < 0)
{ {
pollResult = PollResultType::Error; pollResult = PollResultType_Error;
} }
else if (ret == 0) else if (ret == 0)
{ {
pollResult = PollResultType::Timeout; pollResult = PollResultType_Timeout;
}
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
uint64_t value = _selectInterrupt->read();
if (value == kSendRequest)
{
pollResult = PollResultType::SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType::CloseRequest;
}
}
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{
pollResult = PollResultType::ReadyForRead;
}
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{
pollResult = PollResultType::ReadyForWrite;
} }
return pollResult; onPollCallback(pollResult);
} }
PollResultType Socket::isReadyToRead(int timeoutMs) void Socket::wakeUpFromPoll()
{ {
bool readyToRead = true; // this will wake up the thread blocked on select, only needed on Linux
return select(readyToRead, timeoutMs); _eventfd.notify();
}
PollResultType Socket::isReadyToWrite(int timeoutMs)
{
bool readyToRead = false;
return select(readyToRead, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{
return _selectInterrupt->notify(wakeUpCode);
} }
bool Socket::connect(const std::string& host, bool Socket::connect(const std::string& host,
@ -141,7 +87,7 @@ namespace ix
{ {
std::lock_guard<std::mutex> lock(_socketMutex); std::lock_guard<std::mutex> lock(_socketMutex);
if (!_selectInterrupt->clear()) return false; if (!_eventfd.clear()) return false;
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
return _sockfd != -1; return _sockfd != -1;
@ -200,40 +146,24 @@ namespace ix
#endif #endif
} }
bool Socket::init(std::string& errorMsg) bool Socket::init()
{ {
return _selectInterrupt->init(errorMsg); #ifdef _WIN32
INT rc;
WSADATA wsaData;
rc = WSAStartup(MAKEWORD(2, 2), &wsaData);
return rc != 0;
#else
return true;
#endif
} }
bool Socket::writeBytes(const std::string& str, void Socket::cleanup()
const CancellationRequest& isCancellationRequested)
{ {
while (true) #ifdef _WIN32
{ WSACleanup();
if (isCancellationRequested && isCancellationRequested()) return false; #endif
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
ssize_t ret = send(buffer, len);
// We wrote some bytes, as needed, all good.
if (ret > 0)
{
return ret == len;
}
// There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
continue;
}
// There was an error during the write, abort
else
{
return false;
}
}
} }
bool Socket::readByte(void* buffer, bool Socket::readByte(void* buffer,
@ -241,7 +171,7 @@ namespace ix
{ {
while (true) while (true)
{ {
if (isCancellationRequested && isCancellationRequested()) return false; if (isCancellationRequested()) return false;
ssize_t ret; ssize_t ret;
ret = recv(buffer, 1); ret = recv(buffer, 1);
@ -255,12 +185,23 @@ namespace ix
else if (ret < 0 && (getErrno() == EWOULDBLOCK || else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN)) getErrno() == EAGAIN))
{ {
// Wait with a 1ms timeout until the socket is ready to read. // Wait with a timeout until something is written.
// This way we are not busy looping // This way we are not busy looping
if (isReadyToRead(1) == PollResultType::Error) fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms timeout
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{ {
return false; return false;
} }
continue;
} }
// There was an error during the read, abort // There was an error during the read, abort
else else
@ -270,8 +211,38 @@ namespace ix
} }
} }
std::pair<bool, std::string> Socket::readLine( bool Socket::writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
ssize_t ret = send(buffer, len);
// We wrote some bytes, as needed, all good.
if (ret > 0)
{
return ret == len;
}
// There is possibly something to be write, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
continue;
}
// There was an error during the write, abort
else
{
return false;
}
}
}
std::pair<bool, std::string> Socket::readLine(const CancellationRequest& isCancellationRequested)
{ {
char c; char c;
std::string line; std::string line;
@ -281,8 +252,7 @@ namespace ix
{ {
if (!readByte(&c, isCancellationRequested)) if (!readByte(&c, isCancellationRequested))
{ {
// Return what we were able to read return std::make_pair(false, std::string());
return std::make_pair(false, line);
} }
line += c; line += c;
@ -290,52 +260,4 @@ namespace ix
return std::make_pair(true, line); return std::make_pair(true, line);
} }
std::pair<bool, std::string> Socket::readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested)
{
if (_readBuffer.empty())
{
_readBuffer.resize(kChunkSize);
}
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{
// Error
return std::make_pair(false, std::string());
}
else if (ret > 0)
{
output.insert(output.end(),
_readBuffer.begin(),
_readBuffer.begin() + ret);
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(1) == PollResultType::Error)
{
return std::make_pair(false, std::string());
}
}
return std::make_pair(true, std::string(output.begin(),
output.end()));
}
} }

View File

@ -10,29 +10,22 @@
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <vector>
#include <memory>
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
#endif #endif
#include "IXEventFd.h"
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXProgressCallback.h"
namespace ix namespace ix
{ {
class SelectInterrupt; enum PollResultType
enum class PollResultType
{ {
ReadyForRead = 0, PollResultType_ReadyForRead = 0,
ReadyForWrite = 1, PollResultType_Timeout = 1,
Timeout = 2, PollResultType_Error = 2
Error = 3,
SendRequest = 4,
CloseRequest = 5
}; };
class Socket { class Socket {
@ -41,17 +34,12 @@ namespace ix
Socket(int fd = -1); Socket(int fd = -1);
virtual ~Socket(); virtual ~Socket();
bool init(std::string& errorMsg);
void configure(); void configure();
// Functions to check whether there is activity on the socket virtual void poll(const OnPollCallback& onPollCallback,
void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode); virtual void wakeUpFromPoll();
PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToRead(int timeoutMs);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,
@ -70,36 +58,21 @@ namespace ix
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
bool writeBytes(const std::string& str, bool writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested);
static int getErrno(); static int getErrno();
static bool init(); // Required on Windows to initialize WinSocket
// Used as special codes for pipe communication static void cleanup(); // Required on Windows to cleanup WinSocket
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected: protected:
void closeSocket(int fd); void closeSocket(int fd);
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;
EventFd _eventfd;
private: private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout; static const int kDefaultPollNoTimeout;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
std::shared_ptr<SelectInterrupt> _selectInterrupt;
}; };
} }

View File

@ -66,7 +66,7 @@ namespace ix
for (;;) for (;;)
{ {
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well if (isCancellationRequested()) // Must handle timeout as well
{ {
closeSocket(fd); closeSocket(fd);
errMsg = "Cancelled"; errMsg = "Cancelled";

View File

@ -1,64 +0,0 @@
/*
* IXSocketFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocketFactory.h"
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
namespace ix
{
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket;
if (!tls)
{
socket = std::make_shared<Socket>();
}
else
{
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
socket = std::make_shared<SocketAppleSSL>();
# else
socket = std::make_shared<SocketOpenSSL>();
# endif
#else
errorMsg = "TLS support is not enabled on this platform.";
return nullptr;
#endif
}
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket = std::make_shared<Socket>(fd);
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
}

View File

@ -1,20 +0,0 @@
/*
* IXSocketFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class Socket;
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg);
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg);
}

View File

@ -21,7 +21,6 @@
namespace ix namespace ix
{ {
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false); std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd), SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr), _ssl_connection(nullptr),

View File

@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method; const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
static std::once_flag _openSSLInitFlag; std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful; static std::atomic<bool> _openSSLInitializationSuccessful;
}; };

View File

@ -29,8 +29,7 @@ namespace ix
_host(host), _host(host),
_backlog(backlog), _backlog(backlog),
_maxConnections(maxConnections), _maxConnections(maxConnections),
_stop(false), _stop(false)
_connectionStateFactory(&ConnectionState::createConnectionState)
{ {
} }
@ -146,12 +145,6 @@ namespace ix
::close(_serverFd); ::close(_serverFd);
} }
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run() void SocketServer::run()
{ {
// Set the socket to non blocking mode, so that accept calls are not blocking // Set the socket to non blocking mode, so that accept calls are not blocking
@ -221,12 +214,6 @@ namespace ix
continue; continue;
} }
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread. // Launch the handleConnection work asynchronously in its own thread.
// //
// the destructor of a future returned by std::async blocks, // the destructor of a future returned by std::async blocks,
@ -234,8 +221,7 @@ namespace ix
f = std::async(std::launch::async, f = std::async(std::launch::async,
&SocketServer::handleConnection, &SocketServer::handleConnection,
this, this,
clientFd, clientFd);
connectionState);
} }
} }
} }

View File

@ -6,8 +6,6 @@
#pragma once #pragma once
#include "IXConnectionState.h"
#include <utility> // pair #include <utility> // pair
#include <string> #include <string>
#include <set> #include <set>
@ -22,8 +20,6 @@ namespace ix
{ {
class SocketServer { class SocketServer {
public: public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort, SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost, const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog, int backlog = SocketServer::kDefaultTcpBacklog,
@ -31,8 +27,6 @@ namespace ix
virtual ~SocketServer(); virtual ~SocketServer();
virtual void stop(); virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort; const static int kDefaultPort;
const static std::string kDefaultHost; const static std::string kDefaultHost;
const static int kDefaultTcpBacklog; const static int kDefaultTcpBacklog;
@ -66,13 +60,9 @@ namespace ix
std::condition_variable _conditionVariable; std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods // Methods
void run(); void run();
virtual void handleConnection(int fd, virtual void handleConnection(int fd) = 0;
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0; virtual size_t getConnectedClientsCount() = 0;
}; };
} }

View File

@ -1,104 +0,0 @@
/*
* IXUrlParser.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXUrlParser.h"
#include <iostream>
#include <sstream>
namespace ix
{
//
// The only difference between those 2 regex is the protocol
//
std::regex UrlParser::_httpRegex("(http|https)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::regex UrlParser::_webSocketRegex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
bool UrlParser::parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket)
{
std::cmatch what;
if (!regex_match(url.c_str(), what,
websocket ? _webSocketRegex : _httpRegex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws" || protocol == "http")
{
port = 80;
}
else if (protocol == "wss" || protocol == "https")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void UrlParser::printUrl(const std::string& url, bool websocket)
{
std::string protocol, host, path, query;
int port {0};
if (!parse(url, protocol, host, path, query, port, websocket))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
}

View File

@ -1,31 +0,0 @@
/*
* IXUrlParser.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <regex>
namespace ix
{
class UrlParser
{
public:
static bool parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket);
static void printUrl(const std::string& url, bool websocket);
private:
static std::regex _httpRegex;
static std::regex _webSocketRegex;
};
}

View File

@ -79,10 +79,10 @@ namespace ix
return _perMessageDeflateOptions; return _perMessageDeflateOptions;
} }
void WebSocket::setHeartBeatPeriod(int heartBeatPeriod) void WebSocket::setHeartBeatPeriod(int hearBeatPeriod)
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = heartBeatPeriod; _heartBeatPeriod = hearBeatPeriod;
} }
int WebSocket::getHeartBeatPeriod() const int WebSocket::getHeartBeatPeriod() const
@ -252,11 +252,6 @@ namespace ix
{ {
webSocketMessageType = WebSocket_MessageType_Pong; webSocketMessageType = WebSocket_MessageType_Pong;
} break; } break;
case WebSocketTransport::FRAGMENT:
{
webSocketMessageType = WebSocket_MessageType_Fragment;
} break;
} }
WebSocketErrorInfo webSocketErrorInfo; WebSocketErrorInfo webSocketErrorInfo;
@ -299,10 +294,9 @@ namespace ix
} }
} }
WebSocketSendInfo WebSocket::send(const std::string& text, WebSocketSendInfo WebSocket::send(const std::string& text)
const OnProgressCallback& onProgressCallback)
{ {
return sendMessage(text, false, onProgressCallback); return sendMessage(text, false);
} }
WebSocketSendInfo WebSocket::ping(const std::string& text) WebSocketSendInfo WebSocket::ping(const std::string& text)
@ -314,9 +308,7 @@ namespace ix
return sendMessage(text, true); return sendMessage(text, true);
} }
WebSocketSendInfo WebSocket::sendMessage(const std::string& text, WebSocketSendInfo WebSocket::sendMessage(const std::string& text, bool ping)
bool ping,
const OnProgressCallback& onProgressCallback)
{ {
if (!isConnected()) return WebSocketSendInfo(false); if (!isConnected()) return WebSocketSendInfo(false);
@ -338,7 +330,7 @@ namespace ix
} }
else else
{ {
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback); webSocketSendInfo = _ws.sendBinary(text);
} }
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false); WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
@ -379,9 +371,4 @@ namespace ix
{ {
_automaticReconnection = false; _automaticReconnection = false;
} }
size_t WebSocket::bufferedAmount() const
{
return _ws.bufferedAmount();
}
} }

View File

@ -19,7 +19,6 @@
#include "IXWebSocketSendInfo.h" #include "IXWebSocketSendInfo.h"
#include "IXWebSocketPerMessageDeflateOptions.h" #include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXProgressCallback.h"
namespace ix namespace ix
{ {
@ -39,8 +38,7 @@ namespace ix
WebSocket_MessageType_Close = 2, WebSocket_MessageType_Close = 2,
WebSocket_MessageType_Error = 3, WebSocket_MessageType_Error = 3,
WebSocket_MessageType_Ping = 4, WebSocket_MessageType_Ping = 4,
WebSocket_MessageType_Pong = 5, WebSocket_MessageType_Pong = 5
WebSocket_MessageType_Fragment = 6
}; };
struct WebSocketOpenInfo struct WebSocketOpenInfo
@ -89,7 +87,7 @@ namespace ix
void setUrl(const std::string& url); void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions); void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs); void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int heartBeatPeriod); void setHeartBeatPeriod(int hearBeatPeriod);
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();
@ -99,8 +97,7 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs); WebSocketInitResult connect(int timeoutSecs);
void run(); void run();
WebSocketSendInfo send(const std::string& text, WebSocketSendInfo send(const std::string& text);
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text); WebSocketSendInfo ping(const std::string& text);
void close(); void close();
@ -112,16 +109,13 @@ namespace ix
const std::string& getUrl() const; const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const; int getHeartBeatPeriod() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection(); void enableAutomaticReconnection();
void disableAutomaticReconnection(); void disableAutomaticReconnection();
private: private:
WebSocketSendInfo sendMessage(const std::string& text, WebSocketSendInfo sendMessage(const std::string& text, bool ping);
bool ping,
const OnProgressCallback& callback = nullptr);
bool isConnected() const; bool isConnected() const;
bool isClosing() const; bool isClosing() const;

View File

@ -6,7 +6,6 @@
#include "IXWebSocketHandshake.h" #include "IXWebSocketHandshake.h"
#include "IXSocketConnect.h" #include "IXSocketConnect.h"
#include "IXUrlParser.h"
#include "libwshandshake.hpp" #include "libwshandshake.hpp"
@ -33,6 +32,90 @@ namespace ix
} }
bool WebSocketHandshake::parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port)
{
std::regex ex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::cmatch what;
if (!regex_match(url.c_str(), what, ex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws")
{
port = 80;
}
else if (protocol == "wss")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void WebSocketHandshake::printUrl(const std::string& url)
{
std::string protocol, host, path, query;
int port {0};
if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
std::string WebSocketHandshake::trim(const std::string& str) std::string WebSocketHandshake::trim(const std::string& str)
{ {
std::string out(str); std::string out(str);
@ -109,6 +192,61 @@ namespace ix
return s; return s;
} }
std::pair<bool, WebSocketHttpHeaders> WebSocketHandshake::parseHttpHeaders(
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[256];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!_socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
// Make the name lower case.
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
WebSocketInitResult WebSocketHandshake::sendErrorResponse(int code, const std::string& reason) WebSocketInitResult WebSocketHandshake::sendErrorResponse(int code, const std::string& reason)
{ {
std::stringstream ss; std::stringstream ss;
@ -217,7 +355,7 @@ namespace ix
return WebSocketInitResult(false, status, ss.str()); return WebSocketInitResult(false, status, ss.str());
} }
auto result = parseHttpHeaders(_socket, isCancellationRequested); auto result = parseHttpHeaders(isCancellationRequested);
auto headersValid = result.first; auto headersValid = result.first;
auto headers = result.second; auto headers = result.second;
@ -312,7 +450,7 @@ namespace ix
} }
// Retrieve and validate HTTP headers // Retrieve and validate HTTP headers
auto result = parseHttpHeaders(_socket, isCancellationRequested); auto result = parseHttpHeaders(isCancellationRequested);
auto headersValid = result.first; auto headersValid = result.first;
auto headers = result.second; auto headers = result.second;

View File

@ -59,10 +59,19 @@ namespace ix
WebSocketInitResult serverHandshake(int fd, WebSocketInitResult serverHandshake(int fd,
int timeoutSecs); int timeoutSecs);
static bool parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port);
private: private:
static void printUrl(const std::string& url);
std::string genRandomString(const int len); std::string genRandomString(const int len);
// Parse HTTP headers // Parse HTTP headers
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(const CancellationRequest& isCancellationRequested);
WebSocketInitResult sendErrorResponse(int code, const std::string& reason); WebSocketInitResult sendErrorResponse(int code, const std::string& reason);
std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line); std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line);

View File

@ -1,66 +0,0 @@
/*
* IXWebSocketHttpHeaders.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketHttpHeaders.h"
#include "IXSocket.h"
#include <string>
#include <unordered_map>
namespace ix
{
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[1024];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 1023 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
}

View File

@ -6,40 +6,10 @@
#pragma once #pragma once
#include "IXCancellationRequest.h"
#include <string> #include <string>
#include <map> #include <unordered_map>
#include <memory>
#include <algorithm>
namespace ix namespace ix
{ {
class Socket; using WebSocketHttpHeaders = std::unordered_map<std::string, std::string>;
struct CaseInsensitiveLess
{
// Case Insensitive compare_less binary function
struct NocaseCompare
{
bool operator() (const unsigned char& c1, const unsigned char& c2) const
{
return std::tolower(c1) < std::tolower(c2);
}
};
bool operator() (const std::string & s1, const std::string & s2) const
{
return std::lexicographical_compare
(s1.begin(), s1.end(), // source range
s2.begin(), s2.end(), // dest range
NocaseCompare()); // comparison
}
};
using WebSocketHttpHeaders = std::map<std::string, std::string, CaseInsensitiveLess>;
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested);
} }

View File

@ -49,12 +49,10 @@ namespace ix
_onConnectionCallback = callback; _onConnectionCallback = callback;
} }
void WebSocketServer::handleConnection( void WebSocketServer::handleConnection(int fd)
int fd,
std::shared_ptr<ConnectionState> connectionState)
{ {
auto webSocket = std::make_shared<WebSocket>(); auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket, connectionState); _onConnectionCallback(webSocket);
webSocket->disableAutomaticReconnection(); webSocket->disableAutomaticReconnection();

View File

@ -20,8 +20,7 @@
namespace ix namespace ix
{ {
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>, using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer { class WebSocketServer : public SocketServer {
public: public:
@ -50,8 +49,7 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs; const static int kDefaultHandShakeTimeoutSecs;
// Methods // Methods
virtual void handleConnection(int fd, virtual void handleConnection(int fd) final;
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final; virtual size_t getConnectedClientsCount() final;
}; };
} }

View File

@ -1,31 +1,7 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2012, 2013 <dhbaird@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/* /*
* IXWebSocketTransport.cpp * IXWebSocketTransport.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/ */
// //
@ -35,8 +11,14 @@
#include "IXWebSocketTransport.h" #include "IXWebSocketTransport.h"
#include "IXWebSocketHandshake.h" #include "IXWebSocketHandshake.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXUrlParser.h"
#include "IXSocketFactory.h" #ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
# include "IXSocketAppleSSL.h"
# else
# include "IXSocketOpenSSL.h"
# endif
#endif
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -47,15 +29,12 @@
#include <cstdarg> #include <cstdarg>
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <chrono>
#include <thread>
namespace ix namespace ix
{ {
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat"); const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1); const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() : WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED), _readyState(CLOSED),
@ -66,7 +45,7 @@ namespace ix
_heartBeatPeriod(kDefaultHeartBeatPeriod), _heartBeatPeriod(kDefaultHeartBeatPeriod),
_lastSendTimePoint(std::chrono::steady_clock::now()) _lastSendTimePoint(std::chrono::steady_clock::now())
{ {
_readbuf.resize(kChunkSize);
} }
WebSocketTransport::~WebSocketTransport() WebSocketTransport::~WebSocketTransport()
@ -75,11 +54,11 @@ namespace ix
} }
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int heartBeatPeriod) int hearBeatPeriod)
{ {
_perMessageDeflateOptions = perMessageDeflateOptions; _perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled(); _enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = heartBeatPeriod; _heartBeatPeriod = hearBeatPeriod;
} }
// Client // Client
@ -88,21 +67,31 @@ namespace ix
{ {
std::string protocol, host, path, query; std::string protocol, host, path, query;
int port; int port;
bool websocket = true;
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket)) if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
{ {
return WebSocketInitResult(false, 0, return WebSocketInitResult(false, 0,
std::string("Could not parse URL ") + url); std::string("Could not parse URL ") + url);
} }
bool tls = protocol == "wss"; if (protocol == "wss")
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{ {
return WebSocketInitResult(false, 0, errorMsg); _socket.reset();
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
_socket = std::make_shared<SocketAppleSSL>();
# else
_socket = std::make_shared<SocketOpenSSL>();
# endif
#else
return WebSocketInitResult(false, 0, "TLS is not supported.");
#endif
}
else
{
_socket.reset();
_socket = std::make_shared<Socket>();
} }
WebSocketHandshake webSocketHandshake(_requestInitCancellation, WebSocketHandshake webSocketHandshake(_requestInitCancellation,
@ -123,13 +112,8 @@ namespace ix
// Server // Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs) WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{ {
std::string errorMsg; _socket.reset();
_socket = createSocket(fd, errorMsg); _socket = std::make_shared<Socket>(fd);
if (!_socket)
{
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation, WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket, _socket,
@ -189,75 +173,45 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or // If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a // send for a duration exceeding our heart-beat period, send a
// ping to the server. // ping to the server.
if (pollResult == PollResultType::Timeout && if (pollResult == PollResultType_Timeout &&
heartBeatPeriodExceeded()) heartBeatPeriodExceeded())
{ {
std::stringstream ss; std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str()); sendPing(ss.str());
return;
} }
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType::SendRequest)
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType::Error)
{
_socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
}
}
else if (pollResult == PollResultType::ReadyForRead)
{
while (true) while (true)
{ {
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); int N = (int) _rxbuf.size();
_rxbuf.resize(N + 1500);
ssize_t ret = _socket->recv((char*)&_rxbuf[0] + N, 1500);
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN)) _socket->getErrno() == EAGAIN)) {
{ _rxbuf.resize(N);
break; break;
} }
else if (ret <= 0) else if (ret <= 0)
{ {
_rxbuf.clear(); _rxbuf.resize(N);
_socket->close(); _socket->close();
setReadyState(CLOSED); setReadyState(CLOSED);
break; break;
} }
else else
{ {
_rxbuf.insert(_rxbuf.end(), _rxbuf.resize(N + ret);
_readbuf.begin(),
_readbuf.begin() + ret);
} }
} }
}
else if (pollResult == PollResultType::Error)
{
_socket->close();
}
else if (pollResult == PollResultType::CloseRequest)
{
_socket->close();
}
// Avoid a race condition where we get stuck in select if (isSendBufferEmpty() && _readyState == CLOSING)
// while closing.
if (_readyState == CLOSING)
{ {
_socket->close(); _socket->close();
setReadyState(CLOSED);
} }
}, },
_heartBeatPeriod); _heartBeatPeriod);
@ -406,39 +360,17 @@ namespace ix
|| ws.opcode == wsheader_type::CONTINUATION || ws.opcode == wsheader_type::CONTINUATION
) { ) {
unmaskReceiveBuffer(ws); unmaskReceiveBuffer(ws);
_receivedData.insert(_receivedData.end(),
// _rxbuf.begin()+ws.header_size,
// Usual case. Small unfragmented messages _rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed
//
if (ws.fin && _chunks.empty())
{
emitMessage(MSG,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
onMessageCallback);
}
else
{
//
// Add intermediary message to our chunk list.
// We use a chunk list instead of a big buffer because resizing
// large buffer can be very costly when we need to re-allocate
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin) if (ws.fin)
{ {
emitMessage(MSG, getMergedChunks(), ws, onMessageCallback); // fire callback with a string message
_chunks.clear(); std::string stringMessage(_receivedData.begin(),
} _receivedData.end());
else
{ emitMessage(MSG, stringMessage, ws, onMessageCallback);
emitMessage(FRAGMENT, std::string(), ws, onMessageCallback); _receivedData.clear();
}
} }
} }
else if (ws.opcode == wsheader_type::PING) else if (ws.opcode == wsheader_type::PING)
@ -488,32 +420,11 @@ namespace ix
close(); close();
} }
// Erase the message that has been processed from the input/read buffer
_rxbuf.erase(_rxbuf.begin(), _rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N); _rxbuf.begin() + ws.header_size + (size_t) ws.N);
} }
} }
std::string WebSocketTransport::getMergedChunks() const
{
size_t length = 0;
for (auto&& chunk : _chunks)
{
length += chunk.size();
}
std::string msg;
msg.reserve(length);
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
}
return msg;
}
void WebSocketTransport::emitMessage(MessageKind messageKind, void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message, const std::string& message,
const wsheader_type& ws, const wsheader_type& ws,
@ -522,7 +433,7 @@ namespace ix
size_t wireSize = message.size(); size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed // When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != FRAGMENT) if (_enablePerMessageDeflate && ws.rsv1)
{ {
std::string decompressedMessage; std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage); bool success = _perMessageDeflate.decompress(message, decompressedMessage);
@ -543,11 +454,9 @@ namespace ix
return static_cast<unsigned>(seconds); return static_cast<unsigned>(seconds);
} }
WebSocketSendInfo WebSocketTransport::sendData( WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
wsheader_type::opcode_type type,
const std::string& message, const std::string& message,
bool compress, bool compress)
const OnProgressCallback& onProgressCallback)
{ {
if (_readyState == CLOSING || _readyState == CLOSED) if (_readyState == CLOSING || _readyState == CLOSED)
{ {
@ -564,87 +473,15 @@ namespace ix
if (compress) if (compress)
{ {
if (!_perMessageDeflate.compress(message, compressedMessage)) bool success = _perMessageDeflate.compress(message, compressedMessage);
{ compressionError = !success;
bool success = false;
compressionError = true;
payloadSize = 0;
wireSize = 0;
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
}
compressionError = false;
wireSize = compressedMessage.size(); wireSize = compressedMessage.size();
message_begin = compressedMessage.begin(); message_begin = compressedMessage.begin();
message_end = compressedMessage.end(); message_end = compressedMessage.end();
} }
// Common case for most message. No fragmentation required. uint64_t message_size = wireSize;
if (wireSize < kChunkSize)
{
sendFragment(type, true, message_begin, message_end, compress);
}
else
{
//
// Large messages need to be fragmented
//
// Rules:
// First message needs to specify a proper type (BINARY or TEXT)
// Intermediary and last messages need to be of type CONTINUATION
// Last message must set the fin byte.
//
auto steps = wireSize / kChunkSize;
std::string::const_iterator begin = message_begin;
std::string::const_iterator end = message_end;
for (uint64_t i = 0 ; i < steps; ++i)
{
bool firstStep = i == 0;
bool lastStep = (i+1) == steps;
bool fin = lastStep;
end = begin + kChunkSize;
if (lastStep)
{
end = message_end;
}
auto opcodeType = type;
if (!firstStep)
{
opcodeType = wsheader_type::CONTINUATION;
}
// Send message
sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback((int)i, (int) steps))
{
break;
}
begin += kChunkSize;
}
}
// Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty())
{
_socket->wakeUpFromPoll(Socket::kSendRequest);
}
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
void WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator message_begin,
std::string::const_iterator message_end,
bool compress)
{
auto message_size = message_end - message_begin;
unsigned x = getRandomUnsigned(); unsigned x = getRandomUnsigned();
uint8_t masking_key[4] = {}; uint8_t masking_key[4] = {};
@ -657,13 +494,7 @@ namespace ix
header.assign(2 + header.assign(2 +
(message_size >= 126 ? 2 : 0) + (message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0); (message_size >= 65536 ? 6 : 0) + 4, 0);
header[0] = type; header[0] = 0x80 | type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
if (fin)
{
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed // This bit indicate that the frame is compressed
if (compress) if (compress)
@ -715,6 +546,8 @@ namespace ix
// Now actually send this data // Now actually send this data
sendOnSocket(); sendOnSocket();
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
} }
WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message) WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message)
@ -723,13 +556,9 @@ namespace ix
return sendData(wsheader_type::PING, message, compress); return sendData(wsheader_type::PING, message, compress);
} }
WebSocketSendInfo WebSocketTransport::sendBinary( WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message)
const std::string& message,
const OnProgressCallback& onProgressCallback)
{ {
return sendData(wsheader_type::BINARY_FRAME, message, return sendData(wsheader_type::BINARY_FRAME, message, _enablePerMessageDeflate);
_enablePerMessageDeflate, onProgressCallback);
} }
void WebSocketTransport::sendOnSocket() void WebSocketTransport::sendOnSocket()
@ -780,18 +609,8 @@ namespace ix
sendData(wsheader_type::CLOSE, normalClosure, compress); sendData(wsheader_type::CLOSE, normalClosure, compress);
setReadyState(CLOSING); setReadyState(CLOSING);
_socket->wakeUpFromPoll(Socket::kCloseRequest); _socket->wakeUpFromPoll();
_socket->close(); _socket->close();
_closeCode = 1000;
_closeReason = "Normal Closure";
setReadyState(CLOSED);
}
size_t WebSocketTransport::bufferedAmount() const
{
std::lock_guard<std::mutex> lock(_txbufMutex);
return _txbuf.size();
} }
} // namespace ix } // namespace ix

View File

@ -16,7 +16,6 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <list>
#include "IXWebSocketSendInfo.h" #include "IXWebSocketSendInfo.h"
#include "IXWebSocketPerMessageDeflate.h" #include "IXWebSocketPerMessageDeflate.h"
@ -24,7 +23,6 @@
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXWebSocketHandshake.h" #include "IXWebSocketHandshake.h"
#include "IXProgressCallback.h"
namespace ix namespace ix
{ {
@ -45,8 +43,7 @@ namespace ix
{ {
MSG, MSG,
PING, PING,
PONG, PONG
FRAGMENT
}; };
using OnMessageCallback = std::function<void(const std::string&, using OnMessageCallback = std::function<void(const std::string&,
@ -61,7 +58,7 @@ namespace ix
~WebSocketTransport(); ~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions, void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int heartBeatPeriod); int hearBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs); int timeoutSecs);
@ -69,18 +66,17 @@ namespace ix
int timeoutSecs); int timeoutSecs);
void poll(); void poll();
WebSocketSendInfo sendBinary(const std::string& message, WebSocketSendInfo sendBinary(const std::string& message);
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message); WebSocketSendInfo sendPing(const std::string& message);
void close(); void close();
ReadyStateValues getReadyState() const; ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue); void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback); void dispatch(const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
private: private:
std::string _url; std::string _url;
std::string _origin;
struct wsheader_type { struct wsheader_type {
unsigned header_size; unsigned header_size;
@ -100,31 +96,13 @@ namespace ix
uint8_t masking_key[4]; uint8_t masking_key[4];
}; };
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf;
// Contains all messages that were fetched in the last socket read.
// This could be a mix of control messages (Close, Ping, etc...) and
// data messages. That buffer
std::vector<uint8_t> _rxbuf; std::vector<uint8_t> _rxbuf;
// Contains all messages that are waiting to be sent
std::vector<uint8_t> _txbuf; std::vector<uint8_t> _txbuf;
mutable std::mutex _txbufMutex; mutable std::mutex _txbufMutex;
std::vector<uint8_t> _receivedData;
// Hold fragments for multi-fragments messages in a list. We support receiving very large
// messages (tested messages up to 700M) and we cannot put them in a single
// buffer that is resized, as this operation can be slow when a buffer has its
// size increased 2 fold, while appending to a list has a fixed cost.
std::list<std::vector<uint8_t>> _chunks;
// Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15;
// Underlying TCP socket
std::shared_ptr<Socket> _socket; std::shared_ptr<Socket> _socket;
// Hold the state of the connection (OPEN, CLOSED, etc...)
std::atomic<ReadyStateValues> _readyState; std::atomic<ReadyStateValues> _readyState;
OnCloseCallback _onCloseCallback; OnCloseCallback _onCloseCallback;
@ -133,7 +111,6 @@ namespace ix
size_t _closeWireSize; size_t _closeWireSize;
mutable std::mutex _closeDataMutex; mutable std::mutex _closeDataMutex;
// Data used for Per Message Deflate compression (with zlib)
WebSocketPerMessageDeflate _perMessageDeflate; WebSocketPerMessageDeflate _perMessageDeflate;
WebSocketPerMessageDeflateOptions _perMessageDeflateOptions; WebSocketPerMessageDeflateOptions _perMessageDeflateOptions;
std::atomic<bool> _enablePerMessageDeflate; std::atomic<bool> _enablePerMessageDeflate;
@ -148,19 +125,12 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex; mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint; std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer than the heartbeat period // No data was send through the socket for longer that the hearbeat period
bool heartBeatPeriodExceeded(); bool heartBeatPeriodExceeded();
void sendOnSocket(); void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message, const std::string& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
void sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator begin,
std::string::const_iterator end,
bool compress); bool compress);
void emitMessage(MessageKind messageKind, void emitMessage(MessageKind messageKind,
@ -178,7 +148,5 @@ namespace ix
unsigned getRandomUnsigned(); unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws); void unmaskReceiveBuffer(const wsheader_type& ws);
std::string getMergedChunks() const;
}; };
} }

View File

@ -1,23 +1,14 @@
# #
# This makefile is just used to easily work with docker (linux build) # This makefile is just used to easily work with docker (linux build)
# #
all: brew all: run
install: brew
brew:
mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker .PHONY: docker
docker: docker:
docker build -t ws:latest . docker build -t ws_connect:latest .
run: run: docker
docker run --cap-add sys_ptrace -it ws:latest docker run --cap-add sys_ptrace -it ws_connect:latest bash
# this is helpful to remove trailing whitespaces
trail:
sh third_party/remote_trailing_whitespaces.sh
build: build:
(cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make) (cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make)
@ -33,14 +24,11 @@ test_server:
(cd test && npm i ws && node broadcast-server.js) (cd test && npm i ws && node broadcast-server.js)
# env TEST=Websocket_server make test # env TEST=Websocket_server make test
# env TEST=Websocket_chat make test # env TEST=websocket_server make test
# env TEST=heartbeat make test # env TEST=heartbeat make test
test: test:
python test/run.py python test/run.py
ws_test: all
(cd ws ; bash test_ws.sh)
# For the fork that is configured with appveyor # For the fork that is configured with appveyor
rebase_upstream: rebase_upstream:
git fetch upstream git fetch upstream
@ -48,9 +36,5 @@ rebase_upstream:
git reset --hard upstream/master git reset --hard upstream/master
git push origin master --force git push origin master --force
install_cmake_for_linux:
mkdir -p /tmp/cmake
(cd /tmp/cmake ; curl -L -O https://github.com/Kitware/CMake/releases/download/v3.14.0-rc4/cmake-3.14.0-rc4-Linux-x86_64.tar.gz ; tar zxf cmake-3.14.0-rc4-Linux-x86_64.tar.gz)
.PHONY: test .PHONY: test
.PHONY: build .PHONY: build

View File

@ -18,14 +18,13 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
include_directories( include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include ${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party/msgpack11
) )
# Shared sources # Shared sources
set (SOURCES set (SOURCES
test_runner.cpp test_runner.cpp
IXTest.cpp IXTest.cpp
../third_party/msgpack11/msgpack11.cpp msgpack11.cpp
IXDNSLookupTest.cpp IXDNSLookupTest.cpp
IXSocketTest.cpp IXSocketTest.cpp

View File

@ -5,13 +5,19 @@
*/ */
#include <iostream> #include <iostream>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h> #include <ixwebsocket/IXCancellationRequest.h>
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
#include "IXTest.h" #include "IXTest.h"
#include "catch.hpp" #include "catch.hpp"
#include <string.h>
using namespace ix; using namespace ix;
@ -33,15 +39,16 @@ namespace ix
Logger() << "errMsg: " << errMsg; Logger() << "errMsg: " << errMsg;
REQUIRE(success); REQUIRE(success);
Logger() << "Sending request: " << request std::cout << "Sending request: " << request
<< "to " << host << ":" << port; << "to " << host << ":" << port
<< std::endl;
REQUIRE(socket->writeBytes(request, isCancellationRequested)); REQUIRE(socket->writeBytes(request, isCancellationRequested));
auto lineResult = socket->readLine(isCancellationRequested); auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first; auto lineValid = lineResult.first;
auto line = lineResult.second; auto line = lineResult.second;
Logger() << "read error: " << strerror(Socket::getErrno()); std::cout << "read error: " << strerror(Socket::getErrno()) << std::endl;
REQUIRE(lineValid); REQUIRE(lineValid);
@ -55,18 +62,10 @@ TEST_CASE("socket", "[socket]")
{ {
SECTION("Connect to google HTTP server. Send GET request without header. Should return 200") SECTION("Connect to google HTTP server. Send GET request without header. Should return 200")
{ {
std::string errMsg; std::shared_ptr<Socket> socket(new Socket);
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("www.google.com"); std::string host("www.google.com");
int port = 80; int port = 80;
std::string request("GET / HTTP/1.1\r\n\r\n");
std::stringstream ss;
ss << "GET / HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "\r\n";
std::string request(ss.str());
int expectedStatus = 200; int expectedStatus = 200;
int timeoutSecs = 3; int timeoutSecs = 3;
@ -76,9 +75,11 @@ TEST_CASE("socket", "[socket]")
#if defined(__APPLE__) or defined(__linux__) #if defined(__APPLE__) or defined(__linux__)
SECTION("Connect to google HTTPS server. Send GET request without header. Should return 200") SECTION("Connect to google HTTPS server. Send GET request without header. Should return 200")
{ {
std::string errMsg; # ifdef __APPLE__
bool tls = true; std::shared_ptr<Socket> socket = std::make_shared<SocketAppleSSL>();
std::shared_ptr<Socket> socket = createSocket(tls, errMsg); # else
std::shared_ptr<Socket> socket = std::make_shared<SocketOpenSSL>();
# endif
std::string host("www.google.com"); std::string host("www.google.com");
int port = 443; int port = 443;
std::string request("GET / HTTP/1.1\r\n\r\n"); std::string request("GET / HTTP/1.1\r\n\r\n");

View File

@ -69,15 +69,10 @@ namespace ix
Logger() << msg; Logger() << msg;
} }
int getAnyFreePortSimple() int getFreePort()
{
static int defaultPort = 8090;
return defaultPort++;
}
int getAnyFreePort()
{ {
int defaultPort = 8090; int defaultPort = 8090;
int sockfd; int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{ {
@ -122,30 +117,4 @@ namespace ix
return port; return port;
} }
int getFreePort()
{
while (true)
{
#if defined(__has_feature)
# if __has_feature(address_sanitizer)
int port = getAnyFreePortSimple();
# else
int port = getAnyFreePort();
# endif
#else
int port = getAnyFreePort();
#endif
//
// Only port above 1024 can be used by non root users, but for some
// reason I got port 7 returned with macOS when binding on port 0...
//
if (port > 1024)
{
return port;
}
}
return -1;
}
} }

Some files were not shown because too many files have changed in this diff Show More