Compare commits

...

59 Commits

Author SHA1 Message Date
c86b6074f2 add an install target 2019-03-18 15:11:08 -07:00
d5d1a2c5f4 no default parameters for isReadyToWrite and isReadyToRead 2019-03-18 14:31:21 -07:00
2a90e3f478 when trying to flush the send buffer, use select to wait until it is possible instead of using sleep to retry at a given frequency 2019-03-18 14:25:27 -07:00
1d49ba41ea Fix typo (#19) 2019-03-17 16:08:28 -07:00
e1de1f6682 remove unused gitmodule file 2019-03-17 10:38:48 -07:00
47ed5e4d4d remove unused folder 2019-03-17 10:38:19 -07:00
d77f6f5659 linux hangs when closing 2019-03-16 11:38:23 -07:00
05f0045d5d edit README 2019-03-16 11:32:46 -07:00
c4afb84f6e use pipe to abort select on Linux as well as macOS 2019-03-15 17:46:40 -07:00
b0b2f9b6d2 missing assert include on Linux 2019-03-15 11:43:27 -07:00
ee37feb489 cleanup 2019-03-15 11:41:57 -07:00
6b8337596f unittest fix 2019-03-14 18:58:16 -07:00
250665b92e linux compile fix 2019-03-14 18:55:33 -07:00
86b83c889e linux fixes 2019-03-14 18:54:47 -07:00
c9c657c07b build fix 2019-03-14 18:53:21 -07:00
4f2babaf54 select interrupt cleanup 2019-03-14 18:37:38 -07:00
1b03bf4555 linux build fix 2019-03-14 15:17:17 -07:00
977b995af9 replace uint8_t with uint64_t for the send/close requests types / use named variable to index into the _fildes array 2019-03-14 15:03:57 -07:00
310ab990bd set a default close reason string 2019-03-14 14:52:51 -07:00
d6b49b54d4 do not busy loop while sending 2019-03-14 14:48:08 -07:00
f00cf39462 remove docker folder 2019-03-14 14:48:02 -07:00
18550cf1cb send optimization + ws file transfer test 2019-03-14 14:47:53 -07:00
168918f807 Update README.md
Stop lying about Windows support ...
2019-03-13 23:10:40 -07:00
2750df8aa7 send can fail silently when sending would block (EWOULDBLOCK return for send) (#18)
* try to use a pipe for communication

* flush send buffer on the background thread

* cleanup

* linux fix / linux still use event fd for now

* cleanup
2019-03-13 23:09:45 -07:00
d6597d9f52 websocket send: make sure all data in the kernel buffer is sent 2019-03-11 22:16:55 -07:00
892ea375e3 add new message type when receiving message fragments 2019-03-11 11:12:43 -07:00
03abe77b5f ws broacast_server / can set serving hostname 2019-03-10 16:36:44 -07:00
e46eb8aa49 debian 9 unittest build fix 2019-03-10 16:07:48 -07:00
2c4862e0f1 asan test suite fix 2019-03-09 10:45:40 -08:00
fd69efa45c unittest + warning fix 2019-03-09 10:37:14 -08:00
e8aa15917f add ability to run with asan on macOS 2019-03-05 17:07:28 -08:00
b3d77f8902 fix compiler warnings in ws command line tool 2019-03-04 13:56:30 -08:00
9c3b0b08ec Socket code refactoring, plus stop polling with a 1s timeout in readBytes while we only want to poll with a 1ms timeout 2019-03-04 13:40:15 -08:00
fe7d94194c readBytes does not read bytes one by one but in chunks 2019-03-02 21:11:16 -08:00
d6c26d6aa8 create a blocking + cancellable Socket::readBytes method 2019-03-02 15:16:46 -08:00
8a74ddcd13 create a blocking + cancellable Socket::readBytes method 2019-03-02 11:01:51 -08:00
18e7189a07 more ws doc 2019-02-28 22:07:45 -08:00
785dd42c84 more ws doc 2019-02-28 22:03:48 -08:00
0cff5065d9 Feature/http (#16)
* add skeleton and broken http client code.

GET returns "Resource temporarily unavailable" errors...

* linux compile fix

* can GET some pages

* Update formatting in README.md

* unittest for sending large messages

* document bug

* Feature/send large message (#14)

* introduce send fragment

* can pass a fin frame

* can send messages which are a perfect multiple of the chunk size

* set fin only for last fragment

* cleanup

* last fragment should be of type CONTINUATION

* Add simple send and receive programs

* speedups receiving + better way to wait for thing

* receive speedup by using linked list of chunks instead of large array

* document bug

* use chunks to receive data

* trailing spaces

* Update README.md

Add note about message fragmentation.

* Feature/ws cli (#15)

* New command line tool for transfering files / still very beta.

* add readme

* use cli11 for argument parsing

* json -> msgpack

* stop using base64 and use binary which can be stored in message pack

* add target for building with homebrew

* all CMakeLists are referenced by the top level one

* add ws_chat and ws_connect sub commands to ws

* cleanup

* add echo and broadcast server as ws sub-commands

* add gitignore

* comments

* ping pong added to ws

* mv cobra_publisher under ws folder

* Update README.md

* linux build fix

* linux build fix

* move http_client to a ws sub-command

* simple HTTP post support (urlencode parameters)

* can specify extra headers

* chunk encoding / simple redirect support / -I option

* follow redirects is optional

* make README vim markdown plugin friendly

* cleanup argument parsing + add socket creation factory

* add missing file

* http gzip compression

* cleanup

* doc

* Feature/send large message (#14)

* introduce send fragment

* can pass a fin frame

* can send messages which are a perfect multiple of the chunk size

* set fin only for last fragment

* cleanup

* last fragment should be of type CONTINUATION

* Add simple send and receive programs

* speedups receiving + better way to wait for thing

* receive speedup by using linked list of chunks instead of large array

* document bug

* use chunks to receive data

* trailing spaces
2019-02-28 21:54:03 -08:00
e881b82511 Update README.md 2019-02-22 21:53:29 -08:00
d5551e5d68 mv cobra_publisher under ws folder 2019-02-22 21:51:03 -08:00
e8583000b8 ping pong added to ws 2019-02-22 21:47:57 -08:00
d642ef1a89 comments 2019-02-22 21:27:49 -08:00
2df118022d add gitignore 2019-02-22 21:26:25 -08:00
95457c8f4c add echo and broadcast server as ws sub-commands 2019-02-22 21:25:56 -08:00
0a45b7787f cleanup 2019-02-22 20:51:22 -08:00
b8c397e180 add ws_chat and ws_connect sub commands to ws 2019-02-22 20:49:26 -08:00
90105fa2b3 all CMakeLists are referenced by the top level one 2019-02-21 22:21:29 -08:00
24859fef8a add target for building with homebrew 2019-02-21 22:05:30 -08:00
73d7280723 Feature/ws cli (#15)
* New command line tool for transfering files / still very beta.

* add readme

* use cli11 for argument parsing

* json -> msgpack

* stop using base64 and use binary which can be stored in message pack
2019-02-21 21:24:53 -08:00
262de49c3c Update README.md
Add note about message fragmentation.
2019-02-21 14:08:27 -08:00
3a77e96a05 Feature/send large message (#14)
* introduce send fragment

* can pass a fin frame

* can send messages which are a perfect multiple of the chunk size

* set fin only for last fragment

* cleanup

* last fragment should be of type CONTINUATION

* Add simple send and receive programs

* speedups receiving + better way to wait for thing

* receive speedup by using linked list of chunks instead of large array

* document bug

* use chunks to receive data

* trailing spaces
2019-02-20 18:59:07 -08:00
505dd6d50f document bug 2019-02-16 10:33:37 -08:00
3f8027b65c unittest for sending large messages 2019-02-16 10:32:02 -08:00
0f2c765f45 Update formatting in README.md 2019-02-05 23:04:45 -08:00
49077f8f44 more conf in CI 2019-01-29 17:50:19 -08:00
6a23b8530f get free port that can be used by non root users (> 1024) 2019-01-28 15:24:19 -08:00
ae841af91a use dynamically generated port number to configure servers in unittest 2019-01-28 15:24:19 -08:00
44f38849b2 Merge pull request #13 from machinezone/user/bsergeant/poll
User/bsergeant/poll
2019-01-27 10:47:38 -08:00
143 changed files with 16757 additions and 1373 deletions

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
build

View File

@ -1,2 +1 @@
venv
build

0
.gitmodules vendored
View File

View File

@ -2,8 +2,16 @@ language: cpp
dist: xenial
compiler:
- gcc
- clang
# - gcc
os:
- linux
- osx
# os: osx
script: make test
matrix:
exclude:
# GCC fails on recent Travis OSX images.
- compiler: gcc
os: osx
script: python test/run.py

View File

@ -10,15 +10,16 @@ set (CMAKE_CXX_STANDARD 14)
set (CXX_STANDARD_REQUIRED ON)
set (CMAKE_CXX_EXTENSIONS OFF)
# -Wshorten-64-to-32 does not work with clang
if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXWebSocket.cpp
@ -28,16 +29,23 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
)
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
@ -49,6 +57,11 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/libwshandshake.hpp
ixwebsocket/IXHttpClient.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h
)
# Platform specific code
@ -58,6 +71,8 @@ elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptEventFd.cpp)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSelectInterruptEventFd.h)
endif()
if (USE_TLS)
@ -112,3 +127,5 @@ set( IXWEBSOCKET_INCLUDE_DIRS
.
../../shared/OpenSSL/include)
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)

View File

@ -1 +0,0 @@
docker/Dockerfile.debian

31
Dockerfile Normal file
View File

@ -0,0 +1,31 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

102
README.md
View File

@ -5,17 +5,16 @@
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS
* iOS
* Linux
* Android
* Windows (no TLS support yet)
## Examples
The examples folder countains a simple chat program, using a node.js broadcast server.
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage.
Here is what the client API looks like.
@ -25,7 +24,7 @@ ix::WebSocket webSocket;
std::string url("ws://localhost:8080/");
webSocket.setUrl(url);
// Optional heart beat, sent every 45 seconds when there isn't any traffic
// Optional heart beat, sent every 45 seconds when there is not any traffic
// to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45);
@ -77,7 +76,10 @@ server.setOnConnectionCallback(
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
@ -110,12 +112,81 @@ server.wait();
```
Here is what the HTTP client API looks like. Note that HTTP client support is very recent and subject to changes.
```
//
// Preparation
//
HttpClient httpClient;
HttpRequestArgs args;
// Custom headers can be set
WebSocketHttpHeaders headers;
headers["Foo"] = "bar";
args.extraHeaders = headers;
// Timeout options
args.connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout;
// Redirect options
args.followRedirects = followRedirects;
args.maxRedirects = maxRedirects;
// Misc
args.compress = compress; // Enable gzip compression
args.verbose = verbose;
args.logger = [](const std::string& msg)
{
std::cout << msg;
};
//
// Request
//
HttpResponse out;
std::string url = "https://www.google.com";
// HEAD request
out = httpClient.head(url, args);
// GET request
out = httpClient.get(url, args);
// POST request with parameters
HttpParameters httpParameters;
httpParameters["foo"] = "bar";
out = httpClient.post(url, httpParameters, args);
// POST request with a body
out = httpClient.post(url, std::string("foo=bar"), args);
//
// Result
//
auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
```
## Build
CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project.
There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`.
You can build and install the ws command line tool with Homebrew.
```
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz
brew install IXWebSocket
```
## Implementation details
### Per Message Deflate compression.
@ -134,25 +205,19 @@ No manual polling to fetch data is required. Data is sent and received instantly
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
## Limitations
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue.
## Examples
1. Bring up a terminal and jump to the examples folder.
2. Compile the example C++ code. `sh build.sh`
3. Install node.js from [here](https://nodejs.org/en/download/).
4. Type `npm install` to install the node.js dependencies. Then `node broadcast-server.js` to run the server.
5. Bring up a second terminal. `./cmd_websocket_chat bob`
6. Bring up a third terminal. `./cmd_websocket_chat bill`
7. Start typing things in any of those terminals. Hopefully you should see your message being received on the other end.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue.
## C++ code organization
Here's a simplistic diagram which explains how the code is structured in term of class/modules.
Here is a simplistic diagram which explains how the code is structured in term of class/modules.
```
+-----------------------+ --- Public
@ -204,7 +269,7 @@ If the connection was closed and sending failed, the return value will be set to
1. WebSocket_ReadyState_Connecting - The connection is not yet open.
2. WebSocket_ReadyState_Open - The connection is open and ready to communicate.
3. WebSocket_ReadyState_Closing - The connection is in the process of closing.
4. WebSocket_MessageType_Close - The connection is closed or couldn't be opened.
4. WebSocket_MessageType_Close - The connection is closed or could not be opened.
### Open and Close notifications
@ -309,11 +374,12 @@ A ping message can be sent to the server, with an optional data string.
```
websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long");
```
### Heartbeat.
You can configure an optional heart beat / keep-alive, sent every 45 seconds
when there isn't any traffic to make sure that load balancers do not kill an
when there is no any traffic to make sure that load balancers do not kill an
idle connection.
```

View File

@ -1,16 +0,0 @@
FROM debian:stretch
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

View File

@ -1,11 +0,0 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

View File

@ -1,11 +0,0 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

View File

@ -1,19 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
COPY . .
WORKDIR test
RUN ["sh", "build_linux.sh"]

View File

@ -1,8 +0,0 @@
FROM gcc:8
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

View File

@ -1,9 +0,0 @@
CMakeCache.txt
package-lock.json
CMakeFiles
ixwebsocket_unittest
cmake_install.cmake
node_modules
ixwebsocket
Makefile
build

View File

@ -1,30 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (broadcast_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(broadcast_server .)
add_executable(broadcast_server
broadcast_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(broadcast_server "-framework foundation" "-framework security")
endif()
target_link_libraries(broadcast_server ixwebsocket)
install(TARGETS broadcast_server DESTINATION bin)

View File

@ -1,74 +0,0 @@
/*
* broadcast_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,3 +0,0 @@
build
venv
node_modules

View File

@ -1,23 +0,0 @@
#
# cmd_websocket_chat.cpp
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (cmd_websocket_chat)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(cmd_websocket_chat cmd_websocket_chat.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(cmd_websocket_chat "-framework foundation" "-framework security")
endif()
target_link_libraries(cmd_websocket_chat ixwebsocket)
install(TARGETS cmd_websocket_chat DESTINATION bin)

View File

@ -1,39 +0,0 @@
# Building
1. cmake -G .
2. make
## Disable TLS
chat$ cmake -DUSE_TLS=OFF .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
chat$ make
Scanning dependencies of target ixwebsocket
[ 16%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 33%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 50%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 66%] Linking CXX static library libixwebsocket.a
[ 66%] Built target ixwebsocket
[ 83%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
## Enable TLS (default)
```
chat$ cmake -DUSE_TLS=ON .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
(venv) chat$ make
Scanning dependencies of target ixwebsocket
[ 14%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 28%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 42%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 57%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocketAppleSSL.cpp.o
[ 71%] Linking CXX static library libixwebsocket.a
[ 71%] Built target ixwebsocket
[ 85%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
```

View File

@ -1,15 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -1,17 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
clang++ --std=c++11 --stdlib=libc++ \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXSocketAppleSSL.cpp \
../../ixwebsocket/IXWebSocket.cpp \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat \
-framework Security \
-framework Foundation

View File

@ -1,31 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"safe-buffer": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
},
"ultron": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ultron/-/ultron-1.1.1.tgz",
"integrity": "sha512-UIEXBNeYmKptWH6z8ZnqTeS8fV74zG0/eRU9VGkpzz+LIJNs8W/zM/L+7ctCkRrgbNnnR0xxw4bKOr0cW0N0Og=="
},
"ws": {
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz",
"integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==",
"requires": {
"async-limiter": "1.0.0",
"safe-buffer": "5.1.2",
"ultron": "1.1.1"
}
}
}
}

View File

@ -1,6 +0,0 @@
{
"dependencies": {
"msgpack-js": "^0.3.0",
"ws": "^3.3.3"
}
}

View File

@ -1,30 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (echo_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(echo_server .)
add_executable(echo_server
echo_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(echo_server "-framework foundation" "-framework security")
endif()
target_link_libraries(echo_server ixwebsocket)
install(TARGETS echo_server DESTINATION bin)

View File

@ -1,68 +0,0 @@
/*
* echo_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
webSocket->send(str);
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,27 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ping_pong)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ping_pong ping_pong.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ping_pong "-framework foundation" "-framework security")
endif()
if (WIN32)
target_link_libraries(ping_pong wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
endif()
target_link_libraries(ping_pong ixwebsocket)
install(TARGETS ping_pong DESTINATION bin)

View File

@ -1,15 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -1,17 +0,0 @@
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
response = await websocket.recv()
print(response)
pong_waiter = await websocket.ping('coucou')
ret = await pong_waiter # only if you want to wait for the pong
print(ret)
asyncio.get_event_loop().run_until_complete(
hello('ws://localhost:5678'))

View File

@ -1,21 +0,0 @@
#!/usr/bin/env python
import os
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(message)
await websocket.send(message)
if os.getenv('TEST_CLOSE'):
print('Closing')
# breakpoint()
await websocket.close(1001, 'close message')
# await websocket.close()
break
asyncio.get_event_loop().run_until_complete(
websockets.serve(echo, 'localhost', 5678))
asyncio.get_event_loop().run_forever()

View File

@ -1,9 +0,0 @@
#!/bin/sh
test -d build || {
mkdir -p build
cd build
cmake ..
}
(cd build ; make)
./build/ping_pong ws://localhost:5678

View File

@ -1,3 +0,0 @@
build
venv
node_modules

View File

@ -1,22 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ws_connect)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ws_connect ws_connect.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws_connect "-framework foundation" "-framework security")
endif()
target_link_libraries(ws_connect ixwebsocket)
install(TARGETS ws_connect DESTINATION bin)

View File

@ -1,11 +0,0 @@
# Building
1. mkdir build
2. cd build
3. cmake ..
4. make
## Disable TLS
* Enable: `cmake -DUSE_TLS=OFF ..`
* Disable: `cmake -DUSE_TLS=ON ..`

View File

@ -1,25 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
-DIXWEBSOCKET_USE_TLS \
-g \
../../ixwebsocket/IXEventFd.cpp \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXSetThreadName.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
../../ixwebsocket/IXDNSLookup.cpp \
../../ixwebsocket/IXSocketConnect.cpp \
../../ixwebsocket/IXSocketOpenSSL.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
-I ../.. \
ws_connect.cpp \
-o ws_connect \
-lcrypto -lssl -lz -lpthread

View File

@ -8,7 +8,7 @@
#include <chrono>
namespace ix
namespace ix
{
CancellationRequest makeCancellationRequestWithTimeout(int secs,
std::atomic<bool>& requestInitCancellation)
@ -20,7 +20,7 @@ namespace ix
{
// Was an explicit cancellation requested ?
if (requestInitCancellation) return true;
auto now = std::chrono::system_clock::now();
if ((now - start) > timeout) return true;

View File

@ -9,7 +9,7 @@
#include <functional>
#include <atomic>
namespace ix
namespace ix
{
using CancellationRequest = std::function<bool()>;

View File

@ -10,7 +10,7 @@
#include <string.h>
#include <chrono>
namespace ix
namespace ix
{
const int64_t DNSLookup::kDefaultWait = 10; // ms
@ -26,7 +26,7 @@ namespace ix
_done(false),
_id(_nextId++)
{
}
DNSLookup::~DNSLookup()
@ -36,7 +36,7 @@ namespace ix
_activeJobs.erase(_id);
}
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg)
{
@ -49,7 +49,7 @@ namespace ix
std::string sport = std::to_string(port);
struct addrinfo* res;
int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(),
int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(),
&hints, &res);
if (getaddrinfo_result)
{
@ -101,7 +101,7 @@ namespace ix
_activeJobs.insert(_id);
}
//
//
// Good resource on thread forced termination
// https://www.bo-yang.net/2017/11/19/cpp-kill-detached-thread
//
@ -141,7 +141,7 @@ namespace ix
void DNSLookup::run(uint64_t id, const std::string& hostname, int port) // thread runner
{
// We don't want to read or write into members variables of an object that could be
// gone, so we use temporary variables (res) or we pass in by copy everything that
// gone, so we use temporary variables (res) or we pass in by copy everything that
// getAddrInfo needs to work.
std::string errMsg;
struct addrinfo* res = getAddrInfo(hostname, port, errMsg);

View File

@ -3,7 +3,7 @@
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*
* Resolve a hostname+port to a struct addrinfo obtained with getaddrinfo
* Resolve a hostname+port to a struct addrinfo obtained with getaddrinfo
* Does this in a background thread so that it can be cancelled, since
* getaddrinfo is a blocking call, and we don't want to block the main thread on Mobile.
*/
@ -20,7 +20,7 @@
struct addrinfo;
namespace ix
namespace ix
{
class DNSLookup {
public:
@ -39,7 +39,7 @@ namespace ix
struct addrinfo* resolveBlocking(std::string& errMsg,
const CancellationRequest& isCancellationRequested);
static struct addrinfo* getAddrInfo(const std::string& hostname,
static struct addrinfo* getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg);

View File

@ -1,82 +0,0 @@
/*
* IXEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
#include "IXEventFd.h"
#ifdef __linux__
# include <sys/eventfd.h>
#endif
#ifndef _WIN32
#include <unistd.h> // for write
#endif
namespace ix
{
EventFd::EventFd() :
_eventfd(-1)
{
#ifdef __linux__
_eventfd = eventfd(0, 0);
#endif
}
EventFd::~EventFd()
{
#ifdef __linux__
::close(_eventfd);
#endif
}
bool EventFd::notify()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// select will wake up when a non-zero value is written to our eventfd
uint64_t value = 1;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
bool EventFd::clear()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
int EventFd::getFd()
{
return _eventfd;
}
}

View File

@ -1,23 +0,0 @@
/*
* IXEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
class EventFd {
public:
EventFd();
virtual ~EventFd();
bool notify();
bool clear();
int getFd();
private:
int _eventfd;
};
}

View File

@ -0,0 +1,467 @@
/*
* IXHttpClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXHttpClient.h"
#include "IXUrlParser.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXSocketFactory.h"
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
#include <zlib.h>
namespace ix
{
const std::string HttpClient::kPost = "POST";
const std::string HttpClient::kGet = "GET";
const std::string HttpClient::kHead = "HEAD";
HttpClient::HttpClient()
{
}
HttpClient::~HttpClient()
{
}
HttpResponse HttpClient::request(
const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects)
{
uint64_t uploadSize = 0;
uint64_t downloadSize = 0;
int code = 0;
WebSocketHttpHeaders headers;
std::string payload;
std::string protocol, host, path, query;
int port;
bool websocket = false;
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket))
{
std::stringstream ss;
ss << "Cannot parse url: " << url;
return std::make_tuple(code, HttpErrorCode_UrlMalformed,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
bool tls = protocol == "https";
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return std::make_tuple(code, HttpErrorCode_CannotCreateSocket,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Build request string
std::stringstream ss;
ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "User-Agent: ixwebsocket/1.0.0" << "\r\n";
ss << "Accept: */*" << "\r\n";
if (args.compress)
{
ss << "Accept-Encoding: gzip" << "\r\n";
}
// Append extra headers
for (auto&& it : args.extraHeaders)
{
ss << it.first << ": " << it.second << "\r\n";
}
if (verb == kPost)
{
ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end())
{
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
}
ss << "\r\n";
ss << body;
}
else
{
ss << "\r\n";
}
std::string req(ss.str());
std::string errMsg;
std::atomic<bool> requestInitCancellation(false);
// Make a cancellation object dealing with connection timeout
auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success)
{
std::stringstream ss;
ss << "Cannot connect to url: " << url;
return std::make_tuple(code, HttpErrorCode_CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Make a new cancellation object dealing with transfer timeout
isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation);
if (args.verbose)
{
std::stringstream ss;
ss << "Sending " << verb << " request "
<< "to " << host << ":" << port << std::endl
<< "request size: " << req.size() << " bytes" << std::endl
<< "=============" << std::endl
<< req
<< "=============" << std::endl
<< std::endl;
log(ss.str(), args);
}
if (!_socket->writeBytes(req, isCancellationRequested))
{
std::string errorMsg("Cannot send request");
return std::make_tuple(code, HttpErrorCode_SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uploadSize = req.size();
auto lineResult = _socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid)
{
std::string errorMsg("Cannot retrieve status line");
return std::make_tuple(code, HttpErrorCode_CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (args.verbose)
{
std::stringstream ss;
ss << "Status line " << line;
log(ss.str(), args);
}
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{
std::string errorMsg("Cannot parse response code from status line");
return std::make_tuple(code, HttpErrorCode_MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
headers = result.second;
if (!headersValid)
{
std::string errorMsg("Cannot parse http headers");
return std::make_tuple(code, HttpErrorCode_HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects)
{
if (headers.find("Location") == headers.end())
{
std::string errorMsg("Missing location header for redirect");
return std::make_tuple(code, HttpErrorCode_MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (redirects >= args.maxRedirects)
{
std::stringstream ss;
ss << "Too many redirects: " << redirects;
return std::make_tuple(code, HttpErrorCode_TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Recurse
std::string location = headers["Location"];
return request(location, verb, body, args, redirects+1);
}
if (verb == "HEAD")
{
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
// Parse response:
if (headers.find("Content-Length") != headers.end())
{
ssize_t contentLength = -1;
ss.str("");
ss << headers["Content-Length"];
ss >> contentLength;
payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
}
else if (headers.find("Transfer-Encoding") != headers.end() &&
headers["Transfer-Encoding"] == "chunked")
{
std::stringstream ss;
while (true)
{
lineResult = _socket->readLine(isCancellationRequested);
line = lineResult.second;
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uint64_t chunkSize;
ss.str("");
ss << std::hex << line;
ss >> chunkSize;
if (args.verbose)
{
std::stringstream oss;
oss << "Reading " << chunkSize << " bytes"
<< std::endl;
log(oss.str(), args);
}
payload.reserve(payload.size() + chunkSize);
// Read a chunk
auto chunkResult = _socket->readBytes(chunkSize,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
// Read the line that terminates the chunk (\r\n)
lineResult = _socket->readLine(isCancellationRequested);
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (chunkSize == 0) break;
}
}
else if (code == 204)
{
; // 204 is NoContent response code
}
else
{
std::string errorMsg("Cannot read http body");
return std::make_tuple(code, HttpErrorCode_CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
downloadSize = payload.size();
// If the content was compressed with gzip, decode it
if (headers["Content-Encoding"] == "gzip")
{
std::string decompressedPayload;
if (!gzipInflate(payload, decompressedPayload))
{
std::string errorMsg("Error decompressing payload");
return std::make_tuple(code, HttpErrorCode_Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload = decompressedPayload;
}
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
HttpResponse HttpClient::get(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kGet, std::string(), args);
}
HttpResponse HttpClient::head(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kHead, std::string(), args);
}
HttpResponse HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
{
return request(url, kPost, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
{
return request(url, kPost, body, args);
}
std::string HttpClient::urlEncode(const std::string& value)
{
std::ostringstream escaped;
escaped.fill('0');
escaped << std::hex;
for (std::string::const_iterator i = value.begin(), n = value.end();
i != n; ++i)
{
std::string::value_type c = (*i);
// Keep alphanumeric and other accepted characters intact
if (isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~')
{
escaped << c;
continue;
}
// Any other characters are percent-encoded
escaped << std::uppercase;
escaped << '%' << std::setw(2) << int((unsigned char) c);
escaped << std::nouppercase;
}
return escaped.str();
}
std::string HttpClient::serializeHttpParameters(const HttpParameters& httpParameters)
{
std::stringstream ss;
size_t count = httpParameters.size();
size_t i = 0;
for (auto&& it : httpParameters)
{
ss << urlEncode(it.first)
<< "="
<< urlEncode(it.second);
if (i++ < (count-1))
{
ss << "&";
}
}
return ss.str();
}
bool HttpClient::gzipInflate(
const std::string& in,
std::string& out)
{
z_stream inflateState;
std::memset(&inflateState, 0, sizeof(inflateState));
inflateState.zalloc = Z_NULL;
inflateState.zfree = Z_NULL;
inflateState.opaque = Z_NULL;
inflateState.avail_in = 0;
inflateState.next_in = Z_NULL;
if (inflateInit2(&inflateState, 16+MAX_WBITS) != Z_OK)
{
return false;
}
inflateState.avail_in = (uInt) in.size();
inflateState.next_in = (unsigned char *)(const_cast<char *>(in.data()));
const int kBufferSize = 1 << 14;
std::unique_ptr<unsigned char[]> compressBuffer =
std::make_unique<unsigned char[]>(kBufferSize);
do
{
inflateState.avail_out = (uInt) kBufferSize;
inflateState.next_out = compressBuffer.get();
int ret = inflate(&inflateState, Z_SYNC_FLUSH);
if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
{
inflateEnd(&inflateState);
return false;
}
out.append(
reinterpret_cast<char *>(compressBuffer.get()),
kBufferSize - inflateState.avail_out
);
} while (inflateState.avail_out == 0);
inflateEnd(&inflateState);
return true;
}
void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args)
{
if (args.logger)
{
args.logger(msg);
}
}
}

107
ixwebsocket/IXHttpClient.h Normal file
View File

@ -0,0 +1,107 @@
/*
* IXHttpClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <algorithm>
#include <functional>
#include <mutex>
#include <atomic>
#include <tuple>
#include <memory>
#include <map>
#include "IXSocket.h"
#include "IXWebSocketHttpHeaders.h"
namespace ix
{
enum HttpErrorCode
{
HttpErrorCode_Ok = 0,
HttpErrorCode_CannotConnect = 1,
HttpErrorCode_Timeout = 2,
HttpErrorCode_Gzip = 3,
HttpErrorCode_UrlMalformed = 4,
HttpErrorCode_CannotCreateSocket = 5,
HttpErrorCode_SendError = 6,
HttpErrorCode_ReadError = 7,
HttpErrorCode_CannotReadStatusLine = 8,
HttpErrorCode_MissingStatus = 9,
HttpErrorCode_HeaderParsingError = 10,
HttpErrorCode_MissingLocation = 11,
HttpErrorCode_TooManyRedirects = 12,
HttpErrorCode_ChunkReadError = 13,
HttpErrorCode_CannotReadBody = 14
};
using HttpResponse = std::tuple<int, // status
HttpErrorCode, // error code
WebSocketHttpHeaders,
std::string, // payload
std::string, // error msg
uint64_t, // upload size
uint64_t>; // download size
using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>;
struct HttpRequestArgs
{
std::string url;
WebSocketHttpHeaders extraHeaders;
std::string body;
int connectTimeout;
int transferTimeout;
bool followRedirects;
int maxRedirects;
bool verbose;
bool compress;
Logger logger;
OnProgressCallback onProgressCallback;
};
class HttpClient {
public:
HttpClient();
~HttpClient();
HttpResponse get(const std::string& url,
const HttpRequestArgs& args);
HttpResponse head(const std::string& url,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
private:
HttpResponse request(const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects = 0);
std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value);
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(
const std::string& in,
std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost;
const static std::string kGet;
const static std::string kHead;
};
}

View File

@ -0,0 +1,14 @@
/*
* IXProgressCallback.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <functional>
namespace ix
{
using OnProgressCallback = std::function<bool(int current, int total)>;
}

View File

@ -0,0 +1,46 @@
/*
* IXSelectInterrupt.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterrupt.h"
namespace ix
{
SelectInterrupt::SelectInterrupt()
{
;
}
SelectInterrupt::~SelectInterrupt()
{
;
}
bool SelectInterrupt::init(std::string& /*errorMsg*/)
{
return true;
}
bool SelectInterrupt::notify(uint64_t /*value*/)
{
return true;
}
uint64_t SelectInterrupt::read()
{
return 0;
}
bool SelectInterrupt::clear()
{
return true;
}
int SelectInterrupt::getFd() const
{
return -1;
}
}

View File

@ -0,0 +1,28 @@
/*
* IXSelectInterrupt.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterrupt {
public:
SelectInterrupt();
virtual ~SelectInterrupt();
virtual bool init(std::string& errorMsg);
virtual bool notify(uint64_t value);
virtual bool clear();
virtual uint64_t read();
virtual int getFd() const;
};
}

View File

@ -0,0 +1,116 @@
/*
* IXSelectInterruptEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On Linux we use eventd to wake up select.
//
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptEventFd.h"
#include <sys/eventfd.h>
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
SelectInterruptEventFd::SelectInterruptEventFd()
{
_eventfd = -1;
}
SelectInterruptEventFd::~SelectInterruptEventFd()
{
::close(_eventfd);
}
bool SelectInterruptEventFd::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_eventfd == -1);
_eventfd = eventfd(0, 0);
if (_eventfd < 0)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in eventfd()"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
if (fcntl(_eventfd, F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
return true;
}
bool SelectInterruptEventFd::notify(uint64_t value)
{
int fd = _eventfd;
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptEventFd::read()
{
int fd = _eventfd;
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptEventFd::clear()
{
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
}
int SelectInterruptEventFd::getFd() const
{
return _eventfd;
}
}

View File

@ -0,0 +1,32 @@
/*
* IXSelectInterruptEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptEventFd : public SelectInterrupt {
public:
SelectInterruptEventFd();
virtual ~SelectInterruptEventFd();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
int _eventfd;
};
}

View File

@ -0,0 +1,25 @@
/*
* IXSelectInterruptFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterruptFactory.h"
#if defined(__linux__) || defined(__APPLE__)
# include <ixwebsocket/IXSelectInterruptPipe.h>
#else
# include <ixwebsocket/IXSelectInterrupt.h>
#endif
namespace ix
{
std::shared_ptr<SelectInterrupt> createSelectInterrupt()
{
#if defined(__linux__) || defined(__APPLE__)
return std::make_shared<SelectInterruptPipe>();
#else
return std::make_shared<SelectInterrupt>();
#endif
}
}

View File

@ -0,0 +1,15 @@
/*
* IXSelectInterruptFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class SelectInterrupt;
std::shared_ptr<SelectInterrupt> createSelectInterrupt();
}

View File

@ -0,0 +1,119 @@
/*
* IXSelectInterruptPipe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptPipe.h"
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
// File descriptor at index 0 in _fildes is the read end of the pipe
// File descriptor at index 1 in _fildes is the write end of the pipe
const int SelectInterruptPipe::kPipeReadIndex = 0;
const int SelectInterruptPipe::kPipeWriteIndex = 1;
SelectInterruptPipe::SelectInterruptPipe()
{
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
SelectInterruptPipe::~SelectInterruptPipe()
{
::close(_fildes[kPipeReadIndex]);
::close(_fildes[kPipeWriteIndex]);
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
bool SelectInterruptPipe::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_fildes[kPipeReadIndex] == -1);
assert(_fildes[kPipeWriteIndex] == -1);
if (pipe(_fildes) < 0)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in pipe() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
return false;
}
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
//
// FIXME: on macOS we should configure the pipe to not trigger SIGPIPE
// on reads/writes to a closed fd
//
// The generation of the SIGPIPE signal can be suppressed using the
// F_SETNOSIGPIPE fcntl command.
//
return true;
}
bool SelectInterruptPipe::notify(uint64_t value)
{
int fd = _fildes[kPipeWriteIndex];
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptPipe::read()
{
int fd = _fildes[kPipeReadIndex];
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptPipe::clear()
{
return true;
}
int SelectInterruptPipe::getFd() const
{
return _fildes[kPipeReadIndex];
}
}

View File

@ -0,0 +1,39 @@
/*
* IXSelectInterruptPipe.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptPipe : public SelectInterrupt {
public:
SelectInterruptPipe();
virtual ~SelectInterruptPipe();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
// Used to identify the read/write idx
static const int kPipeReadIndex;
static const int kPipeWriteIndex;
};
}

View File

@ -7,6 +7,8 @@
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
#include <stdio.h>
#include <stdlib.h>
@ -15,20 +17,23 @@
#include <stdint.h>
#include <fcntl.h>
#include <sys/types.h>
#include <poll.h>
#include <algorithm>
#include <iostream>
namespace ix
namespace ix
{
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) :
_sockfd(fd)
Socket::Socket(int fd) :
_sockfd(fd),
_selectInterrupt(createSelectInterrupt())
{
;
}
Socket::~Socket()
@ -40,25 +45,43 @@ namespace ix
{
if (_sockfd == -1)
{
onPollCallback(PollResultType_Error);
if (onPollCallback) onPollCallback(PollResultType_Error);
return;
}
#ifdef __linux__
constexpr int nfds = 2;
#else
constexpr int nfds = 1;
#endif
PollResultType pollResult = isReadyToRead(timeoutSecs, 0);
struct pollfd fds[nfds];
fds[0].fd = _sockfd;
fds[0].events = POLLIN;
if (onPollCallback) onPollCallback(pollResult);
}
#ifdef __linux__
fds[1].fd = _eventfd.getFd();
fds[1].events = POLLIN;
#endif
int ret = ::poll(fds, nfds, timeoutSecs * 1000);
PollResultType Socket::select(bool readyToRead, int timeoutSecs, int timeoutMs)
{
fd_set rfds;
fd_set wfds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1)
{
FD_SET(interruptFd, fds);
}
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 1000 * timeoutMs;
// Compute the highest fd.
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0)
@ -69,15 +92,51 @@ namespace ix
{
pollResult = PollResultType_Timeout;
}
else if (interruptFd != -1 && FD_ISSET(interruptFd, fds))
{
uint64_t value = _selectInterrupt->read();
onPollCallback(pollResult);
if (value == kSendRequest)
{
pollResult = PollResultType_SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType_CloseRequest;
}
}
else if (sockfd != -1 && FD_ISSET(sockfd, fds))
{
if (readyToRead)
{
pollResult = PollResultType_ReadyForRead;
}
else
{
pollResult = PollResultType_ReadyForWrite;
}
}
return pollResult;
}
void Socket::wakeUpFromPoll()
PollResultType Socket::isReadyToRead(int timeoutSecs, int timeoutMs)
{
// this will wake up the thread blocked on select, only needed on Linux
_eventfd.notify();
bool readyToRead = true;
return select(readyToRead, timeoutSecs, timeoutMs);
}
PollResultType Socket::isReadyToWrite(int timeoutSecs, int timeoutMs)
{
bool readyToRead = false;
return select(readyToRead, timeoutSecs, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{
return _selectInterrupt->notify(wakeUpCode);
}
bool Socket::connect(const std::string& host,
@ -87,7 +146,7 @@ namespace ix
{
std::lock_guard<std::mutex> lock(_socketMutex);
if (!_eventfd.clear()) return false;
if (!_selectInterrupt->clear()) return false;
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
return _sockfd != -1;
@ -146,69 +205,9 @@ namespace ix
#endif
}
bool Socket::init()
bool Socket::init(std::string& errorMsg)
{
#ifdef _WIN32
INT rc;
WSADATA wsaData;
rc = WSAStartup(MAKEWORD(2, 2), &wsaData);
return rc != 0;
#else
return true;
#endif
}
void Socket::cleanup()
{
#ifdef _WIN32
WSACleanup();
#endif
}
bool Socket::readByte(void* buffer,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
if (ret == 1)
{
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a timeout until something is written.
// This way we are not busy looping
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms timeout
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
return false;
}
continue;
}
// There was an error during the read, abort
else
{
return false;
}
}
return _selectInterrupt->init(errorMsg);
}
bool Socket::writeBytes(const std::string& str,
@ -242,7 +241,42 @@ namespace ix
}
}
std::pair<bool, std::string> Socket::readLine(const CancellationRequest& isCancellationRequested)
bool Socket::readByte(void* buffer,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
if (ret == 1)
{
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error)
{
return false;
}
}
// There was an error during the read, abort
else
{
return false;
}
}
}
std::pair<bool, std::string> Socket::readLine(
const CancellationRequest& isCancellationRequested)
{
char c;
std::string line;
@ -252,7 +286,8 @@ namespace ix
{
if (!readByte(&c, isCancellationRequested))
{
return std::make_pair(false, std::string());
// Return what we were able to read
return std::make_pair(false, line);
}
line += c;
@ -260,4 +295,49 @@ namespace ix
return std::make_pair(true, line);
}
std::pair<bool, std::string> Socket::readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested)
{
if (_readBuffer.empty())
{
_readBuffer.resize(kChunkSize);
}
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested()) return std::make_pair(false, std::string());
int size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{
// Error
return std::make_pair(false, std::string());
}
else if (ret > 0)
{
output.insert(output.end(),
_readBuffer.begin(),
_readBuffer.begin() + ret);
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(0, 1) == PollResultType_Error)
{
return std::make_pair(false, std::string());
}
}
return std::make_pair(true, std::string(output.begin(),
output.end()));
}
}

View File

@ -10,22 +10,29 @@
#include <functional>
#include <mutex>
#include <atomic>
#include <vector>
#include <memory>
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#endif
#include "IXEventFd.h"
#include "IXCancellationRequest.h"
#include "IXProgressCallback.h"
namespace ix
namespace ix
{
class SelectInterrupt;
enum PollResultType
{
PollResultType_ReadyForRead = 0,
PollResultType_Timeout = 1,
PollResultType_Error = 2
PollResultType_ReadyForWrite = 1,
PollResultType_Timeout = 2,
PollResultType_Error = 3,
PollResultType_SendRequest = 4,
PollResultType_CloseRequest = 5
};
class Socket {
@ -34,15 +41,21 @@ namespace ix
Socket(int fd = -1);
virtual ~Socket();
bool init(std::string& errorMsg);
void configure();
virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout);
virtual void wakeUpFromPoll();
// Functions to check whether there is activity on the socket
void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType select(bool readyToRead, int timeoutSecs, int timeoutMs);
PollResultType isReadyToWrite(int timeoutSecs, int timeoutMs);
PollResultType isReadyToRead(int timeoutSecs, int timeoutMs);
// Virtual methods
virtual bool connect(const std::string& url,
virtual bool connect(const std::string& url,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested);
@ -58,21 +71,34 @@ namespace ix
const CancellationRequest& isCancellationRequested);
bool writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested);
static int getErrno();
static bool init(); // Required on Windows to initialize WinSocket
static void cleanup(); // Required on Windows to cleanup WinSocket
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected:
void closeSocket(int fd);
std::atomic<int> _sockfd;
std::mutex _socketMutex;
EventFd _eventfd;
private:
static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
std::shared_ptr<SelectInterrupt> _selectInterrupt;
};
}

View File

@ -50,7 +50,7 @@ OSStatus read_from_socket(SSLConnectionRef connection, void *data, size_t *len)
else
return noErr;
}
else if (0 == status)
else if (0 == status)
{
*len = 0;
return errSSLClosedGraceful;
@ -102,7 +102,7 @@ OSStatus write_to_socket(SSLConnectionRef connection, const void *data, size_t *
else
{
*len = 0;
if (EAGAIN == errno)
if (EAGAIN == errno)
{
return errSSLWouldBlock;
}
@ -141,7 +141,7 @@ std::string getSSLErrorDescription(OSStatus status)
} // anonymous namespace
namespace ix
namespace ix
{
SocketAppleSSL::SocketAppleSSL(int fd) : Socket(fd),
_sslContext(nullptr)
@ -176,11 +176,11 @@ namespace ix
do {
status = SSLHandshake(_sslContext);
} while (errSSLWouldBlock == status ||
} while (errSSLWouldBlock == status ||
errSSLServerAuthCompleted == status);
}
if (noErr != status)
if (noErr != status)
{
errMsg = getSSLErrorDescription(status);
close();
@ -230,7 +230,7 @@ namespace ix
ssize_t SocketAppleSSL::recv(void* buf, size_t nbyte)
{
OSStatus status = errSSLWouldBlock;
while (errSSLWouldBlock == status)
while (errSSLWouldBlock == status)
{
size_t processed = 0;
std::lock_guard<std::mutex> lock(_mutex);
@ -239,7 +239,7 @@ namespace ix
if (processed > 0)
return (ssize_t) processed;
// The connection was reset, inform the caller that this
// The connection was reset, inform the caller that this
// Socket should close
if (status == errSSLClosedGraceful ||
status == errSSLClosedNoNotify ||

View File

@ -14,15 +14,15 @@
#include <mutex>
namespace ix
namespace ix
{
class SocketAppleSSL : public Socket
class SocketAppleSSL : public Socket
{
public:
SocketAppleSSL(int fd = -1);
~SocketAppleSSL();
virtual bool connect(const std::string& host,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested) final;

View File

@ -30,7 +30,7 @@ namespace
}
}
namespace ix
namespace ix
{
//
// This function can be cancelled every 50 ms
@ -42,7 +42,7 @@ namespace ix
const CancellationRequest& isCancellationRequested)
{
errMsg = "no error";
int fd = socket(address->ai_family,
address->ai_socktype,
address->ai_protocol);
@ -72,7 +72,7 @@ namespace ix
errMsg = "Cancelled";
return -1;
}
// Use select to check the status of the new connection
struct timeval timeout;
timeout.tv_sec = 0;
@ -179,7 +179,7 @@ namespace ix
// 3. (apple) prevent SIGPIPE from being emitted when the remote end disconnect
#ifdef SO_NOSIGPIPE
int value = 1;
setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE,
setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE,
(void *)&value, sizeof(value));
#endif
}

View File

@ -12,7 +12,7 @@
struct addrinfo;
namespace ix
namespace ix
{
class SocketConnect {
public:

View File

@ -0,0 +1,64 @@
/*
* IXSocketFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocketFactory.h"
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
namespace ix
{
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket;
if (!tls)
{
socket = std::make_shared<Socket>();
}
else
{
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
socket = std::make_shared<SocketAppleSSL>();
# else
socket = std::make_shared<SocketOpenSSL>();
# endif
#else
errorMsg = "TLS support is not enabled on this platform.";
return nullptr;
#endif
}
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket = std::make_shared<Socket>(fd);
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
}

View File

@ -0,0 +1,20 @@
/*
* IXSocketFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class Socket;
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg);
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg);
}

View File

@ -18,12 +18,12 @@
#include <errno.h>
#define socketerrno errno
namespace ix
namespace ix
{
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr),
_ssl_connection(nullptr),
_ssl_context(nullptr)
{
std::call_once(_openSSLInitFlag, &SocketOpenSSL::openSSLInitialize, this);
@ -80,7 +80,7 @@ namespace ix
return "OpenSSL failed - underlying BIO reported an I/O error";
}
}
else if (err == SSL_ERROR_SSL)
else if (err == SSL_ERROR_SSL)
{
e = ERR_get_error();
std::string errMsg("OpenSSL failed - ");
@ -149,7 +149,7 @@ namespace ix
#if OPENSSL_VERSION_NUMBER < 0x10100000L
// Check server name
bool hostname_verifies_ok = false;
STACK_OF(GENERAL_NAME) *san_names =
STACK_OF(GENERAL_NAME) *san_names =
(STACK_OF(GENERAL_NAME)*) X509_get_ext_d2i((X509 *)server_cert,
NID_subject_alt_name, NULL, NULL);
if (san_names)
@ -160,8 +160,8 @@ namespace ix
if (sk_name->type == GEN_DNS)
{
char *name = (char *)ASN1_STRING_data(sk_name->d.dNSName);
if ((size_t)ASN1_STRING_length(sk_name->d.dNSName) == strlen(name) &&
checkHost(hostname, name))
if ((size_t)ASN1_STRING_length(sk_name->d.dNSName) == strlen(name) &&
checkHost(hostname, name))
{
hostname_verifies_ok = true;
break;
@ -185,8 +185,8 @@ namespace ix
ASN1_STRING *cn_asn1 = X509_NAME_ENTRY_get_data(cn_entry);
char *cn = (char *)ASN1_STRING_data(cn_asn1);
if ((size_t)ASN1_STRING_length(cn_asn1) == strlen(cn) &&
checkHost(hostname, cn))
if ((size_t)ASN1_STRING_length(cn_asn1) == strlen(cn) &&
checkHost(hostname, cn))
{
hostname_verifies_ok = true;
}
@ -205,7 +205,7 @@ namespace ix
return true;
}
bool SocketOpenSSL::openSSLHandshake(const std::string& host, std::string& errMsg)
bool SocketOpenSSL::openSSLHandshake(const std::string& host, std::string& errMsg)
{
while (true)
{

View File

@ -17,15 +17,15 @@
#include <mutex>
namespace ix
namespace ix
{
class SocketOpenSSL : public Socket
class SocketOpenSSL : public Socket
{
public:
SocketOpenSSL(int fd = -1);
~SocketOpenSSL();
virtual bool connect(const std::string& host,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested) final;

View File

@ -47,7 +47,7 @@
// link with ntdsapi.lib for DsMakeSpn function
#pragma comment(lib, "ntdsapi.lib")
// The following function assumes that Winsock
// The following function assumes that Winsock
// has already been initialized
@ -59,7 +59,7 @@
# error("This file should only be built on Windows")
#endif
namespace ix
namespace ix
{
SocketSChannel::SocketSChannel()
{
@ -68,7 +68,7 @@ namespace ix
SocketSChannel::~SocketSChannel()
{
}
bool SocketSChannel::connect(const std::string& host,
@ -78,7 +78,7 @@ namespace ix
return Socket::connect(host, port, errMsg);
}
void SocketSChannel::secureSocket()
{
// there will be a lot to do here ...

View File

@ -8,15 +8,15 @@
#include "IXSocket.h"
namespace ix
namespace ix
{
class SocketSChannel : public Socket
class SocketSChannel : public Socket
{
public:
SocketSChannel();
~SocketSChannel();
virtual bool connect(const std::string& host,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg) final;
virtual void close() final;

View File

@ -14,7 +14,7 @@
#include <future>
#include <string.h>
namespace ix
namespace ix
{
const int SocketServer::kDefaultPort(8080);
const std::string SocketServer::kDefaultHost("127.0.0.1");
@ -83,7 +83,7 @@ namespace ix
server.sin_family = AF_INET;
server.sin_port = htons(_port);
// Using INADDR_ANY trigger a pop-up box as binding to any address is detected
// Using INADDR_ANY trigger a pop-up box as binding to any address is detected
// by the osx firewall. We need to codesign the binary with a self-signed cert
// to allow that, but this is a bit of a pain. (this is what node or python would do).
//
@ -216,7 +216,7 @@ namespace ix
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
// the destructor of a future returned by std::async blocks,
// so we need to declare it outside of this loop
f = std::async(std::launch::async,
&SocketServer::handleConnection,

View File

@ -16,7 +16,7 @@
#include <atomic>
#include <condition_variable>
namespace ix
namespace ix
{
class SocketServer {
public:

104
ixwebsocket/IXUrlParser.cpp Normal file
View File

@ -0,0 +1,104 @@
/*
* IXUrlParser.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXUrlParser.h"
#include <iostream>
#include <sstream>
namespace ix
{
//
// The only difference between those 2 regex is the protocol
//
std::regex UrlParser::_httpRegex("(http|https)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::regex UrlParser::_webSocketRegex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
bool UrlParser::parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket)
{
std::cmatch what;
if (!regex_match(url.c_str(), what,
websocket ? _webSocketRegex : _httpRegex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws" || protocol == "http")
{
port = 80;
}
else if (protocol == "wss" || protocol == "https")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void UrlParser::printUrl(const std::string& url, bool websocket)
{
std::string protocol, host, path, query;
int port {0};
if (!parse(url, protocol, host, path, query, port, websocket))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
}

31
ixwebsocket/IXUrlParser.h Normal file
View File

@ -0,0 +1,31 @@
/*
* IXUrlParser.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <regex>
namespace ix
{
class UrlParser
{
public:
static bool parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket);
static void printUrl(const std::string& url, bool websocket);
private:
static std::regex _httpRegex;
static std::regex _webSocketRegex;
};
}

View File

@ -50,7 +50,7 @@ namespace ix
);
}
WebSocket::~WebSocket()
WebSocket::~WebSocket()
{
stop();
}
@ -135,7 +135,7 @@ namespace ix
}
_onMessageCallback(WebSocket_MessageType_Open, "", 0,
WebSocketErrorInfo(),
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
return status;
@ -155,7 +155,7 @@ namespace ix
}
_onMessageCallback(WebSocket_MessageType_Open, "", 0,
WebSocketErrorInfo(),
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
return status;
@ -184,7 +184,7 @@ namespace ix
using millis = std::chrono::duration<double, std::milli>;
millis duration;
while (true)
while (true)
{
if (isConnected() || isClosing() || _stop || !_automaticReconnection)
{
@ -214,7 +214,7 @@ namespace ix
{
setThreadName(_url);
while (true)
while (true)
{
if (_stop) return;
@ -223,7 +223,7 @@ namespace ix
if (_stop) return;
// 2. Poll to see if there's any new data available
// 2. Poll to see if there's any new data available
_ws.poll();
if (_stop) return;
@ -252,6 +252,11 @@ namespace ix
{
webSocketMessageType = WebSocket_MessageType_Pong;
} break;
case WebSocketTransport::FRAGMENT:
{
webSocketMessageType = WebSocket_MessageType_Fragment;
} break;
}
WebSocketErrorInfo webSocketErrorInfo;
@ -273,7 +278,7 @@ namespace ix
void WebSocket::setOnMessageCallback(const OnMessageCallback& callback)
{
_onMessageCallback = callback;
_onMessageCallback = callback;
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
@ -294,9 +299,10 @@ namespace ix
}
}
WebSocketSendInfo WebSocket::send(const std::string& text)
WebSocketSendInfo WebSocket::send(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, false);
return sendMessage(text, false, onProgressCallback);
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
@ -308,7 +314,9 @@ namespace ix
return sendMessage(text, true);
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text, bool ping)
WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
bool ping,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
@ -330,7 +338,7 @@ namespace ix
}
else
{
webSocketSendInfo = _ws.sendBinary(text);
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
}
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
@ -340,7 +348,7 @@ namespace ix
ReadyState WebSocket::getReadyState() const
{
switch (_ws.getReadyState())
switch (_ws.getReadyState())
{
case ix::WebSocketTransport::OPEN: return WebSocket_ReadyState_Open;
case ix::WebSocketTransport::CONNECTING: return WebSocket_ReadyState_Connecting;
@ -371,4 +379,9 @@ namespace ix
{
_automaticReconnection = false;
}
size_t WebSocket::bufferedAmount() const
{
return _ws.bufferedAmount();
}
}

View File

@ -19,11 +19,12 @@
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXProgressCallback.h"
namespace ix
{
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket#Ready_state_constants
enum ReadyState
enum ReadyState
{
WebSocket_ReadyState_Connecting = 0,
WebSocket_ReadyState_Open = 1,
@ -38,7 +39,8 @@ namespace ix
WebSocket_MessageType_Close = 2,
WebSocket_MessageType_Error = 3,
WebSocket_MessageType_Ping = 4,
WebSocket_MessageType_Pong = 5
WebSocket_MessageType_Pong = 5,
WebSocket_MessageType_Fragment = 6
};
struct WebSocketOpenInfo
@ -78,7 +80,7 @@ namespace ix
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class WebSocket
class WebSocket
{
public:
WebSocket();
@ -97,7 +99,8 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs);
void run();
WebSocketSendInfo send(const std::string& text);
WebSocketSendInfo send(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close();
@ -109,13 +112,16 @@ namespace ix
const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection();
void disableAutomaticReconnection();
private:
WebSocketSendInfo sendMessage(const std::string& text, bool ping);
WebSocketSendInfo sendMessage(const std::string& text,
bool ping,
const OnProgressCallback& callback = nullptr);
bool isConnected() const;
bool isClosing() const;

View File

@ -8,7 +8,7 @@
#include <string>
namespace ix
namespace ix
{
struct WebSocketErrorInfo
{

View File

@ -6,6 +6,7 @@
#include "IXWebSocketHandshake.h"
#include "IXSocketConnect.h"
#include "IXUrlParser.h"
#include "libwshandshake.hpp"
@ -16,7 +17,7 @@
#include <algorithm>
namespace ix
namespace ix
{
WebSocketHandshake::WebSocketHandshake(std::atomic<bool>& requestInitCancellation,
std::shared_ptr<Socket> socket,
@ -32,90 +33,6 @@ namespace ix
}
bool WebSocketHandshake::parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port)
{
std::regex ex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::cmatch what;
if (!regex_match(url.c_str(), what, ex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws")
{
port = 80;
}
else if (protocol == "wss")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void WebSocketHandshake::printUrl(const std::string& url)
{
std::string protocol, host, path, query;
int port {0};
if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
std::string WebSocketHandshake::trim(const std::string& str)
{
std::string out(str);
@ -171,7 +88,7 @@ namespace ix
std::string WebSocketHandshake::genRandomString(const int len)
{
std::string alphanum =
std::string alphanum =
"0123456789"
"ABCDEFGH"
"abcdefgh";
@ -192,61 +109,6 @@ namespace ix
return s;
}
std::pair<bool, WebSocketHttpHeaders> WebSocketHandshake::parseHttpHeaders(
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[256];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!_socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
// Make the name lower case.
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
WebSocketInitResult WebSocketHandshake::sendErrorResponse(int code, const std::string& reason)
{
std::stringstream ss;
@ -277,7 +139,7 @@ namespace ix
{
_requestInitCancellation = false;
auto isCancellationRequested =
auto isCancellationRequested =
makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string errMsg;
@ -355,7 +217,7 @@ namespace ix
return WebSocketInitResult(false, status, ss.str());
}
auto result = parseHttpHeaders(isCancellationRequested);
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
auto headers = result.second;
@ -372,7 +234,7 @@ namespace ix
}
// Check the value of the connection field
// Some websocket servers (Go/Gorilla?) send lowercase values for the
// Some websocket servers (Go/Gorilla?) send lowercase values for the
// connection header, so do a case insensitive comparison
if (!insensitiveStringCompare(headers["connection"], "Upgrade"))
{
@ -418,7 +280,7 @@ namespace ix
// Set the socket to non blocking mode + other tweaks
SocketConnect::configure(fd);
auto isCancellationRequested =
auto isCancellationRequested =
makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string remote = std::string("remote fd ") + std::to_string(fd);
@ -432,7 +294,7 @@ namespace ix
{
return sendErrorResponse(400, "Error reading HTTP request line");
}
// Validate request line (GET /foo HTTP/1.1\r\n)
auto requestLine = parseRequestLine(line);
auto method = std::get<0>(requestLine);
@ -450,7 +312,7 @@ namespace ix
}
// Retrieve and validate HTTP headers
auto result = parseHttpHeaders(isCancellationRequested);
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
auto headers = result.second;

View File

@ -18,7 +18,7 @@
#include <memory>
#include <tuple>
namespace ix
namespace ix
{
struct WebSocketInitResult
{
@ -59,19 +59,10 @@ namespace ix
WebSocketInitResult serverHandshake(int fd,
int timeoutSecs);
static bool parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port);
private:
static void printUrl(const std::string& url);
std::string genRandomString(const int len);
// Parse HTTP headers
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(const CancellationRequest& isCancellationRequested);
WebSocketInitResult sendErrorResponse(int code, const std::string& reason);
std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line);

View File

@ -0,0 +1,66 @@
/*
* IXWebSocketHttpHeaders.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketHttpHeaders.h"
#include "IXSocket.h"
#include <string>
#include <unordered_map>
namespace ix
{
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[1024];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 1023 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
}

View File

@ -6,10 +6,40 @@
#pragma once
#include <string>
#include <unordered_map>
#include "IXCancellationRequest.h"
namespace ix
#include <string>
#include <map>
#include <memory>
#include <algorithm>
namespace ix
{
using WebSocketHttpHeaders = std::unordered_map<std::string, std::string>;
class Socket;
struct CaseInsensitiveLess
{
// Case Insensitive compare_less binary function
struct NocaseCompare
{
bool operator() (const unsigned char& c1, const unsigned char& c2) const
{
return std::tolower(c1) < std::tolower(c2);
}
};
bool operator() (const std::string & s1, const std::string & s2) const
{
return std::lexicographical_compare
(s1.begin(), s1.end(), // source range
s2.begin(), s2.end(), // dest range
NocaseCompare()); // comparison
}
};
using WebSocketHttpHeaders = std::map<std::string, std::string, CaseInsensitiveLess>;
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested);
}

View File

@ -34,7 +34,7 @@
* - Reused zlib compression + decompression bits.
* - Refactored to have 2 class for compression and decompression, to allow multi-threading
* and make sure that _compressBuffer is not shared between threads.
* - Original code wasn't working for some reason, I had to add checks
* - Original code wasn't working for some reason, I had to add checks
* for the presence of the kEmptyUncompressedBlock at the end of buffer so that servers
* would start accepting receiving/decoding compressed messages. Original code was probably
* modifying the passed in buffers before processing in enabled.hpp ?
@ -65,13 +65,13 @@ namespace ix
bool WebSocketPerMessageDeflate::init(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions)
{
bool clientNoContextTakeover =
bool clientNoContextTakeover =
perMessageDeflateOptions.getClientNoContextTakeover();
uint8_t deflateBits = perMessageDeflateOptions.getClientMaxWindowBits();
uint8_t inflateBits = perMessageDeflateOptions.getServerMaxWindowBits();
return _compressor->init(deflateBits, clientNoContextTakeover) &&
return _compressor->init(deflateBits, clientNoContextTakeover) &&
_decompressor->init(inflateBits, clientNoContextTakeover);
}

View File

@ -37,7 +37,7 @@
#include <string>
#include <memory>
namespace ix
namespace ix
{
class WebSocketPerMessageDeflateOptions;
class WebSocketPerMessageDeflateCompressor;

View File

@ -14,7 +14,7 @@
namespace
{
// The passed in size (4) is important, without it the string litteral
// is treated as a char* and the null termination (\x00) makes it
// is treated as a char* and the null termination (\x00) makes it
// look like an empty string.
const std::string kEmptyUncompressedBlock = std::string("\x00\x00\xff\xff", 4);
@ -76,16 +76,16 @@ namespace ix
{
//
// 7.2.1. Compression
//
//
// An endpoint uses the following algorithm to compress a message.
//
//
// 1. Compress all the octets of the payload of the message using
// DEFLATE.
//
//
// 2. If the resulting data does not end with an empty DEFLATE block
// with no compression (the "BTYPE" bits are set to 00), append an
// empty DEFLATE block with no compression to the tail end.
//
//
// 3. Remove 4 octets (that are 0x00 0x00 0xff 0xff) from the tail end.
// After this step, the last octet of the compressed data contains
// (possibly part of) the DEFLATE header bits with the "BTYPE" bits
@ -168,14 +168,14 @@ namespace ix
{
//
// 7.2.2. Decompression
//
//
// An endpoint uses the following algorithm to decompress a message.
//
//
// 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the
// payload of the message.
//
//
// 2. Decompress the resulting data using DEFLATE.
//
//
std::string inFixed(in);
inFixed += kEmptyUncompressedBlock;

View File

@ -10,7 +10,7 @@
#include <string>
#include <memory>
namespace ix
namespace ix
{
class WebSocketPerMessageDeflateCompressor
{

View File

@ -36,7 +36,7 @@ namespace ix
_serverMaxWindowBits = serverMaxWindowBits;
}
//
//
// Four extension parameters are defined for "permessage-deflate" to
// help endpoints manage per-connection resource usage.
//
@ -88,9 +88,9 @@ namespace ix
int x;
ss >> x;
// Sanitize values to be in the proper range [8, 15] in
// Sanitize values to be in the proper range [8, 15] in
// case a server would give us bogus values
_serverMaxWindowBits =
_serverMaxWindowBits =
std::min(maxServerMaxWindowBits,
std::max(x, minServerMaxWindowBits));
}
@ -103,9 +103,9 @@ namespace ix
int x;
ss >> x;
// Sanitize values to be in the proper range [8, 15] in
// Sanitize values to be in the proper range [8, 15] in
// case a server would give us bogus values
_clientMaxWindowBits =
_clientMaxWindowBits =
std::min(maxClientMaxWindowBits,
std::max(x, minClientMaxWindowBits));
}
@ -162,7 +162,7 @@ namespace ix
std::string WebSocketPerMessageDeflateOptions::removeSpaces(const std::string& str)
{
std::string out(str);
out.erase(std::remove_if(out.begin(),
out.erase(std::remove_if(out.begin(),
out.end(),
[](unsigned char x){ return std::isspace(x); }),
out.end());

View File

@ -8,7 +8,7 @@
#include <string>
namespace ix
namespace ix
{
class WebSocketPerMessageDeflateOptions
{

View File

@ -9,7 +9,7 @@
#include <string>
#include <iostream>
namespace ix
namespace ix
{
struct WebSocketSendInfo
{

View File

@ -14,7 +14,7 @@
#include <future>
#include <string.h>
namespace ix
namespace ix
{
const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds
@ -65,7 +65,7 @@ namespace ix
auto status = webSocket->connectToSocket(fd, _handshakeTimeoutSecs);
if (status.success)
{
// Process incoming messages and execute callbacks
// Process incoming messages and execute callbacks
// until the connection is closed
webSocket->run();
}

View File

@ -18,7 +18,7 @@
#include "IXWebSocket.h"
#include "IXSocketServer.h"
namespace ix
namespace ix
{
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;

View File

@ -1,7 +1,31 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2012, 2013 <dhbaird@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/*
* IXWebSocketTransport.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
//
@ -11,14 +35,8 @@
#include "IXWebSocketTransport.h"
#include "IXWebSocketHandshake.h"
#include "IXWebSocketHttpHeaders.h"
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
# include "IXSocketAppleSSL.h"
# else
# include "IXSocketOpenSSL.h"
# endif
#endif
#include "IXUrlParser.h"
#include "IXSocketFactory.h"
#include <string.h>
#include <stdlib.h>
@ -29,12 +47,15 @@
#include <cstdarg>
#include <iostream>
#include <sstream>
#include <chrono>
#include <thread>
namespace ix
{
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED),
@ -45,7 +66,7 @@ namespace ix
_heartBeatPeriod(kDefaultHeartBeatPeriod),
_lastSendTimePoint(std::chrono::steady_clock::now())
{
_readbuf.resize(kChunkSize);
}
WebSocketTransport::~WebSocketTransport()
@ -67,31 +88,21 @@ namespace ix
{
std::string protocol, host, path, query;
int port;
bool websocket = true;
if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket))
{
return WebSocketInitResult(false, 0,
std::string("Could not parse URL ") + url);
}
if (protocol == "wss")
bool tls = protocol == "wss";
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
_socket.reset();
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
_socket = std::make_shared<SocketAppleSSL>();
# else
_socket = std::make_shared<SocketOpenSSL>();
# endif
#else
return WebSocketInitResult(false, 0, "TLS is not supported.");
#endif
}
else
{
_socket.reset();
_socket = std::make_shared<Socket>();
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
@ -112,8 +123,13 @@ namespace ix
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{
_socket.reset();
_socket = std::make_shared<Socket>(fd);
std::string errorMsg;
_socket = createSocket(fd, errorMsg);
if (!_socket)
{
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket,
@ -129,7 +145,7 @@ namespace ix
return result;
}
WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const
WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const
{
return _readyState;
}
@ -153,10 +169,12 @@ namespace ix
void WebSocketTransport::setOnCloseCallback(const OnCloseCallback& onCloseCallback)
{
_onCloseCallback = onCloseCallback;
_onCloseCallback = onCloseCallback;
}
bool WebSocketTransport::exceedSendHeartBeatTimeOut()
// Only consider send time points for that computation.
// The receive time points is taken into account in Socket::poll (second parameter).
bool WebSocketTransport::heartBeatPeriodExceeded()
{
std::lock_guard<std::mutex> lock(_lastSendTimePointMutex);
auto now = std::chrono::steady_clock::now();
@ -171,45 +189,74 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
exceedSendHeartBeatTimeOut())
if (pollResult == PollResultType_Timeout &&
heartBeatPeriodExceeded())
{
std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str());
return;
}
while (true)
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType_SendRequest)
{
int N = (int) _rxbuf.size();
_rxbuf.resize(N + 1500);
ssize_t ret = _socket->recv((char*)&_rxbuf[0] + N, 1500);
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN)) {
_rxbuf.resize(N);
break;
}
else if (ret <= 0)
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
_rxbuf.resize(N);
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.resize(N + ret);
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(0, 10);
if (result == PollResultType_Error)
{
_socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType_ReadyForWrite)
{
sendOnSocket();
}
}
}
else if (pollResult == PollResultType_ReadyForRead)
{
while (true)
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (isSendBufferEmpty() && _readyState == CLOSING)
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
}
}
}
else if (pollResult == PollResultType_Error)
{
_socket->close();
}
else if (pollResult == PollResultType_CloseRequest)
{
_socket->close();
}
// Avoid a race condition where we get stuck in select
// while closing.
if (_readyState == CLOSING)
{
_socket->close();
setReadyState(CLOSED);
}
},
_heartBeatPeriod);
@ -280,7 +327,7 @@ namespace ix
//
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback)
{
while (true)
while (true)
{
wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */
@ -292,7 +339,7 @@ namespace ix
ws.N0 = (data[1] & 0x7f);
ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0);
if (_rxbuf.size() < ws.header_size) return; /* Need: ws.header_size - _rxbuf.size() */
//
// Calculate payload length:
// 0-125 mean the payload is that long.
@ -330,7 +377,7 @@ namespace ix
// invalid payload length according to the spec. bail out
return;
}
if (ws.mask)
{
ws.masking_key[0] = ((uint8_t) data[i+0]) << 0;
@ -353,22 +400,44 @@ namespace ix
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
unmaskReceiveBuffer(ws);
_receivedData.insert(_receivedData.end(),
_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed
if (ws.fin)
{
// fire callback with a string message
std::string stringMessage(_receivedData.begin(),
_receivedData.end());
emitMessage(MSG, stringMessage, ws, onMessageCallback);
_receivedData.clear();
//
// Usual case. Small unfragmented messages
//
if (ws.fin && _chunks.empty())
{
emitMessage(MSG,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
onMessageCallback);
}
else
{
//
// Add intermediary message to our chunk list.
// We use a chunk list instead of a big buffer because resizing
// large buffer can be very costly when we need to re-allocate
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin)
{
emitMessage(MSG, getMergedChunks(), ws, onMessageCallback);
_chunks.clear();
}
else
{
emitMessage(FRAGMENT, std::string(), ws, onMessageCallback);
}
}
}
else if (ws.opcode == wsheader_type::PING)
@ -418,12 +487,33 @@ namespace ix
close();
}
// Erase the message that has been processed from the input/read buffer
_rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N);
}
}
void WebSocketTransport::emitMessage(MessageKind messageKind,
std::string WebSocketTransport::getMergedChunks() const
{
size_t length = 0;
for (auto&& chunk : _chunks)
{
length += chunk.size();
}
std::string msg;
msg.reserve(length);
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
}
return msg;
}
void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
const OnMessageCallback& onMessageCallback)
@ -431,7 +521,7 @@ namespace ix
size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1)
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != FRAGMENT)
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
@ -446,15 +536,17 @@ namespace ix
unsigned WebSocketTransport::getRandomUnsigned()
{
auto now = std::chrono::system_clock::now();
auto seconds =
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
return static_cast<unsigned>(seconds);
}
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::string& message,
bool compress)
WebSocketSendInfo WebSocketTransport::sendData(
wsheader_type::opcode_type type,
const std::string& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
if (_readyState == CLOSING || _readyState == CLOSED)
{
@ -471,15 +563,87 @@ namespace ix
if (compress)
{
bool success = _perMessageDeflate.compress(message, compressedMessage);
compressionError = !success;
if (!_perMessageDeflate.compress(message, compressedMessage))
{
bool success = false;
compressionError = true;
payloadSize = 0;
wireSize = 0;
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
}
compressionError = false;
wireSize = compressedMessage.size();
message_begin = compressedMessage.begin();
message_end = compressedMessage.end();
}
uint64_t message_size = wireSize;
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)
{
sendFragment(type, true, message_begin, message_end, compress);
}
else
{
//
// Large messages need to be fragmented
//
// Rules:
// First message needs to specify a proper type (BINARY or TEXT)
// Intermediary and last messages need to be of type CONTINUATION
// Last message must set the fin byte.
//
auto steps = wireSize / kChunkSize;
std::string::const_iterator begin = message_begin;
std::string::const_iterator end = message_end;
for (uint64_t i = 0 ; i < steps; ++i)
{
bool firstStep = i == 0;
bool lastStep = (i+1) == steps;
bool fin = lastStep;
end = begin + kChunkSize;
if (lastStep)
{
end = message_end;
}
auto opcodeType = type;
if (!firstStep)
{
opcodeType = wsheader_type::CONTINUATION;
}
// Send message
sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback((int)i, (int) steps))
{
break;
}
begin += kChunkSize;
}
}
// Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty())
{
_socket->wakeUpFromPoll(Socket::kSendRequest);
}
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
void WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator message_begin,
std::string::const_iterator message_end,
bool compress)
{
auto message_size = message_end - message_begin;
unsigned x = getRandomUnsigned();
uint8_t masking_key[4] = {};
@ -492,7 +656,13 @@ namespace ix
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0);
header[0] = 0x80 | type;
header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
if (fin)
{
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed
if (compress)
@ -509,7 +679,7 @@ namespace ix
header[4] = masking_key[2];
header[5] = masking_key[3];
}
else if (message_size < 65536)
else if (message_size < 65536)
{
header[1] = 126 | 0x80;
header[2] = (message_size >> 8) & 0xff;
@ -544,8 +714,6 @@ namespace ix
// Now actually send this data
sendOnSocket();
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message)
@ -554,9 +722,13 @@ namespace ix
return sendData(wsheader_type::PING, message, compress);
}
WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message)
WebSocketSendInfo WebSocketTransport::sendBinary(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::BINARY_FRAME, message, _enablePerMessageDeflate);
return sendData(wsheader_type::BINARY_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket()
@ -567,7 +739,7 @@ namespace ix
{
ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
@ -607,8 +779,18 @@ namespace ix
sendData(wsheader_type::CLOSE, normalClosure, compress);
setReadyState(CLOSING);
_socket->wakeUpFromPoll();
_socket->wakeUpFromPoll(Socket::kCloseRequest);
_socket->close();
_closeCode = 1000;
_closeReason = "Normal Closure";
setReadyState(CLOSED);
}
size_t WebSocketTransport::bufferedAmount() const
{
std::lock_guard<std::mutex> lock(_txbufMutex);
return _txbuf.size();
}
} // namespace ix

View File

@ -16,6 +16,7 @@
#include <memory>
#include <mutex>
#include <atomic>
#include <list>
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketPerMessageDeflate.h"
@ -23,8 +24,9 @@
#include "IXWebSocketHttpHeaders.h"
#include "IXCancellationRequest.h"
#include "IXWebSocketHandshake.h"
#include "IXProgressCallback.h"
namespace ix
namespace ix
{
class Socket;
@ -43,7 +45,8 @@ namespace ix
{
MSG,
PING,
PONG
PONG,
FRAGMENT
};
using OnMessageCallback = std::function<void(const std::string&,
@ -66,17 +69,18 @@ namespace ix
int timeoutSecs);
void poll();
WebSocketSendInfo sendBinary(const std::string& message);
WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message);
void close();
ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
private:
std::string _url;
std::string _origin;
struct wsheader_type {
unsigned header_size;
@ -96,13 +100,31 @@ namespace ix
uint8_t masking_key[4];
};
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf;
// Contains all messages that were fetched in the last socket read.
// This could be a mix of control messages (Close, Ping, etc...) and
// data messages. That buffer
std::vector<uint8_t> _rxbuf;
// Contains all messages that are waiting to be sent
std::vector<uint8_t> _txbuf;
mutable std::mutex _txbufMutex;
std::vector<uint8_t> _receivedData;
// Hold fragments for multi-fragments messages in a list. We support receiving very large
// messages (tested messages up to 700M) and we cannot put them in a single
// buffer that is resized, as this operation can be slow when a buffer has its
// size increased 2 fold, while appending to a list has a fixed cost.
std::list<std::vector<uint8_t>> _chunks;
// Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15;
// Underlying TCP socket
std::shared_ptr<Socket> _socket;
// Hold the state of the connection (OPEN, CLOSED, etc...)
std::atomic<ReadyStateValues> _readyState;
OnCloseCallback _onCloseCallback;
@ -111,13 +133,14 @@ namespace ix
size_t _closeWireSize;
mutable std::mutex _closeDataMutex;
// Data used for Per Message Deflate compression (with zlib)
WebSocketPerMessageDeflate _perMessageDeflate;
WebSocketPerMessageDeflateOptions _perMessageDeflateOptions;
std::atomic<bool> _enablePerMessageDeflate;
// Used to cancel dns lookup + socket connect + http upgrade
std::atomic<bool> _requestInitCancellation;
// Optional Heartbeat
int _heartBeatPeriod;
static const int kDefaultHeartBeatPeriod;
@ -126,14 +149,21 @@ namespace ix
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period
bool exceedSendHeartBeatTimeOut();
bool heartBeatPeriodExceeded();
void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message,
bool compress);
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
void emitMessage(MessageKind messageKind,
void sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator begin,
std::string::const_iterator end,
bool compress);
void emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
const OnMessageCallback& onMessageCallback);
@ -148,5 +178,7 @@ namespace ix
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);
std::string getMergedChunks() const;
};
}

View File

@ -1,14 +1,23 @@
#
# This makefile is just used to easily work with docker (linux build)
#
all: run
all: brew
install: brew
brew:
mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker
docker:
docker build -t ws_connect:latest .
docker build -t ws:latest .
run: docker
docker run --cap-add sys_ptrace -it ws_connect:latest bash
run:
docker run --cap-add sys_ptrace -it ws:latest
# this is helpful to remove trailing whitespaces
trail:
sh third_party/remote_trailing_whitespaces.sh
build:
(cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make)
@ -24,11 +33,14 @@ test_server:
(cd test && npm i ws && node broadcast-server.js)
# env TEST=Websocket_server make test
# env TEST=websocket_server make test
# env TEST=Websocket_chat make test
# env TEST=heartbeat make test
test:
python test/run.py
ws_test: all
(cd ws ; sh test_ws.sh)
# For the fork that is configured with appveyor
rebase_upstream:
git fetch upstream
@ -36,5 +48,9 @@ rebase_upstream:
git reset --hard upstream/master
git push origin master --force
install_cmake_for_linux:
mkdir -p /tmp/cmake
(cd /tmp/cmake ; curl -L -O https://github.com/Kitware/CMake/releases/download/v3.14.0-rc4/cmake-3.14.0-rc4-Linux-x86_64.tar.gz ; tar zxf cmake-3.14.0-rc4-Linux-x86_64.tar.gz)
.PHONY: test
.PHONY: build

View File

@ -18,13 +18,14 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party/msgpack11
)
# Shared sources
set (SOURCES
test_runner.cpp
IXTest.cpp
msgpack11.cpp
../third_party/msgpack11/msgpack11.cpp
IXDNSLookupTest.cpp
IXSocketTest.cpp

View File

@ -5,19 +5,13 @@
*/
#include <iostream>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h>
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
#include "IXTest.h"
#include "IXTest.h"
#include "catch.hpp"
#include <string.h>
using namespace ix;
@ -39,16 +33,15 @@ namespace ix
Logger() << "errMsg: " << errMsg;
REQUIRE(success);
std::cout << "Sending request: " << request
<< "to " << host << ":" << port
<< std::endl;
Logger() << "Sending request: " << request
<< "to " << host << ":" << port;
REQUIRE(socket->writeBytes(request, isCancellationRequested));
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
std::cout << "read error: " << strerror(Socket::getErrno()) << std::endl;
Logger() << "read error: " << strerror(Socket::getErrno());
REQUIRE(lineValid);
@ -62,10 +55,18 @@ TEST_CASE("socket", "[socket]")
{
SECTION("Connect to google HTTP server. Send GET request without header. Should return 200")
{
std::shared_ptr<Socket> socket(new Socket);
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("www.google.com");
int port = 80;
std::string request("GET / HTTP/1.1\r\n\r\n");
std::stringstream ss;
ss << "GET / HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "\r\n";
std::string request(ss.str());
int expectedStatus = 200;
int timeoutSecs = 3;
@ -75,11 +76,9 @@ TEST_CASE("socket", "[socket]")
#if defined(__APPLE__) or defined(__linux__)
SECTION("Connect to google HTTPS server. Send GET request without header. Should return 200")
{
# ifdef __APPLE__
std::shared_ptr<Socket> socket = std::make_shared<SocketAppleSSL>();
# else
std::shared_ptr<Socket> socket = std::make_shared<SocketOpenSSL>();
# endif
std::string errMsg;
bool tls = true;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("www.google.com");
int port = 443;
std::string request("GET / HTTP/1.1\r\n\r\n");

View File

@ -6,6 +6,7 @@
#include "IXTest.h"
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXNetSystem.h>
#include <chrono>
#include <thread>
@ -14,12 +15,14 @@
#include <fstream>
#include <iostream>
#include <stdlib.h>
#include <stack>
namespace ix
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
std::mutex Logger::_mutex;
std::stack<int> freePorts;
void setupWebSocketTrafficTrackerCallback()
{
@ -54,7 +57,7 @@ namespace ix
std::string generateSessionId()
{
auto now = std::chrono::system_clock::now();
auto seconds =
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
@ -66,4 +69,83 @@ namespace ix
Logger() << msg;
}
int getAnyFreePortSimple()
{
static int defaultPort = 8090;
return defaultPort++;
}
int getAnyFreePort()
{
int defaultPort = 8090;
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
log("Cannot compute a free port. socket error.");
return defaultPort;
}
int enable = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
(char*) &enable, sizeof(enable)) < 0)
{
log("Cannot compute a free port. setsockopt error.");
return defaultPort;
}
// Bind to port 0. This is the standard way to get a free port.
struct sockaddr_in server; // server address information
server.sin_family = AF_INET;
server.sin_port = htons(0);
server.sin_addr.s_addr = inet_addr("127.0.0.1");
if (bind(sockfd, (struct sockaddr *)&server, sizeof(server)) < 0)
{
log("Cannot compute a free port. bind error.");
::close(sockfd);
return defaultPort;
}
struct sockaddr_in sa; // server address information
unsigned int len;
if (getsockname(sockfd, (struct sockaddr *) &sa, &len) < 0)
{
log("Cannot compute a free port. getsockname error.");
::close(sockfd);
return defaultPort;
}
int port = ntohs(sa.sin_port);
::close(sockfd);
return port;
}
int getFreePort()
{
while (true)
{
#if defined(__has_feature)
# if __has_feature(address_sanitizer)
int port = getAnyFreePortSimple();
# else
int port = getAnyFreePort();
# endif
#else
int port = getAnyFreePort();
#endif
//
// Only port above 1024 can be used by non root users, but for some
// reason I got port 7 returned with macOS when binding on port 0...
//
if (port > 1024)
{
return port;
}
}
return -1;
}
}

View File

@ -51,4 +51,7 @@ namespace ix
};
void log(const std::string& msg);
bool computeFreePorts(int count);
int getFreePort();
}

View File

@ -34,7 +34,7 @@ namespace
int _port;
};
WebSocketClient::WebSocketClient(int port)
WebSocketClient::WebSocketClient(int port)
: _port(port)
{
;
@ -56,7 +56,7 @@ namespace
{
std::stringstream ss;
ss << "ws://localhost:"
<< _port
<< _port
<< "/";
url = ss.str();
@ -64,7 +64,7 @@ namespace
_webSocket.setUrl(url);
// The important bit for this test.
// The important bit for this test.
// Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1);
@ -180,7 +180,7 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = 8093;
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));

View File

@ -8,6 +8,7 @@
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <ixwebsocket/IXSocketFactory.h>
#include "IXTest.h"
@ -75,21 +76,22 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
SECTION("Connect to the server, do not send anything. Should timeout and return 400")
{
int port = 8091;
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
Socket socket;
std::string host("localhost");
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
};
bool success = socket.connect(host, port, errMsg, isCancellationRequested);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
auto lineResult = socket.readLine(isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
@ -107,24 +109,25 @@ TEST_CASE("Websocket_server", "[websocket_server]")
SECTION("Connect to the server. Send GET request without header. Should return 400")
{
int port = 8092;
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
Socket socket;
std::string host("localhost");
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
};
bool success = socket.connect(host, port, errMsg, isCancellationRequested);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
Logger() << "writeBytes";
socket.writeBytes("GET /\r\n", isCancellationRequested);
socket->writeBytes("GET /\r\n", isCancellationRequested);
auto lineResult = socket.readLine(isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
@ -142,28 +145,29 @@ TEST_CASE("Websocket_server", "[websocket_server]")
SECTION("Connect to the server. Send GET request with correct header")
{
int port = 8093;
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
Socket socket;
std::string host("localhost");
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
};
bool success = socket.connect(host, port, errMsg, isCancellationRequested);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
socket.writeBytes("GET / HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Version: 13\r\n"
"Sec-WebSocket-Key: foobar\r\n"
"\r\n",
isCancellationRequested);
socket->writeBytes("GET / HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Version: 13\r\n"
"Sec-WebSocket-Key: foobar\r\n"
"\r\n",
isCancellationRequested);
auto lineResult = socket.readLine(isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;

View File

@ -4,9 +4,15 @@
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
//
// Simple chat program that talks to the node.js server at
// websocket_chat_server/broacast-server.js
//
#include <iostream>
#include <sstream>
#include <queue>
#include <vector>
#include <mutex>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include "msgpack11.hpp"
@ -24,7 +30,8 @@ namespace
{
public:
WebSocketChat(const std::string& user,
const std::string& session);
const std::string& session,
int port);
void subscribe(const std::string& channel);
void start();
@ -33,30 +40,49 @@ namespace
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
const std::vector<std::string>& getReceivedMessages() const;
std::string encodeMessage(const std::string& text);
std::pair<std::string, std::string> decodeMessage(const std::string& str);
void appendMessage(const std::string& message);
private:
std::string _user;
std::string _session;
int _port;
ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue;
std::vector<std::string> _receivedMessages;
mutable std::mutex _mutex;
};
WebSocketChat::WebSocketChat(const std::string& user,
const std::string& session) :
const std::string& session,
int port) :
_user(user),
_session(session)
_session(session),
_port(port)
{
;
}
size_t WebSocketChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages.size();
}
const std::vector<std::string>& WebSocketChat::getReceivedMessages() const
{
std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages;
}
void WebSocketChat::appendMessage(const std::string& message)
{
std::lock_guard<std::mutex> lock(_mutex);
_receivedMessages.push_back(message);
}
bool WebSocketChat::isReady() const
@ -71,7 +97,17 @@ namespace
void WebSocketChat::start()
{
std::string url("ws://localhost:8090/");
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:"
<< _port
<< "/"
<< _user;
url = ss.str();
}
_webSocket.setUrl(url);
std::stringstream ss;
@ -109,10 +145,16 @@ namespace
// as we do for the satori chat example.
// store text
_receivedQueue.push(result.second);
appendMessage(result.second);
ss << std::endl
<< result.first << " > " << result.second
std::string payload = result.second;
if (payload.size() > 2000)
{
payload = "<message too large>";
}
ss << std::endl
<< result.first << " > " << payload
<< std::endl
<< _user << " > ";
log(ss.str());
@ -122,10 +164,21 @@ namespace
ss << "cmd_websocket_chat: Error ! " << error.reason;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
else
{
// FIXME: missing ping/pong messages
ss << "Invalid ix::WebSocketMessageType";
ss << "Unexpected ix::WebSocketMessageType";
log(ss.str());
}
});
@ -226,8 +279,8 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
REQUIRE(startServer(server));
std::string session = ix::generateSessionId();
WebSocketChat chatA("jean", session);
WebSocketChat chatB("paul", session);
WebSocketChat chatA("jean", session, port);
WebSocketChat chatB("paul", session, port);
chatA.start();
chatB.start();
@ -251,15 +304,36 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
chatB.sendMessage("from B1");
chatB.sendMessage("from B2");
// Give us 1s for all messages to be received
ix::msleep(1000);
// Test large messages that needs to be broken into small fragments
size_t size = 1 * 1024 * 1024; // ~1Mb
std::string bigMessage(size, 'a');
chatB.sendMessage(bigMessage);
log("Sent all messages");
// Wait until all messages are received. 10s timeout
int attempts = 0;
while (chatA.getReceivedMessagesCount() != 3 ||
chatB.getReceivedMessagesCount() != 3)
{
REQUIRE(attempts++ < 10);
ix::msleep(1000);
}
chatA.stop();
chatB.stop();
REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatA.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessages()[0] == "from A1");
REQUIRE(chatB.getReceivedMessages()[1] == "from A2");
REQUIRE(chatB.getReceivedMessages()[2] == "from A3");
REQUIRE(chatA.getReceivedMessages()[0] == "from B1");
REQUIRE(chatA.getReceivedMessages()[1] == "from B2");
REQUIRE(chatA.getReceivedMessages()[2].size() == bigMessage.size());
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);

View File

@ -2,14 +2,47 @@ import os
import platform
import shutil
import subprocess
import threading
class Command(object):
"""Run system commands with timeout
From http://www.bo-yang.net/2016/12/01/python-run-command-with-timeout
Python3 might have a builtin way to do that.
"""
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run_command(self, capture = False):
self.process = subprocess.Popen(self.cmd, shell=True)
self.process.communicate()
def run(self, timeout = 5 * 60):
'''5 minutes default timeout'''
thread = threading.Thread(target=self.run_command, args=())
thread.start()
thread.join(timeout)
if thread.is_alive():
print 'Command timeout, kill it: ' + self.cmd
self.process.terminate()
thread.join()
return False, 255
else:
return True, self.process.returncode
osName = platform.system()
print('os name = {}'.format(osName))
root = os.path.dirname(os.path.realpath(__file__))
buildDir = os.path.join(root, 'build')
buildDir = os.path.join(root, 'build', osName)
if not os.path.exists(buildDir):
os.mkdir(buildDir)
os.makedirs(buildDir)
os.chdir(buildDir)
@ -20,7 +53,7 @@ if osName == 'Windows':
else:
generator = ''
make = 'make -j6'
testBinary ='./ixwebsocket_unittest'
testBinary ='./ixwebsocket_unittest'
sanitizersFlags = {
'asan': '-DSANITIZE_ADDRESS=On',
@ -34,7 +67,11 @@ if osName == 'Linux':
sanitizerFlags = sanitizersFlags[sanitizer]
cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ..'.format(generator, sanitizerFlags)
# if osName == 'Windows':
# os.environ['CC'] = 'clang-cl'
# os.environ['CXX'] = 'clang-cl'
cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ../..'.format(generator, sanitizerFlags)
print(cmakeCmd)
ret = os.system(cmakeCmd)
assert ret == 0, 'CMake failed, exiting'
@ -63,6 +100,7 @@ def findFiles(prefix):
# We need to copy the zlib DLL in the current work directory
shutil.copy(os.path.join(
'..',
'..',
'..',
'third_party',
@ -73,6 +111,9 @@ shutil.copy(os.path.join(
'bin',
'zlib.dll'), '.')
testCommand = '{} {}'.format(testBinary, os.getenv('TEST', ''))
ret = os.system(testCommand)
# lldb = "lldb --batch -o 'run' -k 'thread backtrace all' -k 'quit 1'"
lldb = "" # Disabled for now
testCommand = '{} {} {}'.format(lldb, testBinary, os.getenv('TEST', ''))
command = Command(testCommand)
timedout, ret = command.run()
assert ret == 0, 'Test command failed'

View File

@ -9,12 +9,8 @@
#include <ixwebsocket/IXSocket.h>
int main(int argc, char* argv[])
int main(int argc, char* argv[])
{
ix::Socket::init(); // for Windows
int result = Catch::Session().run(argc, argv);
ix::Socket::cleanup(); // for Windows
return result;
}

4641
third_party/cli11/CLI11.hpp vendored Normal file

File diff suppressed because it is too large Load Diff

20
third_party/homebrew_formula.rb vendored Normal file
View File

@ -0,0 +1,20 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

View File

@ -7,28 +7,28 @@
// //////////////////////////////////////////////////////////////////////
/*
The JsonCpp library's source code, including accompanying documentation,
The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following
conditions...
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and
The JsonCpp Authors, and is released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
Public Domain/MIT License conditions described here, as they choose.
The MIT License is about as close to Public Domain as a license can get, and is
described in clear, concise terms at:
http://en.wikipedia.org/wiki/MIT_License
The full text of the MIT License follows:
========================================================================

View File

@ -6,28 +6,28 @@
// //////////////////////////////////////////////////////////////////////
/*
The JsonCpp library's source code, including accompanying documentation,
The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following
conditions...
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and
The JsonCpp Authors, and is released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
Public Domain/MIT License conditions described here, as they choose.
The MIT License is about as close to Public Domain as a license can get, and is
described in clear, concise terms at:
http://en.wikipedia.org/wiki/MIT_License
The full text of the MIT License follows:
========================================================================
@ -1673,7 +1673,7 @@ public:
- `"rejectDupKeys": false or true`
- If true, `parse()` returns false when a key is duplicated within an object.
- `"allowSpecialFloats": false or true`
- If true, special float values (NaNs and infinities) are allowed
- If true, special float values (NaNs and infinities) are allowed
and their values are lossfree restorable.
You can examine 'settings_` yourself

View File

@ -6,28 +6,28 @@
// //////////////////////////////////////////////////////////////////////
/*
The JsonCpp library's source code, including accompanying documentation,
The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following
conditions...
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and
The JsonCpp Authors, and is released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
Public Domain/MIT License conditions described here, as they choose.
The MIT License is about as close to Public Domain as a license can get, and is
described in clear, concise terms at:
http://en.wikipedia.org/wiki/MIT_License
The full text of the MIT License follows:
========================================================================
@ -238,7 +238,7 @@ static inline void fixNumericLocaleInput(char* begin, char* end) {
#include <limits>
#if defined(_MSC_VER)
#if !defined(WINCE) && defined(__STDC_SECURE_LIB__) && _MSC_VER >= 1500 // VC++ 9.0 and above
#if !defined(WINCE) && defined(__STDC_SECURE_LIB__) && _MSC_VER >= 1500 // VC++ 9.0 and above
#define snprintf sprintf_s
#elif _MSC_VER >= 1900 // VC++ 14.0 and above
#define snprintf std::snprintf
@ -383,7 +383,7 @@ bool Reader::parse(const char* beginDoc,
bool Reader::readValue() {
// readValue() may call itself only if it calls readObject() or ReadArray().
// These methods execute nodes_.push() just before and nodes_.pop)() just after calling readValue().
// These methods execute nodes_.push() just before and nodes_.pop)() just after calling readValue().
// parse() executes one nodes_.push(), so > instead of >=.
if (nodes_.size() > stackLimit_g) throwRuntimeError("Exceeded stackLimit in readValue().");
@ -4215,7 +4215,7 @@ Value& Path::make(Value& root) const {
#endif
#endif
#if defined(__BORLANDC__)
#if defined(__BORLANDC__)
#include <float.h>
#define isfinite _finite
#define snprintf _snprintf
@ -5290,7 +5290,7 @@ StreamWriter* StreamWriterBuilder::newStreamWriter() const
JSONCPP_STRING cs_str = settings_["commentStyle"].asString();
bool eyc = settings_["enableYAMLCompatibility"].asBool();
bool dnp = settings_["dropNullPlaceholders"].asBool();
bool usf = settings_["useSpecialFloats"].asBool();
bool usf = settings_["useSpecialFloats"].asBool();
unsigned int pre = settings_["precision"].asUInt();
CommentStyle::Enum cs = CommentStyle::All;
if (cs_str == "All") {

Some files were not shown because too many files have changed in this diff Show More