Compare commits

..

22 Commits

Author SHA1 Message Date
1b03bf4555 linux build fix 2019-03-14 15:17:17 -07:00
977b995af9 replace uint8_t with uint64_t for the send/close requests types / use named variable to index into the _fildes array 2019-03-14 15:03:57 -07:00
310ab990bd set a default close reason string 2019-03-14 14:52:51 -07:00
d6b49b54d4 do not busy loop while sending 2019-03-14 14:48:08 -07:00
f00cf39462 remove docker folder 2019-03-14 14:48:02 -07:00
18550cf1cb send optimization + ws file transfer test 2019-03-14 14:47:53 -07:00
168918f807 Update README.md
Stop lying about Windows support ...
2019-03-13 23:10:40 -07:00
2750df8aa7 send can fail silently when sending would block (EWOULDBLOCK return for send) (#18)
* try to use a pipe for communication

* flush send buffer on the background thread

* cleanup

* linux fix / linux still use event fd for now

* cleanup
2019-03-13 23:09:45 -07:00
d6597d9f52 websocket send: make sure all data in the kernel buffer is sent 2019-03-11 22:16:55 -07:00
892ea375e3 add new message type when receiving message fragments 2019-03-11 11:12:43 -07:00
03abe77b5f ws broacast_server / can set serving hostname 2019-03-10 16:36:44 -07:00
e46eb8aa49 debian 9 unittest build fix 2019-03-10 16:07:48 -07:00
2c4862e0f1 asan test suite fix 2019-03-09 10:45:40 -08:00
fd69efa45c unittest + warning fix 2019-03-09 10:37:14 -08:00
e8aa15917f add ability to run with asan on macOS 2019-03-05 17:07:28 -08:00
b3d77f8902 fix compiler warnings in ws command line tool 2019-03-04 13:56:30 -08:00
9c3b0b08ec Socket code refactoring, plus stop polling with a 1s timeout in readBytes while we only want to poll with a 1ms timeout 2019-03-04 13:40:15 -08:00
fe7d94194c readBytes does not read bytes one by one but in chunks 2019-03-02 21:11:16 -08:00
d6c26d6aa8 create a blocking + cancellable Socket::readBytes method 2019-03-02 15:16:46 -08:00
8a74ddcd13 create a blocking + cancellable Socket::readBytes method 2019-03-02 11:01:51 -08:00
18e7189a07 more ws doc 2019-02-28 22:07:45 -08:00
785dd42c84 more ws doc 2019-02-28 22:03:48 -08:00
38 changed files with 738 additions and 265 deletions

View File

@ -1 +0,0 @@
docker/Dockerfile.debian

31
Dockerfile Normal file
View File

@ -0,0 +1,31 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install curl
RUN apt-get -y install python
RUN apt-get -y install netcat
# debian strech cmake is too old for building with Docker
COPY makefile .
RUN ["make", "install_cmake_for_linux"]
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-rc4-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
# RUN ["make"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "--port", "8765", "--host", "0.0.0.0"]

View File

@ -11,7 +11,6 @@ communication channels over a single TCP connection. *IXWebSocket* is a C++ libr
* iOS * iOS
* Linux * Linux
* Android * Android
* Windows (no TLS support yet)
## Examples ## Examples
@ -77,7 +76,10 @@ server.setOnConnectionCallback(
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
std::cerr << "New connection" << std::endl; std::cerr << "New connection" << std::endl;
// The uri the client did connect to.
std::cerr << "Uri: " << openInfo.uri << std::endl; std::cerr << "Uri: " << openInfo.uri << std::endl;
std::cerr << "Headers:" << std::endl; std::cerr << "Headers:" << std::endl;
for (auto it : openInfo.headers) for (auto it : openInfo.headers)
{ {
@ -178,6 +180,13 @@ CMakefiles for the library and the examples are available. This library has few
There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`. There is a Dockerfile for running some code on Linux, and a unittest which can be executed by typing `make test`.
You can build and install the ws command line tool with Homebrew.
```
brew create --cmake https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz
brew install IXWebSocket
```
## Implementation details ## Implementation details
### Per Message Deflate compression. ### Per Message Deflate compression.

View File

@ -1,16 +0,0 @@
FROM debian:stretch
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

View File

@ -1,11 +0,0 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

View File

@ -1,11 +0,0 @@
FROM alpine:3.8
RUN apk add --no-cache g++ musl-dev make cmake openssl-dev
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]
EXPOSE 8765
CMD ["ws_connect"]

View File

@ -1,22 +0,0 @@
FROM debian:stretch
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install gdb
RUN apt-get -y install screen
RUN apt-get -y install procps
RUN apt-get -y install lsof
RUN apt-get -y install libz-dev
RUN apt-get -y install vim
RUN apt-get -y install make
RUN apt-get -y install cmake
COPY . .
WORKDIR ws
RUN ["sh", "docker_build.sh"]
EXPOSE 8765
CMD ["/ws/ws", "transfer", "8765"]

View File

@ -1,8 +0,0 @@
FROM gcc:8
# RUN yum install -y gcc-c++ make cmake openssl-devel gdb
COPY . .
WORKDIR examples/ws_connect
RUN ["sh", "build_linux.sh"]

View File

@ -17,6 +17,8 @@
// cf Android/Kernel table here // cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel // https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
// //
// On macOS we use UNIX pipes to wake up select.
//
#include "IXEventFd.h" #include "IXEventFd.h"
@ -24,17 +26,29 @@
# include <sys/eventfd.h> # include <sys/eventfd.h>
#endif #endif
#ifndef _WIN32
#include <unistd.h> // for write #include <unistd.h> // for write
#endif #include <fcntl.h>
namespace ix namespace ix
{ {
EventFd::EventFd() : // File descriptor at index 0 in _fildes is the read end of the pipe
_eventfd(-1) // File descriptor at index 1 in _fildes is the write end of the pipe
const int EventFd::kPipeReadIndex = 0;
const int EventFd::kPipeWriteIndex = 1;
EventFd::EventFd()
{ {
#ifdef __linux__ #ifdef __linux__
_eventfd = -1;
_eventfd = eventfd(0, 0); _eventfd = eventfd(0, 0);
fcntl(_eventfd, F_SETFL, O_NONBLOCK);
#else
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
pipe(_fildes);
fcntl(_fildes[kPipeReadIndex], F_SETFL, O_NONBLOCK);
fcntl(_fildes[kPipeWriteIndex], F_SETFL, O_NONBLOCK);
#endif #endif
} }
@ -42,22 +56,43 @@ namespace ix
{ {
#ifdef __linux__ #ifdef __linux__
::close(_eventfd); ::close(_eventfd);
#else
::close(_fildes[kPipeReadIndex]);
::close(_fildes[kPipeWriteIndex]);
_fildes[kPipeReadIndex] = -1;
_fildes[kPipeWriteIndex] = -1;
#endif #endif
} }
bool EventFd::notify() bool EventFd::notify(uint64_t value)
{ {
#if defined(__linux__) int fd;
if (_eventfd == -1) return false;
// select will wake up when a non-zero value is written to our eventfd #if defined(__linux__)
uint64_t value = 1; fd = _eventfd;
#else
fd = _fildes[kPipeWriteIndex];
#endif
if (fd == -1) return false;
// we should write 8 bytes for an uint64_t // we should write 8 bytes for an uint64_t
return write(_eventfd, &value, sizeof(value)) == 8; return write(fd, &value, sizeof(value)) == 8;
}
// TODO: return max uint64_t for errors ?
uint64_t EventFd::read()
{
int fd;
#if defined(__linux__)
fd = _eventfd;
#else #else
return true; fd = _fildes[kPipeReadIndex];
#endif #endif
uint64_t value = 0;
::read(fd, &value, sizeof(value));
return value;
} }
bool EventFd::clear() bool EventFd::clear()
@ -77,6 +112,10 @@ namespace ix
int EventFd::getFd() int EventFd::getFd()
{ {
#if defined(__linux__)
return _eventfd; return _eventfd;
#else
return _fildes[kPipeReadIndex];
#endif
} }
} }

View File

@ -6,6 +6,8 @@
#pragma once #pragma once
#include <stdint.h>
namespace ix namespace ix
{ {
class EventFd { class EventFd {
@ -13,11 +15,23 @@ namespace ix
EventFd(); EventFd();
virtual ~EventFd(); virtual ~EventFd();
bool notify(); bool notify(uint64_t value);
bool clear(); bool clear();
uint64_t read();
int getFd(); int getFd();
private: private:
#if defined(__linux__)
int _eventfd; int _eventfd;
#else
// Store file descriptors used by the communication pipe. Communication
// happens between a control thread and a background thread, which is
// blocked on select.
int _fildes[2];
#endif
// Used to identify the read/write idx
static const int kPipeReadIndex;
static const int kPipeWriteIndex;
}; };
} }

View File

@ -231,19 +231,17 @@ namespace ix
payload.reserve(contentLength); payload.reserve(contentLength);
// FIXME: very inefficient way to read bytes, but it works... auto chunkResult = _socket->readBytes(contentLength,
for (int i = 0; i < contentLength; ++i) args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{ {
char c; errorMsg = "Cannot read chunk";
if (!_socket->readByte(&c, isCancellationRequested)) return std::make_tuple(code, HttpErrorCode_ChunkReadError,
{ headers, payload, errorMsg,
return std::make_tuple(code, HttpErrorCode_ReadError,
headers, payload, "Cannot read byte",
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload += chunkResult.second;
payload += c;
}
} }
else if (headers.find("Transfer-Encoding") != headers.end() && else if (headers.find("Transfer-Encoding") != headers.end() &&
headers["Transfer-Encoding"] == "chunked") headers["Transfer-Encoding"] == "chunked")
@ -277,22 +275,20 @@ namespace ix
payload.reserve(payload.size() + chunkSize); payload.reserve(payload.size() + chunkSize);
// Read another line // Read a chunk
auto chunkResult = _socket->readBytes(chunkSize,
for (uint64_t i = 0; i < chunkSize; ++i) args.onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{ {
char c; errorMsg = "Cannot read chunk";
if (!_socket->readByte(&c, isCancellationRequested))
{
errorMsg = "Cannot read byte";
return std::make_tuple(code, HttpErrorCode_ChunkReadError, return std::make_tuple(code, HttpErrorCode_ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload += chunkResult.second;
payload += c; // Read the line that terminates the chunk (\r\n)
}
lineResult = _socket->readLine(isCancellationRequested); lineResult = _socket->readLine(isCancellationRequested);
if (!lineResult.first) if (!lineResult.first)

View File

@ -61,6 +61,7 @@ namespace ix
bool verbose; bool verbose;
bool compress; bool compress;
Logger logger; Logger logger;
OnProgressCallback onProgressCallback;
}; };
class HttpClient { class HttpClient {

View File

@ -23,11 +23,14 @@ namespace ix
{ {
const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
const uint64_t Socket::kSendRequest = 1;
const uint64_t Socket::kCloseRequest = 2;
constexpr size_t Socket::kChunkSize;
Socket::Socket(int fd) : Socket::Socket(int fd) :
_sockfd(fd) _sockfd(fd)
{ {
;
} }
Socket::~Socket() Socket::~Socket()
@ -39,25 +42,37 @@ namespace ix
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
onPollCallback(PollResultType_Error); if (onPollCallback) onPollCallback(PollResultType_Error);
return; return;
} }
PollResultType pollResult = select(timeoutSecs, 0);
if (onPollCallback) onPollCallback(pollResult);
}
PollResultType Socket::select(int timeoutSecs, int timeoutMs)
{
fd_set rfds; fd_set rfds;
FD_ZERO(&rfds); FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds); FD_SET(_sockfd, &rfds);
#ifdef __linux__ // File descriptor at index 0 in _fildes is the read end of the pipe
FD_SET(_eventfd.getFd(), &rfds); int eventfd = _eventfd.getFd();
#endif if (eventfd != -1)
{
FD_SET(eventfd, &rfds);
}
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = timeoutSecs; timeout.tv_sec = timeoutSecs;
timeout.tv_usec = 0; timeout.tv_usec = 1000 * timeoutMs;
// Compute the highest fd.
int sockfd = _sockfd; int sockfd = _sockfd;
int nfds = (std::max)(sockfd, _eventfd.getFd()); int nfds = (std::max)(sockfd, eventfd);
int ret = select(nfds + 1, &rfds, nullptr, nullptr,
int ret = ::select(nfds + 1, &rfds, nullptr, nullptr,
(timeoutSecs < 0) ? nullptr : &timeout); (timeoutSecs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType_ReadyForRead; PollResultType pollResult = PollResultType_ReadyForRead;
@ -69,14 +84,27 @@ namespace ix
{ {
pollResult = PollResultType_Timeout; pollResult = PollResultType_Timeout;
} }
else if (eventfd != -1 && FD_ISSET(eventfd, &rfds))
{
uint64_t value = _eventfd.read();
onPollCallback(pollResult); if (value == kSendRequest)
{
pollResult = PollResultType_SendRequest;
}
else if (value == kCloseRequest)
{
pollResult = PollResultType_CloseRequest;
}
} }
void Socket::wakeUpFromPoll() return pollResult;
}
// Wake up from poll/select by writing to the pipe which is watched by select
bool Socket::wakeUpFromPoll(uint8_t wakeUpCode)
{ {
// this will wake up the thread blocked on select, only needed on Linux return _eventfd.notify(wakeUpCode);
_eventfd.notify();
} }
bool Socket::connect(const std::string& host, bool Socket::connect(const std::string& host,
@ -165,51 +193,6 @@ namespace ix
#endif #endif
} }
bool Socket::readByte(void* buffer,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
if (ret == 1)
{
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a timeout until something is written.
// This way we are not busy looping
fd_set rfds;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 1 * 1000; // 1ms timeout
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
if (select(_sockfd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{
return false;
}
continue;
}
// There was an error during the read, abort
else
{
return false;
}
}
}
bool Socket::writeBytes(const std::string& str, bool Socket::writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
@ -241,7 +224,43 @@ namespace ix
} }
} }
std::pair<bool, std::string> Socket::readLine(const CancellationRequest& isCancellationRequested) bool Socket::readByte(void* buffer,
const CancellationRequest& isCancellationRequested)
{
while (true)
{
if (isCancellationRequested()) return false;
ssize_t ret;
ret = recv(buffer, 1);
// We read one byte, as needed, all good.
if (ret == 1)
{
return true;
}
// There is possibly something to be read, try again
else if (ret < 0 && (getErrno() == EWOULDBLOCK ||
getErrno() == EAGAIN))
{
// Wait with a timeout until something is ready to read.
// This way we are not busy looping
int res = select(0, 1);
if (res < 0 && (errno == EBADF || errno == EINVAL))
{
return false;
}
}
// There was an error during the read, abort
else
{
return false;
}
}
}
std::pair<bool, std::string> Socket::readLine(
const CancellationRequest& isCancellationRequested)
{ {
char c; char c;
std::string line; std::string line;
@ -251,7 +270,8 @@ namespace ix
{ {
if (!readByte(&c, isCancellationRequested)) if (!readByte(&c, isCancellationRequested))
{ {
return std::make_pair(false, std::string()); // Return what we were able to read
return std::make_pair(false, line);
} }
line += c; line += c;
@ -259,4 +279,46 @@ namespace ix
return std::make_pair(true, line); return std::make_pair(true, line);
} }
std::pair<bool, std::string> Socket::readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested)
{
if (_readBuffer.empty())
{
_readBuffer.resize(kChunkSize);
}
std::vector<uint8_t> output;
while (output.size() != length)
{
if (isCancellationRequested()) return std::make_pair(false, std::string());
int size = std::min(kChunkSize, length - output.size());
ssize_t ret = recv((char*)&_readBuffer[0], size);
if (ret <= 0 && (getErrno() != EWOULDBLOCK &&
getErrno() != EAGAIN))
{
// Error
return std::make_pair(false, std::string());
}
else if (ret > 0)
{
output.insert(output.end(),
_readBuffer.begin(),
_readBuffer.begin() + ret);
}
if (onProgressCallback) onProgressCallback((int) output.size(), (int) length);
// Wait with a timeout until something is ready to read.
// This way we are not busy looping
select(0, 1);
}
return std::make_pair(true, std::string(output.begin(),
output.end()));
}
} }

View File

@ -10,14 +10,16 @@
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <vector>
#ifdef _WIN32 #ifdef _WIN32
#include <BaseTsd.h> #include <BaseTsd.h>
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
#endif #endif
#include "IXEventFd.h"
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include "IXProgressCallback.h"
#include "IXEventFd.h"
namespace ix namespace ix
{ {
@ -25,7 +27,9 @@ namespace ix
{ {
PollResultType_ReadyForRead = 0, PollResultType_ReadyForRead = 0,
PollResultType_Timeout = 1, PollResultType_Timeout = 1,
PollResultType_Error = 2 PollResultType_Error = 2,
PollResultType_SendRequest = 3,
PollResultType_CloseRequest = 4
}; };
class Socket { class Socket {
@ -37,9 +41,10 @@ namespace ix
void configure(); void configure();
PollResultType select(int timeoutSecs, int timeoutMs);
virtual void poll(const OnPollCallback& onPollCallback, virtual void poll(const OnPollCallback& onPollCallback,
int timeoutSecs = kDefaultPollTimeout); int timeoutSecs = kDefaultPollTimeout);
virtual void wakeUpFromPoll(); virtual bool wakeUpFromPoll(uint8_t wakeUpCode);
// Virtual methods // Virtual methods
virtual bool connect(const std::string& url, virtual bool connect(const std::string& url,
@ -58,21 +63,36 @@ namespace ix
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
bool writeBytes(const std::string& str, bool writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readLine(
const CancellationRequest& isCancellationRequested);
std::pair<bool, std::string> readBytes(
size_t length,
const OnProgressCallback& onProgressCallback,
const CancellationRequest& isCancellationRequested);
static int getErrno(); static int getErrno();
static bool init(); // Required on Windows to initialize WinSocket static bool init(); // Required on Windows to initialize WinSocket
static void cleanup(); // Required on Windows to cleanup WinSocket static void cleanup(); // Required on Windows to cleanup WinSocket
// Used as special codes for pipe communication
static const uint64_t kSendRequest;
static const uint64_t kCloseRequest;
protected: protected:
void closeSocket(int fd); void closeSocket(int fd);
std::atomic<int> _sockfd; std::atomic<int> _sockfd;
std::mutex _socketMutex; std::mutex _socketMutex;
EventFd _eventfd;
private: private:
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout; static const int kDefaultPollNoTimeout;
// Buffer for reading from our socket. That buffer is never resized.
std::vector<uint8_t> _readBuffer;
static constexpr size_t kChunkSize = 1 << 15;
EventFd _eventfd;
}; };
} }

View File

@ -252,6 +252,11 @@ namespace ix
{ {
webSocketMessageType = WebSocket_MessageType_Pong; webSocketMessageType = WebSocket_MessageType_Pong;
} break; } break;
case WebSocketTransport::FRAGMENT:
{
webSocketMessageType = WebSocket_MessageType_Fragment;
} break;
} }
WebSocketErrorInfo webSocketErrorInfo; WebSocketErrorInfo webSocketErrorInfo;
@ -374,4 +379,9 @@ namespace ix
{ {
_automaticReconnection = false; _automaticReconnection = false;
} }
size_t WebSocket::bufferedAmount() const
{
return _ws.bufferedAmount();
}
} }

View File

@ -39,7 +39,8 @@ namespace ix
WebSocket_MessageType_Close = 2, WebSocket_MessageType_Close = 2,
WebSocket_MessageType_Error = 3, WebSocket_MessageType_Error = 3,
WebSocket_MessageType_Ping = 4, WebSocket_MessageType_Ping = 4,
WebSocket_MessageType_Pong = 5 WebSocket_MessageType_Pong = 5,
WebSocket_MessageType_Fragment = 6
}; };
struct WebSocketOpenInfo struct WebSocketOpenInfo
@ -111,6 +112,7 @@ namespace ix
const std::string& getUrl() const; const std::string& getUrl() const;
const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const; const WebSocketPerMessageDeflateOptions& getPerMessageDeflateOptions() const;
int getHeartBeatPeriod() const; int getHeartBeatPeriod() const;
size_t bufferedAmount() const;
void enableAutomaticReconnection(); void enableAutomaticReconnection();
void disableAutomaticReconnection(); void disableAutomaticReconnection();

View File

@ -1,7 +1,31 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2012, 2013 <dhbaird@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/* /*
* IXWebSocketTransport.cpp * IXWebSocketTransport.cpp
* Author: Benjamin Sergeant * Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. * Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/ */
// //
@ -14,14 +38,6 @@
#include "IXUrlParser.h" #include "IXUrlParser.h"
#include "IXSocketFactory.h" #include "IXSocketFactory.h"
#ifdef IXWEBSOCKET_USE_TLS
# ifdef __APPLE__
# include "IXSocketAppleSSL.h"
# else
# include "IXSocketOpenSSL.h"
# endif
#endif
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -80,16 +96,6 @@ namespace ix
std::string("Could not parse URL ") + url); std::string("Could not parse URL ") + url);
} }
if (protocol != "ws" && protocol != "wss")
{
std::stringstream ss;
ss << "Invalid protocol: " << protocol
<< " for url " << url
<< " . Supported protocols are ws and wss";
return WebSocketInitResult(false, 0, ss.str());
}
bool tls = protocol == "wss"; bool tls = protocol == "wss";
std::string errorMsg; std::string errorMsg;
_socket = createSocket(tls, errorMsg); _socket = createSocket(tls, errorMsg);
@ -184,9 +190,24 @@ namespace ix
std::stringstream ss; std::stringstream ss;
ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s"; ss << kHeartBeatPingMessage << "::" << _heartBeatPeriod << "s";
sendPing(ss.str()); sendPing(ss.str());
return;
} }
// Make sure we send all the buffered data
// there can be a lot of it for large messages.
else if (pollResult == PollResultType_SendRequest)
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
sendOnSocket();
// Sleep 10ms between each send so that we dont busy loop
// A better strategy would be to select on the socket to
// check whether we can write to it without blocking
std::chrono::duration<double, std::micro> duration(10);
std::this_thread::sleep_for(duration);
}
}
else if (pollResult == PollResultType_ReadyForRead)
{
while (true) while (true)
{ {
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
@ -210,12 +231,16 @@ namespace ix
_readbuf.begin() + ret); _readbuf.begin() + ret);
} }
} }
}
if (isSendBufferEmpty() && _readyState == CLOSING) else if (pollResult == PollResultType_Error)
{ {
_socket->close(); _socket->close();
setReadyState(CLOSED);
} }
else if (pollResult == PollResultType_CloseRequest)
{
;
}
}, },
_heartBeatPeriod); _heartBeatPeriod);
} }
@ -392,6 +417,10 @@ namespace ix
emitMessage(MSG, getMergedChunks(), ws, onMessageCallback); emitMessage(MSG, getMergedChunks(), ws, onMessageCallback);
_chunks.clear(); _chunks.clear();
} }
else
{
emitMessage(FRAGMENT, std::string(), ws, onMessageCallback);
}
} }
} }
else if (ws.opcode == wsheader_type::PING) else if (ws.opcode == wsheader_type::PING)
@ -475,7 +504,7 @@ namespace ix
size_t wireSize = message.size(); size_t wireSize = message.size();
// When the RSV1 bit is 1 it means the message is compressed // When the RSV1 bit is 1 it means the message is compressed
if (_enablePerMessageDeflate && ws.rsv1) if (_enablePerMessageDeflate && ws.rsv1 && messageKind != FRAGMENT)
{ {
std::string decompressedMessage; std::string decompressedMessage;
bool success = _perMessageDeflate.decompress(message, decompressedMessage); bool success = _perMessageDeflate.decompress(message, decompressedMessage);
@ -573,7 +602,7 @@ namespace ix
// Send message // Send message
sendFragment(opcodeType, fin, begin, end, compress); sendFragment(opcodeType, fin, begin, end, compress);
if (onProgressCallback && !onProgressCallback(i, steps)) if (onProgressCallback && !onProgressCallback((int)i, (int) steps))
{ {
break; break;
} }
@ -582,6 +611,12 @@ namespace ix
} }
} }
// Request to flush the send buffer on the background thread if it isn't empty
if (!isSendBufferEmpty())
{
_socket->wakeUpFromPoll(Socket::kSendRequest);
}
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize); return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
} }
@ -727,8 +762,18 @@ namespace ix
sendData(wsheader_type::CLOSE, normalClosure, compress); sendData(wsheader_type::CLOSE, normalClosure, compress);
setReadyState(CLOSING); setReadyState(CLOSING);
_socket->wakeUpFromPoll(); _socket->wakeUpFromPoll(Socket::kCloseRequest);
_socket->close(); _socket->close();
_closeCode = 1000;
_closeReason = "Normal Closure";
setReadyState(CLOSED);
}
size_t WebSocketTransport::bufferedAmount() const
{
std::lock_guard<std::mutex> lock(_txbufMutex);
return _txbuf.size();
} }
} // namespace ix } // namespace ix

View File

@ -45,7 +45,8 @@ namespace ix
{ {
MSG, MSG,
PING, PING,
PONG PONG,
FRAGMENT
}; };
using OnMessageCallback = std::function<void(const std::string&, using OnMessageCallback = std::function<void(const std::string&,
@ -76,6 +77,7 @@ namespace ix
void setReadyState(ReadyStateValues readyStateValue); void setReadyState(ReadyStateValues readyStateValue);
void setOnCloseCallback(const OnCloseCallback& onCloseCallback); void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
void dispatch(const OnMessageCallback& onMessageCallback); void dispatch(const OnMessageCallback& onMessageCallback);
size_t bufferedAmount() const;
private: private:
std::string _url; std::string _url;

View File

@ -8,10 +8,10 @@ brew:
.PHONY: docker .PHONY: docker
docker: docker:
docker build -t broadcast_server:latest . docker build -t ws:latest .
run: run:
docker run --cap-add sys_ptrace -it broadcast_server:latest bash docker run --cap-add sys_ptrace -it ws:latest
# this is helpful to remove trailing whitespaces # this is helpful to remove trailing whitespaces
trail: trail:
@ -36,6 +36,9 @@ test_server:
test: test:
python test/run.py python test/run.py
ws_test:
(cd ws ; sh test_ws.sh)
# For the fork that is configured with appveyor # For the fork that is configured with appveyor
rebase_upstream: rebase_upstream:
git fetch upstream git fetch upstream
@ -43,5 +46,9 @@ rebase_upstream:
git reset --hard upstream/master git reset --hard upstream/master
git push origin master --force git push origin master --force
install_cmake_for_linux:
mkdir -p /tmp/cmake
(cd /tmp/cmake ; curl -L -O https://github.com/Kitware/CMake/releases/download/v3.14.0-rc4/cmake-3.14.0-rc4-Linux-x86_64.tar.gz ; tar zxf cmake-3.14.0-rc4-Linux-x86_64.tar.gz)
.PHONY: test .PHONY: test
.PHONY: build .PHONY: build

View File

@ -18,6 +18,7 @@
#include "IXTest.h" #include "IXTest.h"
#include "catch.hpp" #include "catch.hpp"
#include <string.h>
using namespace ix; using namespace ix;
@ -65,7 +66,13 @@ TEST_CASE("socket", "[socket]")
std::shared_ptr<Socket> socket(new Socket); std::shared_ptr<Socket> socket(new Socket);
std::string host("www.google.com"); std::string host("www.google.com");
int port = 80; int port = 80;
std::string request("GET / HTTP/1.1\r\n\r\n");
std::stringstream ss;
ss << "GET / HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
ss << "\r\n";
std::string request(ss.str());
int expectedStatus = 200; int expectedStatus = 200;
int timeoutSecs = 3; int timeoutSecs = 3;

View File

@ -69,10 +69,15 @@ namespace ix
Logger() << msg; Logger() << msg;
} }
int getAnyFreePortSimple()
{
static int defaultPort = 8090;
return defaultPort++;
}
int getAnyFreePort() int getAnyFreePort()
{ {
int defaultPort = 8090; int defaultPort = 8090;
int sockfd; int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{ {
@ -122,8 +127,15 @@ namespace ix
{ {
while (true) while (true)
{ {
#if defined(__has_feature)
# if __has_feature(address_sanitizer)
int port = getAnyFreePortSimple();
# else
int port = getAnyFreePort(); int port = getAnyFreePort();
# endif
#else
int port = getAnyFreePort();
#endif
// //
// Only port above 1024 can be used by non root users, but for some // Only port above 1024 can be used by non root users, but for some
// reason I got port 7 returned with macOS when binding on port 0... // reason I got port 7 returned with macOS when binding on port 0...

View File

@ -164,10 +164,21 @@ namespace
ss << "cmd_websocket_chat: Error ! " << error.reason; ss << "cmd_websocket_chat: Error ! " << error.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Ping)
{
log("cmd_websocket_chat: received ping message");
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
log("cmd_websocket_chat: received pong message");
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
log("cmd_websocket_chat: received message fragment");
}
else else
{ {
// FIXME: missing ping/pong messages ss << "Unexpected ix::WebSocketMessageType";
ss << "Invalid ix::WebSocketMessageType";
log(ss.str()); log(ss.str());
} }
}); });

View File

@ -6,10 +6,10 @@ osName = platform.system()
print('os name = {}'.format(osName)) print('os name = {}'.format(osName))
root = os.path.dirname(os.path.realpath(__file__)) root = os.path.dirname(os.path.realpath(__file__))
buildDir = os.path.join(root, 'build') buildDir = os.path.join(root, 'build', osName)
if not os.path.exists(buildDir): if not os.path.exists(buildDir):
os.mkdir(buildDir) os.makedirs(buildDir)
os.chdir(buildDir) os.chdir(buildDir)
@ -38,7 +38,7 @@ sanitizerFlags = sanitizersFlags[sanitizer]
# os.environ['CC'] = 'clang-cl' # os.environ['CC'] = 'clang-cl'
# os.environ['CXX'] = 'clang-cl' # os.environ['CXX'] = 'clang-cl'
cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ..'.format(generator, sanitizerFlags) cmakeCmd = 'cmake -DCMAKE_BUILD_TYPE=Debug {} {} ../..'.format(generator, sanitizerFlags)
print(cmakeCmd) print(cmakeCmd)
ret = os.system(cmakeCmd) ret = os.system(cmakeCmd)
assert ret == 0, 'CMake failed, exiting' assert ret == 0, 'CMake failed, exiting'
@ -67,6 +67,7 @@ def findFiles(prefix):
# We need to copy the zlib DLL in the current work directory # We need to copy the zlib DLL in the current work directory
shutil.copy(os.path.join( shutil.copy(os.path.join(
'..',
'..', '..',
'..', '..',
'third_party', 'third_party',
@ -77,6 +78,8 @@ shutil.copy(os.path.join(
'bin', 'bin',
'zlib.dll'), '.') 'zlib.dll'), '.')
testCommand = '{} {}'.format(testBinary, os.getenv('TEST', '')) lldb = "lldb --batch -o 'run' -k 'thread backtrace all' -k 'quit 1'"
lldb = "" # Disabled for now
testCommand = '{} {} {}'.format(lldb, testBinary, os.getenv('TEST', ''))
ret = os.system(testCommand) ret = os.system(testCommand)
assert ret == 0, 'Test command failed' assert ret == 0, 'Test command failed'

20
third_party/homebrew_formula.rb vendored Normal file
View File

@ -0,0 +1,20 @@
class Ixwebsocket < Formula
desc "WebSocket client and server, and HTTP client command-line tool"
homepage "https://github.com/machinezone/IXWebSocket"
url "https://github.com/machinezone/IXWebSocket/archive/v1.1.0.tar.gz"
sha256 "52592ce3d0a67ad0f90ac9e8a458f61724175d95a01a38d1bad3fcdc5c7b6666"
depends_on "cmake" => :build
def install
system "cmake", ".", *std_cmake_args
system "make", "install"
end
test do
system "#{bin}/ws", "--help"
system "#{bin}/ws", "send", "--help"
system "#{bin}/ws", "receive", "--help"
system "#{bin}/ws", "transfer", "--help"
system "#{bin}/ws", "curl", "--help"
end
end

View File

@ -1,10 +1,62 @@
# General
ws is a command line tool that should exercise most of the IXWebSocket code, and provide example code.
```
$ ws --help
ws is a websocket tool
Usage: ws [OPTIONS] SUBCOMMAND
Options:
-h,--help Print this help message and exit
Subcommands:
send Send a file
receive Receive a file
transfer Broadcasting server
connect Connect to a remote server
chat Group chat
echo_server Echo server
broadcast_server Broadcasting server
ping Ping pong
curl HTTP Client
```
## file transfer
``` ```
# Start transfer server, which is just a broadcast server at this point # Start transfer server, which is just a broadcast server at this point
./ws transfer # running on port 8080. ws transfer # running on port 8080.
# Start receiver first # Start receiver first
./ws receive ws://localhost:8080 ws receive ws://localhost:8080
# Then send a file. File will be received and written to disk by the receiver process # Then send a file. File will be received and written to disk by the receiver process
./ws send ws://localhost:8080 /file/to/path ws send ws://localhost:8080 /file/to/path
```
## curl
```
$ ws curl --help
HTTP Client
Usage: ws curl [OPTIONS] url
Positionals:
url TEXT REQUIRED Connection url
Options:
-h,--help Print this help message and exit
-d TEXT Form data
-F TEXT Form data
-H TEXT Header
--output TEXT Output file
-I Send a HEAD request
-L Follow redirects
--max-redirects INT Max Redirects
-v Verbose
-O Save output to disk
--compress Enable gzip compression
--connect-timeout INT Connection timeout
--transfer-timeout INT Transfer timeout
``` ```

View File

@ -13,6 +13,7 @@ g++ --std=c++14 \
../ixwebsocket/IXSocket.cpp \ ../ixwebsocket/IXSocket.cpp \
../ixwebsocket/IXSocketServer.cpp \ ../ixwebsocket/IXSocketServer.cpp \
../ixwebsocket/IXSocketConnect.cpp \ ../ixwebsocket/IXSocketConnect.cpp \
../ixwebsocket/IXSocketFactory.cpp \
../ixwebsocket/IXDNSLookup.cpp \ ../ixwebsocket/IXDNSLookup.cpp \
../ixwebsocket/IXCancellationRequest.cpp \ ../ixwebsocket/IXCancellationRequest.cpp \
../ixwebsocket/IXWebSocket.cpp \ ../ixwebsocket/IXWebSocket.cpp \
@ -22,12 +23,16 @@ g++ --std=c++14 \
../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \ ../ixwebsocket/IXWebSocketPerMessageDeflate.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp \ ../ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp \
../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \ ../ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp \
../ixwebsocket/IXWebSocketHttpHeaders.cpp \
../ixwebsocket/IXHttpClient.cpp \
../ixwebsocket/IXUrlParser.cpp \
../ixwebsocket/IXSocketOpenSSL.cpp \ ../ixwebsocket/IXSocketOpenSSL.cpp \
../ixwebsocket/linux/IXSetThreadName_linux.cpp \ ../ixwebsocket/linux/IXSetThreadName_linux.cpp \
../third_party/jsoncpp/jsoncpp.cpp \ ../third_party/msgpack11/msgpack11.cpp \
ixcrypto/IXBase64.cpp \ ixcrypto/IXBase64.cpp \
ixcrypto/IXHash.cpp \ ixcrypto/IXHash.cpp \
ixcrypto/IXUuid.cpp \ ixcrypto/IXUuid.cpp \
ws_http_client.cpp \
ws_ping_pong.cpp \ ws_ping_pong.cpp \
ws_broadcast_server.cpp \ ws_broadcast_server.cpp \
ws_echo_server.cpp \ ws_echo_server.cpp \

52
ws/test_ws.sh Normal file
View File

@ -0,0 +1,52 @@
#!/bin/sh
rm -rf /tmp/ws_test
mkdir -p /tmp/ws_test
# Start a transport server
cd /tmp/ws_test
ws transfer --port 8090 --pidfile /tmp/ws_test/pidfile &
# Wait until the transfer server is up
while true
do
nc -zv 127.0.0.1 8090 && {
echo "Transfer server up and running"
break
}
echo "sleep ..."
sleep 0.1
done
# Start a receiver
mkdir -p /tmp/ws_test/receive
cd /tmp/ws_test/receive
ws receive ws://127.0.0.1:8090 &
mkdir /tmp/ws_test/send
cd /tmp/ws_test/send
# mkfile 10m 10M_file
dd if=/dev/urandom of=10M_file count=10000 bs=1024
# Start the sender job
ws send ws://127.0.0.1:8090 10M_file
# Wait until the file has been written to disk
while true
do
if test -f /tmp/ws_test/receive/10M_file ; then
echo "Received file does exists, exiting loop"
break
fi
echo "sleep ..."
sleep 0.1
done
cksum /tmp/ws_test/send/10M_file
cksum /tmp/ws_test/receive/10M_file
# Give some time to ws receive to terminate
sleep 2
# Cleanup
kill `cat /tmp/ws_test/pidfile`

View File

@ -16,6 +16,8 @@
#include <string> #include <string>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <fstream>
#include <unistd.h>
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
@ -31,6 +33,8 @@ int main(int argc, char** argv)
std::string data; std::string data;
std::string headers; std::string headers;
std::string output; std::string output;
std::string hostname("127.0.0.1");
std::string pidfile;
bool headersOnly = false; bool headersOnly = false;
bool followRedirects = false; bool followRedirects = false;
bool verbose = false; bool verbose = false;
@ -50,6 +54,8 @@ int main(int argc, char** argv)
CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server"); CLI::App* transferApp = app.add_subcommand("transfer", "Broadcasting server");
transferApp->add_option("--port", port, "Connection url"); transferApp->add_option("--port", port, "Connection url");
transferApp->add_option("--host", hostname, "Hostname");
transferApp->add_option("--pidfile", pidfile, "Pid file");
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
@ -59,10 +65,12 @@ int main(int argc, char** argv)
chatApp->add_option("user", user, "User name")->required(); chatApp->add_option("user", user, "User name")->required();
CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server"); CLI::App* echoServerApp = app.add_subcommand("echo_server", "Echo server");
echoServerApp->add_option("--port", port, "Connection url"); echoServerApp->add_option("--port", port, "Port");
echoServerApp->add_option("--host", hostname, "Hostname");
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server"); CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
broadcastServerApp->add_option("--port", port, "Connection url"); broadcastServerApp->add_option("--port", port, "Port");
broadcastServerApp->add_option("--host", hostname, "Hostname");
CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong"); CLI::App* pingPongApp = app.add_subcommand("ping", "Ping pong");
pingPongApp->add_option("url", url, "Connection url")->required(); pingPongApp->add_option("url", url, "Connection url")->required();
@ -86,9 +94,21 @@ int main(int argc, char** argv)
ix::Socket::init(); ix::Socket::init();
// pid file handling
if (app.got_subcommand("transfer")) if (app.got_subcommand("transfer"))
{ {
return ix::ws_transfer_main(port); if (!pidfile.empty())
{
unlink(pidfile.c_str());
std::ofstream f;
f.open(pidfile);
f << getpid();
f.close();
}
return ix::ws_transfer_main(port, hostname);
} }
else if (app.got_subcommand("send")) else if (app.got_subcommand("send"))
{ {
@ -109,11 +129,11 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("echo_server")) else if (app.got_subcommand("echo_server"))
{ {
return ix::ws_echo_server_main(port); return ix::ws_echo_server_main(port, hostname);
} }
else if (app.got_subcommand("broadcast_server")) else if (app.got_subcommand("broadcast_server"))
{ {
return ix::ws_broadcast_server_main(port); return ix::ws_broadcast_server_main(port, hostname);
} }
else if (app.got_subcommand("ping")) else if (app.got_subcommand("ping"))
{ {

View File

@ -24,9 +24,9 @@ namespace ix
int ws_ping_pong_main(const std::string& url); int ws_ping_pong_main(const std::string& url);
int ws_echo_server_main(int port); int ws_echo_server_main(int port, const std::string& hostname);
int ws_broadcast_server_main(int port, const std::string& hostname);
int ws_broadcast_server_main(int port); int ws_transfer_main(int port, const std::string& hostname);
int ws_chat_main(const std::string& url, int ws_chat_main(const std::string& url,
const std::string& user); const std::string& user);
@ -36,8 +36,6 @@ namespace ix
int ws_receive_main(const std::string& url, int ws_receive_main(const std::string& url,
bool enablePerMessageDeflate); bool enablePerMessageDeflate);
int ws_transfer_main(int port);
int ws_send_main(const std::string& url, int ws_send_main(const std::string& url,
const std::string& path); const std::string& path);
} }

View File

@ -10,11 +10,11 @@
namespace ix namespace ix
{ {
int ws_broadcast_server_main(int port) int ws_broadcast_server_main(int port, const std::string& hostname)
{ {
std::cout << "Listening on port " << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<ix::WebSocket> webSocket)
@ -39,16 +39,47 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
std::cerr << "Closed connection" << std::endl; std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl; std::cerr << "Received " << wireSize << " bytes" << std::endl;
for (auto&& client : server.getClients()) for (auto&& client : server.getClients())
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(str,
[](int current, int total) -> bool
{
std::cerr << "Step " << current
<< " out of " << total << std::endl;
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (client->bufferedAmount() != 0);
} }
} }
} }

View File

@ -94,16 +94,26 @@ namespace ix
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
ss << "cmd_websocket_chat: user " log("ws chat: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
ss << "ws chat: user "
<< _user << _user
<< " Connected !"; << " Connected !";
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
ss << "cmd_websocket_chat: user " ss << "ws chat: user "
<< _user << _user
<< " disconnected !"; << " disconnected !"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason;
log(ss.str()); log(ss.str());
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
@ -117,7 +127,7 @@ namespace ix
_receivedQueue.push(result.second); _receivedQueue.push(result.second);
ss << std::endl ss << std::endl
<< result.first << " > " << result.second << result.first << "(" << wireSize << " bytes)" << " > " << result.second
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
@ -188,5 +198,7 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketChat.stop(); webSocketChat.stop();
return 0;
} }
} }

View File

@ -84,6 +84,8 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl;
ss << "ws_connect: received message: " ss << "ws_connect: received message: "
<< str; << str;
log(ss.str()); log(ss.str());

View File

@ -10,17 +10,17 @@
namespace ix namespace ix
{ {
int ws_echo_server_main(int port) int ws_echo_server_main(int port, const std::string& hostname)
{ {
std::cout << "Listening on port " << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [](std::shared_ptr<ix::WebSocket> webSocket)
{ {
webSocket->setOnMessageCallback( webSocket->setOnMessageCallback(
[webSocket, &server](ix::WebSocketMessageType messageType, [webSocket](ix::WebSocketMessageType messageType,
const std::string& str, const std::string& str,
size_t wireSize, size_t wireSize,
const ix::WebSocketErrorInfo& error, const ix::WebSocketErrorInfo& error,
@ -39,7 +39,18 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
std::cerr << "Closed connection" << std::endl; std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {

View File

@ -107,6 +107,12 @@ namespace ix
{ {
std::cout << msg; std::cout << msg;
}; };
args.onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
HttpParameters httpParameters = parsePostParameters(data); HttpParameters httpParameters = parsePostParameters(data);
@ -125,6 +131,8 @@ namespace ix
out = httpClient.post(url, httpParameters, args); out = httpClient.post(url, httpParameters, args);
} }
std::cerr << std::endl;
auto statusCode = std::get<0>(out); auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out); auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out); auto responseHeaders = std::get<2>(out);

View File

@ -61,10 +61,19 @@ namespace ix
const ix::WebSocketOpenInfo& openInfo, const ix::WebSocketOpenInfo& openInfo,
const ix::WebSocketCloseInfo& closeInfo) const ix::WebSocketCloseInfo& closeInfo)
{ {
std::cerr << "Received " << wireSize << " bytes" << std::endl;
std::stringstream ss; std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open) if (messageType == ix::WebSocket_MessageType_Open)
{ {
log("ping_pong: connected"); log("ping_pong: connected");
std::cout << "Uri: " << openInfo.uri << std::endl;
std::cout << "Handshake Headers:" << std::endl;
for (auto it : openInfo.headers)
{
std::cout << it.first << ": " << it.second << std::endl;
}
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
@ -153,5 +162,7 @@ namespace ix
std::cout << std::endl; std::cout << std::endl;
webSocketPingPong.stop(); webSocketPingPong.stop();
return 0;
} }
} }

View File

@ -146,11 +146,16 @@ namespace ix
std::string filename = data["filename"].string_value(); std::string filename = data["filename"].string_value();
filename = extractFilename(filename); filename = extractFilename(filename);
std::cout << "Writing to disk: " << filename << std::endl; std::string filenameTmp = filename + ".tmp";
std::ofstream out(filename);
std::cout << "Writing to disk: " << filenameTmp << std::endl;
std::ofstream out(filenameTmp);
out.write((char*)&content.front(), content.size()); out.write((char*)&content.front(), content.size());
out.close(); out.close();
std::cout << "Renaming " << filenameTmp << " to " << filename << std::endl;
rename(filenameTmp.c_str(), filename.c_str());
std::map<MsgPack, MsgPack> pdu; std::map<MsgPack, MsgPack> pdu;
pdu["ack"] = true; pdu["ack"] = true;
pdu["id"] = data["id"]; pdu["id"] = data["id"];
@ -206,6 +211,11 @@ namespace ix
handleMessage(str); handleMessage(str);
_condition.notify_one(); _condition.notify_one();
} }
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
ss << "ws_receive: received fragment";
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Error) else if (messageType == ix::WebSocket_MessageType_Error)
{ {
ss << "Connection error: " << error.reason << std::endl; ss << "Connection error: " << error.reason << std::endl;

View File

@ -257,6 +257,15 @@ namespace ix
return true; return true;
}); });
do
{
size_t bufferedAmount = _webSocket.bufferedAmount();
std::cout << bufferedAmount << " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (_webSocket.bufferedAmount() != 0);
bench.report(); bench.report();
auto duration = bench.getDuration(); auto duration = bench.getDuration();
auto transferRate = 1000 * content.size() / duration; auto transferRate = 1000 * content.size() / duration;

View File

@ -10,11 +10,11 @@
namespace ix namespace ix
{ {
int ws_transfer_main(int port) int ws_transfer_main(int port, const std::string& hostname)
{ {
std::cout << "Listening on port " << port << std::endl; std::cout << "Listening on " << hostname << ":" << port << std::endl;
ix::WebSocketServer server(port); ix::WebSocketServer server(port, hostname);
server.setOnConnectionCallback( server.setOnConnectionCallback(
[&server](std::shared_ptr<ix::WebSocket> webSocket) [&server](std::shared_ptr<ix::WebSocket> webSocket)
@ -39,7 +39,22 @@ namespace ix
} }
else if (messageType == ix::WebSocket_MessageType_Close) else if (messageType == ix::WebSocket_MessageType_Close)
{ {
std::cerr << "Closed connection" << std::endl; std::cerr << "Closed connection"
<< " code " << closeInfo.code
<< " reason " << closeInfo.reason << std::endl;
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
std::stringstream ss;
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
std::cerr << ss.str();
}
else if (messageType == ix::WebSocket_MessageType_Fragment)
{
std::cerr << "Received message fragment" << std::endl;
} }
else if (messageType == ix::WebSocket_MessageType_Message) else if (messageType == ix::WebSocket_MessageType_Message)
{ {
@ -48,7 +63,22 @@ namespace ix
{ {
if (client != webSocket) if (client != webSocket)
{ {
client->send(str); client->send(str,
[](int current, int total) -> bool
{
std::cerr << "Step " << current
<< " out of " << total << std::endl;
return true;
});
do
{
size_t bufferedAmount = client->bufferedAmount();
std::cerr << bufferedAmount << " bytes left to be sent" << std::endl;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
} while (client->bufferedAmount() != 0);
} }
} }
} }