Compare commits

...

5 Commits

Author SHA1 Message Date
b8c397e180 add ws_chat and ws_connect sub commands to ws 2019-02-22 20:49:26 -08:00
90105fa2b3 all CMakeLists are referenced by the top level one 2019-02-21 22:21:29 -08:00
24859fef8a add target for building with homebrew 2019-02-21 22:05:30 -08:00
73d7280723 Feature/ws cli (#15)
* New command line tool for transfering files / still very beta.

* add readme

* use cli11 for argument parsing

* json -> msgpack

* stop using base64 and use binary which can be stored in message pack
2019-02-21 21:24:53 -08:00
262de49c3c Update README.md
Add note about message fragmentation.
2019-02-21 14:08:27 -08:00
49 changed files with 5237 additions and 404 deletions

View File

@ -113,3 +113,5 @@ set( IXWEBSOCKET_INCLUDE_DIRS
. .
../../shared/OpenSSL/include) ../../shared/OpenSSL/include)
target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} ) target_include_directories( ixwebsocket PUBLIC ${IXWEBSOCKET_INCLUDE_DIRS} )
add_subdirectory(ws)

View File

@ -15,7 +15,7 @@ communication channels over a single TCP connection. *IXWebSocket* is a C++ libr
## Examples ## Examples
The examples folder countains a simple chat program, using a node.js broadcast server. The ws folder countains many interactive programs for chat and file transfers demonstrating client and server usage.
Here is what the client API looks like. Here is what the client API looks like.
@ -134,23 +134,16 @@ No manual polling to fetch data is required. Data is sent and received instantly
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
## Limitations ## Limitations
* Sending large messages are not supported yet (see feature/send_large_message). This is a bug and will be fixed.
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well. * There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that. * Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue. * The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue.
## Examples
1. Bring up a terminal and jump to the examples folder.
2. Compile the example C++ code. `sh build.sh`
3. Install node.js from [here](https://nodejs.org/en/download/).
4. Type `npm install` to install the node.js dependencies. Then `node broadcast-server.js` to run the server.
5. Bring up a second terminal. `./cmd_websocket_chat bob`
6. Bring up a third terminal. `./cmd_websocket_chat bill`
7. Start typing things in any of those terminals. Hopefully you should see your message being received on the other end.
## C++ code organization ## C++ code organization
Here's a simplistic diagram which explains how the code is structured in term of class/modules. Here's a simplistic diagram which explains how the code is structured in term of class/modules.

View File

@ -15,5 +15,8 @@ RUN apt-get -y install cmake
COPY . . COPY . .
WORKDIR test WORKDIR ws
RUN ["sh", "build_linux.sh"] RUN ["sh", "docker_build.sh"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "8765"]

7
examples/CMakeLists.txt Normal file
View File

@ -0,0 +1,7 @@
add_subdirectory(broadcast_server)
add_subdirectory(ping_pong)
add_subdirectory(chat)
add_subdirectory(echo_server)
add_subdirectory(ws_connect)
# add_subdirectory(cobra_publisher)

View File

@ -15,8 +15,6 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(broadcast_server .) include_directories(broadcast_server .)
add_executable(broadcast_server add_executable(broadcast_server

View File

@ -47,6 +47,7 @@ int main(int argc, char** argv)
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)

View File

@ -11,8 +11,6 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(cmd_websocket_chat cmd_websocket_chat.cpp) add_executable(cmd_websocket_chat cmd_websocket_chat.cpp)
if (APPLE AND USE_TLS) if (APPLE AND USE_TLS)

View File

@ -15,8 +15,6 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(cobra_publisher ${OPENSSL_PREFIX}/include) include_directories(cobra_publisher ${OPENSSL_PREFIX}/include)
include_directories(cobra_publisher .) include_directories(cobra_publisher .)

View File

@ -15,8 +15,6 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(echo_server .) include_directories(echo_server .)
add_executable(echo_server add_executable(echo_server

View File

@ -10,8 +10,6 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ping_pong ping_pong.cpp) add_executable(ping_pong ping_pong.cpp)
if (APPLE AND USE_TLS) if (APPLE AND USE_TLS)

View File

@ -10,8 +10,6 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ws_connect ws_connect.cpp) add_executable(ws_connect ws_connect.cpp)
if (APPLE AND USE_TLS) if (APPLE AND USE_TLS)

View File

@ -1,30 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ws_receive)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(ws_receive .)
add_executable(ws_receive
jsoncpp/jsoncpp.cpp
ixcrypto/IXBase64.cpp
ixcrypto/IXHash.cpp
ws_receive.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws_receive "-framework foundation" "-framework security")
endif()
target_link_libraries(ws_receive ixwebsocket)

View File

@ -1 +0,0 @@
ws_receive is a simple server upload program. It needs to be used in conjonction with ws_send.

View File

@ -1 +0,0 @@
../cobra_publisher/ixcrypto

View File

@ -1,29 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"base-64": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/base-64/-/base-64-0.1.0.tgz",
"integrity": "sha1-eAqZyE59YAJgNhURxId2E78k9rs="
},
"djb2": {
"version": "0.0.2",
"resolved": "https://registry.npmjs.org/djb2/-/djb2-0.0.2.tgz",
"integrity": "sha1-RAs4kao6uBQrVNRpsXe66p6W5O8="
},
"ws": {
"version": "6.1.4",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.1.4.tgz",
"integrity": "sha512-eqZfL+NE/YQc1/ZynhojeV8q+H050oR8AZ2uIev7RU10svA9ZnJUddHcOUZTJLinZ9yEfdA2kSATS2qZK5fhJA==",
"requires": {
"async-limiter": "1.0.0"
}
}
}
}

View File

@ -1,153 +0,0 @@
/*
* ws_receive.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <fstream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <jsoncpp/json/json.h>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
namespace
{
// We should cleanup the file name and full path further to remove .. as well
std::string extractFilename(const std::string& path)
{
std::string filename("filename.conf");
std::string::size_type idx;
idx = path.rfind('/');
if (idx != std::string::npos)
{
std::string filename = path.substr(idx+1);
return filename;
}
else
{
return std::string();
}
}
}
namespace ix
{
void errorHandler(const std::string& errMsg,
const std::string& id,
std::shared_ptr<ix::WebSocket> webSocket)
{
Json::Value pdu;
pdu["kind"] = "error";
pdu["id"] = id;
pdu["message"] = errMsg;
webSocket->send(pdu.toStyledString());
}
void messageHandler(const std::string& str,
std::shared_ptr<ix::WebSocket> webSocket)
{
std::cerr << "Received message: " << str.size() << std::endl;
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data))
{
errorHandler("Invalid JSON", std::string(), webSocket);
return;
}
std::cout << "id: " << data["id"].asString() << std::endl;
std::string content = ix::base64_decode(data["content"].asString());
std::cout << "Content size: " << content.size() << std::endl;
// Validate checksum
uint64_t cksum = ix::djb2Hash(data["content"].asString());
uint64_t cksumRef = data["djb2_hash"].asUInt64();
std::cout << "Computed hash: " << cksum << std::endl;
std::cout << "Reference hash: " << cksumRef << std::endl;
if (cksum != cksumRef)
{
errorHandler("Hash mismatch.", std::string(), webSocket);
return;
}
std::string filename = data["filename"].asString();
filename = extractFilename(filename);
std::ofstream out(filename);
out << content;
out.close();
Json::Value pdu;
pdu["ack"] = true;
pdu["id"] = data["id"];
pdu["filename"] = data["filename"];
webSocket->send(pdu.toStyledString());
}
}
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
messageHandler(str, webSocket);
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,43 +0,0 @@
/*
* ws_receive.js
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
const WebSocket = require('ws')
const djb2 = require('djb2')
const fs = require('fs')
const wss = new WebSocket.Server({ port: 8080,
perMessageDeflate: false,
maxPayload: 1024 * 1024 * 1024 * 1024});
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(data) {
console.log('Received message')
let str = data.toString()
let obj = JSON.parse(str)
console.log(obj.id)
console.log(obj.djb2_hash)
console.log(djb2(obj.content))
var content = Buffer.from(obj.content, 'base64')
// let bytes = base64.decode(obj.content)
let path = obj.filename
fs.writeFile(path, content, function(err) {
if (err) {
throw err
} else {
console.log('wrote data to disk')
}
});
let response = {
id: obj.id
}
ws.send(JSON.stringify(response))
});
});

View File

@ -1 +0,0 @@
ws_send is a simple upload program. It needs to be used in conjonction with ws_receive.

View File

@ -1 +0,0 @@
../cobra_publisher/ixcrypto

View File

@ -1 +0,0 @@
../cobra_publisher/jsoncpp

View File

@ -1,18 +1,21 @@
# #
# This makefile is just used to easily work with docker (linux build) # This makefile is just used to easily work with docker (linux build)
# #
all: run all: brew
brew:
mkdir -p build && (cd build ; cmake .. ; make)
.PHONY: docker .PHONY: docker
docker: docker:
docker build -t ws_connect:latest . docker build -t broadcast_server:latest .
run: docker run:
docker run --cap-add sys_ptrace -it ws_connect:latest bash docker run --cap-add sys_ptrace -it broadcast_server:latest bash
# this is helpful to remove trailing whitespaces # this is helpful to remove trailing whitespaces
trail: trail:
sh third_party/remote_trailing_whitespaces.sh sh third_party/remove_trailing_whitespaces.sh
build: build:
(cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make) (cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make)

View File

@ -18,13 +18,14 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
include_directories( include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include ${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party/msgpack11
) )
# Shared sources # Shared sources
set (SOURCES set (SOURCES
test_runner.cpp test_runner.cpp
IXTest.cpp IXTest.cpp
msgpack11.cpp ../third_party/msgpack11/msgpack11.cpp
IXDNSLookupTest.cpp IXDNSLookupTest.cpp
IXSocketTest.cpp IXSocketTest.cpp

4641
third_party/cli11/CLI11.hpp vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,2 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+

View File

@ -4,7 +4,7 @@
# #
cmake_minimum_required (VERSION 3.4.1) cmake_minimum_required (VERSION 3.4.1)
project (ws_send) project (ws)
# There's -Weverything too for clang # There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
@ -13,19 +13,26 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON) option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket) include_directories(ws .)
include_directories(ws ..)
include_directories(ws ../third_party)
include_directories(ws_send .) add_executable(ws
../third_party/msgpack11/msgpack11.cpp
add_executable(ws_send
jsoncpp/jsoncpp.cpp
ixcrypto/IXBase64.cpp ixcrypto/IXBase64.cpp
ixcrypto/IXUuid.cpp
ixcrypto/IXHash.cpp ixcrypto/IXHash.cpp
ws_send.cpp) ixcrypto/IXUuid.cpp
ws_chat.cpp
ws_connect.cpp
ws_transfer.cpp
ws_send.cpp
ws_receive.cpp
ws.cpp)
if (APPLE AND USE_TLS) if (APPLE AND USE_TLS)
target_link_libraries(ws_send "-framework foundation" "-framework security") target_link_libraries(ws "-framework foundation" "-framework security")
endif() endif()
target_link_libraries(ws_send ixwebsocket) target_link_libraries(ws ixwebsocket)
install(TARGETS ws RUNTIME DESTINATION bin)

10
ws/README.md Normal file
View File

@ -0,0 +1,10 @@
```
# Start receiver first
./ws receive ws://localhost:8080
# Sender
./ws send ws://localhost:8080 /file/to/path
# Server
./ws transfer # running on port 8080.
```

41
ws/docker_build.sh Normal file
View File

@ -0,0 +1,41 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. I cannot get CMake to work to build in a container.
g++ --std=c++14 \
-DIXWEBSOCKET_USE_TLS \
-g \
../ixwebsocket/IXEventFd.cpp \
../ixwebsocket/IXSocket.cpp \
../ixwebsocket/IXSocketServer.cpp \
../ixwebsocket/IXSocketConnect.cpp \
../ixwebsocket/IXDNSLookup.cpp \
../ixwebsocket/IXCancellationRequest.cpp \
../ixwebsocket/IXWebSocket.cpp \
../ixwebsocket/IXWebSocketServer.cpp \
../ixwebsocket/IXWebSocketTransport.cpp \
../ixwebsocket/IXWebSocketHandshake.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
../ixwebsocket/IXSocketOpenSSL.cpp \
../ixwebsocket/linux/IXSetThreadName_linux.cpp \
../third_party/jsoncpp/jsoncpp.cpp \
ixcrypto/IXBase64.cpp \
ixcrypto/IXHash.cpp \
ixcrypto/IXUuid.cpp \
ws_chat.cpp \
ws_connect.cpp \
ws_transfer.cpp \
ws_send.cpp \
ws_receive.cpp \
ws.cpp \
-I . \
-I .. \
-I ../third_party \
-o ws \
-lcrypto -lssl -lz -lpthread

View File

@ -4,15 +4,15 @@
* Copyright (c) 2018 Machine Zone. All rights reserved. * Copyright (c) 2018 Machine Zone. All rights reserved.
*/ */
#include <string> #include "IXHash.h"
namespace ix namespace ix
{ {
uint64_t djb2Hash(const std::string& data) uint64_t djb2Hash(const std::vector<uint8_t>& data)
{ {
uint64_t hashAddress = 5381; uint64_t hashAddress = 5381;
for (auto& c : data) for (auto&& c : data)
{ {
hashAddress = ((hashAddress << 5) + hashAddress) + c; hashAddress = ((hashAddress << 5) + hashAddress) + c;
} }

View File

@ -6,10 +6,10 @@
#pragma once #pragma once
#include <string> #include <vector>
namespace ix namespace ix
{ {
uint64_t djb2Hash(const std::string& data); uint64_t djb2Hash(const std::vector<uint8_t>& data);
} }

89
ws/ws.cpp Normal file
View File

@ -0,0 +1,89 @@
/*
* ws.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
//
// Main drive for websocket utilities
//
#include <string>
#include <sstream>
#include <iostream>
#include <cli11/CLI11.hpp>
namespace ix
{
int ws_chat_main(const std::string& url,
const std::string& user);
int ws_connect_main(const std::string& url);
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate);
int ws_transfer_main(int port);
int ws_send_main(const std::string& url,
const std::string& path);
}
int main(int argc, char** argv)
{
CLI::App app{"ws is a websocket tool"};
app.require_subcommand();
std::string url;
std::string path;
std::string user;
int port = 8080;
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send")->required();
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required();
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url");
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required();
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required();
chatApp->add_option("user", user, "User name")->required();
CLI11_PARSE(app, argc, argv);
if (app.got_subcommand("transfer"))
{
return ix::ws_transfer_main(port);
}
else if (app.got_subcommand("send"))
{
return ix::ws_send_main(url, path);
}
else if (app.got_subcommand("receive"))
{
bool enablePerMessageDeflate = false;
return ix::ws_receive_main(url, enablePerMessageDeflate);
}
else if (app.got_subcommand("connect"))
{
return ix::ws_connect_main(url);
}
else if (app.got_subcommand("chat"))
{
return ix::ws_chat_main(url, user);
}
else
{
assert(false);
}
return 1;
}

View File

@ -1,7 +1,7 @@
/* /*
* cmd_websocket_chat.cpp * ws_chat.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/ */
// //
@ -20,19 +20,13 @@
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;
using namespace ix; namespace ix
namespace
{ {
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketChat class WebSocketChat
{ {
public: public:
WebSocketChat(const std::string& user); WebSocketChat(const std::string& url,
const std::string& user);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -46,19 +40,27 @@ namespace
std::pair<std::string, std::string> decodeMessage(const std::string& str); std::pair<std::string, std::string> decodeMessage(const std::string& str);
private: private:
std::string _url;
std::string _user; std::string _user;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue; std::queue<std::string> _receivedQueue;
void log(const std::string& msg);
}; };
WebSocketChat::WebSocketChat(const std::string& user) : WebSocketChat::WebSocketChat(const std::string& url,
const std::string& user) :
_url(url),
_user(user) _user(user)
{ {
; ;
} }
void WebSocketChat::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
size_t WebSocketChat::getReceivedMessagesCount() const size_t WebSocketChat::getReceivedMessagesCount() const
{ {
return _receivedQueue.size(); return _receivedQueue.size();
@ -76,11 +78,10 @@ namespace
void WebSocketChat::start() void WebSocketChat::start()
{ {
std::string url("ws://localhost:8080/"); _webSocket.setUrl(_url);
_webSocket.setUrl(url);
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + url); log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback( _webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, [this](ix::WebSocketMessageType messageType,
@ -164,10 +165,11 @@ namespace
_webSocket.send(encodeMessage(text)); _webSocket.send(encodeMessage(text));
} }
void interactiveMain(const std::string& user) void interactiveMain(const std::string& url,
const std::string& user)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketChat webSocketChat(user); WebSocketChat webSocketChat(url, user);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
@ -187,17 +189,13 @@ namespace
std::cout << std::endl; std::cout << std::endl;
webSocketChat.stop(); webSocketChat.stop();
} }
}
int main(int argc, char** argv) int ws_chat_main(const std::string& url,
{ const std::string& user)
std::string user("user");
if (argc == 2)
{ {
user = argv[1]; Socket::init();
interactiveMain(url, user);
return 0;
} }
Socket::init();
interactiveMain(user);
return 0;
} }

View File

@ -9,15 +9,8 @@
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
using namespace ix; namespace ix
namespace
{ {
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketConnect class WebSocketConnect
{ {
public: public:
@ -32,6 +25,8 @@ namespace
private: private:
std::string _url; std::string _url;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
void log(const std::string& msg);
}; };
WebSocketConnect::WebSocketConnect(const std::string& url) : WebSocketConnect::WebSocketConnect(const std::string& url) :
@ -40,6 +35,11 @@ namespace
; ;
} }
void WebSocketConnect::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
void WebSocketConnect::stop() void WebSocketConnect::stop()
{ {
_webSocket.stop(); _webSocket.stop();
@ -148,18 +148,12 @@ namespace
std::cout << std::endl; std::cout << std::endl;
webSocketChat.stop(); webSocketChat.stop();
} }
}
int main(int argc, char** argv) int ws_connect_main(const std::string& url)
{
if (argc != 2)
{ {
std::cerr << "Usage: ws_connect <url>" << std::endl; Socket::init();
return 1; interactiveMain(url);
return 0;
} }
std::string url = argv[1];
Socket::init();
interactiveMain(url);
return 0;
} }

251
ws/ws_receive.cpp Normal file
View File

@ -0,0 +1,251 @@
/*
* ws_receiver.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
#include <ixcrypto/IXUuid.h>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
#include <msgpack11/msgpack11.hpp>
using msgpack11::MsgPack;
namespace ix
{
class WebSocketReceiver
{
public:
WebSocketReceiver(const std::string& _url,
bool enablePerMessageDeflate);
void subscribe(const std::string& channel);
void start();
void stop();
void waitForConnection();
void waitForMessage();
void handleMessage(const std::string& str);
private:
std::string _url;
std::string _id;
ix::WebSocket _webSocket;
bool _enablePerMessageDeflate;
std::mutex _conditionVariableMutex;
std::condition_variable _condition;
std::string extractFilename(const std::string& path);
void handleError(const std::string& errMsg, const std::string& id);
void log(const std::string& msg);
};
WebSocketReceiver::WebSocketReceiver(const std::string& url,
bool enablePerMessageDeflate) :
_url(url),
_enablePerMessageDeflate(enablePerMessageDeflate)
{
;
}
void WebSocketReceiver::stop()
{
_webSocket.stop();
}
void WebSocketReceiver::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
void WebSocketReceiver::waitForConnection()
{
std::cout << "Connecting..." << std::endl;
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
void WebSocketReceiver::waitForMessage()
{
std::cout << "Waiting for message..." << std::endl;
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
// We should cleanup the file name and full path further to remove .. as well
std::string WebSocketReceiver::extractFilename(const std::string& path)
{
std::string::size_type idx;
idx = path.rfind('/');
if (idx != std::string::npos)
{
std::string filename = path.substr(idx+1);
return filename;
}
else
{
return path;
}
}
void WebSocketReceiver::handleError(const std::string& errMsg,
const std::string& id)
{
std::map<MsgPack, MsgPack> pdu;
pdu["kind"] = "error";
pdu["id"] = id;
pdu["message"] = errMsg;
MsgPack msg(pdu);
_webSocket.send(msg.dump());
}
void WebSocketReceiver::handleMessage(const std::string& str)
{
std::cerr << "Received message: " << str.size() << std::endl;
std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg);
if (!errMsg.empty())
{
handleError("Invalid MsgPack", std::string());
return;
}
std::cout << "id: " << data["id"].string_value() << std::endl;
std::vector<uint8_t> content = data["content"].binary_items();
std::cout << "Content size: " << content.size() << std::endl;
// Validate checksum
uint64_t cksum = ix::djb2Hash(content);
auto cksumRef = data["djb2_hash"].string_value();
std::cout << "Computed hash: " << cksum << std::endl;
std::cout << "Reference hash: " << cksumRef << std::endl;
if (std::to_string(cksum) != cksumRef)
{
handleError("Hash mismatch.", std::string());
return;
}
std::string filename = data["filename"].string_value();
filename = extractFilename(filename);
std::cout << "Writing to disk: " << filename << std::endl;
std::ofstream out(filename);
out.write((char*)&content.front(), content.size());
out.close();
std::map<MsgPack, MsgPack> pdu;
pdu["ack"] = true;
pdu["id"] = data["id"];
pdu["filename"] = data["filename"];
MsgPack msg(pdu);
_webSocket.send(msg.dump());
}
void WebSocketReceiver::start()
{
_webSocket.setUrl(_url);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
_enablePerMessageDeflate, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
_condition.notify_one();
log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "ws_receive: transfered " << wireSize << " bytes";
log(ss.str());
handleMessage(str);
_condition.notify_one();
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void wsReceive(const std::string& url,
bool enablePerMessageDeflate)
{
WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate);
webSocketReceiver.start();
webSocketReceiver.waitForConnection();
webSocketReceiver.waitForMessage();
std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration);
std::cout << "Done !" << std::endl;
webSocketReceiver.stop();
}
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate)
{
Socket::init();
wsReceive(url, enablePerMessageDeflate);
return 0;
}
}

View File

@ -16,17 +16,12 @@
#include <ixcrypto/IXUuid.h> #include <ixcrypto/IXUuid.h>
#include <ixcrypto/IXBase64.h> #include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h> #include <ixcrypto/IXHash.h>
#include <jsoncpp/json/json.h> #include <msgpack11/msgpack11.hpp>
using namespace ix; using msgpack11::MsgPack;
namespace namespace ix
{ {
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketSender class WebSocketSender
{ {
public: public:
@ -50,6 +45,8 @@ namespace
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
std::condition_variable _condition; std::condition_variable _condition;
void log(const std::string& msg);
}; };
WebSocketSender::WebSocketSender(const std::string& url, WebSocketSender::WebSocketSender(const std::string& url,
@ -65,6 +62,11 @@ namespace
_webSocket.stop(); _webSocket.stop();
} }
void WebSocketSender::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
void WebSocketSender::waitForConnection() void WebSocketSender::waitForConnection()
{ {
std::cout << "Connecting..." << std::endl; std::cout << "Connecting..." << std::endl;
@ -81,22 +83,21 @@ namespace
_condition.wait(lock); _condition.wait(lock);
} }
std::string load(const std::string& path) std::vector<uint8_t> load(const std::string& path)
{ {
// std::vector<uint8_t> memblock; std::vector<uint8_t> memblock;
std::string str;
std::ifstream file(path); std::ifstream file(path);
if (!file.is_open()) return std::string(); if (!file.is_open()) return memblock;
file.seekg(0, file.end); file.seekg(0, file.end);
std::streamoff size = file.tellg(); std::streamoff size = file.tellg();
file.seekg(0, file.beg); file.seekg(0, file.beg);
str.resize(size); memblock.resize(size);
file.read((char*)&str.front(), static_cast<std::streamsize>(size)); file.read((char*)&memblock.front(), static_cast<std::streamsize>(size));
return str; return memblock;
} }
void WebSocketSender::start() void WebSocketSender::start()
@ -142,19 +143,18 @@ namespace
{ {
_condition.notify_one(); _condition.notify_one();
ss << "ws_send: received message: " ss << "ws_send: received message (" << wireSize << " bytes)";
<< str;
log(ss.str()); log(ss.str());
Json::Value data; std::string errMsg;
Json::Reader reader; MsgPack data = MsgPack::parse(str, errMsg);
if (!reader.parse(str, data)) if (!errMsg.empty())
{ {
std::cerr << "Invalid JSON response" << std::endl; std::cerr << "Invalid MsgPack response" << std::endl;
return; return;
} }
std::string id = data["id"].asString(); std::string id = data["id"].string_value();
if (_id != id) if (_id != id)
{ {
std::cerr << "Invalid id" << std::endl; std::cerr << "Invalid id" << std::endl;
@ -224,7 +224,7 @@ namespace
void WebSocketSender::sendMessage(const std::string& filename, void WebSocketSender::sendMessage(const std::string& filename,
bool throttle) bool throttle)
{ {
std::string content; std::vector<uint8_t> content;
{ {
Bench bench("load file from disk"); Bench bench("load file from disk");
content = load(filename); content = load(filename);
@ -232,21 +232,18 @@ namespace
_id = uuid4(); _id = uuid4();
std::string b64Content; std::map<MsgPack, MsgPack> pdu;
{
Bench bench("base 64 encode file");
b64Content = base64_encode(content, content.size());
}
Json::Value pdu;
pdu["kind"] = "send"; pdu["kind"] = "send";
pdu["id"] = _id; pdu["id"] = _id;
pdu["content"] = b64Content; pdu["content"] = content;
pdu["djb2_hash"] = djb2Hash(b64Content); auto hash = djb2Hash(content);
pdu["djb2_hash"] = std::to_string(hash);
pdu["filename"] = filename; pdu["filename"] = filename;
MsgPack msg(pdu);
Bench bench("Sending file through websocket"); Bench bench("Sending file through websocket");
_webSocket.send(pdu.toStyledString(), _webSocket.send(msg.dump(),
[throttle](int current, int total) -> bool [throttle](int current, int total) -> bool
{ {
std::cout << "Step " << current << " out of " << total << std::endl; std::cout << "Step " << current << " out of " << total << std::endl;
@ -262,7 +259,7 @@ namespace
bench.report(); bench.report();
auto duration = bench.getDuration(); auto duration = bench.getDuration();
auto transferRate = 1000 * b64Content.size() / duration; auto transferRate = 1000 * content.size() / duration;
transferRate /= (1024 * 1024); transferRate /= (1024 * 1024);
std::cout << "Send transfer rate: " << transferRate << "MB/s" << std::endl; std::cout << "Send transfer rate: " << transferRate << "MB/s" << std::endl;
} }
@ -285,22 +282,15 @@ namespace
std::cout << "Done !" << std::endl; std::cout << "Done !" << std::endl;
webSocketSender.stop(); webSocketSender.stop();
} }
}
int main(int argc, char** argv) int ws_send_main(const std::string& url,
{ const std::string& path)
if (argc != 3)
{ {
std::cerr << "Usage: ws_send <url> <path>" << std::endl; bool throttle = false;
return 1; bool enablePerMessageDeflate = false;
Socket::init();
wsSend(url, path, enablePerMessageDeflate, throttle);
return 0;
} }
std::string url = argv[1];
std::string path = argv[2];
bool throttle = false;
bool enablePerMessageDeflate = false;
Socket::init();
wsSend(url, path, enablePerMessageDeflate, throttle);
return 0;
} }

72
ws/ws_transfer.cpp Normal file
View File

@ -0,0 +1,72 @@
/*
* ws_transfer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
namespace ix
{
int ws_transfer_main(int port)
{
std::cout << "Listening on port " << port << std::endl;
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}
}