Compare commits

...

2 Commits

Author SHA1 Message Date
73d7280723 Feature/ws cli (#15)
* New command line tool for transfering files / still very beta.

* add readme

* use cli11 for argument parsing

* json -> msgpack

* stop using base64 and use binary which can be stored in message pack
2019-02-21 21:24:53 -08:00
262de49c3c Update README.md
Add note about message fragmentation.
2019-02-21 14:08:27 -08:00
38 changed files with 5157 additions and 331 deletions

View File

@ -134,9 +134,12 @@ No manual polling to fetch data is required. Data is sent and received instantly
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
### Large messages
Large frames are broken up into smaller chunks or messages to avoid filling up the os tcp buffers, which is permitted thanks to WebSocket [fragmentation](https://tools.ietf.org/html/rfc6455#section-5.4). Messages up to 500M were sent and received succesfully.
## Limitations
* Sending large messages are not supported yet (see feature/send_large_message). This is a bug and will be fixed.
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
* The server code is using select to detect incoming data, and creates one OS thread per connection. This isn't as scalable as strategies using epoll or kqueue.

View File

@ -15,5 +15,8 @@ RUN apt-get -y install cmake
COPY . .
WORKDIR test
RUN ["sh", "build_linux.sh"]
WORKDIR ws
RUN ["sh", "docker_build.sh"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "8765"]

View File

@ -47,6 +47,7 @@ int main(int argc, char** argv)
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)

View File

@ -1,30 +0,0 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ws_receive)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(ws_receive .)
add_executable(ws_receive
jsoncpp/jsoncpp.cpp
ixcrypto/IXBase64.cpp
ixcrypto/IXHash.cpp
ws_receive.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws_receive "-framework foundation" "-framework security")
endif()
target_link_libraries(ws_receive ixwebsocket)

View File

@ -1 +0,0 @@
ws_receive is a simple server upload program. It needs to be used in conjonction with ws_send.

View File

@ -1 +0,0 @@
../cobra_publisher/ixcrypto

View File

@ -1,29 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"base-64": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/base-64/-/base-64-0.1.0.tgz",
"integrity": "sha1-eAqZyE59YAJgNhURxId2E78k9rs="
},
"djb2": {
"version": "0.0.2",
"resolved": "https://registry.npmjs.org/djb2/-/djb2-0.0.2.tgz",
"integrity": "sha1-RAs4kao6uBQrVNRpsXe66p6W5O8="
},
"ws": {
"version": "6.1.4",
"resolved": "https://registry.npmjs.org/ws/-/ws-6.1.4.tgz",
"integrity": "sha512-eqZfL+NE/YQc1/ZynhojeV8q+H050oR8AZ2uIev7RU10svA9ZnJUddHcOUZTJLinZ9yEfdA2kSATS2qZK5fhJA==",
"requires": {
"async-limiter": "1.0.0"
}
}
}
}

View File

@ -1,153 +0,0 @@
/*
* ws_receive.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <fstream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <jsoncpp/json/json.h>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
namespace
{
// We should cleanup the file name and full path further to remove .. as well
std::string extractFilename(const std::string& path)
{
std::string filename("filename.conf");
std::string::size_type idx;
idx = path.rfind('/');
if (idx != std::string::npos)
{
std::string filename = path.substr(idx+1);
return filename;
}
else
{
return std::string();
}
}
}
namespace ix
{
void errorHandler(const std::string& errMsg,
const std::string& id,
std::shared_ptr<ix::WebSocket> webSocket)
{
Json::Value pdu;
pdu["kind"] = "error";
pdu["id"] = id;
pdu["message"] = errMsg;
webSocket->send(pdu.toStyledString());
}
void messageHandler(const std::string& str,
std::shared_ptr<ix::WebSocket> webSocket)
{
std::cerr << "Received message: " << str.size() << std::endl;
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data))
{
errorHandler("Invalid JSON", std::string(), webSocket);
return;
}
std::cout << "id: " << data["id"].asString() << std::endl;
std::string content = ix::base64_decode(data["content"].asString());
std::cout << "Content size: " << content.size() << std::endl;
// Validate checksum
uint64_t cksum = ix::djb2Hash(data["content"].asString());
uint64_t cksumRef = data["djb2_hash"].asUInt64();
std::cout << "Computed hash: " << cksum << std::endl;
std::cout << "Reference hash: " << cksumRef << std::endl;
if (cksum != cksumRef)
{
errorHandler("Hash mismatch.", std::string(), webSocket);
return;
}
std::string filename = data["filename"].asString();
filename = extractFilename(filename);
std::ofstream out(filename);
out << content;
out.close();
Json::Value pdu;
pdu["ack"] = true;
pdu["id"] = data["id"];
pdu["filename"] = data["filename"];
webSocket->send(pdu.toStyledString());
}
}
int main(int argc, char** argv)
{
int port = 8080;
if (argc == 2)
{
std::stringstream ss;
ss << argv[1];
ss >> port;
}
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
messageHandler(str, webSocket);
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}

View File

@ -1,43 +0,0 @@
/*
* ws_receive.js
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone, Inc. All rights reserved.
*/
const WebSocket = require('ws')
const djb2 = require('djb2')
const fs = require('fs')
const wss = new WebSocket.Server({ port: 8080,
perMessageDeflate: false,
maxPayload: 1024 * 1024 * 1024 * 1024});
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(data) {
console.log('Received message')
let str = data.toString()
let obj = JSON.parse(str)
console.log(obj.id)
console.log(obj.djb2_hash)
console.log(djb2(obj.content))
var content = Buffer.from(obj.content, 'base64')
// let bytes = base64.decode(obj.content)
let path = obj.filename
fs.writeFile(path, content, function(err) {
if (err) {
throw err
} else {
console.log('wrote data to disk')
}
});
let response = {
id: obj.id
}
ws.send(JSON.stringify(response))
});
});

View File

@ -1 +0,0 @@
ws_send is a simple upload program. It needs to be used in conjonction with ws_receive.

View File

@ -1 +0,0 @@
../cobra_publisher/ixcrypto

View File

@ -1 +0,0 @@
../cobra_publisher/jsoncpp

View File

@ -5,14 +5,14 @@ all: run
.PHONY: docker
docker:
docker build -t ws_connect:latest .
docker build -t broadcast_server:latest .
run: docker
docker run --cap-add sys_ptrace -it ws_connect:latest bash
run:
docker run --cap-add sys_ptrace -it broadcast_server:latest bash
# this is helpful to remove trailing whitespaces
trail:
sh third_party/remote_trailing_whitespaces.sh
sh third_party/remove_trailing_whitespaces.sh
build:
(cd examples/satori_publisher ; mkdir -p build ; cd build ; cmake .. ; make)

View File

@ -18,13 +18,14 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
include_directories(
${PROJECT_SOURCE_DIR}/Catch2/single_include
../third_party/msgpack11
)
# Shared sources
set (SOURCES
test_runner.cpp
IXTest.cpp
msgpack11.cpp
../third_party/msgpack11/msgpack11.cpp
IXDNSLookupTest.cpp
IXSocketTest.cpp

4641
third_party/cli11/CLI11.hpp vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,2 @@
find . -type f -name '*.cpp' -exec sed -i '' 's/[[:space:]]*$//' {} \+
find . -type f -name '*.h' -exec sed -i '' 's/[[:space:]]*$//' {} \+

View File

@ -4,7 +4,7 @@
#
cmake_minimum_required (VERSION 3.4.1)
project (ws_send)
project (ws)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
@ -13,19 +13,24 @@ set (CMAKE_CXX_STANDARD 14)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_subdirectory(${PROJECT_SOURCE_DIR}/.. ixwebsocket)
include_directories(ws_send .)
include_directories(ws .)
include_directories(ws ../third_party)
add_executable(ws_send
jsoncpp/jsoncpp.cpp
add_executable(ws
../third_party/msgpack11/msgpack11.cpp
ixcrypto/IXBase64.cpp
ixcrypto/IXUuid.cpp
ixcrypto/IXHash.cpp
ws_send.cpp)
ixcrypto/IXUuid.cpp
ws_transfer.cpp
ws_send.cpp
ws_receive.cpp
ws.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ws_send "-framework foundation" "-framework security")
target_link_libraries(ws "-framework foundation" "-framework security")
endif()
target_link_libraries(ws_send ixwebsocket)
target_link_libraries(ws ixwebsocket)

10
ws/README.md Normal file
View File

@ -0,0 +1,10 @@
```
# Start receiver first
./ws receive ws://localhost:8080
# Sender
./ws send ws://localhost:8080 /file/to/path
# Server
./ws transfer # running on port 8080.
```

39
ws/docker_build.sh Normal file
View File

@ -0,0 +1,39 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
# 'manual' way of building. I cannot get CMake to work to build in a container.
g++ --std=c++14 \
-DIXWEBSOCKET_USE_TLS \
-g \
../ixwebsocket/IXEventFd.cpp \
../ixwebsocket/IXSocket.cpp \
../ixwebsocket/IXSocketServer.cpp \
../ixwebsocket/IXSocketConnect.cpp \
../ixwebsocket/IXDNSLookup.cpp \
../ixwebsocket/IXCancellationRequest.cpp \
../ixwebsocket/IXWebSocket.cpp \
../ixwebsocket/IXWebSocketServer.cpp \
../ixwebsocket/IXWebSocketTransport.cpp \
../ixwebsocket/IXWebSocketHandshake.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
../ixwebsocket/IXSocketOpenSSL.cpp \
../ixwebsocket/linux/IXSetThreadName_linux.cpp \
../third_party/jsoncpp/jsoncpp.cpp \
ixcrypto/IXBase64.cpp \
ixcrypto/IXHash.cpp \
ixcrypto/IXUuid.cpp \
ws_transfer.cpp \
ws_send.cpp \
ws_receive.cpp \
ws.cpp \
-I . \
-I .. \
-I ../third_party \
-o ws \
-lcrypto -lssl -lz -lpthread

View File

@ -4,15 +4,15 @@
* Copyright (c) 2018 Machine Zone. All rights reserved.
*/
#include <string>
#include "IXHash.h"
namespace ix
{
uint64_t djb2Hash(const std::string& data)
uint64_t djb2Hash(const std::vector<uint8_t>& data)
{
uint64_t hashAddress = 5381;
for (auto& c : data)
for (auto&& c : data)
{
hashAddress = ((hashAddress << 5) + hashAddress) + c;
}

View File

@ -6,10 +6,10 @@
#pragma once
#include <string>
#include <vector>
namespace ix
{
uint64_t djb2Hash(const std::string& data);
uint64_t djb2Hash(const std::vector<uint8_t>& data);
}

68
ws/ws.cpp Normal file
View File

@ -0,0 +1,68 @@
/*
* ws.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
//
// Main drive for websocket utilities
//
#include <string>
#include <sstream>
#include <iostream>
#include <cli11/CLI11.hpp>
namespace ix
{
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate);
extern int ws_transfer_main(int port);
extern int ws_send_main(const std::string& url,
const std::string& path);
}
int main(int argc, char** argv)
{
CLI::App app{"ws is a websocket tool"};
app.require_subcommand();
std::string url;
std::string path;
int port = 8080;
CLI::App* sendApp = app.add_subcommand("send", "Send a file");
sendApp->add_option("url", url, "Connection url")->required();
sendApp->add_option("path", path, "Path to the file to send")->required();
CLI::App* receiveApp = app.add_subcommand("receive", "Receive a file");
receiveApp->add_option("url", url, "Connection url")->required();
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url");
CLI11_PARSE(app, argc, argv);
if (app.got_subcommand("transfer"))
{
return ix::ws_transfer_main(port);
}
else if (app.got_subcommand("send"))
{
return ix::ws_send_main(url, path);
}
else if (app.got_subcommand("receive"))
{
bool enablePerMessageDeflate = false;
return ix::ws_receive_main(url, enablePerMessageDeflate);
}
else
{
assert(false);
}
return 1;
}

251
ws/ws_receive.cpp Normal file
View File

@ -0,0 +1,251 @@
/*
* ws_receiver.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
#include <ixcrypto/IXUuid.h>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
#include <msgpack11/msgpack11.hpp>
using msgpack11::MsgPack;
namespace ix
{
class WebSocketReceiver
{
public:
WebSocketReceiver(const std::string& _url,
bool enablePerMessageDeflate);
void subscribe(const std::string& channel);
void start();
void stop();
void waitForConnection();
void waitForMessage();
void handleMessage(const std::string& str);
private:
std::string _url;
std::string _id;
ix::WebSocket _webSocket;
bool _enablePerMessageDeflate;
std::mutex _conditionVariableMutex;
std::condition_variable _condition;
std::string extractFilename(const std::string& path);
void handleError(const std::string& errMsg, const std::string& id);
void log(const std::string& msg);
};
WebSocketReceiver::WebSocketReceiver(const std::string& url,
bool enablePerMessageDeflate) :
_url(url),
_enablePerMessageDeflate(enablePerMessageDeflate)
{
;
}
void WebSocketReceiver::stop()
{
_webSocket.stop();
}
void WebSocketReceiver::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
void WebSocketReceiver::waitForConnection()
{
std::cout << "Connecting..." << std::endl;
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
void WebSocketReceiver::waitForMessage()
{
std::cout << "Waiting for message..." << std::endl;
std::unique_lock<std::mutex> lock(_conditionVariableMutex);
_condition.wait(lock);
}
// We should cleanup the file name and full path further to remove .. as well
std::string WebSocketReceiver::extractFilename(const std::string& path)
{
std::string::size_type idx;
idx = path.rfind('/');
if (idx != std::string::npos)
{
std::string filename = path.substr(idx+1);
return filename;
}
else
{
return path;
}
}
void WebSocketReceiver::handleError(const std::string& errMsg,
const std::string& id)
{
std::map<MsgPack, MsgPack> pdu;
pdu["kind"] = "error";
pdu["id"] = id;
pdu["message"] = errMsg;
MsgPack msg(pdu);
_webSocket.send(msg.dump());
}
void WebSocketReceiver::handleMessage(const std::string& str)
{
std::cerr << "Received message: " << str.size() << std::endl;
std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg);
if (!errMsg.empty())
{
handleError("Invalid MsgPack", std::string());
return;
}
std::cout << "id: " << data["id"].string_value() << std::endl;
std::vector<uint8_t> content = data["content"].binary_items();
std::cout << "Content size: " << content.size() << std::endl;
// Validate checksum
uint64_t cksum = ix::djb2Hash(content);
auto cksumRef = data["djb2_hash"].string_value();
std::cout << "Computed hash: " << cksum << std::endl;
std::cout << "Reference hash: " << cksumRef << std::endl;
if (std::to_string(cksum) != cksumRef)
{
handleError("Hash mismatch.", std::string());
return;
}
std::string filename = data["filename"].string_value();
filename = extractFilename(filename);
std::cout << "Writing to disk: " << filename << std::endl;
std::ofstream out(filename);
out.write((char*)&content.front(), content.size());
out.close();
std::map<MsgPack, MsgPack> pdu;
pdu["ack"] = true;
pdu["id"] = data["id"];
pdu["filename"] = data["filename"];
MsgPack msg(pdu);
_webSocket.send(msg.dump());
}
void WebSocketReceiver::start()
{
_webSocket.setUrl(_url);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
_enablePerMessageDeflate, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
_condition.notify_one();
log("ws_receive: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
ss << "ws_receive: connection closed:";
ss << " code " << closeInfo.code;
ss << " reason " << closeInfo.reason << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "ws_receive: transfered " << wireSize << " bytes";
log(ss.str());
handleMessage(str);
_condition.notify_one();
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void wsReceive(const std::string& url,
bool enablePerMessageDeflate)
{
WebSocketReceiver webSocketReceiver(url, enablePerMessageDeflate);
webSocketReceiver.start();
webSocketReceiver.waitForConnection();
webSocketReceiver.waitForMessage();
std::chrono::duration<double, std::milli> duration(1000);
std::this_thread::sleep_for(duration);
std::cout << "Done !" << std::endl;
webSocketReceiver.stop();
}
int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate)
{
Socket::init();
wsReceive(url, enablePerMessageDeflate);
return 0;
}
}

View File

@ -16,17 +16,12 @@
#include <ixcrypto/IXUuid.h>
#include <ixcrypto/IXBase64.h>
#include <ixcrypto/IXHash.h>
#include <jsoncpp/json/json.h>
#include <msgpack11/msgpack11.hpp>
using namespace ix;
using msgpack11::MsgPack;
namespace
namespace ix
{
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketSender
{
public:
@ -50,6 +45,8 @@ namespace
std::mutex _conditionVariableMutex;
std::condition_variable _condition;
void log(const std::string& msg);
};
WebSocketSender::WebSocketSender(const std::string& url,
@ -65,6 +62,11 @@ namespace
_webSocket.stop();
}
void WebSocketSender::log(const std::string& msg)
{
std::cout << msg << std::endl;
}
void WebSocketSender::waitForConnection()
{
std::cout << "Connecting..." << std::endl;
@ -81,22 +83,21 @@ namespace
_condition.wait(lock);
}
std::string load(const std::string& path)
std::vector<uint8_t> load(const std::string& path)
{
// std::vector<uint8_t> memblock;
std::string str;
std::vector<uint8_t> memblock;
std::ifstream file(path);
if (!file.is_open()) return std::string();
if (!file.is_open()) return memblock;
file.seekg(0, file.end);
std::streamoff size = file.tellg();
file.seekg(0, file.beg);
str.resize(size);
file.read((char*)&str.front(), static_cast<std::streamsize>(size));
memblock.resize(size);
file.read((char*)&memblock.front(), static_cast<std::streamsize>(size));
return str;
return memblock;
}
void WebSocketSender::start()
@ -142,19 +143,18 @@ namespace
{
_condition.notify_one();
ss << "ws_send: received message: "
<< str;
ss << "ws_send: received message (" << wireSize << " bytes)";
log(ss.str());
Json::Value data;
Json::Reader reader;
if (!reader.parse(str, data))
std::string errMsg;
MsgPack data = MsgPack::parse(str, errMsg);
if (!errMsg.empty())
{
std::cerr << "Invalid JSON response" << std::endl;
std::cerr << "Invalid MsgPack response" << std::endl;
return;
}
std::string id = data["id"].asString();
std::string id = data["id"].string_value();
if (_id != id)
{
std::cerr << "Invalid id" << std::endl;
@ -224,7 +224,7 @@ namespace
void WebSocketSender::sendMessage(const std::string& filename,
bool throttle)
{
std::string content;
std::vector<uint8_t> content;
{
Bench bench("load file from disk");
content = load(filename);
@ -232,21 +232,18 @@ namespace
_id = uuid4();
std::string b64Content;
{
Bench bench("base 64 encode file");
b64Content = base64_encode(content, content.size());
}
Json::Value pdu;
std::map<MsgPack, MsgPack> pdu;
pdu["kind"] = "send";
pdu["id"] = _id;
pdu["content"] = b64Content;
pdu["djb2_hash"] = djb2Hash(b64Content);
pdu["content"] = content;
auto hash = djb2Hash(content);
pdu["djb2_hash"] = std::to_string(hash);
pdu["filename"] = filename;
MsgPack msg(pdu);
Bench bench("Sending file through websocket");
_webSocket.send(pdu.toStyledString(),
_webSocket.send(msg.dump(),
[throttle](int current, int total) -> bool
{
std::cout << "Step " << current << " out of " << total << std::endl;
@ -262,7 +259,7 @@ namespace
bench.report();
auto duration = bench.getDuration();
auto transferRate = 1000 * b64Content.size() / duration;
auto transferRate = 1000 * content.size() / duration;
transferRate /= (1024 * 1024);
std::cout << "Send transfer rate: " << transferRate << "MB/s" << std::endl;
}
@ -285,22 +282,15 @@ namespace
std::cout << "Done !" << std::endl;
webSocketSender.stop();
}
}
int main(int argc, char** argv)
{
if (argc != 3)
int ws_send_main(const std::string& url,
const std::string& path)
{
std::cerr << "Usage: ws_send <url> <path>" << std::endl;
return 1;
bool throttle = false;
bool enablePerMessageDeflate = false;
Socket::init();
wsSend(url, path, enablePerMessageDeflate, throttle);
return 0;
}
std::string url = argv[1];
std::string path = argv[2];
bool throttle = false;
bool enablePerMessageDeflate = false;
Socket::init();
wsSend(url, path, enablePerMessageDeflate, throttle);
return 0;
}

72
ws/ws_transfer.cpp Normal file
View File

@ -0,0 +1,72 @@
/*
* ws_transfer.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocketServer.h>
namespace ix
{
int ws_transfer_main(int port)
{
std::cout << "Listening on port " << port << std::endl;
ix::WebSocketServer server(port);
server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket)
{
webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType,
const std::string& str,
size_t wireSize,
const ix::WebSocketErrorInfo& error,
const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
std::cerr << "New connection" << std::endl;
std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
std::cerr << "Closed connection" << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients())
{
if (client != webSocket)
{
client->send(str);
}
}
}
}
);
}
);
auto res = server.listen();
if (!res.first)
{
std::cerr << res.second << std::endl;
return 1;
}
server.start();
server.wait();
return 0;
}
}