Compare commits

..

78 Commits

Author SHA1 Message Date
5e1a4541bf play with cpp_redis 2019-03-29 11:42:45 -07:00
2e9c610ac9 Bump sleep time in test shell script 2019-03-29 09:36:56 -07:00
eb063ec60a (redis_subscribe) in verbose mode, received message gets printed with a 'received: ' header 2019-03-29 09:35:19 -07:00
37fb14646d Add clarification notice about third party modules 2019-03-29 09:34:17 -07:00
ae543518d3 offline version of remark-latest 2019-03-28 16:06:43 -07:00
c865d64608 redis conf slides 2019-03-28 14:17:19 -07:00
3004422cb6 slides 2019-03-27 16:27:52 -07:00
0c46a17443 add redis-conf slides 2019-03-27 15:53:55 -07:00
497373d976 ws redis command improvements + test script 2019-03-27 13:41:46 -07:00
91198aca0d (ws) redis_subscribe and redis_publish can take a password + display subscribe response 2019-03-26 09:33:22 -07:00
b17a5e5f0b update doc 2019-03-24 21:48:14 -07:00
3f0ef59f65 remove Formula folder
Homebrew stuff is at https://github.com/bsergean/homebrew-IXWebSocket
2019-03-24 21:43:38 -07:00
1e96edc293 (server) fix masking bug 2019-03-22 15:33:04 -07:00
0afb77393b can send TEXT message (we only support BINARY messages now) 2019-03-22 14:24:22 -07:00
7614b642bb unmasked code is broken 2019-03-22 14:24:15 -07:00
bc89580dfe remove printf + unittest fix 2019-03-22 09:56:28 -07:00
358ae13a88 (server) server should not mask data when sending to client (some python client libraries enforce that and assert) 2019-03-22 09:53:56 -07:00
ccf9dcba70 (server) HTTP response is malformed 2019-03-22 09:52:19 -07:00
94604fad61 minor cleanup 2019-03-21 13:51:25 -07:00
5c4cc7c50d HTTP/1.1 response should contains a reason (websocket server)
Fix compatibility problem with websockets python library, where the response does not contains a reason

File "/.../lib/python3.7/site-packages/websockets/http.py", line 126, in read_response
version, status_code, reason = status_line[:-2].split(b' ', 2)
ValueError: not enough values to unpack (expected 3, got 2)

The above exception was the direct cause of the following exception:

websockets.exceptions.InvalidMessage: Malformed HTTP message
2019-03-21 13:43:47 -07:00
9ed961ec06 cleanup, remove dead method 2019-03-21 10:06:59 -07:00
e6bd8cc8c4 (cmake) add a warning about 32/64 conversion problems. 2019-03-20 21:51:38 -07:00
ee25bd0f92 Feature/connection state (#25)
* (cmake) add a warning about 32/64 conversion problems.

* fix typo

* New connection state for server code + fix OpenSSL double init bug

* update README
2019-03-20 18:34:24 -07:00
e77b9176f3 Feature/redis (#23)
* Fix warning

* (cmake) add a warning about 32/64 conversion problems.

* simple redis clients

* can publish to redis

* redis subscribe

* display messages received per second

* verbose flag

* (cmake) use clang only compile option -Wshorten-64-to-32 when compiling with clang
2019-03-20 14:29:02 -07:00
afe8b966ad Fixed heartbeat typos (#22) 2019-03-19 21:31:43 -07:00
310724c961 make PollResultType an enum class 2019-03-19 09:29:57 -07:00
ceba8ae620 fix bug with isReadyToWrite 2019-03-18 22:05:04 -07:00
fead661ab7 workaround bug in Socket::isReadyToWrite 2019-03-18 20:37:33 -07:00
9c8c17f577 use milliseconds 2019-03-18 20:17:44 -07:00
a04f83930f ws / log subcommand name 2019-03-18 17:54:06 -07:00
c421d19800 disable sigpipe on osx when writing/reading into a dead pipe 2019-03-18 17:52:01 -07:00
521f02c90e edit homebrew install steps 2019-03-18 15:45:33 -07:00
c86b6074f2 add an install target 2019-03-18 15:11:08 -07:00
d5d1a2c5f4 no default parameters for isReadyToWrite and isReadyToRead 2019-03-18 14:31:21 -07:00
2a90e3f478 when trying to flush the send buffer, use select to wait until it is possible instead of using sleep to retry at a given frequency 2019-03-18 14:25:27 -07:00
1d49ba41ea Fix typo (#19) 2019-03-17 16:08:28 -07:00
e1de1f6682 remove unused gitmodule file 2019-03-17 10:38:48 -07:00
47ed5e4d4d remove unused folder 2019-03-17 10:38:19 -07:00
d77f6f5659 linux hangs when closing 2019-03-16 11:38:23 -07:00
05f0045d5d edit README 2019-03-16 11:32:46 -07:00
c4afb84f6e use pipe to abort select on Linux as well as macOS 2019-03-15 17:46:40 -07:00
b0b2f9b6d2 missing assert include on Linux 2019-03-15 11:43:27 -07:00
ee37feb489 cleanup 2019-03-15 11:41:57 -07:00
6b8337596f unittest fix 2019-03-14 18:58:16 -07:00
250665b92e linux compile fix 2019-03-14 18:55:33 -07:00
86b83c889e linux fixes 2019-03-14 18:54:47 -07:00
c9c657c07b build fix 2019-03-14 18:53:21 -07:00
4f2babaf54 select interrupt cleanup 2019-03-14 18:37:38 -07:00
1b03bf4555 linux build fix 2019-03-14 15:17:17 -07:00
977b995af9 replace uint8_t with uint64_t for the send/close requests types / use named variable to index into the _fildes array 2019-03-14 15:03:57 -07:00
310ab990bd set a default close reason string 2019-03-14 14:52:51 -07:00
d6b49b54d4 do not busy loop while sending 2019-03-14 14:48:08 -07:00
f00cf39462 remove docker folder 2019-03-14 14:48:02 -07:00
18550cf1cb send optimization + ws file transfer test 2019-03-14 14:47:53 -07:00
168918f807 Update README.md
Stop lying about Windows support ...
2019-03-13 23:10:40 -07:00
2750df8aa7 send can fail silently when sending would block (EWOULDBLOCK return for send) (#18)
* try to use a pipe for communication

* flush send buffer on the background thread

* cleanup

* linux fix / linux still use event fd for now

* cleanup
2019-03-13 23:09:45 -07:00
d6597d9f52 websocket send: make sure all data in the kernel buffer is sent 2019-03-11 22:16:55 -07:00
892ea375e3 add new message type when receiving message fragments 2019-03-11 11:12:43 -07:00
03abe77b5f ws broacast_server / can set serving hostname 2019-03-10 16:36:44 -07:00
e46eb8aa49 debian 9 unittest build fix 2019-03-10 16:07:48 -07:00
2c4862e0f1 asan test suite fix 2019-03-09 10:45:40 -08:00
fd69efa45c unittest + warning fix 2019-03-09 10:37:14 -08:00
e8aa15917f add ability to run with asan on macOS 2019-03-05 17:07:28 -08:00
b3d77f8902 fix compiler warnings in ws command line tool 2019-03-04 13:56:30 -08:00
9c3b0b08ec Socket code refactoring, plus stop polling with a 1s timeout in readBytes while we only want to poll with a 1ms timeout 2019-03-04 13:40:15 -08:00
fe7d94194c readBytes does not read bytes one by one but in chunks 2019-03-02 21:11:16 -08:00
d6c26d6aa8 create a blocking + cancellable Socket::readBytes method 2019-03-02 15:16:46 -08:00
8a74ddcd13 create a blocking + cancellable Socket::readBytes method 2019-03-02 11:01:51 -08:00
18e7189a07 more ws doc 2019-02-28 22:07:45 -08:00
785dd42c84 more ws doc 2019-02-28 22:03:48 -08:00
0cff5065d9 Feature/http (#16)
* add skeleton and broken http client code.

GET returns "Resource temporarily unavailable" errors...

* linux compile fix

* can GET some pages

* Update formatting in README.md

* unittest for sending large messages

* document bug

* Feature/send large message (#14)

* introduce send fragment

* can pass a fin frame

* can send messages which are a perfect multiple of the chunk size

* set fin only for last fragment

* cleanup

* last fragment should be of type CONTINUATION

* Add simple send and receive programs

* speedups receiving + better way to wait for thing

* receive speedup by using linked list of chunks instead of large array

* document bug

* use chunks to receive data

* trailing spaces

* Update README.md

Add note about message fragmentation.

* Feature/ws cli (#15)

* New command line tool for transfering files / still very beta.

* add readme

* use cli11 for argument parsing

* json -> msgpack

* stop using base64 and use binary which can be stored in message pack

* add target for building with homebrew

* all CMakeLists are referenced by the top level one

* add ws_chat and ws_connect sub commands to ws

* cleanup

* add echo and broadcast server as ws sub-commands

* add gitignore

* comments

* ping pong added to ws

* mv cobra_publisher under ws folder

* Update README.md

* linux build fix

* linux build fix

* move http_client to a ws sub-command

* simple HTTP post support (urlencode parameters)

* can specify extra headers

* chunk encoding / simple redirect support / -I option

* follow redirects is optional

* make README vim markdown plugin friendly

* cleanup argument parsing + add socket creation factory

* add missing file

* http gzip compression

* cleanup

* doc

* Feature/send large message (#14)

* introduce send fragment

* can pass a fin frame

* can send messages which are a perfect multiple of the chunk size

* set fin only for last fragment

* cleanup

* last fragment should be of type CONTINUATION

* Add simple send and receive programs

* speedups receiving + better way to wait for thing

* receive speedup by using linked list of chunks instead of large array

* document bug

* use chunks to receive data

* trailing spaces
2019-02-28 21:54:03 -08:00
e881b82511 Update README.md 2019-02-22 21:53:29 -08:00
d5551e5d68 mv cobra_publisher under ws folder 2019-02-22 21:51:03 -08:00
e8583000b8 ping pong added to ws 2019-02-22 21:47:57 -08:00
d642ef1a89 comments 2019-02-22 21:27:49 -08:00
2df118022d add gitignore 2019-02-22 21:26:25 -08:00
95457c8f4c add echo and broadcast server as ws sub-commands 2019-02-22 21:25:56 -08:00
0a45b7787f cleanup 2019-02-22 20:51:22 -08:00
137 changed files with 4788 additions and 1195 deletions

View File

@ -1,2 +1 @@
venv
build

0
.gitmodules vendored
View File

View File

@ -10,15 +10,20 @@ set (CMAKE_CXX_STANDARD 14)
set (CXX_STANDARD_REQUIRED ON)
set (CMAKE_CXX_EXTENSIONS OFF)
# -Wshorten-64-to-32 does not work with clang
if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif()
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshorten-64-to-32")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSocketFactory.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXWebSocket.cpp
@ -28,13 +33,20 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
ixwebsocket/IXWebSocketHttpHeaders.cpp
ixwebsocket/IXHttpClient.cpp
ixwebsocket/IXUrlParser.cpp
ixwebsocket/IXSelectInterrupt.cpp
ixwebsocket/IXSelectInterruptPipe.cpp
ixwebsocket/IXSelectInterruptFactory.cpp
ixwebsocket/IXConnectionState.cpp
)
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSocketFactory.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h
@ -50,6 +62,12 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/libwshandshake.hpp
ixwebsocket/IXHttpClient.h
ixwebsocket/IXUrlParser.h
ixwebsocket/IXSelectInterrupt.h
ixwebsocket/IXSelectInterruptPipe.h
ixwebsocket/IXSelectInterruptFactory.h
ixwebsocket/IXConnectionState.h
)
# Platform specific code
@ -59,6 +77,8 @@ elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptEventFd.cpp)
list( APPEND IXWEBSOCKET_HEADERS ixwebsocket/IXSelectInterruptEventFd.h)
endif()
if (USE_TLS)
@ -115,3 +135,4 @@ set( IXWEBSOCKET_INCLUDE_DIRS
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)
add_subdirectory(third_party/cpp_redis)

1
DOCKER_VERSION Normal file
View File

@ -0,0 +1 @@
1.3.2

View File

@ -1 +1 @@
docker/Dockerfile.debian
Dockerfile.dev

View File

@ -12,11 +12,20 @@ RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
WORKDIR ws
RUN ["sh", "docker_build.sh"]
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "8765"]
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

30
Dockerfile.prod Normal file
View File

@ -0,0 +1,30 @@
FROM debian:buster
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN adduser app
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make"]
# Now run in usermode
USER app
EXPOSE 8765
CMD ["bash"]

116
README.md
View File

@ -4,18 +4,18 @@
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex and bi-directionnal communication channels over a single TCP connection. *IXWebSocket* is a C++ library for client and server Websocket communication, and for client HTTP communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient) and from the [Satori C SDK](https://github.com/satori-com/satori-rtm-sdk-c). It has been tested on the following platforms.
* macOS
* iOS
* Linux
* Android
* Windows (no TLS support yet)
* Android
The code was made to compile once on Windows but support is currently broken on this platform.
## Examples
The ws folder countains many interactive programs for chat and file transfers demonstrating client and server usage.
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage.
Here is what the client API looks like.
@ -25,7 +25,7 @@ ix::WebSocket webSocket;
std::string url("ws://localhost:8080/");
webSocket.setUrl(url);
// Optional heart beat, sent every 45 seconds when there isn't any traffic
// Optional heart beat, sent every 45 seconds when there is not any traffic
// to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45);
@ -47,9 +47,12 @@ webSocket.setOnMessageCallback(
// Now that our callback is setup, we can start our background thread and receive messages
webSocket.start();
// Send a message to the server
// Send a message to the server (default to BINARY mode)
webSocket.send("hello world");
// The message can be sent in TEXT mode
webSocket.sendText("hello again");
// ... finally ...
// Stop the connection
@ -64,10 +67,11 @@ Here is what the server API looks like. Note that server support is very recent
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -77,7 +81,16 @@ server.setOnConnectionCallback(
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
// A connection state object is available, and has a default id
// You can subclass ConnectionState and pass an alternate factory
// to override it. It is useful if you want to store custom
// attributes per connection (authenticated bool flag, attributes, etc...)
std::cerr << "id: " << connectionState->getId() << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
@ -110,12 +123,81 @@ server.wait();
```
Here is what the HTTP client API looks like. Note that HTTP client support is very recent and subject to changes.
```
//
// Preparation
//
HttpClient httpClient;
HttpRequestArgs args;
// Custom headers can be set
WebSocketHttpHeaders headers;
headers["Foo"] = "bar";
args.extraHeaders = headers;
// Timeout options
args.connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout;
// Redirect options
args.followRedirects = followRedirects;
args.maxRedirects = maxRedirects;
// Misc
args.compress = compress; // Enable gzip compression
args.verbose = verbose;
args.logger = [](const std::string& msg)
{
std::cout << msg;
};
//
// Request
//
HttpResponse out;
std::string url = "https://www.google.com";
// HEAD request
out = httpClient.head(url, args);
// GET request
out = httpClient.get(url, args);
// POST request with parameters
HttpParameters httpParameters;
httpParameters["foo"] = "bar";
out = httpClient.post(url, httpParameters, args);
// POST request with a body
out = httpClient.post(url, std::string("foo=bar"), args);
//
// Result
//
auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
```
## Build
CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project.
There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`.
You can build and install the ws command line tool with Homebrew.
```
brew tap bsergean/IXWebSocket
brew install IXWebSocket
```
## Implementation details
### Per Message Deflate compression.
@ -136,29 +218,29 @@ If the remote end (server) breaks the connection, the code will try to perpetual
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 1G were sent and received succesfully.
## Limitations
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* No utf-8 validation is made when sending TEXT message with sendText()
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This is not as scalable as strategies using epoll or kqueue.
## C++ code organization
Here's a simplistic diagram which explains how the code is structured in term of class/modules.
Here is a simplistic diagram which explains how the code is structured in term of class/modules.
```
+-----------------------+ --- Public
| | Start the receiving Background thread. Auto reconnection. Simple websocket Ping.
| IXWebSocket | Interface used by C++ test clients. No IX dependencies.
| |
| |
+-----------------------+
| |
| IXWebSocketServer | Run a server and give each connections its own WebSocket object.
| | Each connection is handled in a new OS thread.
| |
+-----------------------+ --- Private
+-----------------------+ --- Private
| |
| IXWebSocketTransport | Low level websocket code, framing, managing raw socket. Adapted from easywsclient.
| |
@ -198,7 +280,7 @@ If the connection was closed and sending failed, the return value will be set to
1. WebSocket_ReadyState_Connecting - The connection is not yet open.
2. WebSocket_ReadyState_Open - The connection is open and ready to communicate.
3. WebSocket_ReadyState_Closing - The connection is in the process of closing.
4. WebSocket_MessageType_Close - The connection is closed or couldn't be opened.
4. WebSocket_MessageType_Close - The connection is closed or could not be opened.
### Open and Close notifications
@ -308,7 +390,7 @@ websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes l
### Heartbeat.
You can configure an optional heart beat / keep-alive, sent every 45 seconds
when there isn't any traffic to make sure that load balancers do not kill an
when there is no any traffic to make sure that load balancers do not kill an
idle connection.
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

View File

@ -0,0 +1,2 @@
all:
(cd .. ; make docker && make docker_push)

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

BIN
doc/redis_conf_2019/neo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 168 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 673 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

21
docker-compose.yml Normal file
View File

@ -0,0 +1,21 @@
version: "3"
services:
ws:
stdin_open: true
tty: true
image: bsergean/ws:build
ports:
- "8765:8765"
entrypoint: bash
networks:
- ws-net
depends_on:
- redis1
redis1:
image: redis:alpine
networks:
- ws-net
networks:
ws-net:

View File

@ -1,16 +0,0 @@
FROM debian:stretch
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

View File

@ -1,11 +0,0 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

View File

@ -1,11 +0,0 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

View File

@ -1,8 +0,0 @@
FROM gcc:8
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

View File

@ -1,7 +0,0 @@
add_subdirectory(broadcast_server)
add_subdirectory(ping_pong)
add_subdirectory(chat)
add_subdirectory(echo_server)
add_subdirectory(ws_connect)
# add_subdirectory(cobra_publisher)

View File

@ -1,9 +0,0 @@
CMakeCache.txt
package-lock.json
CMakeFiles
ixwebsocket_unittest
cmake_install.cmake
node_modules
ixwebsocket
Makefile
build

View File

@ -1,28 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (broadcast_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
include_directories(broadcast_server .)
add_executable(broadcast_server
broadcast_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(broadcast_server "-framework foundation" "-framework security")
endif()
target_link_libraries(broadcast_server ixwebsocket)
install(TARGETS broadcast_server DESTINATION bin)

View File

@ -1,75 +0,0 @@
/*
* broadcast_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,3 +0,0 @@
build
venv
node_modules

View File

@ -1,21 +0,0 @@
#
# cmd_websocket_chat.cpp
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (cmd_websocket_chat)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_executable(cmd_websocket_chat cmd_websocket_chat.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(cmd_websocket_chat "-framework foundation" "-framework security")
endif()
target_link_libraries(cmd_websocket_chat ixwebsocket)
install(TARGETS cmd_websocket_chat DESTINATION bin)

View File

@ -1,39 +0,0 @@
# Building
1. cmake -G .
2. make
## Disable TLS
chat$ cmake -DUSE_TLS=OFF .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
chat$ make
Scanning dependencies of target ixwebsocket
[ 16%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 33%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 50%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 66%] Linking CXX static library libixwebsocket.a
[ 66%] Built target ixwebsocket
[ 83%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
## Enable TLS (default)
```
chat$ cmake -DUSE_TLS=ON .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
(venv) chat$ make
Scanning dependencies of target ixwebsocket
[ 14%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 28%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 42%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 57%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocketAppleSSL.cpp.o
[ 71%] Linking CXX static library libixwebsocket.a
[ 71%] Built target ixwebsocket
[ 85%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
```

View File

@ -1,15 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -1,17 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
clang++ --std=c++11 --stdlib=libc++ \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXSocketAppleSSL.cpp \
../../ixwebsocket/IXWebSocket.cpp \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat \
-framework Security \
-framework Foundation

View File

@ -1,31 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"safe-buffer": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
},
"ultron": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ultron/-/ultron-1.1.1.tgz",
"integrity": "sha512-UIEXBNeYmKptWH6z8ZnqTeS8fV74zG0/eRU9VGkpzz+LIJNs8W/zM/L+7ctCkRrgbNnnR0xxw4bKOr0cW0N0Og=="
},
"ws": {
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz",
"integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==",
"requires": {
"async-limiter": "1.0.0",
"safe-buffer": "5.1.2",
"ultron": "1.1.1"
}
}
}
}

View File

@ -1,6 +0,0 @@
{
"dependencies": {
"msgpack-js": "^0.3.0",
"ws": "^3.3.3"
}
}

View File

@ -1,28 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (echo_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
include_directories(echo_server .)
add_executable(echo_server
echo_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(echo_server "-framework foundation" "-framework security")
endif()
target_link_libraries(echo_server ixwebsocket)
install(TARGETS echo_server DESTINATION bin)

View File

@ -1,68 +0,0 @@
/*
* echo_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
webSocket->send(str);
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,25 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ping_pong)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_executable(ping_pong ping_pong.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ping_pong "-framework foundation" "-framework security")
endif()
if (WIN32)
target_link_libraries(ping_pong wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
endif()
target_link_libraries(ping_pong ixwebsocket)
install(TARGETS ping_pong DESTINATION bin)

View File

@ -1,15 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -1,17 +0,0 @@
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
response = await websocket.recv()
print(response)
pong_waiter = await websocket.ping('coucou')
ret = await pong_waiter # only if you want to wait for the pong
print(ret)
asyncio.get_event_loop().run_until_complete(
hello('ws://localhost:5678'))

View File

@ -1,21 +0,0 @@
#!/usr/bin/env python
import os
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(message)
await websocket.send(message)
if os.getenv('TEST_CLOSE'):
print('Closing')
# breakpoint()
await websocket.close(1001, 'close message')
# await websocket.close()
break
asyncio.get_event_loop().run_until_complete(
websockets.serve(echo, 'localhost', 5678))
asyncio.get_event_loop().run_forever()

View File

@ -1,9 +0,0 @@
#!/bin/sh
test -d build || {
mkdir -p build
cd build
cmake ..
}
(cd build ; make)
./build/ping_pong ws://localhost:5678

View File

@ -1,3 +0,0 @@
build
venv
node_modules

View File

@ -1,20 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ws_connect)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_executable(ws_connect ws_connect.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws_connect "-framework foundation" "-framework security")
endif()
target_link_libraries(ws_connect ixwebsocket)
install(TARGETS ws_connect DESTINATION bin)

View File

@ -1,11 +0,0 @@
# Building
1. mkdir build
2. cd build
3. cmake ..
4. make
## Disable TLS
* Enable: `cmake -DUSE_TLS=OFF ..`
* Disable: `cmake -DUSE_TLS=ON ..`

View File

@ -1,25 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
-DIXWEBSOCKET_USE_TLS \
-g \
../../ixwebsocket/IXEventFd.cpp \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXSetThreadName.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
../../ixwebsocket/IXDNSLookup.cpp \
../../ixwebsocket/IXSocketConnect.cpp \
../../ixwebsocket/IXSocketOpenSSL.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
-I ../.. \
ws_connect.cpp \
-o ws_connect \
-lcrypto -lssl -lz -lpthread

View File

@ -0,0 +1,33 @@
/*
* IXConnectionState.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXConnectionState.h"
namespace ix
{
std::atomic<uint64_t> ConnectionState::_globalId(0);
ConnectionState::ConnectionState()
{
computeId();
}
void ConnectionState::computeId()
{
_id = std::to_string(_globalId++);
}
const std::string& ConnectionState::getId() const
{
return _id;
}
std::shared_ptr<ConnectionState> ConnectionState::createConnectionState()
{
return std::make_shared<ConnectionState>();
}
}

View File

@ -0,0 +1,33 @@
/*
* IXConnectionState.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
#include <atomic>
#include <memory>
namespace ix
{
class ConnectionState {
public:
ConnectionState();
virtual ~ConnectionState() = default;
virtual void computeId();
virtual const std::string& getId() const;
static std::shared_ptr<ConnectionState> createConnectionState();
protected:
std::string _id;
static std::atomic<uint64_t> _globalId;
};
}

View File

@ -73,7 +73,7 @@ namespace ix
errMsg = "no error";
// Maybe a cancellation request got in before the background thread terminated ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
@ -121,7 +121,7 @@ namespace ix
}
// Were we cancelled ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;
@ -129,7 +129,7 @@ namespace ix
}
// Maybe a cancellation request got in before the bg terminated ?
if (isCancellationRequested())
if (isCancellationRequested && isCancellationRequested())
{
errMsg = "cancellation requested";
return nullptr;

View File

@ -1,82 +0,0 @@
/*
* IXEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
#include "IXEventFd.h"
#ifdef __linux__
# include <sys/eventfd.h>
#endif
#ifndef _WIN32
#include <unistd.h> // for write
#endif
namespace ix
{
EventFd::EventFd() :
_eventfd(-1)
{
#ifdef __linux__
_eventfd = eventfd(0, 0);
#endif
}
EventFd::~EventFd()
{
#ifdef __linux__
::close(_eventfd);
#endif
}
bool EventFd::notify()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// select will wake up when a non-zero value is written to our eventfd
uint64_t value = 1;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
bool EventFd::clear()
{
#if defined(__linux__)
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
#else
return true;
#endif
}
int EventFd::getFd()
{
return _eventfd;
}
}

View File

@ -1,23 +0,0 @@
/*
* IXEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
namespace ix
{
class EventFd {
public:
EventFd();
virtual ~EventFd();
bool notify();
bool clear();
int getFd();
private:
int _eventfd;
};
}

View File

@ -0,0 +1,467 @@
/*
* IXHttpClient.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXHttpClient.h"
#include "IXUrlParser.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXSocketFactory.h"
#include <sstream>
#include <iomanip>
#include <vector>
#include <cstring>
#include <zlib.h>
namespace ix
{
const std::string HttpClient::kPost = "POST";
const std::string HttpClient::kGet = "GET";
const std::string HttpClient::kHead = "HEAD";
HttpClient::HttpClient()
{
}
HttpClient::~HttpClient()
{
}
HttpResponse HttpClient::request(
const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects)
{
uint64_t uploadSize = 0;
uint64_t downloadSize = 0;
int code = 0;
WebSocketHttpHeaders headers;
std::string payload;
std::string protocol, host, path, query;
int port;
bool websocket = false;
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket))
{
std::stringstream ss;
ss << "Cannot parse url: " << url;
return std::make_tuple(code, HttpErrorCode_UrlMalformed,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
bool tls = protocol == "https";
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
return std::make_tuple(code, HttpErrorCode_CannotCreateSocket,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Build request string
std::stringstream ss;
ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "User-Agent: ixwebsocket/1.0.0" << "\r\n";
ss << "Accept: */*" << "\r\n";
if (args.compress)
{
ss << "Accept-Encoding: gzip" << "\r\n";
}
// Append extra headers
for (auto&& it : args.extraHeaders)
{
ss << it.first << ": " << it.second << "\r\n";
}
if (verb == kPost)
{
ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end())
{
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
}
ss << "\r\n";
ss << body;
}
else
{
ss << "\r\n";
}
std::string req(ss.str());
std::string errMsg;
std::atomic<bool> requestInitCancellation(false);
// Make a cancellation object dealing with connection timeout
auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success)
{
std::stringstream ss;
ss << "Cannot connect to url: " << url;
return std::make_tuple(code, HttpErrorCode_CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Make a new cancellation object dealing with transfer timeout
isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation);
if (args.verbose)
{
std::stringstream ss;
ss << "Sending " << verb << " request "
<< "to " << host << ":" << port << std::endl
<< "request size: " << req.size() << " bytes" << std::endl
<< "=============" << std::endl
<< req
<< "=============" << std::endl
<< std::endl;
log(ss.str(), args);
}
if (!_socket->writeBytes(req, isCancellationRequested))
{
std::string errorMsg("Cannot send request");
return std::make_tuple(code, HttpErrorCode_SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uploadSize = req.size();
auto lineResult = _socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
if (!lineValid)
{
std::string errorMsg("Cannot retrieve status line");
return std::make_tuple(code, HttpErrorCode_CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (args.verbose)
{
std::stringstream ss;
ss << "Status line " << line;
log(ss.str(), args);
}
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{
std::string errorMsg("Cannot parse response code from status line");
return std::make_tuple(code, HttpErrorCode_MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
headers = result.second;
if (!headersValid)
{
std::string errorMsg("Cannot parse http headers");
return std::make_tuple(code, HttpErrorCode_HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects)
{
if (headers.find("Location") == headers.end())
{
std::string errorMsg("Missing location header for redirect");
return std::make_tuple(code, HttpErrorCode_MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (redirects >= args.maxRedirects)
{
std::stringstream ss;
ss << "Too many redirects: " << redirects;
return std::make_tuple(code, HttpErrorCode_TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Recurse
std::string location = headers["Location"];
return request(location, verb, body, args, redirects+1);
}
if (verb == "HEAD")
{
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
// Parse response:
if (headers.find("Content-Length") != headers.end())
{
ssize_t contentLength = -1;
ss.str("");
ss << headers["Content-Length"];
ss >> contentLength;
payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
}
else if (headers.find("Transfer-Encoding") != headers.end() &&
headers["Transfer-Encoding"] == "chunked")
{
std::stringstream ss;
while (true)
{
lineResult = _socket->readLine(isCancellationRequested);
line = lineResult.second;
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uint64_t chunkSize;
ss.str("");
ss << std::hex << line;
ss >> chunkSize;
if (args.verbose)
{
std::stringstream oss;
oss << "Reading " << chunkSize << " bytes"
<< std::endl;
log(oss.str(), args);
}
payload.reserve(payload.size() + chunkSize);
// Read a chunk
auto chunkResult = _socket->readBytes(chunkSize,
args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
// Read the line that terminates the chunk (\r\n)
lineResult = _socket->readLine(isCancellationRequested);
if (!lineResult.first)
{
return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (chunkSize == 0) break;
}
}
else if (code == 204)
{
; // 204 is NoContent response code
}
else
{
std::string errorMsg("Cannot read http body");
return std::make_tuple(code, HttpErrorCode_CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
downloadSize = payload.size();
// If the content was compressed with gzip, decode it
if (headers["Content-Encoding"] == "gzip")
{
std::string decompressedPayload;
if (!gzipInflate(payload, decompressedPayload))
{
std::string errorMsg("Error decompressing payload");
return std::make_tuple(code, HttpErrorCode_Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload = decompressedPayload;
}
return std::make_tuple(code, HttpErrorCode_Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
HttpResponse HttpClient::get(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kGet, std::string(), args);
}
HttpResponse HttpClient::head(const std::string& url,
const HttpRequestArgs& args)
{
return request(url, kHead, std::string(), args);
}
HttpResponse HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
{
return request(url, kPost, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
{
return request(url, kPost, body, args);
}
std::string HttpClient::urlEncode(const std::string& value)
{
std::ostringstream escaped;
escaped.fill('0');
escaped << std::hex;
for (std::string::const_iterator i = value.begin(), n = value.end();
i != n; ++i)
{
std::string::value_type c = (*i);
// Keep alphanumeric and other accepted characters intact
if (isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~')
{
escaped << c;
continue;
}
// Any other characters are percent-encoded
escaped << std::uppercase;
escaped << '%' << std::setw(2) << int((unsigned char) c);
escaped << std::nouppercase;
}
return escaped.str();
}
std::string HttpClient::serializeHttpParameters(const HttpParameters& httpParameters)
{
std::stringstream ss;
size_t count = httpParameters.size();
size_t i = 0;
for (auto&& it : httpParameters)
{
ss << urlEncode(it.first)
<< "="
<< urlEncode(it.second);
if (i++ < (count-1))
{
ss << "&";
}
}
return ss.str();
}
bool HttpClient::gzipInflate(
const std::string& in,
std::string& out)
{
z_stream inflateState;
std::memset(&inflateState, 0, sizeof(inflateState));
inflateState.zalloc = Z_NULL;
inflateState.zfree = Z_NULL;
inflateState.opaque = Z_NULL;
inflateState.avail_in = 0;
inflateState.next_in = Z_NULL;
if (inflateInit2(&inflateState, 16+MAX_WBITS) != Z_OK)
{
return false;
}
inflateState.avail_in = (uInt) in.size();
inflateState.next_in = (unsigned char *)(const_cast<char *>(in.data()));
const int kBufferSize = 1 << 14;
std::unique_ptr<unsigned char[]> compressBuffer =
std::make_unique<unsigned char[]>(kBufferSize);
do
{
inflateState.avail_out = (uInt) kBufferSize;
inflateState.next_out = compressBuffer.get();
int ret = inflate(&inflateState, Z_SYNC_FLUSH);
if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
{
inflateEnd(&inflateState);
return false;
}
out.append(
reinterpret_cast<char *>(compressBuffer.get()),
kBufferSize - inflateState.avail_out
);
} while (inflateState.avail_out == 0);
inflateEnd(&inflateState);
return true;
}
void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args)
{
if (args.logger)
{
args.logger(msg);
}
}
}

107
ixwebsocket/IXHttpClient.h Normal file
View File

@ -0,0 +1,107 @@
/*
* IXHttpClient.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <algorithm>
#include <functional>
#include <mutex>
#include <atomic>
#include <tuple>
#include <memory>
#include <map>
#include "IXSocket.h"
#include "IXWebSocketHttpHeaders.h"
namespace ix
{
enum HttpErrorCode
{
HttpErrorCode_Ok = 0,
HttpErrorCode_CannotConnect = 1,
HttpErrorCode_Timeout = 2,
HttpErrorCode_Gzip = 3,
HttpErrorCode_UrlMalformed = 4,
HttpErrorCode_CannotCreateSocket = 5,
HttpErrorCode_SendError = 6,
HttpErrorCode_ReadError = 7,
HttpErrorCode_CannotReadStatusLine = 8,
HttpErrorCode_MissingStatus = 9,
HttpErrorCode_HeaderParsingError = 10,
HttpErrorCode_MissingLocation = 11,
HttpErrorCode_TooManyRedirects = 12,
HttpErrorCode_ChunkReadError = 13,
HttpErrorCode_CannotReadBody = 14
};
using HttpResponse = std::tuple<int, // status
HttpErrorCode, // error code
WebSocketHttpHeaders,
std::string, // payload
std::string, // error msg
uint64_t, // upload size
uint64_t>; // download size
using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>;
struct HttpRequestArgs
{
std::string url;
WebSocketHttpHeaders extraHeaders;
std::string body;
int connectTimeout;
int transferTimeout;
bool followRedirects;
int maxRedirects;
bool verbose;
bool compress;
Logger logger;
OnProgressCallback onProgressCallback;
};
class HttpClient {
public:
HttpClient();
~HttpClient();
HttpResponse get(const std::string& url,
const HttpRequestArgs& args);
HttpResponse head(const std::string& url,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
private:
HttpResponse request(const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
int redirects = 0);
std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value);
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(
const std::string& in,
std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost;
const static std::string kGet;
const static std::string kHead;
};
}

View File

@ -0,0 +1,46 @@
/*
* IXSelectInterrupt.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterrupt.h"
namespace ix
{
SelectInterrupt::SelectInterrupt()
{
;
}
SelectInterrupt::~SelectInterrupt()
{
;
}
bool SelectInterrupt::init(std::string& /*errorMsg*/)
{
return true;
}
bool SelectInterrupt::notify(uint64_t /*value*/)
{
return true;
}
uint64_t SelectInterrupt::read()
{
return 0;
}
bool SelectInterrupt::clear()
{
return true;
}
int SelectInterrupt::getFd() const
{
return -1;
}
}

View File

@ -0,0 +1,28 @@
/*
* IXSelectInterrupt.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterrupt {
public:
SelectInterrupt();
virtual ~SelectInterrupt();
virtual bool init(std::string& errorMsg);
virtual bool notify(uint64_t value);
virtual bool clear();
virtual uint64_t read();
virtual int getFd() const;
};
}

View File

@ -0,0 +1,116 @@
/*
* IXSelectInterruptEventFd.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On Linux we use eventd to wake up select.
//
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
// http://www.sourcexr.com/articles/2013/10/26/lightweight-inter-process-signaling-with-eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptEventFd.h"
#include <sys/eventfd.h>
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
SelectInterruptEventFd::SelectInterruptEventFd()
{
_eventfd = -1;
}
SelectInterruptEventFd::~SelectInterruptEventFd()
{
::close(_eventfd);
}
bool SelectInterruptEventFd::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_eventfd == -1);
_eventfd = eventfd(0, 0);
if (_eventfd < 0)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in eventfd()"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
if (fcntl(_eventfd, F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptEventFd::init() failed in fcntl() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_eventfd = -1;
return false;
}
return true;
}
bool SelectInterruptEventFd::notify(uint64_t value)
{
int fd = _eventfd;
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptEventFd::read()
{
int fd = _eventfd;
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptEventFd::clear()
{
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8;
}
int SelectInterruptEventFd::getFd() const
{
return _eventfd;
}
}

View File

@ -0,0 +1,32 @@
/*
* IXSelectInterruptEventFd.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptEventFd : public SelectInterrupt {
public:
SelectInterruptEventFd();
virtual ~SelectInterruptEventFd();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
int _eventfd;
};
}

View File

@ -0,0 +1,25 @@
/*
* IXSelectInterruptFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSelectInterruptFactory.h"
#if defined(__linux__) || defined(__APPLE__)
# include <ixwebsocket/IXSelectInterruptPipe.h>
#else
# include <ixwebsocket/IXSelectInterrupt.h>
#endif
namespace ix
{
std::shared_ptr<SelectInterrupt> createSelectInterrupt()
{
#if defined(__linux__) || defined(__APPLE__)
return std::make_shared<SelectInterruptPipe>();
#else
return std::make_shared<SelectInterrupt>();
#endif
}
}

View File

@ -0,0 +1,15 @@
/*
* IXSelectInterruptFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class SelectInterrupt;
std::shared_ptr<SelectInterrupt> createSelectInterrupt();
}

View File

@ -0,0 +1,138 @@
/*
* IXSelectInterruptPipe.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
//
// On macOS we use UNIX pipes to wake up select.
//
#include "IXSelectInterruptPipe.h"
#include <unistd.h> // for write
#include <string.h> // for strerror
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sstream>
namespace ix
{
// File descriptor at index 0 in _fildes is the read end of the pipe
// File descriptor at index 1 in _fildes is the write end of the pipe
const int SelectInterruptPipe::kPipeReadIndex = 0;
const int SelectInterruptPipe::kPipeWriteIndex = 1;
SelectInterruptPipe::SelectInterruptPipe()
{
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
SelectInterruptPipe::~SelectInterruptPipe()
{
::close(_fildes[kPipeReadIndex]);
::close(_fildes[kPipeWriteIndex]);
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
}
bool SelectInterruptPipe::init(std::string& errorMsg)
{
// calling init twice is a programming error
assert(_fildes[kPipeReadIndex] == -1);
assert(_fildes[kPipeWriteIndex] == -1);
if (pipe(_fildes) < 0)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in pipe() call"
<< " : " << strerror(errno);
errorMsg = ss.str();
return false;
}
if (fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., O_NONBLOCK) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#ifdef F_SETNOSIGPIPE
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(.... F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
if (fcntl(_fildes[kPipeWriteIndex], F_SETNOSIGPIPE, 1) == -1)
{
std::stringstream ss;
ss << "SelectInterruptPipe::init() failed in fcntl(..., F_SETNOSIGPIPE) call"
<< " : " << strerror(errno);
errorMsg = ss.str();
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
return false;
}
#endif
return true;
}
bool SelectInterruptPipe::notify(uint64_t value)
{
int fd = _fildes[kPipeWriteIndex];
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t
return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t SelectInterruptPipe::read()
{
int fd = _fildes[kPipeReadIndex];
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
}
bool SelectInterruptPipe::clear()
{
return true;
}
int SelectInterruptPipe::getFd() const
{
return _fildes[kPipeReadIndex];
}
}

View File

@ -0,0 +1,39 @@
/*
* IXSelectInterruptPipe.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSelectInterrupt.h"
#include <stdint.h>
#include <string>
namespace ix
{
class SelectInterruptPipe : public SelectInterrupt {
public:
SelectInterruptPipe();
virtual ~SelectInterruptPipe();
bool init(std::string& errorMsg) final;
bool notify(uint64_t value) final;
bool clear() final;
uint64_t read() final;
int getFd() const final;
private:
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
// Used to identify the read/write idx
static const int kPipeReadIndex;
static const int kPipeWriteIndex;
};
}

View File

@ -7,6 +7,8 @@
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXNetSystem.h"
#include "IXSelectInterrupt.h"
#include "IXSelectInterruptFactory.h"
#include <stdio.h>
#include <stdlib.h>
@ -23,11 +25,15 @@ namespace ix
{
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) :
_sockfd(fd)
_sockfd(fd),
_selectInterrupt(createSelectInterrupt())
{
;
}
Socket::~Socket()
@ -39,44 +45,93 @@ namespace ix
{
if (_sockfd == -1)
{
onPollCallback(PollResultType_Error);
if (onPollCallback) onPollCallback(PollResultType::Error);
return;
}
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs);
#ifdef __linux__
FD_SET(_eventfd.getFd(), &rfds);
#endif
if (onPollCallback) onPollCallback(pollResult);
}
PollResultType Socket::select(bool readyToRead, int timeoutMs)
{
fd_set rfds;
fd_set wfds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
fd_set* fds = (readyToRead) ? &rfds : & wfds;
FD_SET(_sockfd, fds);
// File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1)
{
FD_SET(interruptFd, fds);
}
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 0;
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = (timeoutMs < 1000) ? 0 : 1000 * (timeoutMs % 1000);
// Compute the highest fd.
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _eventfd.getFd());
int ret = select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
int nfds = (std::max)(sockfd, interruptFd);
PollResultType pollResult = PollResultType_ReadyForRead;
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType_Error;
pollResult = PollResultType::Error;
}
else if (ret == 0)
{
pollResult = PollResultType_Timeout;
pollResult = PollResultType::Timeout;
}
else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{
uint64_t value = _selectInterrupt->read();
if (value == kSendRequest)
{
pollResult = PollResultType::SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType::CloseRequest;
}
}
else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{
pollResult = PollResultType::ReadyForRead;
}
else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{
pollResult = PollResultType::ReadyForWrite;
}
onPollCallback(pollResult);
return pollResult;
}
void Socket::wakeUpFromPoll()
PollResultType Socket::isReadyToRead(int timeoutMs)
{
// this will wake up the thread blocked on select, only needed on Linux
_eventfd.notify();
bool readyToRead = true;
return select(readyToRead, timeoutMs);
}
PollResultType Socket::isReadyToWrite(int timeoutMs)
{
bool readyToRead = false;
return select(readyToRead, timeoutMs);
}
// Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{
return _selectInterrupt->notify(wakeUpCode);
}
bool Socket::connect(const std::string& host,
@ -86,7 +141,7 @@ namespace ix
{
std::lock_guard<std::mutex> lock(_socketMutex);
if (!_eventfd.clear()) return false;
if (!_selectInterrupt->clear()) return false;
_sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
return _sockfd != -1;
@ -145,69 +200,9 @@ namespace ix
#endif
}
bool Socket::init()
bool Socket::init(std::string& errorMsg)
{
#ifdef _WIN32
INT rc;
WSADATA wsaData;
rc = WSAStartup(MAKEWORD(2, 2), &wsaData);
return rc != 0;
#else
return true;
#endif
}
void Socket::cleanup()
{
#ifdef _WIN32
WSACleanup();
#endif
}
bool Socket::readByte(void* buffer,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
if (ret == 1)
{
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a timeout until something is written.
// This way we are not busy looping
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms timeout
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
return false;
}
continue;
}
// There was an error during the read, abort
else
{
return false;
}
}
return _selectInterrupt->init(errorMsg);
}
bool Socket::writeBytes(const std::string& str,
@ -215,7 +210,7 @@ namespace ix
{
while (true)
{
if (isCancellationRequested()) return false;
if (isCancellationRequested && isCancellationRequested()) return false;
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
@ -227,7 +222,7 @@ namespace ix
{
return ret == len;
}
// There is possibly something to be write, try again
// There is possibly something to be writen, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
@ -241,7 +236,42 @@ namespace ix
}
}
std::pair<bool, std::string> Socket::readLine(const CancellationRequest& isCancellationRequested)
bool Socket::readByte(void* buffer,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested && isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
if (ret == 1)
{
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(1) == PollResultType::Error)
{
return false;
}
}
// There was an error during the read, abort
else
{
return false;
}
}
}
std::pair<bool, std::string> Socket::readLine(
const CancellationRequest& isCancellationRequested)
{
char c;
std::string line;
@ -251,7 +281,8 @@ namespace ix
{
if (!readByte(&c, isCancellationRequested))
{
return std::make_pair(false, std::string());
// Return what we were able to read
return std::make_pair(false, line);
}
line += c;
@ -259,4 +290,52 @@ namespace ix
return std::make_pair(true, line);
}
std::pair<bool, std::string> Socket::readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested)
{
if (_readBuffer.empty())
{
_readBuffer.resize(kChunkSize);
}
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested && isCancellationRequested())
{
return std::make_pair(false, std::string());
}
size_t size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{
// Error
return std::make_pair(false, std::string());
}
else if (ret > 0)
{
output.insert(output.end(),
_readBuffer.begin(),
_readBuffer.begin() + ret);
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Wait with a 1ms timeout until the socket is ready to read.
// This way we are not busy looping
if (isReadyToRead(1) == PollResultType::Error)
{
return std::make_pair(false, std::string());
}
}
return std::make_pair(true, std::string(output.begin(),
output.end()));
}
}

View File

@ -10,22 +10,29 @@
#include <functional>
#include <mutex>
#include <atomic>
#include <vector>
#include <memory>
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#endif
#include "IXEventFd.h"
#include "IXCancellationRequest.h"
#include "IXProgressCallback.h"
namespace ix
{
enum PollResultType
class SelectInterrupt;
enum class PollResultType
{
PollResultType_ReadyForRead = 0,
PollResultType_Timeout = 1,
PollResultType_Error = 2
ReadyForRead = 0,
ReadyForWrite = 1,
Timeout = 2,
Error = 3,
SendRequest = 4,
CloseRequest = 5
};
class Socket {
@ -34,12 +41,17 @@ namespace ix
Socket(int fd = -1);
virtual ~Socket();
bool init(std::string& errorMsg);
void configure();
virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout);
virtual void wakeUpFromPoll();
// Functions to check whether there is activity on the socket
void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType isReadyToWrite(int timeoutMs);
PollResultType isReadyToRead(int timeoutMs);
// Virtual methods
virtual bool connect(const std::string& url,
@ -58,21 +70,36 @@ namespace ix
const CancellationRequest& isCancellationRequested);
bool writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested);
static int getErrno();
static bool init(); // Required on Windows to initialize WinSocket
static void cleanup(); // Required on Windows to cleanup WinSocket
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected:
void closeSocket(int fd);
std::atomic<int> _sockfd;
std::mutex _socketMutex;
EventFd _eventfd;
private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
std::shared_ptr<SelectInterrupt> _selectInterrupt;
};
}

View File

@ -66,7 +66,7 @@ namespace ix
for (;;)
{
if (isCancellationRequested()) // Must handle timeout as well
if (isCancellationRequested && isCancellationRequested()) // Must handle timeout as well
{
closeSocket(fd);
errMsg = "Cancelled";

View File

@ -0,0 +1,64 @@
/*
* IXSocketFactory.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocketFactory.h"
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
namespace ix
{
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket;
if (!tls)
{
socket = std::make_shared<Socket>();
}
else
{
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
socket = std::make_shared<SocketAppleSSL>();
# else
socket = std::make_shared<SocketOpenSSL>();
# endif
#else
errorMsg = "TLS support is not enabled on this platform.";
return nullptr;
#endif
}
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg)
{
errorMsg.clear();
std::shared_ptr<Socket> socket = std::make_shared<Socket>(fd);
if (!socket->init(errorMsg))
{
socket.reset();
}
return socket;
}
}

View File

@ -0,0 +1,20 @@
/*
* IXSocketFactory.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <memory>
namespace ix
{
class Socket;
std::shared_ptr<Socket> createSocket(bool tls,
std::string& errorMsg);
std::shared_ptr<Socket> createSocket(int fd,
std::string& errorMsg);
}

View File

@ -21,6 +21,7 @@
namespace ix
{
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
std::once_flag SocketOpenSSL::_openSSLInitFlag;
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr),

View File

@ -50,7 +50,7 @@ namespace ix
const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
std::once_flag _openSSLInitFlag;
static std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful;
};

View File

@ -29,7 +29,8 @@ namespace ix
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_stop(false)
_stop(false),
_connectionStateFactory(&ConnectionState::createConnectionState)
{
}
@ -145,6 +146,12 @@ namespace ix
::close(_serverFd);
}
void SocketServer::setConnectionStateFactory(
const ConnectionStateFactory& connectionStateFactory)
{
_connectionStateFactory = connectionStateFactory;
}
void SocketServer::run()
{
// Set the socket to non blocking mode, so that accept calls are not blocking
@ -214,6 +221,12 @@ namespace ix
continue;
}
std::shared_ptr<ConnectionState> connectionState;
if (_connectionStateFactory)
{
connectionState = _connectionStateFactory();
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
@ -221,7 +234,8 @@ namespace ix
f = std::async(std::launch::async,
&SocketServer::handleConnection,
this,
clientFd);
clientFd,
connectionState);
}
}
}

View File

@ -6,6 +6,8 @@
#pragma once
#include "IXConnectionState.h"
#include <utility> // pair
#include <string>
#include <set>
@ -20,6 +22,8 @@ namespace ix
{
class SocketServer {
public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
@ -27,6 +31,8 @@ namespace ix
virtual ~SocketServer();
virtual void stop();
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
@ -60,9 +66,13 @@ namespace ix
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
//
ConnectionStateFactory _connectionStateFactory;
// Methods
void run();
virtual void handleConnection(int fd) = 0;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) = 0;
virtual size_t getConnectedClientsCount() = 0;
};
}

104
ixwebsocket/IXUrlParser.cpp Normal file
View File

@ -0,0 +1,104 @@
/*
* IXUrlParser.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXUrlParser.h"
#include <iostream>
#include <sstream>
namespace ix
{
//
// The only difference between those 2 regex is the protocol
//
std::regex UrlParser::_httpRegex("(http|https)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::regex UrlParser::_webSocketRegex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
bool UrlParser::parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket)
{
std::cmatch what;
if (!regex_match(url.c_str(), what,
websocket ? _webSocketRegex : _httpRegex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws" || protocol == "http")
{
port = 80;
}
else if (protocol == "wss" || protocol == "https")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void UrlParser::printUrl(const std::string& url, bool websocket)
{
std::string protocol, host, path, query;
int port {0};
if (!parse(url, protocol, host, path, query, port, websocket))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
}

31
ixwebsocket/IXUrlParser.h Normal file
View File

@ -0,0 +1,31 @@
/*
* IXUrlParser.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <regex>
namespace ix
{
class UrlParser
{
public:
static bool parse(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port,
bool websocket);
static void printUrl(const std::string& url, bool websocket);
private:
static std::regex _httpRegex;
static std::regex _webSocketRegex;
};
}

View File

@ -79,10 +79,10 @@ namespace ix
return _perMessageDeflateOptions;
}
void WebSocket::setHeartBeatPeriod(int hearBeatPeriod)
void WebSocket::setHeartBeatPeriod(int heartBeatPeriod)
{
std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = hearBeatPeriod;
_heartBeatPeriod = heartBeatPeriod;
}
int WebSocket::getHeartBeatPeriod() const
@ -252,6 +252,11 @@ namespace ix
{
webSocketMessageType = WebSocket_MessageType_Pong;
} break;
case WebSocketTransport::FRAGMENT:
{
webSocketMessageType = WebSocket_MessageType_Fragment;
} break;
}
WebSocketErrorInfo webSocketErrorInfo;
@ -297,7 +302,13 @@ namespace ix
WebSocketSendInfo WebSocket::send(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, false, onProgressCallback);
return sendMessage(text, SendMessageKind::Binary, onProgressCallback);
}
WebSocketSendInfo WebSocket::sendText(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, SendMessageKind::Text, onProgressCallback);
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
@ -306,11 +317,11 @@ namespace ix
constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return WebSocketSendInfo(false);
return sendMessage(text, true);
return sendMessage(text, SendMessageKind::Ping);
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
bool ping,
SendMessageKind sendMessageKind,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
@ -327,13 +338,22 @@ namespace ix
std::lock_guard<std::mutex> lock(_writeMutex);
WebSocketSendInfo webSocketSendInfo;
if (ping)
switch (sendMessageKind)
{
webSocketSendInfo = _ws.sendPing(text);
}
else
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
case SendMessageKind::Text:
{
webSocketSendInfo = _ws.sendText(text, onProgressCallback);
} break;
case SendMessageKind::Binary:
{
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
} break;
case SendMessageKind::Ping:
{
webSocketSendInfo = _ws.sendPing(text);
} break;
}
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
@ -374,4 +394,9 @@ namespace ix
{
_automaticReconnection = false;
}
size_t WebSocket::bufferedAmount() const
{
return _ws.bufferedAmount();
}
}

View File

@ -39,7 +39,8 @@ namespace ix
WebSocket_MessageType_Close = 2,
WebSocket_MessageType_Error = 3,
WebSocket_MessageType_Ping = 4,
WebSocket_MessageType_Pong = 5
WebSocket_MessageType_Pong = 5,
WebSocket_MessageType_Fragment = 6
};
struct WebSocketOpenInfo
@ -88,7 +89,7 @@ namespace ix
void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int hearBeatPeriod);
void setHeartBeatPeriod(int heartBeatPeriod);
// Run asynchronously, by calling start and stop.
void start();
@ -100,6 +101,8 @@ namespace ix
WebSocketSendInfo send(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo sendText(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close();
@ -111,6 +114,7 @@ namespace ix
const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection();
void disableAutomaticReconnection();
@ -118,7 +122,7 @@ namespace ix
private:
WebSocketSendInfo sendMessage(const std::string& text,
bool ping,
SendMessageKind sendMessageKind,
const OnProgressCallback& callback = nullptr);
bool isConnected() const;

View File

@ -6,6 +6,7 @@
#include "IXWebSocketHandshake.h"
#include "IXSocketConnect.h"
#include "IXUrlParser.h"
#include "libwshandshake.hpp"
@ -32,90 +33,6 @@ namespace ix
}
bool WebSocketHandshake::parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port)
{
std::regex ex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::cmatch what;
if (!regex_match(url.c_str(), what, ex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws")
{
port = 80;
}
else if (protocol == "wss")
{
port = 443;
}
else
{
// Invalid protocol. Should be caught by regex check
// but this missing branch trigger cpplint linter.
return false;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void WebSocketHandshake::printUrl(const std::string& url)
{
std::string protocol, host, path, query;
int port {0};
if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
std::string WebSocketHandshake::trim(const std::string& str)
{
std::string out(str);
@ -192,67 +109,12 @@ namespace ix
return s;
}
std::pair<bool, WebSocketHttpHeaders> WebSocketHandshake::parseHttpHeaders(
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[256];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!_socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
// Make the name lower case.
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
WebSocketInitResult WebSocketHandshake::sendErrorResponse(int code, const std::string& reason)
{
std::stringstream ss;
ss << "HTTP/1.1 ";
ss << code;
ss << "\r\n";
ss << " ";
ss << reason;
ss << "\r\n";
@ -355,7 +217,7 @@ namespace ix
return WebSocketInitResult(false, status, ss.str());
}
auto result = parseHttpHeaders(isCancellationRequested);
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
auto headers = result.second;
@ -450,7 +312,7 @@ namespace ix
}
// Retrieve and validate HTTP headers
auto result = parseHttpHeaders(isCancellationRequested);
auto result = parseHttpHeaders(_socket, isCancellationRequested);
auto headersValid = result.first;
auto headers = result.second;
@ -491,7 +353,7 @@ namespace ix
WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"].c_str(), output);
std::stringstream ss;
ss << "HTTP/1.1 101\r\n";
ss << "HTTP/1.1 101 Switching Protocols\r\n";
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";

View File

@ -59,19 +59,10 @@ namespace ix
WebSocketInitResult serverHandshake(int fd,
int timeoutSecs);
static bool parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port);
private:
static void printUrl(const std::string& url);
std::string genRandomString(const int len);
// Parse HTTP headers
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(const CancellationRequest& isCancellationRequested);
WebSocketInitResult sendErrorResponse(int code, const std::string& reason);
std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line);

View File

@ -0,0 +1,66 @@
/*
* IXWebSocketHttpHeaders.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketHttpHeaders.h"
#include "IXSocket.h"
#include <string>
#include <unordered_map>
namespace ix
{
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested)
{
WebSocketHttpHeaders headers;
char line[1024];
int i;
while (true)
{
int colon = 0;
for (i = 0;
i < 2 || (i < 1023 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (!socket->readByte(line+i, isCancellationRequested))
{
return std::make_pair(false, headers);
}
if (line[i] == ':' && colon == 0)
{
colon = i;
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
// line is a single header entry. split by ':', and add it to our
// header map. ignore lines with no colon.
if (colon > 0)
{
line[i] = '\0';
std::string lineStr(line);
// colon is ':', colon+1 is ' ', colon+2 is the start of the value.
// i is end of string (\0), i-colon is length of string minus key;
// subtract 1 for '\0', 1 for '\n', 1 for '\r',
// 1 for the ' ' after the ':', and total is -4
std::string name(lineStr.substr(0, colon));
std::string value(lineStr.substr(colon + 2, i - colon - 4));
headers[name] = value;
}
}
return std::make_pair(true, headers);
}
}

View File

@ -6,10 +6,40 @@
#pragma once
#include "IXCancellationRequest.h"
#include <string>
#include <unordered_map>
#include <map>
#include <memory>
#include <algorithm>
namespace ix
{
using WebSocketHttpHeaders = std::unordered_map<std::string, std::string>;
class Socket;
struct CaseInsensitiveLess
{
// Case Insensitive compare_less binary function
struct NocaseCompare
{
bool operator() (const unsigned char& c1, const unsigned char& c2) const
{
return std::tolower(c1) < std::tolower(c2);
}
};
bool operator() (const std::string & s1, const std::string & s2) const
{
return std::lexicographical_compare
(s1.begin(), s1.end(), // source range
s2.begin(), s2.end(), // dest range
NocaseCompare()); // comparison
}
};
using WebSocketHttpHeaders = std::map<std::string, std::string, CaseInsensitiveLess>;
std::pair<bool, WebSocketHttpHeaders> parseHttpHeaders(
std::shared_ptr<Socket> socket,
const CancellationRequest& isCancellationRequested);
}

View File

@ -49,10 +49,12 @@ namespace ix
_onConnectionCallback = callback;
}
void WebSocketServer::handleConnection(int fd)
void WebSocketServer::handleConnection(
int fd,
std::shared_ptr<ConnectionState> connectionState)
{
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket);
_onConnectionCallback(webSocket, connectionState);
webSocket->disableAutomaticReconnection();

View File

@ -20,7 +20,8 @@
namespace ix
{
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>,
std::shared_ptr<ConnectionState>)>;
class WebSocketServer : public SocketServer {
public:
@ -49,7 +50,8 @@ namespace ix
const static int kDefaultHandShakeTimeoutSecs;
// Methods
virtual void handleConnection(int fd) final;
virtual void handleConnection(int fd,
std::shared_ptr<ConnectionState> connectionState) final;
virtual size_t getConnectedClientsCount() final;
};
}

View File

@ -1,7 +1,31 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2012, 2013 <dhbaird@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/*
* IXWebSocketTransport.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
//
@ -11,14 +35,8 @@
#include "IXWebSocketTransport.h"
#include "IXWebSocketHandshake.h"
#include "IXWebSocketHttpHeaders.h"
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
# include "IXSocketAppleSSL.h"
# else
# include "IXSocketOpenSSL.h"
# endif
#endif
#include "IXUrlParser.h"
#include "IXSocketFactory.h"
#include <string.h>
#include <stdlib.h>
@ -35,11 +53,12 @@
namespace ix
{
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::heartbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
_useMask(true),
_readyState(CLOSED),
_closeCode(0),
_closeWireSize(0),
@ -57,11 +76,11 @@ namespace ix
}
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod)
int heartBeatPeriod)
{
_perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = hearBeatPeriod;
_heartBeatPeriod = heartBeatPeriod;
}
// Client
@ -70,31 +89,21 @@ namespace ix
{
std::string protocol, host, path, query;
int port;
bool websocket = true;
if (!WebSocketHandshake::parseUrl(url, protocol, host,
path, query, port))
if (!UrlParser::parse(url, protocol, host, path, query, port, websocket))
{
return WebSocketInitResult(false, 0,
std::string("Could not parse URL ") + url);
}
if (protocol == "wss")
bool tls = protocol == "wss";
std::string errorMsg;
_socket = createSocket(tls, errorMsg);
if (!_socket)
{
_socket.reset();
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
_socket = std::make_shared<SocketAppleSSL>();
# else
_socket = std::make_shared<SocketOpenSSL>();
# endif
#else
return WebSocketInitResult(false, 0, "TLS is not supported.");
#endif
}
else
{
_socket.reset();
_socket = std::make_shared<Socket>();
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
@ -115,8 +124,16 @@ namespace ix
// Server
WebSocketInitResult WebSocketTransport::connectToSocket(int fd, int timeoutSecs)
{
_socket.reset();
_socket = std::make_shared<Socket>(fd);
// Server should not mask the data it sends to the client
_useMask = false;
std::string errorMsg;
_socket = createSocket(fd, errorMsg);
if (!_socket)
{
return WebSocketInitResult(false, 0, errorMsg);
}
WebSocketHandshake webSocketHandshake(_requestInitCancellation,
_socket,
@ -176,43 +193,75 @@ namespace ix
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
if (pollResult == PollResultType::Timeout &&
heartBeatPeriodExceeded())
{
std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str());
return;
}
while (true)
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType::SendRequest)
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10);
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
if (result == PollResultType::Error)
{
_socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
}
}
else if (pollResult == PollResultType::ReadyForRead)
{
while (true)
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (isSendBufferEmpty() && _readyState == CLOSING)
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
}
}
}
else if (pollResult == PollResultType::Error)
{
_socket->close();
}
else if (pollResult == PollResultType::CloseRequest)
{
_socket->close();
}
// Avoid a race condition where we get stuck in select
// while closing.
if (_readyState == CLOSING)
{
_socket->close();
setReadyState(CLOSED);
}
},
_heartBeatPeriod);
@ -235,19 +284,15 @@ namespace ix
_txbuf.insert(_txbuf.end(), header.begin(), header.end());
_txbuf.insert(_txbuf.end(), begin, end);
// Masking
for (size_t i = 0; i != (size_t) message_size; ++i)
if (_useMask)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
for (size_t i = 0; i != (size_t) message_size; ++i)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
}
}
}
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& buffer)
{
std::lock_guard<std::mutex> lock(_txbufMutex);
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
}
void WebSocketTransport::unmaskReceiveBuffer(const wsheader_type& ws)
{
if (ws.mask)
@ -390,6 +435,10 @@ namespace ix
emitMessage(MSG, getMergedChunks(), ws, onMessageCallback);
_chunks.clear();
}
else
{
emitMessage(FRAGMENT, std::string(), ws, onMessageCallback);
}
}
}
else if (ws.opcode == wsheader_type::PING)
@ -473,7 +522,7 @@ namespace ix
size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1)
if (_enablePerMessageDeflate && ws.rsv1 && messageKind != FRAGMENT)
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
@ -571,7 +620,7 @@ namespace ix
// Send message
sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback(i, steps))
if (onProgressCallback && !onProgressCallback((int)i, (int) steps))
{
break;
}
@ -580,6 +629,12 @@ namespace ix
}
}
// Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty())
{
_socket->wakeUpFromPoll(Socket::kSendRequest);
}
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
@ -601,7 +656,8 @@ namespace ix
std::vector<uint8_t> header;
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0);
(message_size >= 65536 ? 6 : 0) +
(_useMask ? 4 : 0), 0);
header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
@ -618,27 +674,33 @@ namespace ix
if (message_size < 126)
{
header[1] = (message_size & 0xff) | 0x80;
header[1] = (message_size & 0xff) | (_useMask ? 0x80 : 0);
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
if (_useMask)
{
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
}
}
else if (message_size < 65536)
{
header[1] = 126 | 0x80;
header[1] = 126 | (_useMask ? 0x80 : 0);
header[2] = (message_size >> 8) & 0xff;
header[3] = (message_size >> 0) & 0xff;
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
if (_useMask)
{
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
}
}
else
{ // TODO: run coverage testing here
header[1] = 127 | 0x80;
header[1] = 127 | (_useMask ? 0x80 : 0);
header[2] = (message_size >> 56) & 0xff;
header[3] = (message_size >> 48) & 0xff;
header[4] = (message_size >> 40) & 0xff;
@ -648,10 +710,13 @@ namespace ix
header[8] = (message_size >> 8) & 0xff;
header[9] = (message_size >> 0) & 0xff;
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
if (_useMask)
{
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
}
}
// _txbuf will keep growing until it can be transmitted over the socket:
@ -677,6 +742,15 @@ namespace ix
_enablePerMessageDeflate, onProgressCallback);
}
WebSocketSendInfo WebSocketTransport::sendText(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::TEXT_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket()
{
std::lock_guard<std::mutex> lock(_txbufMutex);
@ -725,8 +799,18 @@ namespace ix
sendData(wsheader_type::CLOSE, normalClosure, compress);
setReadyState(CLOSING);
_socket->wakeUpFromPoll();
_socket->wakeUpFromPoll(Socket::kCloseRequest);
_socket->close();
_closeCode = 1000;
_closeReason = "Normal Closure";
setReadyState(CLOSED);
}
size_t WebSocketTransport::bufferedAmount() const
{
std::lock_guard<std::mutex> lock(_txbufMutex);
return _txbuf.size();
}
} // namespace ix

View File

@ -30,6 +30,13 @@ namespace ix
{
class Socket;
enum class SendMessageKind
{
Text,
Binary,
Ping
};
class WebSocketTransport
{
public:
@ -45,7 +52,8 @@ namespace ix
{
MSG,
PING,
PONG
PONG,
FRAGMENT
};
using OnMessageCallback = std::function<void(const std::string&,
@ -60,7 +68,7 @@ namespace ix
~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod);
int heartBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs);
@ -70,12 +78,15 @@ namespace ix
void poll();
WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendText(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message);
void close();
ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
private:
std::string _url;
@ -98,6 +109,10 @@ namespace ix
uint8_t masking_key[4];
};
// Tells whether we should mask the data we send.
// client should mask but server should not
bool _useMask;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf;
@ -146,7 +161,7 @@ namespace ix
mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
// No data was send through the socket for longer that the hearbeat period
// No data was send through the socket for longer than the heartbeat period
bool heartBeatPeriodExceeded();
void sendOnSocket();
@ -172,7 +187,6 @@ namespace ix
std::string::const_iterator end,
uint64_t message_size,
uint8_t masking_key[4]);
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);

View File

@ -3,19 +3,33 @@
#
all: brew
install: brew
brew:
mkdir -p build && (cd build ; cmake .. ; make)
mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker
NAME := bsergean/ws
TAG := $(shell cat DOCKER_VERSION)
IMG := ${NAME}:${TAG}
LATEST := ${NAME}:latest
BUILD := ${NAME}:build
docker:
docker build -t broadcast_server:latest .
docker build -t ${IMG} .
docker tag ${IMG} ${BUILD}
docker_push:
docker tag ${IMG} ${LATEST}
docker push ${LATEST}
run:
docker run --cap-add sys_ptrace -it broadcast_server:latest bash
docker run --cap-add sys_ptrace -it ws:latest
# this is helpful to remove trailing whitespaces
trail:
sh third_party/remove_trailing_whitespaces.sh
sh third_party/remote_trailing_whitespaces.sh
build:
(cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make)
@ -36,6 +50,9 @@ test_server:
test:
python test/run.py
ws_test: all
(cd ws ; bash test_ws.sh)
# For the fork that is configured with appveyor
rebase_upstream:
git fetch upstream
@ -43,5 +60,9 @@ rebase_upstream:
git reset --hard upstream/master
git push origin master --force
install_cmake_for_linux:
mkdir -p /tmp/cmake
(cd /tmp/cmake ; curl -L -O https://github.com/Kitware/CMake/releases/download/v3.14.0-rc4/cmake-3.14.0-rc4-Linux-x86_64.tar.gz ; tar zxf cmake-3.14.0-rc4-Linux-x86_64.tar.gz)
.PHONY: test
.PHONY: build

View File

@ -29,6 +29,7 @@ set (SOURCES
IXDNSLookupTest.cpp
IXSocketTest.cpp
IXSocketConnectTest.cpp
)
# Some unittest don't work on windows yet

View File

@ -0,0 +1,43 @@
/*
* IXSocketConnectTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include "catch.hpp"
#include "IXTest.h"
#include <ixwebsocket/IXSocketConnect.h>
#include <iostream>
using namespace ix;
TEST_CASE("socket_connect", "[net]")
{
SECTION("Test connecting to a known hostname")
{
std::string errMsg;
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd != -1);
}
SECTION("Test connecting to a non-existing hostname")
{
std::string errMsg;
std::string hostname("12313lhjlkjhopiupoijlkasdckljqwehrlkqjwehraospidcuaposidcasdc");
int fd = SocketConnect::connect(hostname, 80, errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
SECTION("Test connecting to a good hostname, with cancellation")
{
std::string errMsg;
// The callback returning true means we are requesting cancellation
int fd = SocketConnect::connect("www.google.com", 80, errMsg, [] { return true; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(fd == -1);
}
}

View File

@ -5,19 +5,13 @@
*/
#include <iostream>
#include <ixwebsocket/IXSocketFactory.h>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h>
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
#include "IXTest.h"
#include "catch.hpp"
#include <string.h>
using namespace ix;
@ -39,16 +33,15 @@ namespace ix
Logger() << "errMsg: " << errMsg;
REQUIRE(success);
std::cout << "Sending request: " << request
<< "to " << host << ":" << port
<< std::endl;
Logger() << "Sending request: " << request
<< "to " << host << ":" << port;
REQUIRE(socket->writeBytes(request, isCancellationRequested));
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
std::cout << "read error: " << strerror(Socket::getErrno()) << std::endl;
Logger() << "read error: " << strerror(Socket::getErrno());
REQUIRE(lineValid);
@ -62,10 +55,18 @@ TEST_CASE("socket", "[socket]")
{
SECTION("Connect to google HTTP server. Send GET request without header. Should return 200")
{
std::shared_ptr<Socket> socket(new Socket);
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("www.google.com");
int port = 80;
std::string request("GET / HTTP/1.1\r\n\r\n");
std::stringstream ss;
ss << "GET / HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "\r\n";
std::string request(ss.str());
int expectedStatus = 200;
int timeoutSecs = 3;
@ -75,11 +76,9 @@ TEST_CASE("socket", "[socket]")
#if defined(__APPLE__) or defined(__linux__)
SECTION("Connect to google HTTPS server. Send GET request without header. Should return 200")
{
# ifdef __APPLE__
std::shared_ptr<Socket> socket = std::make_shared<SocketAppleSSL>();
# else
std::shared_ptr<Socket> socket = std::make_shared<SocketOpenSSL>();
# endif
std::string errMsg;
bool tls = true;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("www.google.com");
int port = 443;
std::string request("GET / HTTP/1.1\r\n\r\n");

View File

@ -16,6 +16,7 @@
#include <iostream>
#include <stdlib.h>
#include <stack>
#include <iomanip>
namespace ix
{
@ -69,10 +70,15 @@ namespace ix
Logger() << msg;
}
int getAnyFreePortSimple()
{
static int defaultPort = 8090;
return defaultPort++;
}
int getAnyFreePort()
{
int defaultPort = 8090;
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
@ -122,8 +128,15 @@ namespace ix
{
while (true)
{
#if defined(__has_feature)
# if __has_feature(address_sanitizer)
int port = getAnyFreePortSimple();
# else
int port = getAnyFreePort();
# endif
#else
int port = getAnyFreePort();
#endif
//
// Only port above 1024 can be used by non root users, but for some
// reason I got port 7 returned with macOS when binding on port 0...
@ -136,4 +149,21 @@ namespace ix
return -1;
}
void hexDump(const std::string& prefix,
const std::string& s)
{
std::ostringstream ss;
bool upper_case = false;
for (std::string::size_type i = 0; i < s.length(); ++i)
{
ss << std::hex
<< std::setfill('0')
<< std::setw(2)
<< (upper_case ? std::uppercase : std::nouppercase) << (int)s[i];
}
std::cout << prefix << ": " << s << " => " << ss.str() << std::endl;
}
}

View File

@ -65,7 +65,7 @@ namespace
_webSocket.setUrl(url);
// The important bit for this test.
// Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second
// Set a 1 second heartbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1);
@ -128,10 +128,11 @@ namespace
{
// A dev/null server
server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -141,6 +142,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New server connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
@ -210,6 +212,10 @@ TEST_CASE("Websocket_heartbeat", "[heartbeat]")
webSocketClientA.stop();
webSocketClientB.stop();
// Here we test heart beat period exceeded for clientA
// but it should not be exceeded for clientB which has sent data.
// -> expected ping messages == 2, but add a small buffer to make this more reliable.
REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4);

View File

@ -8,6 +8,7 @@
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <ixwebsocket/IXSocketFactory.h>
#include "IXTest.h"
@ -17,13 +18,32 @@ using namespace ix;
namespace ix
{
bool startServer(ix::WebSocketServer& server)
// Test that we can override the connectionState impl to provide our own
class ConnectionStateCustom : public ConnectionState
{
void computeId()
{
// a very boring invariant id that we can test against in the unittest
_id = "foobarConnectionId";
}
};
bool startServer(ix::WebSocketServer& server,
std::string& connectionId)
{
auto factory = []() -> std::shared_ptr<ConnectionState>
{
return std::make_shared<ConnectionStateCustom>();
};
server.setConnectionStateFactory(factory);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server, &connectionId](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState,
&connectionId, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -33,12 +53,16 @@ namespace ix
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
connectionState->computeId();
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
connectionId = connectionState->getId();
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
@ -77,19 +101,21 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
Socket socket;
std::string host("localhost");
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
};
bool success = socket.connect(host, port, errMsg, isCancellationRequested);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
auto lineResult = socket.readLine(isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
@ -109,22 +135,24 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
Socket socket;
std::string host("localhost");
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
};
bool success = socket.connect(host, port, errMsg, isCancellationRequested);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
Logger() << "writeBytes";
socket.writeBytes("GET /\r\n", isCancellationRequested);
socket->writeBytes("GET /\r\n", isCancellationRequested);
auto lineResult = socket.readLine(isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
@ -144,26 +172,28 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
std::string connectionId;
REQUIRE(startServer(server, connectionId));
Socket socket;
std::string host("localhost");
std::string errMsg;
bool tls = false;
std::shared_ptr<Socket> socket = createSocket(tls, errMsg);
std::string host("localhost");
auto isCancellationRequested = []() -> bool
{
return false;
};
bool success = socket.connect(host, port, errMsg, isCancellationRequested);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
socket.writeBytes("GET / HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Version: 13\r\n"
"Sec-WebSocket-Key: foobar\r\n"
"\r\n",
isCancellationRequested);
socket->writeBytes("GET / HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Version: 13\r\n"
"Sec-WebSocket-Key: foobar\r\n"
"\r\n",
isCancellationRequested);
auto lineResult = socket.readLine(isCancellationRequested);
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
@ -174,6 +204,8 @@ TEST_CASE("Websocket_server", "[websocket_server]")
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(connectionId == "foobarConnectionId");
server.stop();
REQUIRE(server.getClients().size() == 0);
}

View File

@ -164,10 +164,21 @@ namespace
ss << "cmd_websocket_chat: Error ! " << error.reason;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
else
{
// FIXME: missing ping/pong messages
ss << "Invalid ix::WebSocketMessageType";
ss << "Unexpected ix::WebSocketMessageType";
log(ss.str());
}
});
@ -206,10 +217,11 @@ namespace
bool startServer(ix::WebSocketServer& server)
{
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
[&server](std::shared_ptr<ix::WebSocket> webSocket,
std::shared_ptr<ConnectionState> connectionState)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
[webSocket, connectionState, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
@ -219,6 +231,7 @@ namespace
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New connection";
Logger() << "id: " << connectionState->getId();
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)

View File

@ -2,14 +2,47 @@ import os
import platform
import shutil
import subprocess
import threading
class Command(object):
"""Run system commands with timeout
From http://www.bo-yang.net/2016/12/01/python-run-command-with-timeout
Python3 might have a builtin way to do that.
"""
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run_command(self, capture = False):
self.process = subprocess.Popen(self.cmd, shell=True)
self.process.communicate()
def run(self, timeout = 5 * 60):
'''5 minutes default timeout'''
thread = threading.Thread(target=self.run_command, args=())
thread.start()
thread.join(timeout)
if thread.is_alive():
print('Command timeout, kill it: ' + self.cmd)
self.process.terminate()
thread.join()
return False, 255
else:
return True, self.process.returncode
osName = platform.system()
print('os name = {}'.format(osName))
root = os.path.dirname(os.path.realpath(__file__))
buildDir = os.path.join(root, 'build')
buildDir = os.path.join(root, 'build', osName)
if not os.path.exists(buildDir):
os.mkdir(buildDir)
os.makedirs(buildDir)
os.chdir(buildDir)
@ -38,7 +71,7 @@ sanitizerFlags = sanitizersFlags[sanitizer]
# os.environ['CC'] = 'clang-cl'
# os.environ['CXX'] = 'clang-cl'
cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ..'.format(generator, sanitizerFlags)
cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ../..'.format(generator, sanitizerFlags)
print(cmakeCmd)
ret = os.system(cmakeCmd)
assert ret == 0, 'CMake failed, exiting'
@ -67,6 +100,7 @@ def findFiles(prefix):
# We need to copy the zlib DLL in the current work directory
shutil.copy(os.path.join(
'..',
'..',
'..',
'third_party',
@ -77,6 +111,9 @@ shutil.copy(os.path.join(
'bin',
'zlib.dll'), '.')
testCommand = '{} {}'.format(testBinary, os.getenv('TEST', ''))
ret = os.system(testCommand)
# lldb = "lldb --batch -o 'run' -k 'thread backtrace all' -k 'quit 1'"
lldb = "" # Disabled for now
testCommand = '{} {} {}'.format(lldb, testBinary, os.getenv('TEST', ''))
command = Command(testCommand)
timedout, ret = command.run()
assert ret == 0, 'Test command failed'

View File

@ -11,10 +11,6 @@
int main(int argc, char* argv[])
{
ix::Socket::init(); // for Windows
int result = Catch::Session().run(argc, argv);
ix::Socket::cleanup(); // for Windows
return result;
}

1
third_party/README.md vendored Normal file
View File

@ -0,0 +1 @@
Except ZLIB on Windows (whose port is currently broken...) all dependencies here are for the ws command line tools, not for the IXWebSockets library which is standalone.

View File

@ -1,2 +1,3 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.md' -exec sed -i '' 's/[[:space:]]*$//' {} \+

1
ws/.gitignore vendored
View File

@ -1 +1,2 @@
build
node_modules

View File

@ -7,7 +7,7 @@ cmake_minimum_required (VERSION 3.4.1)
project (ws)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
set (CMAKE_CXX_STANDARD 14)
@ -23,16 +23,24 @@ add_executable(ws
ixcrypto/IXHash.cpp
ixcrypto/IXUuid.cpp
IXRedisClient.cpp
ws_http_client.cpp
ws_ping_pong.cpp
ws_broadcast_server.cpp
ws_echo_server.cpp
ws_chat.cpp
ws_connect.cpp
ws_transfer.cpp
ws_send.cpp
ws_receive.cpp
ws_redis_publish.cpp
ws_redis_subscribe.cpp
ws.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws "-framework foundation" "-framework security")
endif()
target_link_libraries(ws ixwebsocket)
target_link_libraries(ws ixwebsocket cpp_redis tacopie)
install(TARGETS ws RUNTIME DESTINATION bin)

Some files were not shown because too many files have changed in this diff Show More