Compare commits

...

78 Commits

Author SHA1 Message Date
e8583000b8 ping pong added to ws 2019-02-22 21:47:57 -08:00
d642ef1a89 comments 2019-02-22 21:27:49 -08:00
2df118022d add gitignore 2019-02-22 21:26:25 -08:00
95457c8f4c add echo and broadcast server as ws sub-commands 2019-02-22 21:25:56 -08:00
0a45b7787f cleanup 2019-02-22 20:51:22 -08:00
b8c397e180 add ws_chat and ws_connect sub commands to ws 2019-02-22 20:49:26 -08:00
90105fa2b3 all CMakeLists are referenced by the top level one 2019-02-21 22:21:29 -08:00
24859fef8a add target for building with homebrew 2019-02-21 22:05:30 -08:00
73d7280723 Feature/ws cli (#15)
* New command line tool for transfering files / still very beta.

* add readme

* use cli11 for argument parsing

* json -> msgpack

* stop using base64 and use binary which can be stored in message pack
2019-02-21 21:24:53 -08:00
262de49c3c Update README.md
Add note about message fragmentation.
2019-02-21 14:08:27 -08:00
3a77e96a05 Feature/send large message (#14)
* introduce send fragment

* can pass a fin frame

* can send messages which are a perfect multiple of the chunk size

* set fin only for last fragment

* cleanup

* last fragment should be of type CONTINUATION

* Add simple send and receive programs

* speedups receiving + better way to wait for thing

* receive speedup by using linked list of chunks instead of large array

* document bug

* use chunks to receive data

* trailing spaces
2019-02-20 18:59:07 -08:00
505dd6d50f document bug 2019-02-16 10:33:37 -08:00
3f8027b65c unittest for sending large messages 2019-02-16 10:32:02 -08:00
0f2c765f45 Update formatting in README.md 2019-02-05 23:04:45 -08:00
49077f8f44 more conf in CI 2019-01-29 17:50:19 -08:00
6a23b8530f get free port that can be used by non root users (> 1024) 2019-01-28 15:24:19 -08:00
ae841af91a use dynamically generated port number to configure servers in unittest 2019-01-28 15:24:19 -08:00
44f38849b2 Merge pull request #13 from machinezone/user/bsergeant/poll
User/bsergeant/poll
2019-01-27 10:47:38 -08:00
ee12fbdb5f windows build fix 2019-01-27 10:46:02 -08:00
316c630830 constexpr to declare number of fds 2019-01-26 21:01:36 -08:00
1ea5db6110 linux fix 2019-01-26 20:57:48 -08:00
986d9a00c0 remove shutdown call 2019-01-26 20:54:23 -08:00
7a05a11014 rebase poll branch 2019-01-26 20:50:25 -08:00
f09434263c insensitive string compare when validating server connection header 2019-01-25 16:17:51 -08:00
335f594165 Merge pull request #12 from machinezone/user/bsergeant/heart-beat
Add an optional heartbeat
2019-01-25 16:14:28 -08:00
fa7ef06f4d heartbeat correct 2019-01-25 16:11:39 -08:00
3c9ec0aed0 close server socket on exit 2019-01-24 21:16:32 -08:00
c665d65cba unittest fix 2019-01-24 19:54:10 -08:00
5d4e897cc4 add an heartbeat test 2019-01-24 18:50:07 -08:00
05033714bf hearbeat 2019-01-24 12:42:49 -08:00
a02bd3f25c Update README.md 2019-01-15 09:36:43 -08:00
fdbd213fa2 check and validate the Connection: Upgrade header in client/server 2019-01-15 09:31:37 -08:00
da64d349c8 Merge pull request #10 from tonylin0826/master
Fix missing "Upgrade" header error
2019-01-15 09:22:11 -08:00
17b01a8c66 Fix missing upgrade header error 2019-01-15 15:35:37 +08:00
79dd766fab C++14 + use make_unique and make_shared to make shared pointers 2019-01-11 21:25:06 -08:00
8375b28747 add travis badge 2019-01-08 10:13:23 -08:00
e12551f309 travis -> osx 2019-01-08 10:04:47 -08:00
6102f81710 Revert "Revert "try asan on Linux"" [Back to asan on Linux]
This reverts commit 02a704a8c7.
2019-01-07 21:13:48 -08:00
9f678e5962 travis-ci: try to use clang on Linux 2019-01-07 20:49:03 -08:00
02a704a8c7 Revert "try asan on Linux"
This reverts commit dd2360ed70.
2019-01-07 20:47:25 -08:00
dd2360ed70 try asan on Linux 2019-01-07 18:29:44 -08:00
c4ab996470 build with osx on travis 2019-01-07 18:16:29 -08:00
6c54b07d92 fix simple compile error in test/IXTest.h 2019-01-07 18:08:11 -08:00
7f9bef3b8d add a travis file for real 2019-01-07 18:05:55 -08:00
12d1c5d956 add a travis file 2019-01-07 18:04:28 -08:00
e9a4bd5617 update test remote ws url 2019-01-07 11:28:53 -08:00
f34ccbfdb5 remove cmake sanitizer submodule 2019-01-07 11:26:23 -08:00
1fa75d7fb2 check select errors better 2019-01-07 11:18:00 -08:00
39140ef98c sanitizer cmake stuff 2019-01-06 18:54:16 -08:00
e30ef4a87c DNSLookup _id member does not need to be an atomic 2019-01-06 18:32:19 -08:00
9fc94f0487 DNSLookup: fix #8 2019-01-06 18:27:26 -08:00
121acdab6f DNSLookup: copy hostname and port instead of accessing member 2019-01-06 18:17:12 -08:00
6deaa03114 return false -> return -1 2019-01-06 18:10:39 -08:00
f4f30686c5 add new unittest 2019-01-06 15:14:13 -08:00
a21aae521f remove dead file 2019-01-06 14:26:11 -08:00
aed2356fc1 remove openssl testing bits for apple build 2019-01-06 14:21:49 -08:00
a478f734f6 gcc linux compile fix 2019-01-06 12:12:39 -08:00
98c579da03 make a class hierarchy for server code (IXWebSocketServer <- IXSocketServer) 2019-01-06 12:09:31 -08:00
e80def0cd0 add log 2019-01-05 21:16:13 -08:00
cc8a9e883e unittest + compiler warnings 2019-01-05 21:10:08 -08:00
4d587e35d8 windows compile fix 2019-01-05 21:02:55 -08:00
50f4fd1115 int -> ssize_t for socker recv and send 2019-01-05 20:53:50 -08:00
06d2b68696 header refactoring 2019-01-05 20:38:43 -08:00
bf6f057777 windows connect (compile fix) 2019-01-05 17:35:50 -08:00
b57c1d69f2 windows connect potential fix 2019-01-05 17:32:21 -08:00
ff265d83f9 more accurate description of errors 2019-01-05 17:18:43 -08:00
5b1c97b774 SocketTest / more debug info 2019-01-05 17:10:01 -08:00
c8c81366f7 windows (compile) fix 2019-01-05 17:04:09 -08:00
9a37fd56d1 windows fix 2019-01-05 17:02:39 -08:00
7ecaff8c5d test failure is not noticed 2019-01-05 16:30:22 -08:00
e4b0286a25 fix gcc warning 2019-01-05 16:26:11 -08:00
7ae6972306 makefile tweak 2019-01-05 14:43:21 -08:00
59cea0372b add dns lookup test 2019-01-05 14:40:17 -08:00
78d88a8520 openssl cleanup 2019-01-05 11:42:25 -08:00
273af25d57 Merge pull request #7 from bsergean/user/bsergeant/appveyor_first
unittest on appveyor
2019-01-04 17:29:23 -08:00
46d00360a8 unittest on appveyor 2019-01-04 17:28:13 -08:00
3f5935a284 windows fixes 2019-01-04 15:23:57 -08:00
c236ff66e9 Merge pull request #6 from machinezone/user/bsergeant/server
Add support for writing websocket servers (IXWebSocketServer)
2019-01-03 18:47:30 -08:00
152 changed files with 21735 additions and 1512 deletions

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
build

View File

@ -1,2 +1 @@
venv
build

0
.gitmodules vendored Normal file
View File

17
.travis.yml Normal file
View File

@ -0,0 +1,17 @@
language: cpp
dist: xenial
compiler:
- gcc
- clang
os:
- linux
- osx
matrix:
exclude:
# GCC fails on recent Travis OSX images.
- compiler: gcc
os: osx
script: python test/run.py

View File

@ -6,15 +6,19 @@
cmake_minimum_required(VERSION 3.4.1)
project(ixwebsocket C CXX)
set (CMAKE_CXX_STANDARD 11)
set (CMAKE_CXX_STANDARD 14)
set (CXX_STANDARD_REQUIRED ON)
set (CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
# -Wshorten-64-to-32 does not work with clang
if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
endif()
set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketServer.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXCancellationRequest.cpp
@ -23,16 +27,19 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketHandshake.cpp
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
)
set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketServer.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXCancellationRequest.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
@ -40,18 +47,21 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXWebSocketSendInfo.h
ixwebsocket/IXWebSocketErrorInfo.h
ixwebsocket/IXWebSocketPerMessageDeflate.h
ixwebsocket/IXWebSocketPerMessageDeflateCodec.h
ixwebsocket/IXWebSocketPerMessageDeflateOptions.h
ixwebsocket/IXWebSocketHttpHeaders.h
ixwebsocket/libwshandshake.hpp
)
# Platform specific code
if (APPLE)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/apple/IXSetThreadName_apple.cpp)
elseif (WIN32)
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp)
else()
list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp)
endif()
if (USE_TLS)
add_definitions(-DIXWEBSOCKET_USE_TLS)
@ -72,9 +82,37 @@ add_library( ixwebsocket STATIC
${IXWEBSOCKET_HEADERS}
)
target_link_libraries(ixwebsocket "z")
# gcc/Linux needs -pthread
find_package(Threads)
if(UNIX AND NOT APPLE)
find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS})
message(STATUS "OpenSSL: " ${OPENSSL_VERSION})
include_directories(${OPENSSL_INCLUDE_DIR})
endif()
if (WIN32)
get_filename_component(libz_path
${PROJECT_SOURCE_DIR}/third_party/ZLIB-Windows/zlib-1.2.11_deploy_v140/release_dynamic/x64/lib/zlib.lib
ABSOLUTE)
add_library(libz STATIC IMPORTED)
set_target_properties(libz PROPERTIES IMPORTED_LOCATION
${libz_path})
include_directories(${PROJECT_SOURCE_DIR}/third_party/ZLIB-Windows/zlib-1.2.11_deploy_v140/include)
target_link_libraries(ixwebsocket libz wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
else()
target_link_libraries(ixwebsocket
z ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
endif()
set( IXWEBSOCKET_INCLUDE_DIRS
.
../../shared/OpenSSL/include)
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)

View File

@ -1,5 +1,7 @@
# General
![Alt text](https://travis-ci.org/machinezone/IXWebSocket.svg?branch=master)
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
@ -13,7 +15,7 @@ communication channels over a single TCP connection. *IXWebSocket* is a C++ libr
## Examples
The examples folder countains a simple chat program, using a node.js broadcast server.
The ws folder countains many interactive programs for chat and file transfers demonstrating client and server usage.
Here is what the client API looks like.
@ -21,7 +23,11 @@ Here is what the client API looks like.
ix::WebSocket webSocket;
std::string url("ws://localhost:8080/");
webSocket.configure(url);
webSocket.setUrl(url);
// Optional heart beat, sent every 45 seconds when there isn't any traffic
// to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45);
// Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback(
@ -80,7 +86,7 @@ server.setOnConnectionCallback(
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
// For an echo server, we just send back to the client whatever was received by the client
// For an echo server, we just send back to the client whatever was received by the server
// All connected clients are available in an std::set. See the broadcast cpp example.
webSocket->send(str);
}
@ -128,22 +134,16 @@ No manual polling to fetch data is required. Data is sent and received instantly
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
## Limitations
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue.
## Examples
1. Bring up a terminal and jump to the examples folder.
2. Compile the example C++ code. `sh build.sh`
3. Install node.js from [here](https://nodejs.org/en/download/).
4. Type `npm install` to install the node.js dependencies. Then `node broadcast-server.js` to run the server.
5. Bring up a second terminal. `./cmd_websocket_chat bob`
6. Bring up a third terminal. `./cmd_websocket_chat bill`
7. Start typing things in any of those terminals. Hopefully you should see your message being received on the other end.
## C++ code organization
Here's a simplistic diagram which explains how the code is structured in term of class/modules.
@ -304,3 +304,13 @@ A ping message can be sent to the server, with an optional data string.
```
websocket.ping("ping data, optional (empty string is ok): limited to 125 bytes long");
```
### Heartbeat.
You can configure an optional heart beat / keep-alive, sent every 45 seconds
when there isn't any traffic to make sure that load balancers do not kill an
idle connection.
```
webSocket.setHeartBeatPeriod(45);
```

10
appveyor.yml Normal file
View File

@ -0,0 +1,10 @@
image:
- Visual Studio 2017
- Ubuntu
install:
- ls -al
- cmd: call "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\VC\Auxiliary\Build\vcvars64.bat"
- python test/run.py
build: off

View File

@ -15,5 +15,8 @@ RUN apt-get -y install cmake
COPY . .
WORKDIR test
RUN ["sh", "build_linux.sh"]
WORKDIR ws
RUN ["sh", "docker_build.sh"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "8765"]

7
examples/CMakeLists.txt Normal file
View File

@ -0,0 +1,7 @@
add_subdirectory(broadcast_server)
add_subdirectory(ping_pong)
add_subdirectory(chat)
add_subdirectory(echo_server)
add_subdirectory(ws_connect)
# add_subdirectory(cobra_publisher)

View File

@ -1,9 +0,0 @@
CMakeCache.txt
package-lock.json
CMakeFiles
ixwebsocket_unittest
cmake_install.cmake
node_modules
ixwebsocket
Makefile
build

View File

@ -1,30 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (broadcast_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 11)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(broadcast_server .)
add_executable(broadcast_server
broadcast_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(broadcast_server "-framework foundation" "-framework security")
endif()
target_link_libraries(broadcast_server ixwebsocket)
install(TARGETS broadcast_server DESTINATION bin)

View File

@ -1,74 +0,0 @@
/*
* broadcast_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,3 +0,0 @@
build
venv
node_modules

View File

@ -1,23 +0,0 @@
#
# cmd_websocket_chat.cpp
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (cmd_websocket_chat)
set (CMAKE_CXX_STANDARD 11)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(cmd_websocket_chat cmd_websocket_chat.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(cmd_websocket_chat "-framework foundation" "-framework security")
endif()
target_link_libraries(cmd_websocket_chat ixwebsocket)
install(TARGETS cmd_websocket_chat DESTINATION bin)

View File

@ -1,39 +0,0 @@
# Building
1. cmake -G .
2. make
## Disable TLS
chat$ cmake -DUSE_TLS=OFF .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
chat$ make
Scanning dependencies of target ixwebsocket
[ 16%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 33%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 50%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 66%] Linking CXX static library libixwebsocket.a
[ 66%] Built target ixwebsocket
[ 83%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
## Enable TLS (default)
```
chat$ cmake -DUSE_TLS=ON .
-- Configuring done
-- Generating done
-- Build files have been written to: /Users/bsergeant/src/foss/ixwebsocket/examples/chat
(venv) chat$ make
Scanning dependencies of target ixwebsocket
[ 14%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocket.cpp.o
[ 28%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocket.cpp.o
[ 42%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXWebSocketTransport.cpp.o
[ 57%] Building CXX object ixwebsocket/CMakeFiles/ixwebsocket.dir/ixwebsocket/IXSocketAppleSSL.cpp.o
[ 71%] Linking CXX static library libixwebsocket.a
[ 71%] Built target ixwebsocket
[ 85%] Linking CXX executable cmd_websocket_chat
[100%] Built target cmd_websocket_chat
```

View File

@ -1,15 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -1,17 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
clang++ --std=c++11 --stdlib=libc++ \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXSocketAppleSSL.cpp \
../../ixwebsocket/IXWebSocket.cpp \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat \
-framework Security \
-framework Foundation

View File

@ -1,31 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"safe-buffer": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
},
"ultron": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ultron/-/ultron-1.1.1.tgz",
"integrity": "sha512-UIEXBNeYmKptWH6z8ZnqTeS8fV74zG0/eRU9VGkpzz+LIJNs8W/zM/L+7ctCkRrgbNnnR0xxw4bKOr0cW0N0Og=="
},
"ws": {
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz",
"integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==",
"requires": {
"async-limiter": "1.0.0",
"safe-buffer": "5.1.2",
"ultron": "1.1.1"
}
}
}
}

View File

@ -1,6 +0,0 @@
{
"dependencies": {
"msgpack-js": "^0.3.0",
"ws": "^3.3.3"
}
}

View File

@ -11,12 +11,10 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 11)
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(cobra_publisher ${OPENSSL_PREFIX}/include)
include_directories(cobra_publisher .)

View File

@ -59,8 +59,8 @@ namespace ix
}
void CobraConnection::invokeEventCallback(ix::CobraConnectionEventType eventType,
const std::string& errorMsg,
const WebSocketHttpHeaders& headers)
const std::string& errorMsg,
const WebSocketHttpHeaders& headers)
{
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
if (_eventCallback)
@ -176,10 +176,10 @@ namespace ix
}
void CobraConnection::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions)
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions)
{
_appkey = appkey;
_endpoint = endpoint;
@ -229,7 +229,7 @@ namespace ix
return _webSocket.send(serializedJson).success;
}
//
//
// Extract the nonce from the handshake response
// use it to compute a hash during authentication
//
@ -297,7 +297,7 @@ namespace ix
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
// Identify subscription_id, so that we can find
// Identify subscription_id, so that we can find
// which callback to execute
if (!body.isMember("subscription_id")) return false;
Json::Value subscriptionId = body["subscription_id"];
@ -339,7 +339,7 @@ namespace ix
// publish is not thread safe as we are trying to reuse some Json objects.
//
bool CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg)
const Json::Value& msg)
{
_body["channels"] = channels;
_body["message"] = msg;
@ -371,7 +371,7 @@ namespace ix
}
void CobraConnection::subscribe(const std::string& channel,
SubscriptionCallback cb)
SubscriptionCallback cb)
{
// Create and send a subscribe pdu
Json::Value body;
@ -471,5 +471,5 @@ namespace ix
{
connect();
}
} // namespace ix

View File

@ -84,7 +84,7 @@ namespace ix
/// Returns true only if we're connected
bool isConnected() const;
/// Flush the publish queue
bool flushQueue();
@ -118,7 +118,7 @@ namespace ix
///
/// Member variables
///
///
WebSocket _webSocket;
/// Configuration data
@ -148,10 +148,10 @@ namespace ix
std::unordered_map<std::string, SubscriptionCallback> _cbs;
mutable std::mutex _cbsMutex;
// Message Queue can be touched on control+background thread,
// Message Queue can be touched on control+background thread,
// protecting with a mutex.
//
// Message queue is used when there are problems sending messages so
// Message queue is used when there are problems sending messages so
// that sending can be retried later.
std::deque<std::string> _messageQueue;
mutable std::mutex _queueMutex;
@ -159,5 +159,5 @@ namespace ix
// Cap the queue size (100 elems so far -> ~100k)
static constexpr size_t kQueueMaxSize = 256;
};
} // namespace ix

View File

@ -7,28 +7,28 @@
// //////////////////////////////////////////////////////////////////////
/*
The JsonCpp library's source code, including accompanying documentation,
The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following
conditions...
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and
The JsonCpp Authors, and is released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
Public Domain/MIT License conditions described here, as they choose.
The MIT License is about as close to Public Domain as a license can get, and is
described in clear, concise terms at:
http://en.wikipedia.org/wiki/MIT_License
The full text of the MIT License follows:
========================================================================

View File

@ -6,28 +6,28 @@
// //////////////////////////////////////////////////////////////////////
/*
The JsonCpp library's source code, including accompanying documentation,
The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following
conditions...
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and
The JsonCpp Authors, and is released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
Public Domain/MIT License conditions described here, as they choose.
The MIT License is about as close to Public Domain as a license can get, and is
described in clear, concise terms at:
http://en.wikipedia.org/wiki/MIT_License
The full text of the MIT License follows:
========================================================================
@ -1673,7 +1673,7 @@ public:
- `"rejectDupKeys": false or true`
- If true, `parse()` returns false when a key is duplicated within an object.
- `"allowSpecialFloats": false or true`
- If true, special float values (NaNs and infinities) are allowed
- If true, special float values (NaNs and infinities) are allowed
and their values are lossfree restorable.
You can examine 'settings_` yourself

View File

@ -6,28 +6,28 @@
// //////////////////////////////////////////////////////////////////////
/*
The JsonCpp library's source code, including accompanying documentation,
The JsonCpp library's source code, including accompanying documentation,
tests and demonstration applications, are licensed under the following
conditions...
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
Baptiste Lepilleur and The JsonCpp Authors explicitly disclaim copyright in all
jurisdictions which recognize such a disclaimer. In such jurisdictions,
this software is released into the Public Domain.
In jurisdictions which do not recognize Public Domain property (e.g. Germany as of
2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur and
The JsonCpp Authors, and is released under the terms of the MIT License (see below).
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
In jurisdictions which recognize Public Domain property, the user of this
software may choose to accept it either as 1) Public Domain, 2) under the
conditions of the MIT License (see below), or 3) under the terms of dual
Public Domain/MIT License conditions described here, as they choose.
The MIT License is about as close to Public Domain as a license can get, and is
described in clear, concise terms at:
http://en.wikipedia.org/wiki/MIT_License
The full text of the MIT License follows:
========================================================================
@ -238,7 +238,7 @@ static inline void fixNumericLocaleInput(char* begin, char* end) {
#include <limits>
#if defined(_MSC_VER)
#if !defined(WINCE) && defined(__STDC_SECURE_LIB__) && _MSC_VER >= 1500 // VC++ 9.0 and above
#if !defined(WINCE) && defined(__STDC_SECURE_LIB__) && _MSC_VER >= 1500 // VC++ 9.0 and above
#define snprintf sprintf_s
#elif _MSC_VER >= 1900 // VC++ 14.0 and above
#define snprintf std::snprintf
@ -383,7 +383,7 @@ bool Reader::parse(const char* beginDoc,
bool Reader::readValue() {
// readValue() may call itself only if it calls readObject() or ReadArray().
// These methods execute nodes_.push() just before and nodes_.pop)() just after calling readValue().
// These methods execute nodes_.push() just before and nodes_.pop)() just after calling readValue().
// parse() executes one nodes_.push(), so > instead of >=.
if (nodes_.size() > stackLimit_g) throwRuntimeError("Exceeded stackLimit in readValue().");
@ -4215,7 +4215,7 @@ Value& Path::make(Value& root) const {
#endif
#endif
#if defined(__BORLANDC__)
#if defined(__BORLANDC__)
#include <float.h>
#define isfinite _finite
#define snprintf _snprintf
@ -5290,7 +5290,7 @@ StreamWriter* StreamWriterBuilder::newStreamWriter() const
JSONCPP_STRING cs_str = settings_["commentStyle"].asString();
bool eyc = settings_["enableYAMLCompatibility"].asBool();
bool dnp = settings_["dropNullPlaceholders"].asBool();
bool usf = settings_["useSpecialFloats"].asBool();
bool usf = settings_["useSpecialFloats"].asBool();
unsigned int pre = settings_["precision"].asUInt();
CommentStyle::Enum cs = CommentStyle::All;
if (cs_str == "All") {

View File

@ -1,30 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (echo_server)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (OPENSSL_PREFIX /usr/local/opt/openssl) # Homebrew openssl
set (CMAKE_CXX_STANDARD 11)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(echo_server .)
add_executable(echo_server
echo_server.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(echo_server "-framework foundation" "-framework security")
endif()
target_link_libraries(echo_server ixwebsocket)
install(TARGETS echo_server DESTINATION bin)

View File

@ -1,68 +0,0 @@
/*
* echo_server.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
webSocket->send(str);
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,27 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ping_pong)
set (CMAKE_CXX_STANDARD 11)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ping_pong ping_pong.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ping_pong "-framework foundation" "-framework security")
endif()
if (WIN32)
target_link_libraries(ping_pong wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
endif()
target_link_libraries(ping_pong ixwebsocket)
install(TARGETS ping_pong DESTINATION bin)

View File

@ -1,15 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
-I ../.. \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat

View File

@ -1,17 +0,0 @@
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
response = await websocket.recv()
print(response)
pong_waiter = await websocket.ping('coucou')
ret = await pong_waiter # only if you want to wait for the pong
print(ret)
asyncio.get_event_loop().run_until_complete(
hello('ws://localhost:5678'))

View File

@ -1,21 +0,0 @@
#!/usr/bin/env python
import os
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(message)
await websocket.send(message)
if os.getenv('TEST_CLOSE'):
print('Closing')
# breakpoint()
await websocket.close(1001, 'close message')
# await websocket.close()
break
asyncio.get_event_loop().run_until_complete(
websockets.serve(echo, 'localhost', 5678))
asyncio.get_event_loop().run_forever()

View File

@ -1,9 +0,0 @@
#!/bin/sh
test -d build || {
mkdir -p build
cd build
cmake ..
}
(cd build ; make)
./build/ping_pong ws://localhost:5678

View File

@ -1,3 +0,0 @@
build
venv
node_modules

View File

@ -1,22 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ws_connect)
set (CMAKE_CXX_STANDARD 11)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ws_connect ws_connect.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws_connect "-framework foundation" "-framework security")
endif()
target_link_libraries(ws_connect ixwebsocket)
install(TARGETS ws_connect DESTINATION bin)

View File

@ -1,11 +0,0 @@
# Building
1. mkdir build
2. cd build
3. cmake ..
4. make
## Disable TLS
* Enable: `cmake -DUSE_TLS=OFF ..`
* Disable: `cmake -DUSE_TLS=ON ..`

View File

@ -1,25 +0,0 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. You can also use cmake.
g++ --std=c++11 \
-DIXWEBSOCKET_USE_TLS \
-g \
../../ixwebsocket/IXEventFd.cpp \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXSetThreadName.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
../../ixwebsocket/IXDNSLookup.cpp \
../../ixwebsocket/IXSocketConnect.cpp \
../../ixwebsocket/IXSocketOpenSSL.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
-I ../.. \
ws_connect.cpp \
-o ws_connect \
-lcrypto -lssl -lz -lpthread

View File

@ -8,7 +8,7 @@
#include <chrono>
namespace ix
namespace ix
{
CancellationRequest makeCancellationRequestWithTimeout(int secs,
std::atomic<bool>& requestInitCancellation)
@ -20,7 +20,7 @@ namespace ix
{
// Was an explicit cancellation requested ?
if (requestInitCancellation) return true;
auto now = std::chrono::system_clock::now();
if ((now - start) > timeout) return true;

View File

@ -9,7 +9,7 @@
#include <functional>
#include <atomic>
namespace ix
namespace ix
{
using CancellationRequest = std::function<bool()>;

View File

@ -5,14 +5,12 @@
*/
#include "IXDNSLookup.h"
#include "IXNetSystem.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include <chrono>
namespace ix
namespace ix
{
const int64_t DNSLookup::kDefaultWait = 10; // ms
@ -28,7 +26,7 @@ namespace ix
_done(false),
_id(_nextId++)
{
}
DNSLookup::~DNSLookup()
@ -38,7 +36,7 @@ namespace ix
_activeJobs.erase(_id);
}
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg)
{
@ -51,7 +49,7 @@ namespace ix
std::string sport = std::to_string(port);
struct addrinfo* res;
int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(),
int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(),
&hints, &res);
if (getaddrinfo_result)
{
@ -103,11 +101,11 @@ namespace ix
_activeJobs.insert(_id);
}
//
//
// Good resource on thread forced termination
// https://www.bo-yang.net/2017/11/19/cpp-kill-detached-thread
//
_thread = std::thread(&DNSLookup::run, this);
_thread = std::thread(&DNSLookup::run, this, _id, _hostname, _port);
_thread.detach();
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
@ -140,11 +138,13 @@ namespace ix
return _res;
}
void DNSLookup::run()
void DNSLookup::run(uint64_t id, const std::string& hostname, int port) // thread runner
{
uint64_t id = _id;
// We don't want to read or write into members variables of an object that could be
// gone, so we use temporary variables (res) or we pass in by copy everything that
// getAddrInfo needs to work.
std::string errMsg;
_res = getAddrInfo(_hostname, _port, errMsg);
struct addrinfo* res = getAddrInfo(hostname, port, errMsg);
// if this isn't an active job, and the control thread is gone
// there is not thing to do, and we don't want to touch the defunct
@ -155,9 +155,10 @@ namespace ix
return;
}
// Copy result into the member variables
_res = res;
_errMsg = errMsg;
_condition.notify_one();
_done = true;
}
}

View File

@ -3,7 +3,7 @@
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*
* Resolve a hostname+port to a struct addrinfo obtained with getaddrinfo
* Resolve a hostname+port to a struct addrinfo obtained with getaddrinfo
* Does this in a background thread so that it can be cancelled, since
* getaddrinfo is a blocking call, and we don't want to block the main thread on Mobile.
*/
@ -20,7 +20,7 @@
struct addrinfo;
namespace ix
namespace ix
{
class DNSLookup {
public:
@ -39,11 +39,11 @@ namespace ix
struct addrinfo* resolveBlocking(std::string& errMsg,
const CancellationRequest& isCancellationRequested);
static struct addrinfo* getAddrInfo(const std::string& hostname,
static struct addrinfo* getAddrInfo(const std::string& hostname,
int port,
std::string& errMsg);
void run(); // thread runner
void run(uint64_t id, const std::string& hostname, int port); // thread runner
std::string _hostname;
int _port;
@ -56,7 +56,7 @@ namespace ix
std::condition_variable _condition;
std::mutex _conditionVariableMutex;
std::atomic<uint64_t> _id;
uint64_t _id;
static std::atomic<uint64_t> _nextId;
static std::set<uint64_t> _activeJobs;
static std::mutex _activeJobsMutex;

View File

@ -14,7 +14,7 @@
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
@ -28,9 +28,9 @@
#include <unistd.h> // for write
#endif
namespace ix
namespace ix
{
EventFd::EventFd() :
EventFd::EventFd() :
_eventfd(-1)
{
#ifdef __linux__
@ -65,7 +65,7 @@ namespace ix
#if defined(__linux__)
if (_eventfd == -1) return false;
// 0 is a special value ; select will not wake up
// 0 is a special value ; select will not wake up
uint64_t value = 0;
// we should write 8 bytes for an uint64_t

View File

@ -6,7 +6,7 @@
#pragma once
namespace ix
namespace ix
{
class EventFd {
public:

25
ixwebsocket/IXNetSystem.h Normal file
View File

@ -0,0 +1,25 @@
/*
* IXNetSystem.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#pragma once
#ifdef _WIN32
# include <WS2tcpip.h>
# include <WinSock2.h>
# include <basetsd.h>
# include <io.h>
# include <ws2def.h>
#else
# include <arpa/inet.h>
# include <errno.h>
# include <netdb.h>
# include <netinet/tcp.h>
# include <sys/select.h>
# include <sys/socket.h>
# include <sys/stat.h>
# include <sys/time.h>
# include <unistd.h>
#endif

View File

@ -0,0 +1,14 @@
/*
* IXProgressCallback.h
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <functional>
namespace ix
{
using OnProgressCallback = std::function<bool(int current, int total)>;
}

View File

@ -1,31 +0,0 @@
/*
* IXSetThreadName.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXSetThreadName.h"
#include <pthread.h>
namespace ix
{
void setThreadName(const std::string& name)
{
#if defined(__linux__)
//
// Linux only reserve 16 bytes for its thread names
// See prctl and PR_SET_NAME property in
// http://man7.org/linux/man-pages/man2/prctl.2.html
//
pthread_setname_np(pthread_self(),
name.substr(0, 15).c_str());
#elif defined(__APPLE__)
//
// Apple is more generous with 64 chars.
// notice how the Apple version does not take a pthread_t argument
//
pthread_setname_np(name.substr(0, 63).c_str());
#elif
#error("Unsupported platform");
#endif
}
}

View File

@ -6,23 +6,7 @@
#include "IXSocket.h"
#include "IXSocketConnect.h"
#ifdef _WIN32
# include <basetsd.h>
# include <WinSock2.h>
# include <ws2def.h>
# include <WS2tcpip.h>
# include <io.h>
#else
# include <unistd.h>
# include <errno.h>
# include <netdb.h>
# include <netinet/tcp.h>
# include <sys/socket.h>
# include <sys/time.h>
# include <sys/select.h>
# include <sys/stat.h>
#endif
#include "IXNetSystem.h"
#include <stdio.h>
#include <stdlib.h>
@ -35,9 +19,12 @@
#include <algorithm>
#include <iostream>
namespace ix
namespace ix
{
Socket::Socket(int fd) :
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
Socket::Socket(int fd) :
_sockfd(fd)
{
@ -48,11 +35,11 @@ namespace ix
close();
}
void Socket::poll(const OnPollCallback& onPollCallback)
void Socket::poll(const OnPollCallback& onPollCallback, int timeoutSecs)
{
if (_sockfd == -1)
{
onPollCallback();
onPollCallback(PollResultType_Error);
return;
}
@ -64,11 +51,26 @@ namespace ix
FD_SET(_eventfd.getFd(), &rfds);
#endif
struct timeval timeout;
timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 0;
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _eventfd.getFd());
select(nfds + 1, &rfds, nullptr, nullptr, nullptr);
int ret = select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout);
onPollCallback();
PollResultType pollResult = PollResultType_ReadyForRead;
if (ret < 0)
{
pollResult = PollResultType_Error;
}
else if (ret == 0)
{
pollResult = PollResultType_Timeout;
}
onPollCallback(pollResult);
}
void Socket::wakeUpFromPoll()
@ -100,32 +102,32 @@ namespace ix
_sockfd = -1;
}
int Socket::send(char* buffer, size_t length)
ssize_t Socket::send(char* buffer, size_t length)
{
int flags = 0;
#ifdef MSG_NOSIGNAL
flags = MSG_NOSIGNAL;
#endif
return (int) ::send(_sockfd, buffer, length, flags);
return ::send(_sockfd, buffer, length, flags);
}
int Socket::send(const std::string& buffer)
ssize_t Socket::send(const std::string& buffer)
{
return send((char*)&buffer[0], buffer.size());
}
int Socket::recv(void* buffer, size_t length)
ssize_t Socket::recv(void* buffer, size_t length)
{
int flags = 0;
#ifdef MSG_NOSIGNAL
flags = MSG_NOSIGNAL;
#endif
return (int) ::recv(_sockfd, (char*) buffer, length, flags);
return ::recv(_sockfd, (char*) buffer, length, flags);
}
int Socket::getErrno() const
int Socket::getErrno()
{
#ifdef _WIN32
return WSAGetLastError();
@ -148,7 +150,7 @@ namespace ix
#ifdef _WIN32
INT rc;
WSADATA wsaData;
rc = WSAStartup(MAKEWORD(2, 2), &wsaData);
return rc != 0;
#else
@ -170,7 +172,7 @@ namespace ix
{
if (isCancellationRequested()) return false;
int ret;
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
@ -187,11 +189,16 @@ namespace ix
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms
timeout.tv_usec = 1 * 1000; // 1ms timeout
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout);
if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
return false;
}
continue;
}
@ -213,7 +220,7 @@ namespace ix
char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size();
int ret = send(buffer, len);
ssize_t ret = send(buffer, len);
// We wrote some bytes, as needed, all good.
if (ret > 0)

View File

@ -11,33 +11,46 @@
#include <mutex>
#include <atomic>
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#endif
#include "IXEventFd.h"
#include "IXCancellationRequest.h"
namespace ix
namespace ix
{
enum PollResultType
{
PollResultType_ReadyForRead = 0,
PollResultType_Timeout = 1,
PollResultType_Error = 2
};
class Socket {
public:
using OnPollCallback = std::function<void()>;
using OnPollCallback = std::function<void(PollResultType)>;
Socket(int fd = -1);
virtual ~Socket();
void configure();
virtual void poll(const OnPollCallback& onPollCallback);
virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout);
virtual void wakeUpFromPoll();
// Virtual methods
virtual bool connect(const std::string& url,
virtual bool connect(const std::string& url,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested);
virtual void close();
virtual int send(char* buffer, size_t length);
virtual int send(const std::string& buffer);
virtual int recv(void* buffer, size_t length);
virtual ssize_t send(char* buffer, size_t length);
virtual ssize_t send(const std::string& buffer);
virtual ssize_t recv(void* buffer, size_t length);
// Blocking and cancellable versions, working with socket that can be set
// to non blocking mode. Used during HTTP upgrade.
@ -47,7 +60,7 @@ namespace ix
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(const CancellationRequest& isCancellationRequested);
int getErrno() const;
static int getErrno();
static bool init(); // Required on Windows to initialize WinSocket
static void cleanup(); // Required on Windows to cleanup WinSocket
@ -57,5 +70,9 @@ namespace ix
std::atomic<int> _sockfd;
std::mutex _socketMutex;
EventFd _eventfd;
private:
static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout;
};
}

View File

@ -50,7 +50,7 @@ OSStatus read_from_socket(SSLConnectionRef connection, void *data, size_t *len)
else
return noErr;
}
else if (0 == status)
else if (0 == status)
{
*len = 0;
return errSSLClosedGraceful;
@ -102,7 +102,7 @@ OSStatus write_to_socket(SSLConnectionRef connection, const void *data, size_t *
else
{
*len = 0;
if (EAGAIN == errno)
if (EAGAIN == errno)
{
return errSSLWouldBlock;
}
@ -141,7 +141,7 @@ std::string getSSLErrorDescription(OSStatus status)
} // anonymous namespace
namespace ix
namespace ix
{
SocketAppleSSL::SocketAppleSSL(int fd) : Socket(fd),
_sslContext(nullptr)
@ -176,11 +176,11 @@ namespace ix
do {
status = SSLHandshake(_sslContext);
} while (errSSLWouldBlock == status ||
} while (errSSLWouldBlock == status ||
errSSLServerAuthCompleted == status);
}
if (noErr != status)
if (noErr != status)
{
errMsg = getSSLErrorDescription(status);
close();
@ -203,7 +203,7 @@ namespace ix
Socket::close();
}
int SocketAppleSSL::send(char* buf, size_t nbyte)
ssize_t SocketAppleSSL::send(char* buf, size_t nbyte)
{
ssize_t ret = 0;
OSStatus status;
@ -218,28 +218,28 @@ namespace ix
if (ret == 0 && errSSLClosedAbort != status)
ret = -1;
return (int) ret;
return ret;
}
int SocketAppleSSL::send(const std::string& buffer)
ssize_t SocketAppleSSL::send(const std::string& buffer)
{
return send((char*)&buffer[0], buffer.size());
}
// No wait support
int SocketAppleSSL::recv(void* buf, size_t nbyte)
ssize_t SocketAppleSSL::recv(void* buf, size_t nbyte)
{
OSStatus status = errSSLWouldBlock;
while (errSSLWouldBlock == status)
while (errSSLWouldBlock == status)
{
size_t processed = 0;
std::lock_guard<std::mutex> lock(_mutex);
status = SSLRead(_sslContext, buf, nbyte, &processed);
if (processed > 0)
return (int) processed;
return (ssize_t) processed;
// The connection was reset, inform the caller that this
// The connection was reset, inform the caller that this
// Socket should close
if (status == errSSLClosedGraceful ||
status == errSSLClosedNoNotify ||

View File

@ -14,23 +14,23 @@
#include <mutex>
namespace ix
namespace ix
{
class SocketAppleSSL : public Socket
class SocketAppleSSL : public Socket
{
public:
SocketAppleSSL(int fd = -1);
~SocketAppleSSL();
virtual bool connect(const std::string& host,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested) final;
virtual void close() final;
virtual int send(char* buffer, size_t length) final;
virtual int send(const std::string& buffer) final;
virtual int recv(void* buffer, size_t length) final;
virtual ssize_t send(char* buffer, size_t length) final;
virtual ssize_t send(const std::string& buffer) final;
virtual ssize_t recv(void* buffer, size_t length) final;
private:
SSLContextRef _sslContext;

View File

@ -6,23 +6,7 @@
#include "IXSocketConnect.h"
#include "IXDNSLookup.h"
#ifdef _WIN32
# include <basetsd.h>
# include <WinSock2.h>
# include <ws2def.h>
# include <WS2tcpip.h>
# include <io.h>
#else
# include <unistd.h>
# include <errno.h>
# include <netdb.h>
# include <netinet/tcp.h>
# include <sys/socket.h>
# include <sys/time.h>
# include <sys/select.h>
# include <sys/stat.h>
#endif
#include "IXNetSystem.h"
#include <string.h>
#include <fcntl.h>
@ -46,7 +30,7 @@ namespace
}
}
namespace ix
namespace ix
{
//
// This function can be cancelled every 50 ms
@ -58,7 +42,7 @@ namespace ix
const CancellationRequest& isCancellationRequested)
{
errMsg = "no error";
int fd = socket(address->ai_family,
address->ai_socktype,
address->ai_protocol);
@ -86,31 +70,45 @@ namespace ix
{
closeSocket(fd);
errMsg = "Cancelled";
return false;
return -1;
}
fd_set wfds;
FD_ZERO(&wfds);
FD_SET(fd, &wfds);
// 50ms select timeout
// Use select to check the status of the new connection
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 50 * 1000;
timeout.tv_usec = 10 * 1000; // 10ms timeout
fd_set wfds;
fd_set efds;
select(fd + 1, nullptr, &wfds, nullptr, &timeout);
FD_ZERO(&wfds);
FD_SET(fd, &wfds);
FD_ZERO(&efds);
FD_SET(fd, &efds);
if (select(fd + 1, nullptr, &wfds, &efds, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
closeSocket(fd);
errMsg = std::string("Connect error, select error: ") + strerror(errno);
return -1;
}
// Nothing was written to the socket, wait again.
if (!FD_ISSET(fd, &wfds)) continue;
// Something was written to the socket
// Something was written to the socket. Check for errors.
int optval = -1;
socklen_t optlen = sizeof(optval);
#ifdef _WIN32
// On connect error, in async mode, windows will write to the exceptions fds
if (FD_ISSET(fd, &efds))
#else
// getsockopt() puts the errno value for connect into optval so 0
// means no-error.
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 ||
optval != 0)
#endif
{
closeSocket(fd);
errMsg = strerror(optval);
@ -173,7 +171,7 @@ namespace ix
// 2. make socket non blocking
#ifdef _WIN32
unsigned long nonblocking = 1;
ioctlsocket(_sockfd, FIONBIO, &nonblocking);
ioctlsocket(sockfd, FIONBIO, &nonblocking);
#else
fcntl(sockfd, F_SETFL, O_NONBLOCK); // make socket non blocking
#endif
@ -181,7 +179,7 @@ namespace ix
// 3. (apple) prevent SIGPIPE from being emitted when the remote end disconnect
#ifdef SO_NOSIGPIPE
int value = 1;
setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE,
setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE,
(void *)&value, sizeof(value));
#endif
}

View File

@ -12,7 +12,7 @@
struct addrinfo;
namespace ix
namespace ix
{
class SocketConnect {
public:

View File

@ -18,67 +18,15 @@
#include <errno.h>
#define socketerrno errno
namespace {
std::mutex initMutex;
bool openSSLInitialized = false;
bool openSSLInitializationSuccessful = false;
bool openSSLInitialize(std::string& errMsg)
namespace ix
{
std::lock_guard<std::mutex> lock(initMutex);
std::atomic<bool> SocketOpenSSL::_openSSLInitializationSuccessful(false);
if (openSSLInitialized)
{
return openSSLInitializationSuccessful;
}
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
if (!OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, nullptr))
{
errMsg = "OPENSSL_init_ssl failure";
openSSLInitializationSuccessful = false;
openSSLInitialized = true;
return false;
}
#else
(void) OPENSSL_config(nullptr);
#endif
(void) OpenSSL_add_ssl_algorithms();
(void) SSL_load_error_strings();
openSSLInitializationSuccessful = true;
return true;
}
int openssl_verify_callback(int preverify, X509_STORE_CTX *x509_ctx)
{
return preverify;
}
/* create new SSL connection state object */
SSL *openssl_create_connection(SSL_CTX *ctx, int socket)
{
assert(ctx != nullptr);
assert(socket > 0);
SSL *ssl = SSL_new(ctx);
if (ssl)
SSL_set_fd(ssl, socket);
return ssl;
}
} // anonymous namespace
namespace ix
{
SocketOpenSSL::SocketOpenSSL(int fd) : Socket(fd),
_ssl_connection(nullptr),
_ssl_connection(nullptr),
_ssl_context(nullptr)
{
;
std::call_once(_openSSLInitFlag, &SocketOpenSSL::openSSLInitialize, this);
}
SocketOpenSSL::~SocketOpenSSL()
@ -86,6 +34,20 @@ namespace ix
SocketOpenSSL::close();
}
void SocketOpenSSL::openSSLInitialize()
{
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
if (!OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, nullptr)) return;
#else
(void) OPENSSL_config(nullptr);
#endif
(void) OpenSSL_add_ssl_algorithms();
(void) SSL_load_error_strings();
_openSSLInitializationSuccessful = true;
}
std::string SocketOpenSSL::getSSLError(int ret)
{
unsigned long e;
@ -118,7 +80,7 @@ namespace ix
return "OpenSSL failed - underlying BIO reported an I/O error";
}
}
else if (err == SSL_ERROR_SSL)
else if (err == SSL_ERROR_SSL)
{
e = ERR_get_error();
std::string errMsg("OpenSSL failed - ");
@ -153,7 +115,12 @@ namespace ix
if (ctx)
{
// To skip verification, pass in SSL_VERIFY_NONE
SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, openssl_verify_callback);
SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER,
[](int preverify, X509_STORE_CTX*) -> int
{
return preverify;
});
SSL_CTX_set_verify_depth(ctx, 4);
SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
}
@ -182,7 +149,7 @@ namespace ix
#if OPENSSL_VERSION_NUMBER < 0x10100000L
// Check server name
bool hostname_verifies_ok = false;
STACK_OF(GENERAL_NAME) *san_names =
STACK_OF(GENERAL_NAME) *san_names =
(STACK_OF(GENERAL_NAME)*) X509_get_ext_d2i((X509 *)server_cert,
NID_subject_alt_name, NULL, NULL);
if (san_names)
@ -193,8 +160,8 @@ namespace ix
if (sk_name->type == GEN_DNS)
{
char *name = (char *)ASN1_STRING_data(sk_name->d.dNSName);
if ((size_t)ASN1_STRING_length(sk_name->d.dNSName) == strlen(name) &&
checkHost(hostname, name))
if ((size_t)ASN1_STRING_length(sk_name->d.dNSName) == strlen(name) &&
checkHost(hostname, name))
{
hostname_verifies_ok = true;
break;
@ -218,8 +185,8 @@ namespace ix
ASN1_STRING *cn_asn1 = X509_NAME_ENTRY_get_data(cn_entry);
char *cn = (char *)ASN1_STRING_data(cn_asn1);
if ((size_t)ASN1_STRING_length(cn_asn1) == strlen(cn) &&
checkHost(hostname, cn))
if ((size_t)ASN1_STRING_length(cn_asn1) == strlen(cn) &&
checkHost(hostname, cn))
{
hostname_verifies_ok = true;
}
@ -238,7 +205,7 @@ namespace ix
return true;
}
bool SocketOpenSSL::openSSLHandshake(const std::string& host, std::string& errMsg)
bool SocketOpenSSL::openSSLHandshake(const std::string& host, std::string& errMsg)
{
while (true)
{
@ -283,8 +250,9 @@ namespace ix
{
std::lock_guard<std::mutex> lock(_mutex);
if (!openSSLInitialize(errMsg))
if (!_openSSLInitializationSuccessful)
{
errMsg = "OPENSSL_init_ssl failure";
return false;
}
@ -306,14 +274,15 @@ namespace ix
errMsg += ERR_error_string(ssl_err, nullptr);
}
_ssl_connection = openssl_create_connection(_ssl_context, _sockfd);
if (nullptr == _ssl_connection)
_ssl_connection = SSL_new(_ssl_context);
if (_ssl_connection == nullptr)
{
errMsg = "OpenSSL failed to connect";
SSL_CTX_free(_ssl_context);
_ssl_context = nullptr;
return false;
}
SSL_set_fd(_ssl_connection, _sockfd);
// SNI support
SSL_set_tlsext_host_name(_ssl_connection, host.c_str());
@ -357,7 +326,7 @@ namespace ix
Socket::close();
}
int SocketOpenSSL::send(char* buf, size_t nbyte)
ssize_t SocketOpenSSL::send(char* buf, size_t nbyte)
{
ssize_t sent = 0;
@ -371,7 +340,7 @@ namespace ix
}
ERR_clear_error();
int write_result = SSL_write(_ssl_connection, buf + sent, (int) nbyte);
ssize_t write_result = SSL_write(_ssl_connection, buf + sent, (int) nbyte);
int reason = SSL_get_error(_ssl_connection, write_result);
if (reason == SSL_ERROR_NONE) {
@ -384,16 +353,16 @@ namespace ix
return -1;
}
}
return (int) sent;
return sent;
}
int SocketOpenSSL::send(const std::string& buffer)
ssize_t SocketOpenSSL::send(const std::string& buffer)
{
return send((char*)&buffer[0], buffer.size());
}
// No wait support
int SocketOpenSSL::recv(void* buf, size_t nbyte)
ssize_t SocketOpenSSL::recv(void* buf, size_t nbyte)
{
while (true)
{
@ -405,7 +374,7 @@ namespace ix
}
ERR_clear_error();
int read_result = SSL_read(_ssl_connection, buf, (int) nbyte);
ssize_t read_result = SSL_read(_ssl_connection, buf, (int) nbyte);
if (read_result > 0)
{

View File

@ -17,25 +17,26 @@
#include <mutex>
namespace ix
namespace ix
{
class SocketOpenSSL : public Socket
class SocketOpenSSL : public Socket
{
public:
SocketOpenSSL(int fd = -1);
~SocketOpenSSL();
virtual bool connect(const std::string& host,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg,
const CancellationRequest& isCancellationRequested) final;
virtual void close() final;
virtual int send(char* buffer, size_t length) final;
virtual int send(const std::string& buffer) final;
virtual int recv(void* buffer, size_t length) final;
virtual ssize_t send(char* buffer, size_t length) final;
virtual ssize_t send(const std::string& buffer) final;
virtual ssize_t recv(void* buffer, size_t length) final;
private:
void openSSLInitialize();
std::string getSSLError(int ret);
SSL_CTX* openSSLCreateContext(std::string& errMsg);
bool openSSLHandshake(const std::string& hostname, std::string& errMsg);
@ -44,10 +45,13 @@ namespace ix
std::string& errMsg);
bool checkHost(const std::string& host, const char *pattern);
SSL_CTX* _ssl_context;
SSL* _ssl_connection;
SSL_CTX* _ssl_context;
const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
std::once_flag _openSSLInitFlag;
static std::atomic<bool> _openSSLInitializationSuccessful;
};
}

View File

@ -47,7 +47,7 @@
// link with ntdsapi.lib for DsMakeSpn function
#pragma comment(lib, "ntdsapi.lib")
// The following function assumes that Winsock
// The following function assumes that Winsock
// has already been initialized
@ -59,7 +59,7 @@
# error("This file should only be built on Windows")
#endif
namespace ix
namespace ix
{
SocketSChannel::SocketSChannel()
{
@ -68,7 +68,7 @@ namespace ix
SocketSChannel::~SocketSChannel()
{
}
bool SocketSChannel::connect(const std::string& host,
@ -78,7 +78,7 @@ namespace ix
return Socket::connect(host, port, errMsg);
}
void SocketSChannel::secureSocket()
{
// there will be a lot to do here ...

View File

@ -8,15 +8,15 @@
#include "IXSocket.h"
namespace ix
namespace ix
{
class SocketSChannel : public Socket
class SocketSChannel : public Socket
{
public:
SocketSChannel();
~SocketSChannel();
virtual bool connect(const std::string& host,
virtual bool connect(const std::string& host,
int port,
std::string& errMsg) final;
virtual void close() final;

View File

@ -0,0 +1,228 @@
/*
* IXSocketServer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocketServer.h"
#include "IXSocket.h"
#include "IXSocketConnect.h"
#include "IXNetSystem.h"
#include <iostream>
#include <sstream>
#include <future>
#include <string.h>
namespace ix
{
const int SocketServer::kDefaultPort(8080);
const std::string SocketServer::kDefaultHost("127.0.0.1");
const int SocketServer::kDefaultTcpBacklog(5);
const size_t SocketServer::kDefaultMaxConnections(32);
SocketServer::SocketServer(int port,
const std::string& host,
int backlog,
size_t maxConnections) :
_port(port),
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_stop(false)
{
}
SocketServer::~SocketServer()
{
stop();
}
void SocketServer::logError(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl;
}
void SocketServer::logInfo(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl;
}
std::pair<bool, std::string> SocketServer::listen()
{
struct sockaddr_in server; // server address information
// Get a socket for accepting connections.
if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error creating socket): "
<< strerror(Socket::getErrno());
return std::make_pair(false, ss.str());
}
// Make that socket reusable. (allow restarting this server at will)
int enable = 1;
if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR,
(char*) &enable, sizeof(enable)) < 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error calling setsockopt(SO_REUSEADDR) "
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
::close(_serverFd);
return std::make_pair(false, ss.str());
}
// Bind the socket to the server address.
server.sin_family = AF_INET;
server.sin_port = htons(_port);
// Using INADDR_ANY trigger a pop-up box as binding to any address is detected
// by the osx firewall. We need to codesign the binary with a self-signed cert
// to allow that, but this is a bit of a pain. (this is what node or python would do).
//
// Using INADDR_LOOPBACK also does not work ... while it should.
// We default to 127.0.0.1 (localhost)
//
server.sin_addr.s_addr = inet_addr(_host.c_str());
if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error calling bind "
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
::close(_serverFd);
return std::make_pair(false, ss.str());
}
//
// Listen for connections. Specify the tcp backlog.
//
if (::listen(_serverFd, _backlog) < 0)
{
std::stringstream ss;
ss << "SocketServer::listen() error calling listen "
<< "at address " << _host << ":" << _port
<< " : " << strerror(Socket::getErrno());
::close(_serverFd);
return std::make_pair(false, ss.str());
}
return std::make_pair(true, "");
}
void SocketServer::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&SocketServer::run, this);
}
void SocketServer::wait()
{
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_conditionVariable.wait(lock);
}
void SocketServer::stop()
{
if (!_thread.joinable()) return; // nothing to do
_stop = true;
_thread.join();
_stop = false;
_conditionVariable.notify_one();
::close(_serverFd);
}
void SocketServer::run()
{
// Set the socket to non blocking mode, so that accept calls are not blocking
SocketConnect::configure(_serverFd);
// Return value of std::async, ignored
std::future<void> f;
for (;;)
{
if (_stop) return;
// Use select to check whether a new connection is in progress
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms timeout
FD_ZERO(&rfds);
FD_SET(_serverFd, &rfds);
if (select(_serverFd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
std::stringstream ss;
ss << "SocketServer::run() error in select: "
<< strerror(Socket::getErrno());
logError(ss.str());
continue;
}
if (!FD_ISSET(_serverFd, &rfds))
{
// We reached the select timeout, and no new connections are pending
continue;
}
// Accept a connection.
struct sockaddr_in client; // client address information
int clientFd; // socket connected to client
socklen_t addressLen = sizeof(socklen_t);
memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{
if (Socket::getErrno() != EWOULDBLOCK)
{
// FIXME: that error should be propagated
std::stringstream ss;
ss << "SocketServer::run() error accepting connection: "
<< strerror(Socket::getErrno());
logError(ss.str());
}
continue;
}
if (getConnectedClientsCount() >= _maxConnections)
{
std::stringstream ss;
ss << "SocketServer::run() reached max connections = "
<< _maxConnections << ". "
<< "Not accepting connection";
logError(ss.str());
::close(clientFd);
continue;
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
// so we need to declare it outside of this loop
f = std::async(std::launch::async,
&SocketServer::handleConnection,
this,
clientFd);
}
}
}

View File

@ -0,0 +1,68 @@
/*
* IXSocketServer.h
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <utility> // pair
#include <string>
#include <set>
#include <thread>
#include <mutex>
#include <functional>
#include <memory>
#include <atomic>
#include <condition_variable>
namespace ix
{
class SocketServer {
public:
SocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = SocketServer::kDefaultMaxConnections);
virtual ~SocketServer();
virtual void stop();
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
const static size_t kDefaultMaxConnections;
void start();
std::pair<bool, std::string> listen();
void wait();
protected:
// Logging
void logError(const std::string& str);
void logInfo(const std::string& str);
private:
// Member variables
int _port;
std::string _host;
int _backlog;
size_t _maxConnections;
// socket for accepting connections
int _serverFd;
std::mutex _logMutex;
std::atomic<bool> _stop;
std::thread _thread;
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
// Methods
void run();
virtual void handleConnection(int fd) = 0;
virtual size_t getConnectedClientsCount() = 0;
};
}

View File

@ -31,12 +31,14 @@ namespace ix
{
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
const int WebSocket::kDefaultHandShakeTimeoutSecs(60);
const int WebSocket::kDefaultHeartBeatPeriod(-1);
WebSocket::WebSocket() :
_onMessageCallback(OnMessageCallback()),
_stop(false),
_automaticReconnection(true),
_handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
_handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs),
_heartBeatPeriod(kDefaultHeartBeatPeriod)
{
_ws.setOnCloseCallback(
[this](uint16_t code, const std::string& reason, size_t wireSize)
@ -48,7 +50,7 @@ namespace ix
);
}
WebSocket::~WebSocket()
WebSocket::~WebSocket()
{
stop();
}
@ -77,6 +79,18 @@ namespace ix
return _perMessageDeflateOptions;
}
void WebSocket::setHeartBeatPeriod(int hearBeatPeriod)
{
std::lock_guard<std::mutex> lock(_configMutex);
_heartBeatPeriod = hearBeatPeriod;
}
int WebSocket::getHeartBeatPeriod() const
{
std::lock_guard<std::mutex> lock(_configMutex);
return _heartBeatPeriod;
}
void WebSocket::start()
{
if (_thread.joinable()) return; // we've already been started
@ -110,7 +124,8 @@ namespace ix
{
{
std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions);
_ws.configure(_perMessageDeflateOptions,
_heartBeatPeriod);
}
WebSocketInitResult status = _ws.connectToUrl(_url, timeoutSecs);
@ -120,7 +135,7 @@ namespace ix
}
_onMessageCallback(WebSocket_MessageType_Open, "", 0,
WebSocketErrorInfo(),
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
return status;
@ -130,7 +145,7 @@ namespace ix
{
{
std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions);
_ws.configure(_perMessageDeflateOptions, _heartBeatPeriod);
}
WebSocketInitResult status = _ws.connectToSocket(fd, timeoutSecs);
@ -140,7 +155,7 @@ namespace ix
}
_onMessageCallback(WebSocket_MessageType_Open, "", 0,
WebSocketErrorInfo(),
WebSocketErrorInfo(),
WebSocketOpenInfo(status.uri, status.headers),
WebSocketCloseInfo());
return status;
@ -169,7 +184,7 @@ namespace ix
using millis = std::chrono::duration<double, std::milli>;
millis duration;
while (true)
while (true)
{
if (isConnected() || isClosing() || _stop || !_automaticReconnection)
{
@ -199,7 +214,7 @@ namespace ix
{
setThreadName(_url);
while (true)
while (true)
{
if (_stop) return;
@ -208,7 +223,7 @@ namespace ix
if (_stop) return;
// 2. Poll to see if there's any new data available
// 2. Poll to see if there's any new data available
_ws.poll();
if (_stop) return;
@ -258,7 +273,7 @@ namespace ix
void WebSocket::setOnMessageCallback(const OnMessageCallback& callback)
{
_onMessageCallback = callback;
_onMessageCallback = callback;
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
@ -279,9 +294,10 @@ namespace ix
}
}
WebSocketSendInfo WebSocket::send(const std::string& text)
WebSocketSendInfo WebSocket::send(const std::string& text,
const OnProgressCallback& onProgressCallback)
{
return sendMessage(text, false);
return sendMessage(text, false, onProgressCallback);
}
WebSocketSendInfo WebSocket::ping(const std::string& text)
@ -293,7 +309,9 @@ namespace ix
return sendMessage(text, true);
}
WebSocketSendInfo WebSocket::sendMessage(const std::string& text, bool ping)
WebSocketSendInfo WebSocket::sendMessage(const std::string& text,
bool ping,
const OnProgressCallback& onProgressCallback)
{
if (!isConnected()) return WebSocketSendInfo(false);
@ -315,7 +333,7 @@ namespace ix
}
else
{
webSocketSendInfo = _ws.sendBinary(text);
webSocketSendInfo = _ws.sendBinary(text, onProgressCallback);
}
WebSocket::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize, false);
@ -325,12 +343,13 @@ namespace ix
ReadyState WebSocket::getReadyState() const
{
switch (_ws.getReadyState())
switch (_ws.getReadyState())
{
case ix::WebSocketTransport::OPEN: return WebSocket_ReadyState_Open;
case ix::WebSocketTransport::CONNECTING: return WebSocket_ReadyState_Connecting;
case ix::WebSocketTransport::CLOSING: return WebSocket_ReadyState_Closing;
case ix::WebSocketTransport::CLOSED: return WebSocket_ReadyState_Closed;
default: return WebSocket_ReadyState_Closed;
}
}
@ -342,6 +361,7 @@ namespace ix
case WebSocket_ReadyState_Connecting: return "CONNECTING";
case WebSocket_ReadyState_Closing: return "CLOSING";
case WebSocket_ReadyState_Closed: return "CLOSED";
default: return "CLOSED";
}
}

View File

@ -19,11 +19,12 @@
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include "IXWebSocketHttpHeaders.h"
#include "IXProgressCallback.h"
namespace ix
{
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket#Ready_state_constants
enum ReadyState
enum ReadyState
{
WebSocket_ReadyState_Connecting = 0,
WebSocket_ReadyState_Open = 1,
@ -60,7 +61,7 @@ namespace ix
uint16_t code;
std::string reason;
WebSocketCloseInfo(uint64_t c = 0,
WebSocketCloseInfo(uint16_t c = 0,
const std::string& r = std::string())
: code(c)
, reason(r)
@ -78,7 +79,7 @@ namespace ix
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class WebSocket
class WebSocket
{
public:
WebSocket();
@ -86,7 +87,8 @@ namespace ix
void setUrl(const std::string& url);
void setPerMessageDeflateOptions(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void setHandshakeTimeout(int _handshakeTimeoutSecs);
void setHandshakeTimeout(int handshakeTimeoutSecs);
void setHeartBeatPeriod(int hearBeatPeriod);
// Run asynchronously, by calling start and stop.
void start();
@ -96,7 +98,8 @@ namespace ix
WebSocketInitResult connect(int timeoutSecs);
void run();
WebSocketSendInfo send(const std::string& text);
WebSocketSendInfo send(const std::string& text,
const OnProgressCallback& onProgressCallback = nullptr);
WebSocketSendInfo ping(const std::string& text);
void close();
@ -107,13 +110,16 @@ namespace ix
ReadyState getReadyState() const;
const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const;
void enableAutomaticReconnection();
void disableAutomaticReconnection();
private:
WebSocketSendInfo sendMessage(const std::string& text, bool ping);
WebSocketSendInfo sendMessage(const std::string& text,
bool ping,
const OnProgressCallback& callback = nullptr);
bool isConnected() const;
bool isClosing() const;
@ -142,6 +148,10 @@ namespace ix
std::atomic<int> _handshakeTimeoutSecs;
static const int kDefaultHandShakeTimeoutSecs;
// Optional Heartbeat
int _heartBeatPeriod;
static const int kDefaultHeartBeatPeriod;
friend class WebSocketServer;
};
}

View File

@ -8,7 +8,7 @@
#include <string>
namespace ix
namespace ix
{
struct WebSocketErrorInfo
{

View File

@ -16,7 +16,7 @@
#include <algorithm>
namespace ix
namespace ix
{
WebSocketHandshake::WebSocketHandshake(std::atomic<bool>& requestInitCancellation,
std::shared_ptr<Socket> socket,
@ -116,7 +116,7 @@ namespace ix
std::cout << "-------------------------------" << std::endl;
}
std::string trim(const std::string& str)
std::string WebSocketHandshake::trim(const std::string& str)
{
std::string out(str);
out.erase(std::remove(out.begin(), out.end(), ' '), out.end());
@ -125,6 +125,16 @@ namespace ix
return out;
}
bool WebSocketHandshake::insensitiveStringCompare(const std::string& a, const std::string& b)
{
return std::equal(a.begin(), a.end(),
b.begin(), b.end(),
[](char a, char b)
{
return tolower(a) == tolower(b);
});
}
std::tuple<std::string, std::string, std::string> WebSocketHandshake::parseRequestLine(const std::string& line)
{
// Request-Line = Method SP Request-URI SP HTTP-Version CRLF
@ -161,7 +171,7 @@ namespace ix
std::string WebSocketHandshake::genRandomString(const int len)
{
std::string alphanum =
std::string alphanum =
"0123456789"
"ABCDEFGH"
"abcdefgh";
@ -191,7 +201,7 @@ namespace ix
char line[256];
int i;
while (true)
while (true)
{
int colon = 0;
@ -267,7 +277,7 @@ namespace ix
{
_requestInitCancellation = false;
auto isCancellationRequested =
auto isCancellationRequested =
makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string errMsg;
@ -354,6 +364,23 @@ namespace ix
return WebSocketInitResult(false, status, "Error parsing HTTP headers");
}
// Check the presence of the connection field
if (headers.find("connection") == headers.end())
{
std::string errorMsg("Missing connection value");
return WebSocketInitResult(false, status, errorMsg);
}
// Check the value of the connection field
// Some websocket servers (Go/Gorilla?) send lowercase values for the
// connection header, so do a case insensitive comparison
if (!insensitiveStringCompare(headers["connection"], "Upgrade"))
{
std::stringstream ss;
ss << "Invalid connection value: " << headers["connection"];
return WebSocketInitResult(false, status, ss.str());
}
char output[29] = {};
WebSocketHandshakeKeyGen::generate(secWebSocketKey.c_str(), output);
if (std::string(output) != headers["sec-websocket-accept"])
@ -391,7 +418,7 @@ namespace ix
// Set the socket to non blocking mode + other tweaks
SocketConnect::configure(fd);
auto isCancellationRequested =
auto isCancellationRequested =
makeCancellationRequestWithTimeout(timeoutSecs, _requestInitCancellation);
std::string remote = std::string("remote fd ") + std::to_string(fd);
@ -405,7 +432,7 @@ namespace ix
{
return sendErrorResponse(400, "Error reading HTTP request line");
}
// Validate request line (GET /foo HTTP/1.1\r\n)
auto requestLine = parseRequestLine(line);
auto method = std::get<0>(requestLine);
@ -466,6 +493,8 @@ namespace ix
std::stringstream ss;
ss << "HTTP/1.1 101\r\n";
ss << "Sec-WebSocket-Accept: " << std::string(output) << "\r\n";
ss << "Upgrade: websocket\r\n";
ss << "Connection: Upgrade\r\n";
// Parse the client headers. Does it support deflate ?
std::string header = headers["sec-websocket-extensions"];

View File

@ -18,7 +18,7 @@
#include <memory>
#include <tuple>
namespace ix
namespace ix
{
struct WebSocketInitResult
{
@ -75,6 +75,8 @@ namespace ix
WebSocketInitResult sendErrorResponse(int code, const std::string& reason);
std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line);
std::string trim(const std::string& str);
bool insensitiveStringCompare(const std::string& a, const std::string& b);
std::atomic<bool>& _requestInitCancellation;
std::shared_ptr<Socket> _socket;

View File

@ -9,7 +9,7 @@
#include <string>
#include <unordered_map>
namespace ix
namespace ix
{
using WebSocketHttpHeaders = std::unordered_map<std::string, std::string>;
}

View File

@ -34,7 +34,7 @@
* - Reused zlib compression + decompression bits.
* - Refactored to have 2 class for compression and decompression, to allow multi-threading
* and make sure that _compressBuffer is not shared between threads.
* - Original code wasn't working for some reason, I had to add checks
* - Original code wasn't working for some reason, I had to add checks
* for the presence of the kEmptyUncompressedBlock at the end of buffer so that servers
* would start accepting receiving/decoding compressed messages. Original code was probably
* modifying the passed in buffers before processing in enabled.hpp ?
@ -47,222 +47,31 @@
#include "IXWebSocketPerMessageDeflate.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include <iostream>
#include <cassert>
#include <string.h>
namespace
{
// The passed in size (4) is important, without it the string litteral
// is treated as a char* and the null termination (\x00) makes it
// look like an empty string.
const std::string kEmptyUncompressedBlock = std::string("\x00\x00\xff\xff", 4);
const int kBufferSize = 1 << 14;
}
#include "IXWebSocketPerMessageDeflateCodec.h"
namespace ix
{
//
// Compressor
//
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
: _compressBufferSize(kBufferSize)
WebSocketPerMessageDeflate::WebSocketPerMessageDeflate() :
_compressor(std::make_unique<WebSocketPerMessageDeflateCompressor>()),
_decompressor(std::make_unique<WebSocketPerMessageDeflateDecompressor>())
{
memset(&_deflateState, 0, sizeof(_deflateState));
_deflateState.zalloc = Z_NULL;
_deflateState.zfree = Z_NULL;
_deflateState.opaque = Z_NULL;
}
WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor()
{
deflateEnd(&_deflateState);
}
bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits,
bool clientNoContextTakeOver)
{
int ret = deflateInit2(
&_deflateState,
Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
-1*deflateBits,
4, // memory level 1-9
Z_DEFAULT_STRATEGY
);
if (ret != Z_OK) return false;
_compressBuffer.reset(new unsigned char[_compressBufferSize]);
_flush = (clientNoContextTakeOver)
? Z_FULL_FLUSH
: Z_SYNC_FLUSH;
return true;
}
bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value,
const std::string& ending)
{
if (ending.size() > value.size()) return false;
return std::equal(ending.rbegin(), ending.rend(), value.rbegin());
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in,
std::string& out)
{
//
// 7.2.1. Compression
//
// An endpoint uses the following algorithm to compress a message.
//
// 1. Compress all the octets of the payload of the message using
// DEFLATE.
//
// 2. If the resulting data does not end with an empty DEFLATE block
// with no compression (the "BTYPE" bits are set to 00), append an
// empty DEFLATE block with no compression to the tail end.
//
// 3. Remove 4 octets (that are 0x00 0x00 0xff 0xff) from the tail end.
// After this step, the last octet of the compressed data contains
// (possibly part of) the DEFLATE header bits with the "BTYPE" bits
// set to 00.
//
size_t output;
if (in.empty())
{
uint8_t buf[6] = {0x02, 0x00, 0x00, 0x00, 0xff, 0xff};
out.append((char *)(buf), 6);
return true;
}
_deflateState.avail_in = (uInt) in.size();
_deflateState.next_in = (Bytef*) in.data();
do
{
// Output to local buffer
_deflateState.avail_out = (uInt) _compressBufferSize;
_deflateState.next_out = _compressBuffer.get();
deflate(&_deflateState, _flush);
output = _compressBufferSize - _deflateState.avail_out;
out.append((char *)(_compressBuffer.get()),output);
} while (_deflateState.avail_out == 0);
if (endsWith(out, kEmptyUncompressedBlock))
{
out.resize(out.size() - 4);
}
return true;
}
//
// Decompressor
//
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
: _compressBufferSize(kBufferSize)
{
memset(&_inflateState, 0, sizeof(_inflateState));
_inflateState.zalloc = Z_NULL;
_inflateState.zfree = Z_NULL;
_inflateState.opaque = Z_NULL;
_inflateState.avail_in = 0;
_inflateState.next_in = Z_NULL;
}
WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor()
{
inflateEnd(&_inflateState);
}
bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits,
bool clientNoContextTakeOver)
{
int ret = inflateInit2(
&_inflateState,
-1*inflateBits
);
if (ret != Z_OK) return false;
_compressBuffer.reset(new unsigned char[_compressBufferSize]);
_flush = (clientNoContextTakeOver)
? Z_FULL_FLUSH
: Z_SYNC_FLUSH;
return true;
}
bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in,
std::string& out)
{
//
// 7.2.2. Decompression
//
// An endpoint uses the following algorithm to decompress a message.
//
// 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the
// payload of the message.
//
// 2. Decompress the resulting data using DEFLATE.
//
std::string inFixed(in);
inFixed += kEmptyUncompressedBlock;
_inflateState.avail_in = (uInt) inFixed.size();
_inflateState.next_in = (unsigned char *)(const_cast<char *>(inFixed.data()));
do
{
_inflateState.avail_out = (uInt) _compressBufferSize;
_inflateState.next_out = _compressBuffer.get();
int ret = inflate(&_inflateState, Z_SYNC_FLUSH);
if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
{
return false; // zlib error
}
out.append(
reinterpret_cast<char *>(_compressBuffer.get()),
_compressBufferSize - _inflateState.avail_out
);
} while (_inflateState.avail_out == 0);
return true;
}
WebSocketPerMessageDeflate::WebSocketPerMessageDeflate()
{
_compressor.reset(new WebSocketPerMessageDeflateCompressor());
_decompressor.reset(new WebSocketPerMessageDeflateDecompressor());
;
}
WebSocketPerMessageDeflate::~WebSocketPerMessageDeflate()
{
_compressor.reset();
_decompressor.reset();
;
}
bool WebSocketPerMessageDeflate::init(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions)
{
bool clientNoContextTakeover =
bool clientNoContextTakeover =
perMessageDeflateOptions.getClientNoContextTakeover();
uint8_t deflateBits = perMessageDeflateOptions.getClientMaxWindowBits();
uint8_t inflateBits = perMessageDeflateOptions.getServerMaxWindowBits();
return _compressor->init(deflateBits, clientNoContextTakeover) &&
return _compressor->init(deflateBits, clientNoContextTakeover) &&
_decompressor->init(inflateBits, clientNoContextTakeover);
}

View File

@ -34,47 +34,14 @@
#pragma once
#include "zlib.h"
#include <string>
#include <memory>
namespace ix
namespace ix
{
class WebSocketPerMessageDeflateOptions;
class WebSocketPerMessageDeflateCompressor
{
public:
WebSocketPerMessageDeflateCompressor();
~WebSocketPerMessageDeflateCompressor();
bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
bool compress(const std::string& in, std::string& out);
private:
static bool endsWith(const std::string& value, const std::string& ending);
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
z_stream _deflateState;
};
class WebSocketPerMessageDeflateDecompressor
{
public:
WebSocketPerMessageDeflateDecompressor();
~WebSocketPerMessageDeflateDecompressor();
bool init(uint8_t inflateBits, bool clientNoContextTakeOver);
bool decompress(const std::string& in, std::string& out);
private:
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
z_stream _inflateState;
};
class WebSocketPerMessageDeflateCompressor;
class WebSocketPerMessageDeflateDecompressor;
class WebSocketPerMessageDeflate
{
@ -87,7 +54,7 @@ namespace ix
bool decompress(const std::string& in, std::string& out);
private:
std::shared_ptr<WebSocketPerMessageDeflateCompressor> _compressor;
std::shared_ptr<WebSocketPerMessageDeflateDecompressor> _decompressor;
std::unique_ptr<WebSocketPerMessageDeflateCompressor> _compressor;
std::unique_ptr<WebSocketPerMessageDeflateDecompressor> _decompressor;
};
}

View File

@ -0,0 +1,206 @@
/*
* IXWebSocketPerMessageDeflateCodec.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketPerMessageDeflateCodec.h"
#include "IXWebSocketPerMessageDeflateOptions.h"
#include <iostream>
#include <cassert>
#include <string.h>
namespace
{
// The passed in size (4) is important, without it the string litteral
// is treated as a char* and the null termination (\x00) makes it
// look like an empty string.
const std::string kEmptyUncompressedBlock = std::string("\x00\x00\xff\xff", 4);
const int kBufferSize = 1 << 14;
}
namespace ix
{
//
// Compressor
//
WebSocketPerMessageDeflateCompressor::WebSocketPerMessageDeflateCompressor()
: _compressBufferSize(kBufferSize)
{
memset(&_deflateState, 0, sizeof(_deflateState));
_deflateState.zalloc = Z_NULL;
_deflateState.zfree = Z_NULL;
_deflateState.opaque = Z_NULL;
}
WebSocketPerMessageDeflateCompressor::~WebSocketPerMessageDeflateCompressor()
{
deflateEnd(&_deflateState);
}
bool WebSocketPerMessageDeflateCompressor::init(uint8_t deflateBits,
bool clientNoContextTakeOver)
{
int ret = deflateInit2(
&_deflateState,
Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
-1*deflateBits,
4, // memory level 1-9
Z_DEFAULT_STRATEGY
);
if (ret != Z_OK) return false;
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
_flush = (clientNoContextTakeOver)
? Z_FULL_FLUSH
: Z_SYNC_FLUSH;
return true;
}
bool WebSocketPerMessageDeflateCompressor::endsWith(const std::string& value,
const std::string& ending)
{
if (ending.size() > value.size()) return false;
return std::equal(ending.rbegin(), ending.rend(), value.rbegin());
}
bool WebSocketPerMessageDeflateCompressor::compress(const std::string& in,
std::string& out)
{
//
// 7.2.1. Compression
//
// An endpoint uses the following algorithm to compress a message.
//
// 1. Compress all the octets of the payload of the message using
// DEFLATE.
//
// 2. If the resulting data does not end with an empty DEFLATE block
// with no compression (the "BTYPE" bits are set to 00), append an
// empty DEFLATE block with no compression to the tail end.
//
// 3. Remove 4 octets (that are 0x00 0x00 0xff 0xff) from the tail end.
// After this step, the last octet of the compressed data contains
// (possibly part of) the DEFLATE header bits with the "BTYPE" bits
// set to 00.
//
size_t output;
if (in.empty())
{
uint8_t buf[6] = {0x02, 0x00, 0x00, 0x00, 0xff, 0xff};
out.append((char *)(buf), 6);
return true;
}
_deflateState.avail_in = (uInt) in.size();
_deflateState.next_in = (Bytef*) in.data();
do
{
// Output to local buffer
_deflateState.avail_out = (uInt) _compressBufferSize;
_deflateState.next_out = _compressBuffer.get();
deflate(&_deflateState, _flush);
output = _compressBufferSize - _deflateState.avail_out;
out.append((char *)(_compressBuffer.get()),output);
} while (_deflateState.avail_out == 0);
if (endsWith(out, kEmptyUncompressedBlock))
{
out.resize(out.size() - 4);
}
return true;
}
//
// Decompressor
//
WebSocketPerMessageDeflateDecompressor::WebSocketPerMessageDeflateDecompressor()
: _compressBufferSize(kBufferSize)
{
memset(&_inflateState, 0, sizeof(_inflateState));
_inflateState.zalloc = Z_NULL;
_inflateState.zfree = Z_NULL;
_inflateState.opaque = Z_NULL;
_inflateState.avail_in = 0;
_inflateState.next_in = Z_NULL;
}
WebSocketPerMessageDeflateDecompressor::~WebSocketPerMessageDeflateDecompressor()
{
inflateEnd(&_inflateState);
}
bool WebSocketPerMessageDeflateDecompressor::init(uint8_t inflateBits,
bool clientNoContextTakeOver)
{
int ret = inflateInit2(
&_inflateState,
-1*inflateBits
);
if (ret != Z_OK) return false;
_compressBuffer = std::make_unique<unsigned char[]>(_compressBufferSize);
_flush = (clientNoContextTakeOver)
? Z_FULL_FLUSH
: Z_SYNC_FLUSH;
return true;
}
bool WebSocketPerMessageDeflateDecompressor::decompress(const std::string& in,
std::string& out)
{
//
// 7.2.2. Decompression
//
// An endpoint uses the following algorithm to decompress a message.
//
// 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the
// payload of the message.
//
// 2. Decompress the resulting data using DEFLATE.
//
std::string inFixed(in);
inFixed += kEmptyUncompressedBlock;
_inflateState.avail_in = (uInt) inFixed.size();
_inflateState.next_in = (unsigned char *)(const_cast<char *>(inFixed.data()));
do
{
_inflateState.avail_out = (uInt) _compressBufferSize;
_inflateState.next_out = _compressBuffer.get();
int ret = inflate(&_inflateState, Z_SYNC_FLUSH);
if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR)
{
return false; // zlib error
}
out.append(
reinterpret_cast<char *>(_compressBuffer.get()),
_compressBufferSize - _inflateState.avail_out
);
} while (_inflateState.avail_out == 0);
return true;
}
}

View File

@ -0,0 +1,50 @@
/*
* IXWebSocketPerMessageDeflateCodec.h
* Author: Benjamin Sergeant
* Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "zlib.h"
#include <string>
#include <memory>
namespace ix
{
class WebSocketPerMessageDeflateCompressor
{
public:
WebSocketPerMessageDeflateCompressor();
~WebSocketPerMessageDeflateCompressor();
bool init(uint8_t deflateBits, bool clientNoContextTakeOver);
bool compress(const std::string& in, std::string& out);
private:
static bool endsWith(const std::string& value, const std::string& ending);
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
z_stream _deflateState;
};
class WebSocketPerMessageDeflateDecompressor
{
public:
WebSocketPerMessageDeflateDecompressor();
~WebSocketPerMessageDeflateDecompressor();
bool init(uint8_t inflateBits, bool clientNoContextTakeOver);
bool decompress(const std::string& in, std::string& out);
private:
int _flush;
size_t _compressBufferSize;
std::unique_ptr<unsigned char[]> _compressBuffer;
z_stream _inflateState;
};
}

View File

@ -9,6 +9,7 @@
#include <sstream>
#include <iostream>
#include <algorithm>
#include <cctype>
namespace ix
{
@ -35,7 +36,7 @@ namespace ix
_serverMaxWindowBits = serverMaxWindowBits;
}
//
//
// Four extension parameters are defined for "permessage-deflate" to
// help endpoints manage per-connection resource usage.
//
@ -87,9 +88,9 @@ namespace ix
int x;
ss >> x;
// Sanitize values to be in the proper range [8, 15] in
// Sanitize values to be in the proper range [8, 15] in
// case a server would give us bogus values
_serverMaxWindowBits =
_serverMaxWindowBits =
std::min(maxServerMaxWindowBits,
std::max(x, minServerMaxWindowBits));
}
@ -102,9 +103,9 @@ namespace ix
int x;
ss >> x;
// Sanitize values to be in the proper range [8, 15] in
// Sanitize values to be in the proper range [8, 15] in
// case a server would give us bogus values
_clientMaxWindowBits =
_clientMaxWindowBits =
std::min(maxClientMaxWindowBits,
std::max(x, minClientMaxWindowBits));
}
@ -161,7 +162,7 @@ namespace ix
std::string WebSocketPerMessageDeflateOptions::removeSpaces(const std::string& str)
{
std::string out(str);
out.erase(std::remove_if(out.begin(),
out.erase(std::remove_if(out.begin(),
out.end(),
[](unsigned char x){ return std::isspace(x); }),
out.end());

View File

@ -8,7 +8,7 @@
#include <string>
namespace ix
namespace ix
{
class WebSocketPerMessageDeflateOptions
{

View File

@ -9,7 +9,7 @@
#include <string>
#include <iostream>
namespace ix
namespace ix
{
struct WebSocketSendInfo
{

View File

@ -8,35 +8,22 @@
#include "IXWebSocketTransport.h"
#include "IXWebSocket.h"
#include "IXSocketConnect.h"
#include "IXNetSystem.h"
#include <sstream>
#include <future>
#include <netdb.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <string.h>
namespace ix
namespace ix
{
const int WebSocketServer::kDefaultPort(8080);
const std::string WebSocketServer::kDefaultHost("127.0.0.1");
const int WebSocketServer::kDefaultTcpBacklog(5);
const size_t WebSocketServer::kDefaultMaxConnections(32);
const int WebSocketServer::kDefaultHandShakeTimeoutSecs(3); // 3 seconds
WebSocketServer::WebSocketServer(int port,
const std::string& host,
int backlog,
size_t maxConnections,
int handshakeTimeoutSecs) :
_port(port),
_host(host),
_backlog(backlog),
_maxConnections(maxConnections),
_handshakeTimeoutSecs(handshakeTimeoutSecs),
_stop(false)
int handshakeTimeoutSecs) : SocketServer(port, host, backlog, maxConnections),
_handshakeTimeoutSecs(handshakeTimeoutSecs)
{
}
@ -46,189 +33,25 @@ namespace ix
stop();
}
void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback)
{
_onConnectionCallback = callback;
}
void WebSocketServer::logError(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl;
}
void WebSocketServer::logInfo(const std::string& str)
{
std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl;
}
std::pair<bool, std::string> WebSocketServer::listen()
{
struct sockaddr_in server; // server address information
// Get a socket for accepting connections.
if ((_serverFd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error creating socket): "
<< strerror(errno);
return std::make_pair(false, ss.str());
}
// Make that socket reusable. (allow restarting this server at will)
int enable = 1;
if (setsockopt(_serverFd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error calling setsockopt(SO_REUSEADDR): "
<< strerror(errno);
return std::make_pair(false, ss.str());
}
// Bind the socket to the server address.
server.sin_family = AF_INET;
server.sin_port = htons(_port);
// Using INADDR_ANY trigger a pop-up box as binding to any address is detected
// by the osx firewall. We need to codesign the binary with a self-signed cert
// to allow that, but this is a bit of a pain. (this is what node or python would do).
//
// Using INADDR_LOOPBACK also does not work ... while it should.
// We default to 127.0.0.1 (localhost)
//
server.sin_addr.s_addr = inet_addr(_host.c_str());
if (bind(_serverFd, (struct sockaddr *)&server, sizeof(server)) < 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error calling bind: "
<< strerror(errno);
return std::make_pair(false, ss.str());
}
/*
* Listen for connections. Specify the tcp backlog.
*/
if (::listen(_serverFd, _backlog) != 0)
{
std::stringstream ss;
ss << "WebSocketServer::listen() error calling listen: "
<< strerror(errno);
return std::make_pair(false, ss.str());
}
return std::make_pair(true, "");
}
void WebSocketServer::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&WebSocketServer::run, this);
}
void WebSocketServer::wait()
{
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_conditionVariable.wait(lock);
}
void WebSocketServer::stop()
{
if (!_thread.joinable()) return; // nothing to do
auto clients = getClients();
for (auto client : clients)
{
client->close();
}
_stop = true;
_thread.join();
_stop = false;
_conditionVariable.notify_one();
SocketServer::stop();
}
void WebSocketServer::run()
void WebSocketServer::setOnConnectionCallback(const OnConnectionCallback& callback)
{
// Set the socket to non blocking mode, so that accept calls are not blocking
SocketConnect::configure(_serverFd);
// Return value of std::async, ignored
std::future<void> f;
// Select arguments
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms
for (;;)
{
if (_stop) return;
FD_ZERO(&rfds);
FD_SET(_serverFd, &rfds);
select(_serverFd + 1, &rfds, nullptr, nullptr, &timeout);
if (!FD_ISSET(_serverFd, &rfds))
{
// We reached the select timeout, and no new connections are pending
continue;
}
// Accept a connection.
struct sockaddr_in client; // client address information
int clientFd; // socket connected to client
socklen_t addressLen = sizeof(socklen_t);
memset(&client, 0, sizeof(client));
if ((clientFd = accept(_serverFd, (struct sockaddr *)&client, &addressLen)) < 0)
{
if (errno != EWOULDBLOCK)
{
// FIXME: that error should be propagated
std::stringstream ss;
ss << "WebSocketServer::run() error accepting connection: "
<< strerror(errno);
logError(ss.str());
}
continue;
}
if (getConnectedClientsCount() >= _maxConnections)
{
std::stringstream ss;
ss << "WebSocketServer::run() reached max connections = "
<< _maxConnections << ". "
<< "Not accepting connection";
logError(ss.str());
::close(clientFd);
continue;
}
// Launch the handleConnection work asynchronously in its own thread.
//
// the destructor of a future returned by std::async blocks,
// so we need to declare it outside of this loop
f = std::async(std::launch::async,
&WebSocketServer::handleConnection,
this,
clientFd);
}
_onConnectionCallback = callback;
}
void WebSocketServer::handleConnection(int fd)
{
std::shared_ptr<WebSocket> webSocket(new WebSocket);
auto webSocket = std::make_shared<WebSocket>();
_onConnectionCallback(webSocket);
webSocket->disableAutomaticReconnection();
@ -242,7 +65,7 @@ namespace ix
auto status = webSocket->connectToSocket(fd, _handshakeTimeoutSecs);
if (status.success)
{
// Process incoming messages and execute callbacks
// Process incoming messages and execute callbacks
// until the connection is closed
webSocket->run();
}
@ -276,6 +99,7 @@ namespace ix
size_t WebSocketServer::getConnectedClientsCount()
{
return getClients().size();
std::lock_guard<std::mutex> lock(_clientsMutex);
return _clients.size();
}
}

View File

@ -16,67 +16,40 @@
#include <condition_variable>
#include "IXWebSocket.h"
#include "IXSocketServer.h"
namespace ix
namespace ix
{
using OnConnectionCallback = std::function<void(std::shared_ptr<WebSocket>)>;
class WebSocketServer {
class WebSocketServer : public SocketServer {
public:
WebSocketServer(int port = WebSocketServer::kDefaultPort,
const std::string& host = WebSocketServer::kDefaultHost,
int backlog = WebSocketServer::kDefaultTcpBacklog,
size_t maxConnections = WebSocketServer::kDefaultMaxConnections,
WebSocketServer(int port = SocketServer::kDefaultPort,
const std::string& host = SocketServer::kDefaultHost,
int backlog = SocketServer::kDefaultTcpBacklog,
size_t maxConnections = SocketServer::kDefaultMaxConnections,
int handshakeTimeoutSecs = WebSocketServer::kDefaultHandShakeTimeoutSecs);
virtual ~WebSocketServer();
virtual void stop() final;
void setOnConnectionCallback(const OnConnectionCallback& callback);
void start();
void wait();
void stop();
std::pair<bool, std::string> listen();
// Get all the connected clients
std::set<std::shared_ptr<WebSocket>> getClients();
private:
// Member variables
int _port;
std::string _host;
int _backlog;
size_t _maxConnections;
int _handshakeTimeoutSecs;
OnConnectionCallback _onConnectionCallback;
// socket for accepting connections
int _serverFd;
std::mutex _clientsMutex;
std::set<std::shared_ptr<WebSocket>> _clients;
std::mutex _logMutex;
std::atomic<bool> _stop;
std::thread _thread;
std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex;
const static int kDefaultPort;
const static std::string kDefaultHost;
const static int kDefaultTcpBacklog;
const static size_t kDefaultMaxConnections;
const static int kDefaultHandShakeTimeoutSecs;
// Methods
void run();
void handleConnection(int fd);
size_t getConnectedClientsCount();
// Logging
void logError(const std::string& str);
void logInfo(const std::string& str);
virtual void handleConnection(int fd) final;
virtual size_t getConnectedClientsCount() final;
};
}

View File

@ -29,18 +29,26 @@
#include <cstdarg>
#include <iostream>
#include <sstream>
#include <chrono>
#include <thread>
namespace ix
{
const std::string WebSocketTransport::kHeartBeatPingMessage("ixwebsocket::hearbeat");
const int WebSocketTransport::kDefaultHeartBeatPeriod(-1);
constexpr size_t WebSocketTransport::kChunkSize;
WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED),
_closeCode(0),
_closeWireSize(0),
_enablePerMessageDeflate(false),
_requestInitCancellation(false)
_requestInitCancellation(false),
_heartBeatPeriod(kDefaultHeartBeatPeriod),
_lastSendTimePoint(std::chrono::steady_clock::now())
{
_readbuf.resize(kChunkSize);
}
WebSocketTransport::~WebSocketTransport()
@ -48,10 +56,12 @@ namespace ix
;
}
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions)
void WebSocketTransport::configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod)
{
_perMessageDeflateOptions = perMessageDeflateOptions;
_enablePerMessageDeflate = _perMessageDeflateOptions.enabled();
_heartBeatPeriod = hearBeatPeriod;
}
// Client
@ -122,7 +132,7 @@ namespace ix
return result;
}
WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const
WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const
{
return _readyState;
}
@ -146,47 +156,66 @@ namespace ix
void WebSocketTransport::setOnCloseCallback(const OnCloseCallback& onCloseCallback)
{
_onCloseCallback = onCloseCallback;
_onCloseCallback = onCloseCallback;
}
// Only consider send time points for that computation.
// The receive time points is taken into account in Socket::poll (second parameter).
bool WebSocketTransport::heartBeatPeriodExceeded()
{
std::lock_guard<std::mutex> lock(_lastSendTimePointMutex);
auto now = std::chrono::steady_clock::now();
return now - _lastSendTimePoint > std::chrono::seconds(_heartBeatPeriod);
}
void WebSocketTransport::poll()
{
_socket->poll(
[this]()
[this](PollResultType pollResult)
{
while (true)
// If (1) heartbeat is enabled, and (2) no data was received or
// send for a duration exceeding our heart-beat period, send a
// ping to the server.
if (pollResult == PollResultType_Timeout &&
heartBeatPeriodExceeded())
{
int N = (int) _rxbuf.size();
std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str());
return;
}
int ret;
_rxbuf.resize(N + 1500);
ret = _socket->recv((char*)&_rxbuf[0] + N, 1500);
while (true)
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN)) {
_rxbuf.resize(N);
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
else if (ret <= 0)
{
_rxbuf.resize(N);
_rxbuf.clear();
_socket->close();
setReadyState(CLOSED);
break;
}
else
else
{
_rxbuf.resize(N + ret);
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
}
}
if (isSendBufferEmpty() && _readyState == CLOSING)
if (isSendBufferEmpty() && _readyState == CLOSING)
{
_socket->close();
setReadyState(CLOSED);
}
});
},
_heartBeatPeriod);
}
bool WebSocketTransport::isSendBufferEmpty() const
@ -254,7 +283,7 @@ namespace ix
//
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback)
{
while (true)
while (true)
{
wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */
@ -266,7 +295,7 @@ namespace ix
ws.N0 = (data[1] & 0x7f);
ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0);
if (_rxbuf.size() < ws.header_size) return; /* Need: ws.header_size - _rxbuf.size() */
//
// Calculate payload length:
// 0-125 mean the payload is that long.
@ -304,7 +333,7 @@ namespace ix
// invalid payload length according to the spec. bail out
return;
}
if (ws.mask)
{
ws.masking_key[0] = ((uint8_t) data[i+0]) << 0;
@ -327,22 +356,40 @@ namespace ix
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
unmaskReceiveBuffer(ws);
_receivedData.insert(_receivedData.end(),
_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed
if (ws.fin)
{
// fire callback with a string message
std::string stringMessage(_receivedData.begin(),
_receivedData.end());
emitMessage(MSG, stringMessage, ws, onMessageCallback);
_receivedData.clear();
//
// Usual case. Small unfragmented messages
//
if (ws.fin && _chunks.empty())
{
emitMessage(MSG,
std::string(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t) ws.N),
ws,
onMessageCallback);
}
else
{
//
// Add intermediary message to our chunk list.
// We use a chunk list instead of a big buffer because resizing
// large buffer can be very costly when we need to re-allocate
// the internal buffer which is slow and can let the internal OS
// receive buffer fill out.
//
_chunks.emplace_back(
std::vector<uint8_t>(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N));
if (ws.fin)
{
emitMessage(MSG, getMergedChunks(), ws, onMessageCallback);
_chunks.clear();
}
}
}
else if (ws.opcode == wsheader_type::PING)
@ -392,12 +439,33 @@ namespace ix
close();
}
// Erase the message that has been processed from the input/read buffer
_rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N);
}
}
void WebSocketTransport::emitMessage(MessageKind messageKind,
std::string WebSocketTransport::getMergedChunks() const
{
size_t length = 0;
for (auto&& chunk : _chunks)
{
length += chunk.size();
}
std::string msg;
msg.reserve(length);
for (auto&& chunk : _chunks)
{
std::string str(chunk.begin(), chunk.end());
msg += str;
}
return msg;
}
void WebSocketTransport::emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
const OnMessageCallback& onMessageCallback)
@ -409,7 +477,7 @@ namespace ix
{
std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage);
onMessageCallback(decompressedMessage, wireSize, not success, messageKind);
onMessageCallback(decompressedMessage, wireSize, !success, messageKind);
}
else
{
@ -420,15 +488,17 @@ namespace ix
unsigned WebSocketTransport::getRandomUnsigned()
{
auto now = std::chrono::system_clock::now();
auto seconds =
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
return static_cast<unsigned>(seconds);
}
WebSocketSendInfo WebSocketTransport::sendData(wsheader_type::opcode_type type,
const std::string& message,
bool compress)
WebSocketSendInfo WebSocketTransport::sendData(
wsheader_type::opcode_type type,
const std::string& message,
bool compress,
const OnProgressCallback& onProgressCallback)
{
if (_readyState == CLOSING || _readyState == CLOSED)
{
@ -445,15 +515,81 @@ namespace ix
if (compress)
{
bool success = _perMessageDeflate.compress(message, compressedMessage);
compressionError = !success;
if (!_perMessageDeflate.compress(message, compressedMessage))
{
bool success = false;
compressionError = true;
payloadSize = 0;
wireSize = 0;
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
}
compressionError = false;
wireSize = compressedMessage.size();
message_begin = compressedMessage.begin();
message_end = compressedMessage.end();
}
uint64_t message_size = wireSize;
// Common case for most message. No fragmentation required.
if (wireSize < kChunkSize)
{
sendFragment(type, true, message_begin, message_end, compress);
}
else
{
//
// Large messages need to be fragmented
//
// Rules:
// First message needs to specify a proper type (BINARY or TEXT)
// Intermediary and last messages need to be of type CONTINUATION
// Last message must set the fin byte.
//
auto steps = wireSize / kChunkSize;
std::string::const_iterator begin = message_begin;
std::string::const_iterator end = message_end;
for (uint64_t i = 0 ; i < steps; ++i)
{
bool firstStep = i == 0;
bool lastStep = (i+1) == steps;
bool fin = lastStep;
end = begin + kChunkSize;
if (lastStep)
{
end = message_end;
}
auto opcodeType = type;
if (!firstStep)
{
opcodeType = wsheader_type::CONTINUATION;
}
// Send message
sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback(i, steps))
{
break;
}
begin += kChunkSize;
}
}
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
void WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator message_begin,
std::string::const_iterator message_end,
bool compress)
{
auto message_size = message_end - message_begin;
unsigned x = getRandomUnsigned();
uint8_t masking_key[4] = {};
@ -466,7 +602,13 @@ namespace ix
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0);
header[0] = 0x80 | type;
header[0] = type;
// The fin bit indicate that this is the last fragment. Fin is French for end.
if (fin)
{
header[0] |= 0x80;
}
// This bit indicate that the frame is compressed
if (compress)
@ -483,7 +625,7 @@ namespace ix
header[4] = masking_key[2];
header[5] = masking_key[3];
}
else if (message_size < 65536)
else if (message_size < 65536)
{
header[1] = 126 | 0x80;
header[2] = (message_size >> 8) & 0xff;
@ -518,8 +660,6 @@ namespace ix
// Now actually send this data
sendOnSocket();
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
}
WebSocketSendInfo WebSocketTransport::sendPing(const std::string& message)
@ -528,9 +668,13 @@ namespace ix
return sendData(wsheader_type::PING, message, compress);
}
WebSocketSendInfo WebSocketTransport::sendBinary(const std::string& message)
WebSocketSendInfo WebSocketTransport::sendBinary(
const std::string& message,
const OnProgressCallback& onProgressCallback)
{
return sendData(wsheader_type::BINARY_FRAME, message, _enablePerMessageDeflate);
return sendData(wsheader_type::BINARY_FRAME, message,
_enablePerMessageDeflate, onProgressCallback);
}
void WebSocketTransport::sendOnSocket()
@ -539,9 +683,9 @@ namespace ix
while (_txbuf.size())
{
int ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
ssize_t ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
@ -558,6 +702,9 @@ namespace ix
_txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret);
}
}
std::lock_guard<std::mutex> lck(_lastSendTimePointMutex);
_lastSendTimePoint = std::chrono::steady_clock::now();
}
void WebSocketTransport::close()

View File

@ -16,6 +16,7 @@
#include <memory>
#include <mutex>
#include <atomic>
#include <list>
#include "IXWebSocketSendInfo.h"
#include "IXWebSocketPerMessageDeflate.h"
@ -23,8 +24,9 @@
#include "IXWebSocketHttpHeaders.h"
#include "IXCancellationRequest.h"
#include "IXWebSocketHandshake.h"
#include "IXProgressCallback.h"
namespace ix
namespace ix
{
class Socket;
@ -57,7 +59,8 @@ namespace ix
WebSocketTransport();
~WebSocketTransport();
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions);
void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
int hearBeatPeriod);
WebSocketInitResult connectToUrl(const std::string& url, // Client
int timeoutSecs);
@ -65,7 +68,8 @@ namespace ix
int timeoutSecs);
void poll();
WebSocketSendInfo sendBinary(const std::string& message);
WebSocketSendInfo sendBinary(const std::string& message,
const OnProgressCallback& onProgressCallback);
WebSocketSendInfo sendPing(const std::string& message);
void close();
ReadyStateValues getReadyState() const;
@ -75,7 +79,6 @@ namespace ix
private:
std::string _url;
std::string _origin;
struct wsheader_type {
unsigned header_size;
@ -95,13 +98,31 @@ namespace ix
uint8_t masking_key[4];
};
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readbuf;
// Contains all messages that were fetched in the last socket read.
// This could be a mix of control messages (Close, Ping, etc...) and
// data messages. That buffer
std::vector<uint8_t> _rxbuf;
// Contains all messages that are waiting to be sent
std::vector<uint8_t> _txbuf;
mutable std::mutex _txbufMutex;
std::vector<uint8_t> _receivedData;
// Hold fragments for multi-fragments messages in a list. We support receiving very large
// messages (tested messages up to 700M) and we cannot put them in a single
// buffer that is resized, as this operation can be slow when a buffer has its
// size increased 2 fold, while appending to a list has a fixed cost.
std::list<std::vector<uint8_t>> _chunks;
// Fragments are 32K long
static constexpr size_t kChunkSize = 1 << 15;
// Underlying TCP socket
std::shared_ptr<Socket> _socket;
// Hold the state of the connection (OPEN, CLOSED, etc...)
std::atomic<ReadyStateValues> _readyState;
OnCloseCallback _onCloseCallback;
@ -110,6 +131,7 @@ namespace ix
size_t _closeWireSize;
mutable std::mutex _closeDataMutex;
// Data used for Per Message Deflate compression (with zlib)
WebSocketPerMessageDeflate _perMessageDeflate;
WebSocketPerMessageDeflateOptions _perMessageDeflateOptions;
std::atomic<bool> _enablePerMessageDeflate;
@ -117,12 +139,29 @@ namespace ix
// Used to cancel dns lookup + socket connect + http upgrade
std::atomic<bool> _requestInitCancellation;
void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message,
bool compress);
// Optional Heartbeat
int _heartBeatPeriod;
static const int kDefaultHeartBeatPeriod;
const static std::string kHeartBeatPingMessage;
mutable std::mutex _lastSendTimePointMutex;
std::chrono::time_point<std::chrono::steady_clock> _lastSendTimePoint;
void emitMessage(MessageKind messageKind,
// No data was send through the socket for longer that the hearbeat period
bool heartBeatPeriodExceeded();
void sendOnSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message,
bool compress,
const OnProgressCallback& onProgressCallback = nullptr);
void sendFragment(wsheader_type::opcode_type type,
bool fin,
std::string::const_iterator begin,
std::string::const_iterator end,
bool compress);
void emitMessage(MessageKind messageKind,
const std::string& message,
const wsheader_type& ws,
const OnMessageCallback& onMessageCallback);
@ -137,5 +176,7 @@ namespace ix
unsigned getRandomUnsigned();
void unmaskReceiveBuffer(const wsheader_type& ws);
std::string getMergedChunks() const;
};
}

View File

@ -0,0 +1,16 @@
/*
* IXSetThreadName_windows.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
#include "../IXSetThreadName.h"
#include <iostream>
namespace ix
{
void setThreadName(const std::string& name)
{
// FIXME
std::cerr << "setThreadName not implemented on Windows yet" << std::endl;
}
}

View File

@ -1,14 +1,21 @@
#
# This makefile is just used to easily work with docker (linux build)
#
all: run
all: brew
brew:
mkdir -p build && (cd build ; cmake .. ; make -j install)
.PHONY: docker
docker:
docker build -t ws_connect:latest .
docker build -t broadcast_server:latest .
run: docker
docker run --cap-add sys_ptrace -it ws_connect:latest bash
run:
docker run --cap-add sys_ptrace -it broadcast_server:latest bash
# this is helpful to remove trailing whitespaces
trail:
sh third_party/remove_trailing_whitespaces.sh
build:
(cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make)
@ -24,8 +31,17 @@ test_server:
(cd test && npm i ws && node broadcast-server.js)
# env TEST=Websocket_server make test
# env TEST=Websocket_chat make test
# env TEST=heartbeat make test
test:
(cd test && sh run.sh)
python test/run.py
# For the fork that is configured with appveyor
rebase_upstream:
git fetch upstream
git checkout master
git reset --hard upstream/master
git push origin master --force
.PHONY: test
.PHONY: build

View File

@ -2,28 +2,48 @@
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ixwebsocket_unittest)
set (CMAKE_CXX_STANDARD 11)
set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/../third_party/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH})
find_package(Sanitizers)
option(USE_TLS "Add TLS support" ON)
set (CMAKE_CXX_STANDARD 14)
if (NOT WIN32)
option(USE_TLS "Add TLS support" ON)
endif()
add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party/msgpack11
)
add_executable(ixwebsocket_unittest
# Shared sources
set (SOURCES
test_runner.cpp
cmd_websocket_chat.cpp
IXWebSocketServerTest.cpp
IXTest.cpp
msgpack11.cpp
../third_party/msgpack11/msgpack11.cpp
IXDNSLookupTest.cpp
IXSocketTest.cpp
)
# Some unittest don't work on windows yet
if (NOT WIN32)
list(APPEND SOURCES
IXWebSocketServerTest.cpp
IXWebSocketHeartBeatTest.cpp
cmd_websocket_chat.cpp
IXWebSocketTestConnectionDisconnection.cpp
)
endif()
add_executable(ixwebsocket_unittest ${SOURCES})
add_sanitizers(ixwebsocket_unittest)
if (APPLE AND USE_TLS)
target_link_libraries(ixwebsocket_unittest "-framework foundation" "-framework security")
endif()

50
test/IXDNSLookupTest.cpp Normal file
View File

@ -0,0 +1,50 @@
/*
* IXDNSLookupTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include "catch.hpp"
#include "IXTest.h"
#include <ixwebsocket/IXDNSLookup.h>
#include <iostream>
using namespace ix;
TEST_CASE("dns", "[net]")
{
SECTION("Test resolving a known hostname")
{
DNSLookup dnsLookup("www.google.com", 80);
std::string errMsg;
struct addrinfo* res;
res = dnsLookup.resolve(errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(res != nullptr);
}
SECTION("Test resolving a non-existing hostname")
{
DNSLookup dnsLookup("wwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwww", 80);
std::string errMsg;
struct addrinfo* res = dnsLookup.resolve(errMsg, [] { return false; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(res == nullptr);
}
SECTION("Test resolving a good hostname, with cancellation")
{
DNSLookup dnsLookup("www.google.com", 80, 1);
std::string errMsg;
// The callback returning true means we are requesting cancellation
struct addrinfo* res = dnsLookup.resolve(errMsg, [] { return true; });
std::cerr << "Error message: " << errMsg << std::endl;
REQUIRE(res == nullptr);
}
}

92
test/IXSocketTest.cpp Normal file
View File

@ -0,0 +1,92 @@
/*
* IXSocketTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXSocket.h>
#include <ixwebsocket/IXCancellationRequest.h>
#if defined(__APPLE__) or defined(__linux__)
# ifdef __APPLE__
# include <ixwebsocket/IXSocketAppleSSL.h>
# else
# include <ixwebsocket/IXSocketOpenSSL.h>
# endif
#endif
#include "IXTest.h"
#include "catch.hpp"
using namespace ix;
namespace ix
{
void testSocket(const std::string& host,
int port,
const std::string& request,
std::shared_ptr<Socket> socket,
int expectedStatus,
int timeoutSecs)
{
std::string errMsg;
static std::atomic<bool> requestInitCancellation(false);
auto isCancellationRequested =
makeCancellationRequestWithTimeout(timeoutSecs, requestInitCancellation);
bool success = socket->connect(host, port, errMsg, isCancellationRequested);
Logger() << "errMsg: " << errMsg;
REQUIRE(success);
std::cout << "Sending request: " << request
<< "to " << host << ":" << port
<< std::endl;
REQUIRE(socket->writeBytes(request, isCancellationRequested));
auto lineResult = socket->readLine(isCancellationRequested);
auto lineValid = lineResult.first;
auto line = lineResult.second;
std::cout << "read error: " << strerror(Socket::getErrno()) << std::endl;
REQUIRE(lineValid);
int status = -1;
REQUIRE(sscanf(line.c_str(), "HTTP/1.1 %d", &status) == 1);
REQUIRE(status == expectedStatus);
}
}
TEST_CASE("socket", "[socket]")
{
SECTION("Connect to google HTTP server. Send GET request without header. Should return 200")
{
std::shared_ptr<Socket> socket(new Socket);
std::string host("www.google.com");
int port = 80;
std::string request("GET / HTTP/1.1\r\n\r\n");
int expectedStatus = 200;
int timeoutSecs = 3;
testSocket(host, port, request, socket, expectedStatus, timeoutSecs);
}
#if defined(__APPLE__) or defined(__linux__)
SECTION("Connect to google HTTPS server. Send GET request without header. Should return 200")
{
# ifdef __APPLE__
std::shared_ptr<Socket> socket = std::make_shared<SocketAppleSSL>();
# else
std::shared_ptr<Socket> socket = std::make_shared<SocketOpenSSL>();
# endif
std::string host("www.google.com");
int port = 443;
std::string request("GET / HTTP/1.1\r\n\r\n");
int expectedStatus = 200;
int timeoutSecs = 3;
testSocket(host, port, request, socket, expectedStatus, timeoutSecs);
}
#endif
}

View File

@ -6,18 +6,23 @@
#include "IXTest.h"
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXNetSystem.h>
#include <chrono>
#include <thread>
#include <mutex>
#include <string>
#include <fstream>
#include <iostream>
#include <stdlib.h>
#include <stack>
namespace ix
{
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
std::mutex Logger::_mutex;
std::stack<int> freePorts;
void setupWebSocketTrafficTrackerCallback()
{
@ -38,9 +43,9 @@ namespace ix
void reportWebSocketTraffic()
{
std::cout << incomingBytes << std::endl;
std::cout << "Incoming bytes: " << incomingBytes << std::endl;
std::cout << "Outgoing bytes: " << outgoingBytes << std::endl;
Logger() << incomingBytes;
Logger() << "Incoming bytes: " << incomingBytes;
Logger() << "Outgoing bytes: " << outgoingBytes;
}
void msleep(int ms)
@ -52,10 +57,83 @@ namespace ix
std::string generateSessionId()
{
auto now = std::chrono::system_clock::now();
auto seconds =
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
return std::to_string(seconds);
}
void log(const std::string& msg)
{
Logger() << msg;
}
int getAnyFreePort()
{
int defaultPort = 8090;
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
log("Cannot compute a free port. socket error.");
return defaultPort;
}
int enable = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
(char*) &enable, sizeof(enable)) < 0)
{
log("Cannot compute a free port. setsockopt error.");
return defaultPort;
}
// Bind to port 0. This is the standard way to get a free port.
struct sockaddr_in server; // server address information
server.sin_family = AF_INET;
server.sin_port = htons(0);
server.sin_addr.s_addr = inet_addr("127.0.0.1");
if (bind(sockfd, (struct sockaddr *)&server, sizeof(server)) < 0)
{
log("Cannot compute a free port. bind error.");
::close(sockfd);
return defaultPort;
}
struct sockaddr_in sa; // server address information
unsigned int len;
if (getsockname(sockfd, (struct sockaddr *) &sa, &len) < 0)
{
log("Cannot compute a free port. getsockname error.");
::close(sockfd);
return defaultPort;
}
int port = ntohs(sa.sin_port);
::close(sockfd);
return port;
}
int getFreePort()
{
while (true)
{
int port = getAnyFreePort();
//
// Only port above 1024 can be used by non root users, but for some
// reason I got port 7 returned with macOS when binding on port 0...
//
if (port > 1024)
{
return port;
}
}
return -1;
}
}

View File

@ -9,6 +9,8 @@
#include <string>
#include <vector>
#include <sstream>
#include <iostream>
#include <mutex>
namespace ix
{
@ -21,4 +23,35 @@ namespace ix
// Record and report websocket traffic
void setupWebSocketTrafficTrackerCallback();
void reportWebSocketTraffic();
struct Logger
{
public:
Logger& operator<<(const std::string& msg)
{
std::lock_guard<std::mutex> lock(_mutex);
std::cerr << msg;
std::cerr << std::endl;
return *this;
}
template <typename T>
Logger& operator<<(T const& obj)
{
std::lock_guard<std::mutex> lock(_mutex);
std::cerr << obj;
std::cerr << std::endl;
return *this;
}
private:
static std::mutex _mutex;
};
void log(const std::string& msg);
bool computeFreePorts(int count);
int getFreePort();
}

View File

@ -0,0 +1,222 @@
/*
* IXWebSocketHeartBeatTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <queue>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include "IXTest.h"
#include "catch.hpp"
using namespace ix;
namespace
{
class WebSocketClient
{
public:
WebSocketClient(int port);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
void sendMessage(const std::string& text);
private:
ix::WebSocket _webSocket;
int _port;
};
WebSocketClient::WebSocketClient(int port)
: _port(port)
{
;
}
bool WebSocketClient::isReady() const
{
return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open;
}
void WebSocketClient::stop()
{
_webSocket.stop();
}
void WebSocketClient::start()
{
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:"
<< _port
<< "/";
url = ss.str();
}
_webSocket.setUrl(url);
// The important bit for this test.
// Set a 1 second hearbeat ; if no traffic is present on the connection for 1 second
// a ping message will be sent by the client.
_webSocket.setHeartBeatPeriod(1);
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
log("client connected");
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("client disconnected");
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "Error ! " << error.reason;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
ss << "Received pong message " << str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
ss << "Received ping message " << str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "Received message " << str;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void WebSocketClient::sendMessage(const std::string& text)
{
_webSocket.send(text);
}
bool startServer(ix::WebSocketServer& server, std::atomic<int>& receivedPingMessages)
{
// A dev/null server
server.setOnConnectionCallback(
[&server, &receivedPingMessages](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server, &receivedPingMessages](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
Logger() << "New server connection";
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("Server closed connection");
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("Server received a ping");
receivedPingMessages++;
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
log(res.second);
return false;
}
server.start();
return true;
}
}
TEST_CASE("Websocket_heartbeat", "[heartbeat]")
{
SECTION("Make sure that ping messages are sent during heartbeat.")
{
ix::setupWebSocketTrafficTrackerCallback();
int port = getFreePort();
ix::WebSocketServer server(port);
std::atomic<int> serverReceivedPingMessages(0);
REQUIRE(startServer(server, serverReceivedPingMessages));
std::string session = ix::generateSessionId();
WebSocketClient webSocketClientA(port);
WebSocketClient webSocketClientB(port);
webSocketClientA.start();
webSocketClientB.start();
// Wait for all chat instance to be ready
while (true)
{
if (webSocketClientA.isReady() && webSocketClientB.isReady()) break;
ix::msleep(10);
}
REQUIRE(server.getClients().size() == 2);
ix::msleep(900);
webSocketClientB.sendMessage("hello world");
ix::msleep(900);
webSocketClientB.sendMessage("hello world");
ix::msleep(900);
webSocketClientA.stop();
webSocketClientB.stop();
REQUIRE(serverReceivedPingMessages >= 2);
REQUIRE(serverReceivedPingMessages <= 4);
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);
ix::reportWebSocketTraffic();
}
}

View File

@ -32,17 +32,17 @@ namespace ix
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
Logger() << "Closed connection";
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
@ -62,7 +62,7 @@ namespace ix
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
Logger() << res.second;
return false;
}
@ -75,7 +75,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
{
SECTION("Connect to the server, do not send anything. Should timeout and return 400")
{
int port = 8091;
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
@ -107,7 +107,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
SECTION("Connect to the server. Send GET request without header. Should return 400")
{
int port = 8092;
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));
@ -121,7 +121,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
bool success = socket.connect(host, port, errMsg, isCancellationRequested);
REQUIRE(success);
std::cout << "writeBytes" << std::endl;
Logger() << "writeBytes";
socket.writeBytes("GET /\r\n", isCancellationRequested);
auto lineResult = socket.readLine(isCancellationRequested);
@ -142,7 +142,7 @@ TEST_CASE("Websocket_server", "[websocket_server]")
SECTION("Connect to the server. Send GET request with correct header")
{
int port = 8093;
int port = getFreePort();
ix::WebSocketServer server(port);
REQUIRE(startServer(server));

View File

@ -0,0 +1,128 @@
/*
* IXWebSocketTestConnectionDisconnection.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <set>
#include <ixwebsocket/IXWebSocket.h>
#include "IXTest.h"
#include "catch.hpp"
using namespace ix;
namespace
{
const std::string WEBSOCKET_DOT_ORG_URL("wss://echo.websocket.org");
const std::string GOOGLE_URL("wss://google.com");
const std::string UNKNOWN_URL("wss://asdcasdcaasdcasdcasdcasdcasdcasdcasassdd.com");
}
namespace
{
class IXWebSocketTestConnectionDisconnection
{
public:
IXWebSocketTestConnectionDisconnection();
void start(const std::string& url);
void stop();
private:
ix::WebSocket _webSocket;
};
IXWebSocketTestConnectionDisconnection::IXWebSocketTestConnectionDisconnection()
{
;
}
void IXWebSocketTestConnectionDisconnection::stop()
{
_webSocket.stop();
}
void IXWebSocketTestConnectionDisconnection::start(const std::string& url)
{
_webSocket.setUrl(url);
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
log("cmd_websocket_satori_chat: connected !");
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("cmd_websocket_satori_chat: disconnected !");
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
log("cmd_websocket_satori_chat: Error!");
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
log("cmd_websocket_satori_chat: received message.!");
}
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("cmd_websocket_satori_chat: received ping message.!");
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
log("cmd_websocket_satori_chat: received pong message.!");
}
else
{
log("Invalid ix::WebSocketMessageType");
}
});
// Start the connection
_webSocket.start();
}
}
//
// We try to connect to different servers, and make sure there are no crashes.
// FIXME: We could do more checks (make sure that we were not able to connect to unknown servers, etc...)
//
TEST_CASE("websocket_connections", "[websocket]")
{
SECTION("Try to connect to invalid servers.")
{
IXWebSocketTestConnectionDisconnection chatA;
chatA.start(GOOGLE_URL);
ix::msleep(1000);
chatA.stop();
chatA.start(UNKNOWN_URL);
ix::msleep(1000);
chatA.stop();
}
SECTION("Try to connect and disconnect with different timing.")
{
IXWebSocketTestConnectionDisconnection chatA;
for (int i = 0; i < 50; ++i)
{
log(std::string("Run: ") + std::to_string(i));
chatA.start(WEBSOCKET_DOT_ORG_URL);
ix::msleep(i);
chatA.stop();
}
}
}

View File

@ -11,7 +11,8 @@
#include <iostream>
#include <sstream>
#include <queue>
#include <vector>
#include <mutex>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include "msgpack11.hpp"
@ -29,7 +30,8 @@ namespace
{
public:
WebSocketChat(const std::string& user,
const std::string& session);
const std::string& session,
int port);
void subscribe(const std::string& channel);
void start();
@ -38,32 +40,49 @@ namespace
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
const std::vector<std::string>& getReceivedMessages() const;
std::string encodeMessage(const std::string& text);
std::pair<std::string, std::string> decodeMessage(const std::string& str);
void appendMessage(const std::string& message);
private:
void log(const std::string& msg);
std::string _user;
std::string _session;
int _port;
ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue;
std::vector<std::string> _receivedMessages;
mutable std::mutex _mutex;
};
WebSocketChat::WebSocketChat(const std::string& user,
const std::string& session) :
const std::string& session,
int port) :
_user(user),
_session(session)
_session(session),
_port(port)
{
;
}
size_t WebSocketChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages.size();
}
const std::vector<std::string>& WebSocketChat::getReceivedMessages() const
{
std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages;
}
void WebSocketChat::appendMessage(const std::string& message)
{
std::lock_guard<std::mutex> lock(_mutex);
_receivedMessages.push_back(message);
}
bool WebSocketChat::isReady() const
@ -76,14 +95,19 @@ namespace
_webSocket.stop();
}
void WebSocketChat::log(const std::string& msg)
{
std::cerr << msg << std::endl;
}
void WebSocketChat::start()
{
std::string url("ws://localhost:8090/");
std::string url;
{
std::stringstream ss;
ss << "ws://localhost:"
<< _port
<< "/"
<< _user;
url = ss.str();
}
_webSocket.setUrl(url);
std::stringstream ss;
@ -121,10 +145,16 @@ namespace
// as we do for the satori chat example.
// store text
_receivedQueue.push(result.second);
appendMessage(result.second);
ss << std::endl
<< result.first << " > " << result.second
std::string payload = result.second;
if (payload.size() > 2000)
{
payload = "<message too large>";
}
ss << std::endl
<< result.first << " > " << payload
<< std::endl
<< _user << " > ";
log(ss.str());
@ -188,17 +218,17 @@ namespace
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
Logger() << "New connection";
Logger() << "Uri: " << openInfo.uri;
Logger() << "Headers:";
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
Logger() << it.first << ": " << it.second;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
log("Closed connection");
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
@ -218,7 +248,7 @@ namespace
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
log(res.second);
return false;
}
@ -238,8 +268,8 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
REQUIRE(startServer(server));
std::string session = ix::generateSessionId();
WebSocketChat chatA("jean", session);
WebSocketChat chatB("paul", session);
WebSocketChat chatA("jean", session, port);
WebSocketChat chatB("paul", session, port);
chatA.start();
chatB.start();
@ -263,15 +293,36 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
chatB.sendMessage("from B1");
chatB.sendMessage("from B2");
// Give us 1s for all messages to be received
ix::msleep(1000);
// Test large messages that needs to be broken into small fragments
size_t size = 1 * 1024 * 1024; // ~1Mb
std::string bigMessage(size, 'a');
chatB.sendMessage(bigMessage);
log("Sent all messages");
// Wait until all messages are received. 10s timeout
int attempts = 0;
while (chatA.getReceivedMessagesCount() != 3 ||
chatB.getReceivedMessagesCount() != 3)
{
REQUIRE(attempts++ < 10);
ix::msleep(1000);
}
chatA.stop();
chatB.stop();
REQUIRE(chatA.getReceivedMessagesCount() == 2);
REQUIRE(chatA.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessages()[0] == "from A1");
REQUIRE(chatB.getReceivedMessages()[1] == "from A2");
REQUIRE(chatB.getReceivedMessages()[2] == "from A3");
REQUIRE(chatA.getReceivedMessages()[0] == "from B1");
REQUIRE(chatA.getReceivedMessages()[1] == "from B2");
REQUIRE(chatA.getReceivedMessages()[2].size() == bigMessage.size());
// Give us 500ms for the server to notice that clients went away
ix::msleep(500);
REQUIRE(server.getClients().size() == 0);

82
test/run.py Normal file
View File

@ -0,0 +1,82 @@
import os
import platform
import shutil
osName = platform.system()
print('os name = {}'.format(osName))
root = os.path.dirname(os.path.realpath(__file__))
buildDir = os.path.join(root, 'build')
if not os.path.exists(buildDir):
os.mkdir(buildDir)
os.chdir(buildDir)
if osName == 'Windows':
generator = '-G"NMake Makefiles"'
make = 'nmake'
testBinary ='ixwebsocket_unittest.exe'
else:
generator = ''
make = 'make -j6'
testBinary ='./ixwebsocket_unittest'
sanitizersFlags = {
'asan': '-DSANITIZE_ADDRESS=On',
'ubsan': '-DSANITIZE_UNDEFINED=On',
'tsan': '-DSANITIZE_THREAD=On',
'none': ''
}
sanitizer = 'tsan'
if osName == 'Linux':
sanitizer = 'none'
sanitizerFlags = sanitizersFlags[sanitizer]
# if osName == 'Windows':
# os.environ['CC'] = 'clang-cl'
# os.environ['CXX'] = 'clang-cl'
cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ..'.format(generator, sanitizerFlags)
print(cmakeCmd)
ret = os.system(cmakeCmd)
assert ret == 0, 'CMake failed, exiting'
ret = os.system(make)
assert ret == 0, 'Make failed, exiting'
def findFiles(prefix):
'''Find all files under a given directory'''
paths = []
for root, _, files in os.walk(prefix):
for path in files:
fullPath = os.path.join(root, path)
if os.path.islink(fullPath):
continue
paths.append(fullPath)
return paths
#for path in findFiles('.'):
# print(path)
# We need to copy the zlib DLL in the current work directory
shutil.copy(os.path.join(
'..',
'..',
'third_party',
'ZLIB-Windows',
'zlib-1.2.11_deploy_v140',
'release_dynamic',
'x64',
'bin',
'zlib.dll'), '.')
testCommand = '{} {}'.format(testBinary, os.getenv('TEST', ''))
ret = os.system(testCommand)
assert ret == 0, 'Test command failed'

View File

@ -7,8 +7,14 @@
#define CATCH_CONFIG_RUNNER
#include "catch.hpp"
int main(int argc, char* argv[])
#include <ixwebsocket/IXSocket.h>
int main(int argc, char* argv[])
{
ix::Socket::init(); // for Windows
int result = Catch::Session().run(argc, argv);
ix::Socket::cleanup(); // for Windows
return result;
}

115
third_party/ZLIB-Windows/README.md vendored Normal file
View File

@ -0,0 +1,115 @@
ZLIB DATA COMPRESSION LIBRARY
zlib 1.2.11 is a general purpose data compression library. All the code is
thread safe. The data format used by the zlib library is described by RFCs
(Request for Comments) 1950 to 1952 in the files
http://tools.ietf.org/html/rfc1950 (zlib format), rfc1951 (deflate format) and
rfc1952 (gzip format).
All functions of the compression library are documented in the file zlib.h
(volunteer to write man pages welcome, contact zlib@gzip.org). A usage example
of the library is given in the file test/example.c which also tests that
the library is working correctly. Another example is given in the file
test/minigzip.c. The compression library itself is composed of all source
files in the root directory.
To compile all files and run the test program, follow the instructions given at
the top of Makefile.in. In short "./configure; make test", and if that goes
well, "make install" should work for most flavors of Unix. For Windows, use
one of the special makefiles in win32/ or contrib/vstudio/ . For VMS, use
make_vms.com.
Questions about zlib should be sent to <zlib@gzip.org>, or to Gilles Vollant
<info@winimage.com> for the Windows DLL version. The zlib home page is
http://zlib.net/ . Before reporting a problem, please check this site to
verify that you have the latest version of zlib; otherwise get the latest
version and check whether the problem still exists or not.
PLEASE read the zlib FAQ http://zlib.net/zlib_faq.html before asking for help.
Mark Nelson <markn@ieee.org> wrote an article about zlib for the Jan. 1997
issue of Dr. Dobb's Journal; a copy of the article is available at
http://marknelson.us/1997/01/01/zlib-engine/ .
The changes made in version 1.2.11 are documented in the file ChangeLog.
Unsupported third party contributions are provided in directory contrib/ .
zlib is available in Java using the java.util.zip package, documented at
http://java.sun.com/developer/technicalArticles/Programming/compression/ .
A Perl interface to zlib written by Paul Marquess <pmqs@cpan.org> is available
at CPAN (Comprehensive Perl Archive Network) sites, including
http://search.cpan.org/~pmqs/IO-Compress-Zlib/ .
A Python interface to zlib written by A.M. Kuchling <amk@amk.ca> is
available in Python 1.5 and later versions, see
http://docs.python.org/library/zlib.html .
zlib is built into tcl: http://wiki.tcl.tk/4610 .
An experimental package to read and write files in .zip format, written on top
of zlib by Gilles Vollant <info@winimage.com>, is available in the
contrib/minizip directory of zlib.
Notes for some targets:
- For Windows DLL versions, please see win32/DLL_FAQ.txt
- For 64-bit Irix, deflate.c must be compiled without any optimization. With
-O, one libpng test fails. The test works in 32 bit mode (with the -n32
compiler flag). The compiler bug has been reported to SGI.
- zlib doesn't work with gcc 2.6.3 on a DEC 3000/300LX under OSF/1 2.1 it works
when compiled with cc.
- On Digital Unix 4.0D (formely OSF/1) on AlphaServer, the cc option -std1 is
necessary to get gzprintf working correctly. This is done by configure.
- zlib doesn't work on HP-UX 9.05 with some versions of /bin/cc. It works with
other compilers. Use "make test" to check your compiler.
- gzdopen is not supported on RISCOS or BEOS.
- For PalmOs, see http://palmzlib.sourceforge.net/
Acknowledgments:
The deflate format used by zlib was defined by Phil Katz. The deflate and
zlib specifications were written by L. Peter Deutsch. Thanks to all the
people who reported problems and suggested various improvements in zlib; they
are too numerous to cite here.
Copyright notice:
(C) 1995-2017 Jean-loup Gailly and Mark Adler
This software is provided 'as-is', without any express or implied
warranty. In no event will the authors be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
Jean-loup Gailly Mark Adler
jloup@gzip.org madler@alumni.caltech.edu
If you use the zlib library in a product, we would appreciate *not* receiving
lengthy legal documents to sign. The sources are provided for free but without
warranty of any kind. The library has been entirely written by Jean-loup
Gailly and Mark Adler; it does not include third-party code.
If you redistribute modified sources, we would appreciate that you include in
the file ChangeLog history information documenting your changes. Please read
the FAQ for more information on the distribution of modified source versions.

View File

@ -0,0 +1,536 @@
/* zconf.h -- configuration of the zlib compression library
* Copyright (C) 1995-2016 Jean-loup Gailly, Mark Adler
* For conditions of distribution and use, see copyright notice in zlib.h
*/
/* @(#) $Id$ */
#ifndef ZCONF_H
#define ZCONF_H
/* #undef Z_PREFIX */
/* #undef Z_HAVE_UNISTD_H */
/*
* If you *really* need a unique prefix for all types and library functions,
* compile with -DZ_PREFIX. The "standard" zlib should be compiled without it.
* Even better than compiling with -DZ_PREFIX would be to use configure to set
* this permanently in zconf.h using "./configure --zprefix".
*/
#ifdef Z_PREFIX /* may be set to #if 1 by ./configure */
# define Z_PREFIX_SET
/* all linked symbols and init macros */
# define _dist_code z__dist_code
# define _length_code z__length_code
# define _tr_align z__tr_align
# define _tr_flush_bits z__tr_flush_bits
# define _tr_flush_block z__tr_flush_block
# define _tr_init z__tr_init
# define _tr_stored_block z__tr_stored_block
# define _tr_tally z__tr_tally
# define adler32 z_adler32
# define adler32_combine z_adler32_combine
# define adler32_combine64 z_adler32_combine64
# define adler32_z z_adler32_z
# ifndef Z_SOLO
# define compress z_compress
# define compress2 z_compress2
# define compressBound z_compressBound
# endif
# define crc32 z_crc32
# define crc32_combine z_crc32_combine
# define crc32_combine64 z_crc32_combine64
# define crc32_z z_crc32_z
# define deflate z_deflate
# define deflateBound z_deflateBound
# define deflateCopy z_deflateCopy
# define deflateEnd z_deflateEnd
# define deflateGetDictionary z_deflateGetDictionary
# define deflateInit z_deflateInit
# define deflateInit2 z_deflateInit2
# define deflateInit2_ z_deflateInit2_
# define deflateInit_ z_deflateInit_
# define deflateParams z_deflateParams
# define deflatePending z_deflatePending
# define deflatePrime z_deflatePrime
# define deflateReset z_deflateReset
# define deflateResetKeep z_deflateResetKeep
# define deflateSetDictionary z_deflateSetDictionary
# define deflateSetHeader z_deflateSetHeader
# define deflateTune z_deflateTune
# define deflate_copyright z_deflate_copyright
# define get_crc_table z_get_crc_table
# ifndef Z_SOLO
# define gz_error z_gz_error
# define gz_intmax z_gz_intmax
# define gz_strwinerror z_gz_strwinerror
# define gzbuffer z_gzbuffer
# define gzclearerr z_gzclearerr
# define gzclose z_gzclose
# define gzclose_r z_gzclose_r
# define gzclose_w z_gzclose_w
# define gzdirect z_gzdirect
# define gzdopen z_gzdopen
# define gzeof z_gzeof
# define gzerror z_gzerror
# define gzflush z_gzflush
# define gzfread z_gzfread
# define gzfwrite z_gzfwrite
# define gzgetc z_gzgetc
# define gzgetc_ z_gzgetc_
# define gzgets z_gzgets
# define gzoffset z_gzoffset
# define gzoffset64 z_gzoffset64
# define gzopen z_gzopen
# define gzopen64 z_gzopen64
# ifdef _WIN32
# define gzopen_w z_gzopen_w
# endif
# define gzprintf z_gzprintf
# define gzputc z_gzputc
# define gzputs z_gzputs
# define gzread z_gzread
# define gzrewind z_gzrewind
# define gzseek z_gzseek
# define gzseek64 z_gzseek64
# define gzsetparams z_gzsetparams
# define gztell z_gztell
# define gztell64 z_gztell64
# define gzungetc z_gzungetc
# define gzvprintf z_gzvprintf
# define gzwrite z_gzwrite
# endif
# define inflate z_inflate
# define inflateBack z_inflateBack
# define inflateBackEnd z_inflateBackEnd
# define inflateBackInit z_inflateBackInit
# define inflateBackInit_ z_inflateBackInit_
# define inflateCodesUsed z_inflateCodesUsed
# define inflateCopy z_inflateCopy
# define inflateEnd z_inflateEnd
# define inflateGetDictionary z_inflateGetDictionary
# define inflateGetHeader z_inflateGetHeader
# define inflateInit z_inflateInit
# define inflateInit2 z_inflateInit2
# define inflateInit2_ z_inflateInit2_
# define inflateInit_ z_inflateInit_
# define inflateMark z_inflateMark
# define inflatePrime z_inflatePrime
# define inflateReset z_inflateReset
# define inflateReset2 z_inflateReset2
# define inflateResetKeep z_inflateResetKeep
# define inflateSetDictionary z_inflateSetDictionary
# define inflateSync z_inflateSync
# define inflateSyncPoint z_inflateSyncPoint
# define inflateUndermine z_inflateUndermine
# define inflateValidate z_inflateValidate
# define inflate_copyright z_inflate_copyright
# define inflate_fast z_inflate_fast
# define inflate_table z_inflate_table
# ifndef Z_SOLO
# define uncompress z_uncompress
# define uncompress2 z_uncompress2
# endif
# define zError z_zError
# ifndef Z_SOLO
# define zcalloc z_zcalloc
# define zcfree z_zcfree
# endif
# define zlibCompileFlags z_zlibCompileFlags
# define zlibVersion z_zlibVersion
/* all zlib typedefs in zlib.h and zconf.h */
# define Byte z_Byte
# define Bytef z_Bytef
# define alloc_func z_alloc_func
# define charf z_charf
# define free_func z_free_func
# ifndef Z_SOLO
# define gzFile z_gzFile
# endif
# define gz_header z_gz_header
# define gz_headerp z_gz_headerp
# define in_func z_in_func
# define intf z_intf
# define out_func z_out_func
# define uInt z_uInt
# define uIntf z_uIntf
# define uLong z_uLong
# define uLongf z_uLongf
# define voidp z_voidp
# define voidpc z_voidpc
# define voidpf z_voidpf
/* all zlib structs in zlib.h and zconf.h */
# define gz_header_s z_gz_header_s
# define internal_state z_internal_state
#endif
#if defined(__MSDOS__) && !defined(MSDOS)
# define MSDOS
#endif
#if (defined(OS_2) || defined(__OS2__)) && !defined(OS2)
# define OS2
#endif
#if defined(_WINDOWS) && !defined(WINDOWS)
# define WINDOWS
#endif
#if defined(_WIN32) || defined(_WIN32_WCE) || defined(__WIN32__)
# ifndef WIN32
# define WIN32
# endif
#endif
#if (defined(MSDOS) || defined(OS2) || defined(WINDOWS)) && !defined(WIN32)
# if !defined(__GNUC__) && !defined(__FLAT__) && !defined(__386__)
# ifndef SYS16BIT
# define SYS16BIT
# endif
# endif
#endif
/*
* Compile with -DMAXSEG_64K if the alloc function cannot allocate more
* than 64k bytes at a time (needed on systems with 16-bit int).
*/
#ifdef SYS16BIT
# define MAXSEG_64K
#endif
#ifdef MSDOS
# define UNALIGNED_OK
#endif
#ifdef __STDC_VERSION__
# ifndef STDC
# define STDC
# endif
# if __STDC_VERSION__ >= 199901L
# ifndef STDC99
# define STDC99
# endif
# endif
#endif
#if !defined(STDC) && (defined(__STDC__) || defined(__cplusplus))
# define STDC
#endif
#if !defined(STDC) && (defined(__GNUC__) || defined(__BORLANDC__))
# define STDC
#endif
#if !defined(STDC) && (defined(MSDOS) || defined(WINDOWS) || defined(WIN32))
# define STDC
#endif
#if !defined(STDC) && (defined(OS2) || defined(__HOS_AIX__))
# define STDC
#endif
#if defined(__OS400__) && !defined(STDC) /* iSeries (formerly AS/400). */
# define STDC
#endif
#ifndef STDC
# ifndef const /* cannot use !defined(STDC) && !defined(const) on Mac */
# define const /* note: need a more gentle solution here */
# endif
#endif
#if defined(ZLIB_CONST) && !defined(z_const)
# define z_const const
#else
# define z_const
#endif
#ifdef Z_SOLO
typedef unsigned long z_size_t;
#else
# define z_longlong long long
# if defined(NO_SIZE_T)
typedef unsigned NO_SIZE_T z_size_t;
# elif defined(STDC)
# include <stddef.h>
typedef size_t z_size_t;
# else
typedef unsigned long z_size_t;
# endif
# undef z_longlong
#endif
/* Maximum value for memLevel in deflateInit2 */
#ifndef MAX_MEM_LEVEL
# ifdef MAXSEG_64K
# define MAX_MEM_LEVEL 8
# else
# define MAX_MEM_LEVEL 9
# endif
#endif
/* Maximum value for windowBits in deflateInit2 and inflateInit2.
* WARNING: reducing MAX_WBITS makes minigzip unable to extract .gz files
* created by gzip. (Files created by minigzip can still be extracted by
* gzip.)
*/
#ifndef MAX_WBITS
# define MAX_WBITS 15 /* 32K LZ77 window */
#endif
/* The memory requirements for deflate are (in bytes):
(1 << (windowBits+2)) + (1 << (memLevel+9))
that is: 128K for windowBits=15 + 128K for memLevel = 8 (default values)
plus a few kilobytes for small objects. For example, if you want to reduce
the default memory requirements from 256K to 128K, compile with
make CFLAGS="-O -DMAX_WBITS=14 -DMAX_MEM_LEVEL=7"
Of course this will generally degrade compression (there's no free lunch).
The memory requirements for inflate are (in bytes) 1 << windowBits
that is, 32K for windowBits=15 (default value) plus about 7 kilobytes
for small objects.
*/
/* Type declarations */
#ifndef OF /* function prototypes */
# ifdef STDC
# define OF(args) args
# else
# define OF(args) ()
# endif
#endif
#ifndef Z_ARG /* function prototypes for stdarg */
# if defined(STDC) || defined(Z_HAVE_STDARG_H)
# define Z_ARG(args) args
# else
# define Z_ARG(args) ()
# endif
#endif
/* The following definitions for FAR are needed only for MSDOS mixed
* model programming (small or medium model with some far allocations).
* This was tested only with MSC; for other MSDOS compilers you may have
* to define NO_MEMCPY in zutil.h. If you don't need the mixed model,
* just define FAR to be empty.
*/
#ifdef SYS16BIT
# if defined(M_I86SM) || defined(M_I86MM)
/* MSC small or medium model */
# define SMALL_MEDIUM
# ifdef _MSC_VER
# define FAR _far
# else
# define FAR far
# endif
# endif
# if (defined(__SMALL__) || defined(__MEDIUM__))
/* Turbo C small or medium model */
# define SMALL_MEDIUM
# ifdef __BORLANDC__
# define FAR _far
# else
# define FAR far
# endif
# endif
#endif
#if defined(WINDOWS) || defined(WIN32)
/* If building or using zlib as a DLL, define ZLIB_DLL.
* This is not mandatory, but it offers a little performance increase.
*/
# ifdef ZLIB_DLL
# if defined(WIN32) && (!defined(__BORLANDC__) || (__BORLANDC__ >= 0x500))
# ifdef ZLIB_INTERNAL
# define ZEXTERN extern __declspec(dllexport)
# else
# define ZEXTERN extern __declspec(dllimport)
# endif
# endif
# endif /* ZLIB_DLL */
/* If building or using zlib with the WINAPI/WINAPIV calling convention,
* define ZLIB_WINAPI.
* Caution: the standard ZLIB1.DLL is NOT compiled using ZLIB_WINAPI.
*/
# ifdef ZLIB_WINAPI
# ifdef FAR
# undef FAR
# endif
# include <windows.h>
/* No need for _export, use ZLIB.DEF instead. */
/* For complete Windows compatibility, use WINAPI, not __stdcall. */
# define ZEXPORT WINAPI
# ifdef WIN32
# define ZEXPORTVA WINAPIV
# else
# define ZEXPORTVA FAR CDECL
# endif
# endif
#endif
#if defined (__BEOS__)
# ifdef ZLIB_DLL
# ifdef ZLIB_INTERNAL
# define ZEXPORT __declspec(dllexport)
# define ZEXPORTVA __declspec(dllexport)
# else
# define ZEXPORT __declspec(dllimport)
# define ZEXPORTVA __declspec(dllimport)
# endif
# endif
#endif
#ifndef ZEXTERN
# define ZEXTERN extern
#endif
#ifndef ZEXPORT
# define ZEXPORT
#endif
#ifndef ZEXPORTVA
# define ZEXPORTVA
#endif
#ifndef FAR
# define FAR
#endif
#if !defined(__MACTYPES__)
typedef unsigned char Byte; /* 8 bits */
#endif
typedef unsigned int uInt; /* 16 bits or more */
typedef unsigned long uLong; /* 32 bits or more */
#ifdef SMALL_MEDIUM
/* Borland C/C++ and some old MSC versions ignore FAR inside typedef */
# define Bytef Byte FAR
#else
typedef Byte FAR Bytef;
#endif
typedef char FAR charf;
typedef int FAR intf;
typedef uInt FAR uIntf;
typedef uLong FAR uLongf;
#ifdef STDC
typedef void const *voidpc;
typedef void FAR *voidpf;
typedef void *voidp;
#else
typedef Byte const *voidpc;
typedef Byte FAR *voidpf;
typedef Byte *voidp;
#endif
#if !defined(Z_U4) && !defined(Z_SOLO) && defined(STDC)
# include <limits.h>
# if (UINT_MAX == 0xffffffffUL)
# define Z_U4 unsigned
# elif (ULONG_MAX == 0xffffffffUL)
# define Z_U4 unsigned long
# elif (USHRT_MAX == 0xffffffffUL)
# define Z_U4 unsigned short
# endif
#endif
#ifdef Z_U4
typedef Z_U4 z_crc_t;
#else
typedef unsigned long z_crc_t;
#endif
#ifdef HAVE_UNISTD_H /* may be set to #if 1 by ./configure */
# define Z_HAVE_UNISTD_H
#endif
#ifdef HAVE_STDARG_H /* may be set to #if 1 by ./configure */
# define Z_HAVE_STDARG_H
#endif
#ifdef STDC
# ifndef Z_SOLO
# include <sys/types.h> /* for off_t */
# endif
#endif
#if defined(STDC) || defined(Z_HAVE_STDARG_H)
# ifndef Z_SOLO
# include <stdarg.h> /* for va_list */
# endif
#endif
#ifdef _WIN32
# ifndef Z_SOLO
# include <stddef.h> /* for wchar_t */
# endif
#endif
/* a little trick to accommodate both "#define _LARGEFILE64_SOURCE" and
* "#define _LARGEFILE64_SOURCE 1" as requesting 64-bit operations, (even
* though the former does not conform to the LFS document), but considering
* both "#undef _LARGEFILE64_SOURCE" and "#define _LARGEFILE64_SOURCE 0" as
* equivalently requesting no 64-bit operations
*/
#if defined(_LARGEFILE64_SOURCE) && -_LARGEFILE64_SOURCE - -1 == 1
# undef _LARGEFILE64_SOURCE
#endif
#if defined(__WATCOMC__) && !defined(Z_HAVE_UNISTD_H)
# define Z_HAVE_UNISTD_H
#endif
#ifndef Z_SOLO
# if defined(Z_HAVE_UNISTD_H) || defined(_LARGEFILE64_SOURCE)
# include <unistd.h> /* for SEEK_*, off_t, and _LFS64_LARGEFILE */
# ifdef VMS
# include <unixio.h> /* for off_t */
# endif
# ifndef z_off_t
# define z_off_t off_t
# endif
# endif
#endif
#if defined(_LFS64_LARGEFILE) && _LFS64_LARGEFILE-0
# define Z_LFS64
#endif
#if defined(_LARGEFILE64_SOURCE) && defined(Z_LFS64)
# define Z_LARGE64
#endif
#if defined(_FILE_OFFSET_BITS) && _FILE_OFFSET_BITS-0 == 64 && defined(Z_LFS64)
# define Z_WANT64
#endif
#if !defined(SEEK_SET) && !defined(Z_SOLO)
# define SEEK_SET 0 /* Seek from beginning of file. */
# define SEEK_CUR 1 /* Seek from current position. */
# define SEEK_END 2 /* Set file pointer to EOF plus "offset" */
#endif
#ifndef z_off_t
# define z_off_t long
#endif
#if !defined(_WIN32) && defined(Z_LARGE64)
# define z_off64_t off64_t
#else
# if defined(_WIN32) && !defined(__GNUC__) && !defined(Z_SOLO)
# define z_off64_t __int64
# else
# define z_off64_t z_off_t
# endif
#endif
/* MVS linker does not support external names larger than 8 bytes */
#if defined(__MVS__)
#pragma map(deflateInit_,"DEIN")
#pragma map(deflateInit2_,"DEIN2")
#pragma map(deflateEnd,"DEEND")
#pragma map(deflateBound,"DEBND")
#pragma map(inflateInit_,"ININ")
#pragma map(inflateInit2_,"ININ2")
#pragma map(inflateEnd,"INEND")
#pragma map(inflateSync,"INSY")
#pragma map(inflateSetDictionary,"INSEDI")
#pragma map(compressBound,"CMBND")
#pragma map(inflate_table,"INTABL")
#pragma map(inflate_fast,"INFA")
#pragma map(inflate_copyright,"INCOPY")
#endif
#endif /* ZCONF_H */

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More