First import

This commit is contained in:
Benjamin Sergeant 2018-09-27 14:56:48 -07:00
parent 65bd7d5b4c
commit 9b8d6cedfe
17 changed files with 21386 additions and 1 deletions

29
LICENSE.txt Normal file
View File

@ -0,0 +1,29 @@
Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the
distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

164
README.md
View File

@ -1 +1,163 @@
# IXWebSocket
# General
## Introduction
[*WebSocket*](https://en.wikipedia.org/wiki/WebSocket) is a computer communications protocol, providing full-duplex
communication channels over a single TCP connection. This library provides a C++ library for Websocket communication. The code is derived from [easywsclient](https://github.com/dhbaird/easywsclient).
## Examples
The examples folder countains a simple chat program, using a node.js broadcast server.
Here is what the API looks like.
```
ix::WebSocket webSocket;
std::string url("ws://localhost:8080/");
webSocket.configure(url);
// Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
{
if (messageType == ix::WebSocket_MessageType_Message)
{
std::cout << str << std::endl;
}
});
// Now that our callback is setup, we can start our background thread and receive messages
webSocket.start();
// Send a message to the server
webSocket.send("hello world");
// ... finally ...
// Stop the connection
webSocket:stop()
```
## Implementation details
### TLS/SSL
Connections can be optionally secured and encrypted with TLS/SSL when using a wss:// endpoint, or using normal un-encrypted socket with ws:// endpoints. AppleSSL is used on iOS and OpenSSL is used on Android.
### Polling and background thread work
No manual polling to fetch data is required. Data is sent and received instantly by using a background thread for receiving data and the select [system](http://man7.org/linux/man-pages/man2/select.2.html) call to be notified by the OS of incoming data. No timeout is used for select so that the background thread is only woken up when data is available, to optimize battery life. This is also the recommended way of using select according to the select tutorial, section [select law](https://linux.die.net/man/2/select_tut). Read and Writes to the socket are non blocking. Data is sent right away and not enqueued by writing directly to the socket, which is [possible](https://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid) since system socket implementations allow concurrent read/writes. However concurrent writes need to be protected with mutex.
### Automatic reconnection
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
## Limitations
* There is no per message compression support. That could be useful for retrieving large messages, but could also be implemented at the application level. However that would conflict with auto-serialiasation.
* There is no text support for sending data, only the binary protocol is supported. Sending json or text over the binary protocol works well.
* Automatic reconnection works at the TCP socket level, and will detect remote end disconnects. However, if the device/computer network become unreachable (by turning off wifi), it is quite hard to reliably and timely detect it at the socket level using `recv` and `send` error codes. [Here](https://stackoverflow.com/questions/14782143/linux-socket-how-to-detect-disconnected-network-in-a-client-program) is a good discussion on the subject. This behavior is consistent with other runtimes such as node.js. One way to detect a disconnected device with low level C code is to do a name resolution with DNS but this can be expensive. Mobile devices have good and reliable API to do that.
## Examples
1. Bring up a terminal and jump to the examples folder.
2. Compile the example C++ code. `sh build.sh`
3. Install node.js from [here](https://nodejs.org/en/download/).
4. Type `npm install` to install the node.js dependencies. Then `node broadcast-server.js` to run the server.
5. Bring up a second terminal. `env USER=bob ./cmd_websocket_chat`
6. Bring up a third terminal. `env USER=bill ./cmd_websocket_chat`
7. Start typing things in any of those terminals. Hopefully you should see your message being received on the other end.
## C++ code organization
Here's a simplistic diagram which explains how the code is structured in term of class/modules.
```
+-----------------------+
| | Start the receiving Background thread. Auto reconnection. Simple websocket Ping.
| IXWebSocket | Interface used by C++ test clients. No IX dependencies.
| |
+-----------------------+
| |
| IXWebSocketTransport | Low level websocket code, framing, managing raw socket. Adapted from easywsclient.
| |
+-----------------------+
| |
| IXWebSocket | ws:// Unencrypted Socket handler
| IXWebSocketAppleSSL | wss:// TLS encrypted Socket AppleSSL handler. Used on iOS and macOS
| IXWebSocketOpenSSL | wss:// TLS encrypted Socket OpenSSL handler. Used on Android and Linux
| | Can be used on macOS too.
+-----------------------+
```
## Advanced usage
### Sending messages
`websocket:send("foo")` will send a message.
If the connection was closed and sending failed, the return value will be set to false.
### ReadyState
`getReadyState()` returns the state of the connection. There are 4 possible states.
1. WebSocket_ReadyState_Connecting - The connection is not yet open.
2. WebSocket_ReadyState_Open - The connection is open and ready to communicate.
3. WebSocket_ReadyState_Closing - The connection is in the process of closing.
4. WebSocket_MessageType_Close - The connection is closed or couldn't be opened.
### Open and Close notifications
The onMessage event will be fired when the connection is opened or closed. This is similar to the [Javascript browser API](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket), which has `open` and `close` events notification that can be registered with the browser `addEventListener`.
```
webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
{
if (messageType == ix::WebSocket_MessageType_Open)
{
puts("send greetings");
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
puts("disconnected");
}
}
);
```
### Error notification
A message will be fired when there is an error with the connection. The message type will be `ix::WebSocket_MessageType_Error`. Multiple fields will be available on the event to describe the error.
```
webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
{
if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "cmd_websocket_chat: Error ! " << error.reason << std::endl;
ss << "#retries: " << event.retries << std::endl;
ss << "Wait time(ms): " << event.waitTime << std::endl;
ss << "HTTP Status: " << event.httpStatus << std::endl;
}
}
);
```
### start, stop
1. `websocket.start()` connect to the remote server and starts the message receiving background thread.
2. `websocket.stop()` disconnect from the remote server and closes the background thread.
### Configuring the remote url
The url can be set and queried after a websocket object has been created. You will have to call `stop` and `start` if you want to disconnect and connect to that new url.
```
std::string url = 'wss://example.com'
websocket.configure(url);
assert(websocket:getUrl() == url)
```

View File

@ -0,0 +1,28 @@
/*
* cmd_websocket_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
// Broadcast to all.
wss.broadcast = function broadcast(data) {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
};
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(data) {
// Broadcast to everyone else.
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
});

15
examples/build.sh Normal file
View File

@ -0,0 +1,15 @@
#!/bin/sh
#
# Author: Benjamin Sergeant
# Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
#
clang++ --std=c++11 --stdlib=libc++ \
../ixwebsocket/IXSocket.cpp \
../ixwebsocket/IXWebSocketTransport.cpp \
../ixwebsocket/IXSocketAppleSSL.cpp \
../ixwebsocket/IXWebSocket.cpp \
cmd_websocket_chat.cpp \
-o cmd_websocket_chat \
-framework Security \
-framework Foundation

View File

@ -0,0 +1,190 @@
/*
* cmd_websocket_chat.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
//
// Simple chat program that talks to the node.js server at
// websocket_chat_server/broacast-server.js
//
#include <iostream>
#include <sstream>
#include <queue>
#include "../ixwebsocket/IXWebSocket.h"
#include "nlohmann/json.hpp"
// for convenience
using json = nlohmann::json;
using namespace ix;
namespace
{
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketChat
{
public:
WebSocketChat(const std::string& user);
void subscribe(const std::string& channel);
void start();
void stop();
bool isReady() const;
void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const;
std::string encodeMessage(const std::string& text);
std::pair<std::string, std::string> decodeMessage(const std::string& str);
private:
std::string _user;
ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue;
};
WebSocketChat::WebSocketChat(const std::string& user) :
_user(user)
{
;
}
size_t WebSocketChat::getReceivedMessagesCount() const
{
return _receivedQueue.size();
}
bool WebSocketChat::isReady() const
{
return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open;
}
void WebSocketChat::stop()
{
_webSocket.stop();
}
void WebSocketChat::start()
{
std::string url("ws://localhost:8080/");
_webSocket.configure(url);
std::stringstream ss;
log(std::string("Connecting to url: ") + url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " Connected !";
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
ss << "cmd_websocket_chat: user "
<< _user
<< " disconnected !";
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
auto result = decodeMessage(str);
// Our "chat" / "broacast" node.js server does not send us
// the messages we send, so we don't have to filter it out.
// store text
_receivedQueue.push(result.second);
ss << std::endl
<< result.first << " > " << result.second
<< std::endl
<< _user << " > ";
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "cmd_websocket_chat: Error ! " << error.reason;
log(ss.str());
}
else
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
std::pair<std::string, std::string> WebSocketChat::decodeMessage(const std::string& str)
{
auto j = json::parse(str);
std::string msg_user = j["user"];
std::string msg_text = j["text"];
return std::pair<std::string, std::string>(msg_user, msg_text);
}
std::string WebSocketChat::encodeMessage(const std::string& text)
{
json j;
j["user"] = _user;
j["text"] = text;
std::string output = j.dump();
return output;
}
void WebSocketChat::sendMessage(const std::string& text)
{
_webSocket.send(encodeMessage(text));
}
void interactiveMain()
{
std::string user(getenv("USER"));
WebSocketChat webSocketChat(user);
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
webSocketChat.start();
while (true)
{
std::string text;
std::cout << user << " > " << std::flush;
std::getline(std::cin, text);
if (!std::cin)
{
break;
}
webSocketChat.sendMessage(text);
}
std::cout << std::endl;
webSocketChat.stop();
}
}
int main()
{
interactiveMain();
return 0;
}

18878
examples/nlohmann/json.hpp Normal file

File diff suppressed because it is too large Load Diff

6
examples/package.json Normal file
View File

@ -0,0 +1,6 @@
{
"dependencies": {
"msgpack-js": "^0.3.0",
"ws": "^3.1.0"
}
}

251
ixwebsocket/IXSocket.cpp Normal file
View File

@ -0,0 +1,251 @@
/*
* IXSocket.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXSocket.h"
#include <netdb.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/select.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <assert.h>
//
// Linux/Android has a special type of virtual files. select(2) will react
// when reading/writing to those files, unlike closing sockets.
//
// https://linux.die.net/man/2/eventfd
//
// eventfd was added in Linux kernel 2.x, and our oldest Android (Kitkat 4.4)
// is on Kernel 3.x
//
// cf Android/Kernel table here
// https://android.stackexchange.com/questions/51651/which-android-runs-which-linux-kernel
//
#ifndef __APPLE__
# include <sys/eventfd.h>
#endif
// Android needs extra headers for TCP_NODELAY and IPPROTO_TCP
#ifdef ANDROID
# include <linux/in.h>
# include <linux/tcp.h>
#endif
namespace ix
{
Socket::Socket() :
_sockfd(-1),
_eventfd(-1)
{
#ifndef __APPLE__
_eventfd = eventfd(0, 0);
assert(_eventfd != -1 && "Panic - eventfd not functioning on this platform");
#endif
}
Socket::~Socket()
{
close();
#ifndef __APPLE__
::close(_eventfd);
#endif
}
bool connectToAddress(const struct addrinfo *address,
int& sockfd,
std::string& errMsg)
{
sockfd = -1;
int fd = socket(address->ai_family,
address->ai_socktype,
address->ai_protocol);
if (fd < 0)
{
errMsg = "Cannot create a socket";
return false;
}
int maxRetries = 3;
for (int i = 0; i < maxRetries; ++i)
{
if (connect(fd, address->ai_addr, address->ai_addrlen) != -1)
{
sockfd = fd;
return true;
}
// EINTR means we've been interrupted, in which case we try again.
if (errno != EINTR) break;
}
::close(fd);
sockfd = -1;
errMsg = strerror(errno);
return false;
}
int Socket::hostname_connect(const std::string& hostname,
int port,
std::string& errMsg)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_ADDRCONFIG | AI_NUMERICSERV;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
std::string sport = std::to_string(port);
struct addrinfo *res = nullptr;
int getaddrinfo_result = getaddrinfo(hostname.c_str(), sport.c_str(),
&hints, &res);
if (getaddrinfo_result)
{
errMsg = gai_strerror(getaddrinfo_result);
return -1;
}
int sockfd = -1;
// iterate through the records to find a working peer
struct addrinfo *address;
bool success = false;
for (address = res; address != nullptr; address = address->ai_next)
{
success = connectToAddress(address, sockfd, errMsg);
if (success)
{
break;
}
}
freeaddrinfo(res);
return sockfd;
}
void Socket::configure()
{
int flag = 1;
setsockopt(_sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)); // Disable Nagle's algorithm
fcntl(_sockfd, F_SETFL, O_NONBLOCK); // make socket non blocking
#ifdef SO_NOSIGPIPE
int value = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_NOSIGPIPE,
(void *)&value, sizeof(value));
#endif
}
void Socket::poll(const OnPollCallback& onPollCallback)
{
if (_sockfd == -1)
{
onPollCallback();
return;
}
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(_sockfd, &rfds);
#ifndef __APPLE__
FD_SET(_eventfd, &rfds);
#endif
int sockfd = _sockfd;
int nfds = std::max(sockfd, _eventfd);
select(nfds + 1, &rfds, nullptr, nullptr, nullptr);
onPollCallback();
}
void Socket::wakeUpFromPollApple()
{
close(); // All OS but Linux will wake up select
// when closing the file descriptor watched by select
}
void Socket::wakeUpFromPollLinux()
{
std::string str("\n"); // this will wake up the thread blocked on select
const void* buf = reinterpret_cast<const void*>(str.c_str());
write(_eventfd, buf, str.size());
}
void Socket::wakeUpFromPoll()
{
#ifdef __APPLE__
wakeUpFromPollApple();
#else
wakeUpFromPollLinux();
#endif
}
bool Socket::connect(const std::string& host,
int port,
std::string& errMsg)
{
std::lock_guard<std::mutex> lock(_socketMutex);
#ifndef __APPLE__
if (_eventfd == -1)
{
return false; // impossible to use this socket if eventfd is broken
}
#endif
_sockfd = Socket::hostname_connect(host, port, errMsg);
return _sockfd != -1;
}
void Socket::close()
{
std::lock_guard<std::mutex> lock(_socketMutex);
if (_sockfd == -1) return;
::close(_sockfd);
_sockfd = -1;
}
int Socket::send(char* buffer, size_t length)
{
int flags = 0;
#ifdef MSG_NOSIGNAL
flags = MSG_NOSIGNAL;
#endif
return (int) ::send(_sockfd, buffer, length, flags);
}
int Socket::send(const std::string& buffer)
{
return send((char*)&buffer[0], buffer.size());
}
int Socket::recv(void* buffer, size_t length)
{
int flags = 0;
#ifdef MSG_NOSIGNAL
flags = MSG_NOSIGNAL;
#endif
return (int) ::recv(_sockfd, buffer, length, flags);
}
}

49
ixwebsocket/IXSocket.h Normal file
View File

@ -0,0 +1,49 @@
/*
* IXSocket.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include <string>
#include <functional>
#include <mutex>
namespace ix
{
class Socket {
public:
using OnPollCallback = std::function<void()>;
Socket();
virtual ~Socket();
static int hostname_connect(const std::string& hostname,
int port,
std::string& errMsg);
void configure();
virtual void poll(const OnPollCallback& onPollCallback);
virtual void wakeUpFromPoll();
// Virtual methods
virtual bool connect(const std::string& url,
int port,
std::string& errMsg);
virtual void close();
virtual int send(char* buffer, size_t length);
virtual int send(const std::string& buffer);
virtual int recv(void* buffer, size_t length);
protected:
void wakeUpFromPollApple();
void wakeUpFromPollLinux();
std::atomic<int> _sockfd;
int _eventfd;
std::mutex _socketMutex;
};
}

View File

@ -0,0 +1,259 @@
/*
* IXSocketAppleSSL.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*
* Adapted from Satori SDK Apple SSL code.
*/
#include "IXSocketAppleSSL.h"
#include <fcntl.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <iostream>
#include <errno.h>
#define socketerrno errno
#include <Security/SecureTransport.h>
namespace {
OSStatus read_from_socket(SSLConnectionRef connection, void *data, size_t *len)
{
int fd = (int) (long) connection;
if (fd < 0)
return errSSLInternal;
assert(data != nullptr);
assert(len != nullptr);
size_t requested_sz = *len;
ssize_t status = read(fd, data, requested_sz);
if (status > 0)
{
*len = (size_t) status;
if (requested_sz > *len)
return errSSLWouldBlock;
else
return noErr;
}
else if (0 == status)
{
*len = 0;
return errSSLClosedGraceful;
}
else
{
*len = 0;
switch (errno) {
case ENOENT:
return errSSLClosedGraceful;
case EAGAIN:
return errSSLWouldBlock;
case ECONNRESET:
return errSSLClosedAbort;
default:
return errSecIO;
}
}
}
OSStatus write_to_socket(SSLConnectionRef connection, const void *data, size_t *len)
{
int fd = (int) (long) connection;
if (fd < 0)
return errSSLInternal;
assert(data != nullptr);
assert(len != nullptr);
size_t to_write_sz = *len;
ssize_t status = write(fd, data, to_write_sz);
if (status > 0)
{
*len = (size_t) status;
if (to_write_sz > *len)
return errSSLWouldBlock;
else
return noErr;
}
else if (0 == status)
{
*len = 0;
return errSSLClosedGraceful;
}
else
{
*len = 0;
if (EAGAIN == errno)
{
return errSSLWouldBlock;
}
else
{
return errSecIO;
}
}
}
std::string getSSLErrorDescription(OSStatus status)
{
std::string errMsg("Unknown SSL error.");
CFErrorRef error = CFErrorCreate(kCFAllocatorDefault, kCFErrorDomainOSStatus, status, NULL);
if (error)
{
CFStringRef message = CFErrorCopyDescription(error);
if (message)
{
char localBuffer[128];
Boolean success;
success = CFStringGetCString(message, localBuffer, 128,
CFStringGetSystemEncoding());
if (success)
{
errMsg = localBuffer;
}
CFRelease(message);
}
CFRelease(error);
}
return errMsg;
}
} // anonymous namespace
namespace ix
{
SocketAppleSSL::SocketAppleSSL() :
_sslContext(nullptr)
{
;
}
SocketAppleSSL::~SocketAppleSSL()
{
SocketAppleSSL::close();
}
// No wait support
bool SocketAppleSSL::connect(const std::string& host,
int port,
std::string& errMsg)
{
OSStatus status;
{
std::lock_guard<std::mutex> lock(_mutex);
_sockfd = Socket::hostname_connect(host, port, errMsg);
if (_sockfd == -1) return false;
_sslContext = SSLCreateContext(kCFAllocatorDefault, kSSLClientSide, kSSLStreamType);
SSLSetIOFuncs(_sslContext, read_from_socket, write_to_socket);
SSLSetConnection(_sslContext, (SSLConnectionRef) (long) _sockfd);
SSLSetProtocolVersionMin(_sslContext, kTLSProtocol12);
SSLSetPeerDomainName(_sslContext, host.c_str(), host.size());
do {
status = SSLHandshake(_sslContext);
} while (errSSLWouldBlock == status ||
errSSLServerAuthCompleted == status);
}
if (noErr != status)
{
errMsg = getSSLErrorDescription(status);
close();
return false;
}
return true;
}
void SocketAppleSSL::close()
{
std::lock_guard<std::mutex> lock(_mutex);
if (_sslContext == nullptr) return;
SSLClose(_sslContext);
CFRelease(_sslContext);
_sslContext = nullptr;
Socket::close();
}
int SocketAppleSSL::send(char* buf, size_t nbyte)
{
ssize_t ret = 0;
OSStatus status;
do {
size_t processed = 0;
std::lock_guard<std::mutex> lock(_mutex);
status = SSLWrite(_sslContext, buf, nbyte, &processed);
ret += processed;
buf += processed;
nbyte -= processed;
} while (nbyte > 0 && errSSLWouldBlock == status);
if (ret == 0 && errSSLClosedAbort != status)
ret = -1;
return (int) ret;
}
int SocketAppleSSL::send(const std::string& buffer)
{
return send((char*)&buffer[0], buffer.size());
}
// No wait support
int SocketAppleSSL::recv(void* buf, size_t nbyte)
{
OSStatus status = errSSLWouldBlock;
while (errSSLWouldBlock == status)
{
size_t processed = 0;
std::lock_guard<std::mutex> lock(_mutex);
status = SSLRead(_sslContext, buf, nbyte, &processed);
if (processed > 0)
return (int) processed;
// The connection was reset, inform the caller that this
// Socket should close
if (status == errSSLClosedGraceful ||
status == errSSLClosedNoNotify ||
status == errSSLClosedAbort)
{
errno = ECONNRESET;
return -1;
}
if (status == errSSLWouldBlock)
{
errno = EWOULDBLOCK;
return -1;
}
}
return -1;
}
}

View File

@ -0,0 +1,38 @@
/*
* IXSocketAppleSSL.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSocket.h"
#include <Security/Security.h>
#include <Security/SecureTransport.h>
#include <mutex>
namespace ix
{
class SocketAppleSSL : public Socket
{
public:
SocketAppleSSL();
~SocketAppleSSL();
virtual bool connect(const std::string& host,
int port,
std::string& errMsg) final;
virtual void close() final;
virtual int send(char* buffer, size_t length) final;
virtual int send(const std::string& buffer) final;
virtual int recv(void* buffer, size_t length) final;
private:
SSLContextRef _sslContext;
mutable std::mutex _mutex; // AppleSSL routines are not thread-safe
};
}

View File

@ -0,0 +1,342 @@
/*
* IXSocketOpenSSL.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*
* Adapted from Satori SDK OpenSSL code.
*/
#include "IXSocketOpenSSL.h"
#include <cassert>
#include <iostream>
#include <errno.h>
#define socketerrno errno
namespace {
std::mutex initMutex;
bool openSSLInitialized = false;
bool openSSLInitializationSuccessful = false;
bool openSSLInitialize(std::string& errMsg)
{
std::lock_guard<std::mutex> lock(initMutex);
if (openSSLInitialized)
{
return openSSLInitializationSuccessful;
}
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
if (!OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, nullptr))
{
errMsg = "OPENSSL_init_ssl failure";
openSSLInitializationSuccessful = false;
openSSLInitialized = true;
return false;
}
#else
(void) OPENSSL_config(nullptr);
#endif
(void) OpenSSL_add_ssl_algorithms();
(void) SSL_load_error_strings();
openSSLInitializationSuccessful = true;
return true;
}
int openssl_verify_callback(int preverify, X509_STORE_CTX *x509_ctx)
{
return preverify;
}
/* create new SSL connection state object */
SSL *openssl_create_connection(SSL_CTX *ctx, int socket)
{
assert(ctx != nullptr);
assert(socket > 0);
SSL *ssl = SSL_new(ctx);
if (ssl)
SSL_set_fd(ssl, socket);
return ssl;
}
bool openssl_check_server_cert(SSL *ssl, std::string& errMsg)
{
X509 *server_cert = SSL_get_peer_certificate(ssl);
if (server_cert == nullptr)
{
errMsg = "OpenSSL failed - peer didn't present a X509 certificate.";
return false;
}
X509_free(server_cert);
return true;
}
} // anonymous namespace
namespace ix
{
SocketOpenSSL::SocketOpenSSL() :
_ssl_connection(nullptr),
_ssl_context(nullptr)
{
;
}
SocketOpenSSL::~SocketOpenSSL()
{
SocketOpenSSL::close();
}
std::string SocketOpenSSL::getSSLError(int ret)
{
unsigned long e;
int err = SSL_get_error(_ssl_connection, ret);
if (err == SSL_ERROR_WANT_CONNECT || err == SSL_ERROR_WANT_ACCEPT)
{
return "OpenSSL failed - connection failure";
}
else if (err == SSL_ERROR_WANT_X509_LOOKUP)
{
return "OpenSSL failed - x509 error";
}
else if (err == SSL_ERROR_SYSCALL)
{
e = ERR_get_error();
if (e > 0)
{
std::string errMsg("OpenSSL failed - ");
errMsg += ERR_error_string(e, nullptr);
return errMsg;
}
else if (e == 0 && ret == 0)
{
return "OpenSSL failed - received early EOF";
}
else
{
return "OpenSSL failed - underlying BIO reported an I/O error";
}
}
else if (err == SSL_ERROR_SSL)
{
e = ERR_get_error();
std::string errMsg("OpenSSL failed - ");
errMsg += ERR_error_string(e, nullptr);
return errMsg;
}
else if (err == SSL_ERROR_NONE)
{
return "OpenSSL failed - err none";
}
else if (err == SSL_ERROR_ZERO_RETURN)
{
return "OpenSSL failed - err zero return";
}
else
{
return "OpenSSL failed - unknown error";
}
}
SSL_CTX* SocketOpenSSL::openSSLCreateContext(std::string& errMsg)
{
const SSL_METHOD* method = SSLv23_client_method();
if (method == nullptr)
{
errMsg = "SSLv23_client_method failure";
return nullptr;
}
_ssl_method = method;
SSL_CTX* ctx = SSL_CTX_new(_ssl_method);
if (ctx)
{
SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, openssl_verify_callback);
SSL_CTX_set_verify_depth(ctx, 4);
SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
}
return ctx;
}
bool SocketOpenSSL::openSSLHandshake(std::string& errMsg)
{
while (true)
{
if (_ssl_connection == nullptr || _ssl_context == nullptr)
{
return false;
}
ERR_clear_error();
int connect_result = SSL_connect(_ssl_connection);
if (connect_result == 1)
{
return openssl_check_server_cert(_ssl_connection, errMsg);
}
int reason = SSL_get_error(_ssl_connection, connect_result);
bool rc = false;
if (reason == SSL_ERROR_WANT_READ || reason == SSL_ERROR_WANT_WRITE)
{
rc = true;
}
else
{
errMsg = getSSLError(connect_result);
rc = false;
}
if (!rc)
{
return false;
}
}
}
// No wait support
bool SocketOpenSSL::connect(const std::string& host,
int port,
std::string& errMsg)
{
bool handshakeSuccessful = false;
{
std::lock_guard<std::mutex> lock(_mutex);
if (!openSSLInitialize(errMsg))
{
return false;
}
_sockfd = Socket::hostname_connect(host, port, errMsg);
if (_sockfd == -1) return false;
_ssl_context = openSSLCreateContext(errMsg);
if (_ssl_context == nullptr)
{
return false;
}
ERR_clear_error();
int cert_load_result = SSL_CTX_set_default_verify_paths(_ssl_context);
if (cert_load_result == 0)
{
unsigned long ssl_err = ERR_get_error();
errMsg = "OpenSSL failed - SSL_CTX_default_verify_paths loading failed: ";
errMsg += ERR_error_string(ssl_err, nullptr);
}
_ssl_connection = openssl_create_connection(_ssl_context, _sockfd);
if (nullptr == _ssl_connection)
{
errMsg = "OpenSSL failed to connect";
SSL_CTX_free(_ssl_context);
_ssl_context = nullptr;
return false;
}
handshakeSuccessful = openSSLHandshake(errMsg);
}
if (!handshakeSuccessful)
{
close();
return false;
}
return true;
}
void SocketOpenSSL::close()
{
std::lock_guard<std::mutex> lock(_mutex);
if (_ssl_connection != nullptr)
{
SSL_free(_ssl_connection);
_ssl_connection = nullptr;
}
if (_ssl_context != nullptr)
{
SSL_CTX_free(_ssl_context);
_ssl_context = nullptr;
}
Socket::close();
}
int SocketOpenSSL::send(char* buf, size_t nbyte)
{
ssize_t sent = 0;
while (nbyte > 0)
{
std::lock_guard<std::mutex> lock(_mutex);
if (_ssl_connection == nullptr || _ssl_context == nullptr)
{
return 0;
}
ERR_clear_error();
int write_result = SSL_write(_ssl_connection, buf + sent, (int) nbyte);
int reason = SSL_get_error(_ssl_connection, write_result);
if (reason == SSL_ERROR_NONE) {
nbyte -= write_result;
sent += write_result;
} else if (reason == SSL_ERROR_WANT_READ || reason == SSL_ERROR_WANT_WRITE) {
errno = EWOULDBLOCK;
return -1;
} else {
return -1;
}
}
return (int) sent;
}
int SocketOpenSSL::send(const std::string& buffer)
{
return send((char*)&buffer[0], buffer.size());
}
// No wait support
int SocketOpenSSL::recv(void* buf, size_t nbyte)
{
while (true)
{
std::lock_guard<std::mutex> lock(_mutex);
if (_ssl_connection == nullptr || _ssl_context == nullptr)
{
return 0;
}
ERR_clear_error();
int read_result = SSL_read(_ssl_connection, buf, (int) nbyte);
if (read_result > 0)
{
return read_result;
}
int reason = SSL_get_error(_ssl_connection, read_result);
if (reason == SSL_ERROR_WANT_READ || reason == SSL_ERROR_WANT_WRITE)
{
errno = EWOULDBLOCK;
return -1;
} else {
return -1;
}
}
}
}

View File

@ -0,0 +1,47 @@
/*
* IXSocketOpenSSL.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXSocket.h"
#include <openssl/bio.h>
#include <openssl/hmac.h>
#include <openssl/conf.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
#include <mutex>
namespace ix
{
class SocketOpenSSL : public Socket
{
public:
SocketOpenSSL();
~SocketOpenSSL();
virtual bool connect(const std::string& host,
int port,
std::string& errMsg) final;
virtual void close() final;
virtual int send(char* buffer, size_t length) final;
virtual int send(const std::string& buffer) final;
virtual int recv(void* buffer, size_t length) final;
private:
std::string getSSLError(int ret);
SSL_CTX* openSSLCreateContext(std::string& errMsg);
bool openSSLHandshake(std::string& errMsg);
SSL_CTX* _ssl_context;
SSL* _ssl_connection;
const SSL_METHOD* _ssl_method;
mutable std::mutex _mutex; // OpenSSL routines are not thread-safe
};
}

260
ixwebsocket/IXWebSocket.cpp Normal file
View File

@ -0,0 +1,260 @@
/*
* IXWebSocket.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocket.h"
#include <iostream>
#include <cmath>
#include <cassert>
namespace {
// FIXME: put this in a shared location, and use it in
uint64_t calculateRetryWaitMilliseconds(uint64_t retry_count)
{
// This will overflow quite fast for large value of retry_count
// and will become 0, in which case the wait time will be none
// and we'll be constantly retrying to connect.
uint64_t wait_time = ((uint64_t) std::pow(2, retry_count) * 100L);
// cap the wait time to 10s, or to retry_count == 10 for which wait_time > 10s
uint64_t tenSeconds = 10 * 1000;
return (wait_time > tenSeconds || retry_count > 10) ? tenSeconds : wait_time;
}
}
namespace ix {
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
WebSocket::WebSocket() :
_verbose(false),
_onMessageCallback(OnMessageCallback()),
_stop(false),
_automaticReconnection(true)
{
}
WebSocket::~WebSocket()
{
stop();
}
void WebSocket::configure(const std::string& url)
{
std::lock_guard<std::mutex> lock(_urlMutex);
_url = url;
}
void WebSocket::start()
{
if (_thread.joinable()) return; // we've already been started
_thread = std::thread(&WebSocket::run, this);
}
void WebSocket::stop()
{
_automaticReconnection = false;
close();
if (!_thread.joinable())
{
_automaticReconnection = true;
return;
}
_stop = true;
_thread.join();
_stop = false;
_automaticReconnection = true;
}
WebSocketInitResult WebSocket::connect()
{
{
std::lock_guard<std::mutex> lock(_urlMutex);
_ws.configure(_url);
}
_ws.setOnStateChangeCallback(
[this](WebSocketTransport::ReadyStateValues readyStateValue)
{
if (readyStateValue == WebSocketTransport::CLOSED)
{
_onMessageCallback(WebSocket_MessageType_Close, "", WebSocketErrorInfo());
}
if (_verbose)
{
std::cout << "connection state changed -> "
<< readyStateToString(getReadyState())
<< std::endl;
}
}
);
WebSocketInitResult status = _ws.init();
if (!status.success)
{
return status;
}
_onMessageCallback(WebSocket_MessageType_Open, "", WebSocketErrorInfo());
return status;
}
bool WebSocket::isConnected() const
{
return getReadyState() == WebSocket_ReadyState_Open;
}
bool WebSocket::isClosing() const
{
return getReadyState() == WebSocket_ReadyState_Closing;
}
void WebSocket::close()
{
_ws.close();
}
void WebSocket::reconnectPerpetuallyIfDisconnected()
{
uint64_t retries = 0;
WebSocketErrorInfo connectErr;
ix::WebSocketInitResult status;
using millis = std::chrono::duration<double, std::milli>;
millis duration;
while (true)
{
if (isConnected() || isClosing() || _stop || !_automaticReconnection)
{
break;
}
status = connect();
if (!status.success && !_stop)
{
duration = millis(calculateRetryWaitMilliseconds(retries++));
connectErr.retries = retries;
connectErr.wait_time = duration.count();
connectErr.reason = status.errorStr;
connectErr.http_status = status.http_status;
_onMessageCallback(WebSocket_MessageType_Error, "", connectErr);
if (_verbose) std::cout << "Sleeping for " << duration.count() << "ms" << std::endl;
std::this_thread::sleep_for(duration);
}
}
}
void WebSocket::run()
{
while (true)
{
if (_stop) return;
// 1. Make sure we are always connected
reconnectPerpetuallyIfDisconnected();
if (_stop) return;
// 2. Poll to see if there's any new data available
_ws.poll();
if (_stop) return;
// 3. Dispatch the incoming messages
_ws.dispatch(
[this](const std::string& msg)
{
_onMessageCallback(WebSocket_MessageType_Message, msg, WebSocketErrorInfo());
WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
});
}
}
void WebSocket::setOnMessageCallback(const OnMessageCallback& callback)
{
_onMessageCallback = callback;
}
void WebSocket::setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback)
{
_onTrafficTrackerCallback = callback;
}
void WebSocket::resetTrafficTrackerCallback()
{
setTrafficTrackerCallback(nullptr);
}
void WebSocket::invokeTrafficTrackerCallback(size_t size, bool incoming)
{
if (_onTrafficTrackerCallback)
{
_onTrafficTrackerCallback(size, incoming);
}
}
bool WebSocket::send(const std::string& text)
{
if (!isConnected()) return false;
//
// It is OK to read and write on the same socket in 2 different threads.
// https://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid
//
// This makes it so that messages are sent right away, and we dont need
// a timeout while we poll to keep wake ups to a minimum (which helps
// with battery life), and use the system select call to notify us when
// incoming messages are arriving / there's data to be received.
//
std::lock_guard<std::mutex> lock(_writeMutex);
_ws.sendBinary(text);
WebSocket::invokeTrafficTrackerCallback(text.size(), false);
return true;
}
ReadyState WebSocket::getReadyState() const
{
switch (_ws.getReadyState())
{
case ix::WebSocketTransport::OPEN: return WebSocket_ReadyState_Open;
case ix::WebSocketTransport::CONNECTING: return WebSocket_ReadyState_Connecting;
case ix::WebSocketTransport::CLOSING: return WebSocket_ReadyState_Closing;
case ix::WebSocketTransport::CLOSED: return WebSocket_ReadyState_Closed;
}
}
std::string WebSocket::readyStateToString(ReadyState readyState)
{
switch (readyState)
{
case WebSocket_ReadyState_Open: return "OPEN";
case WebSocket_ReadyState_Connecting: return "CONNECTING";
case WebSocket_ReadyState_Closing: return "CLOSING";
case WebSocket_ReadyState_Closed: return "CLOSED";
}
}
const std::string& WebSocket::getUrl() const
{
std::lock_guard<std::mutex> lock(_urlMutex);
return _url;
}
}

96
ixwebsocket/IXWebSocket.h Normal file
View File

@ -0,0 +1,96 @@
/*
* IXWebSocket.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*
* WebSocket RFC
* https://tools.ietf.org/html/rfc6455
*/
#pragma once
#include <string>
#include <thread>
#include <mutex>
#include <atomic>
#include "IXWebSocketTransport.h"
namespace ix
{
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket#Ready_state_constants
enum ReadyState
{
WebSocket_ReadyState_Connecting = 0,
WebSocket_ReadyState_Open = 1,
WebSocket_ReadyState_Closing = 2,
WebSocket_ReadyState_Closed = 3
};
enum WebSocketMessageType
{
WebSocket_MessageType_Message = 0,
WebSocket_MessageType_Open = 1,
WebSocket_MessageType_Close = 2,
WebSocket_MessageType_Error = 3
};
struct WebSocketErrorInfo
{
uint64_t retries;
double wait_time;
int http_status;
std::string reason;
};
using OnMessageCallback = std::function<void(WebSocketMessageType, const std::string&, const WebSocketErrorInfo)>;
using OnTrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class WebSocket
{
public:
WebSocket();
~WebSocket();
void configure(const std::string& url);
void start();
void stop();
bool send(const std::string& text);
void close();
void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback();
void setVerbose(bool verbose) { _verbose = verbose; }
const std::string& getUrl() const;
ReadyState getReadyState() const;
private:
void run();
WebSocketInitResult connect();
bool isConnected() const;
bool isClosing() const;
void reconnectPerpetuallyIfDisconnected();
std::string readyStateToString(ReadyState readyState);
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
WebSocketTransport _ws;
std::string _url;
mutable std::mutex _urlMutex;
bool _verbose;
OnMessageCallback _onMessageCallback;
static OnTrafficTrackerCallback _onTrafficTrackerCallback;
std::atomic<bool> _stop;
std::atomic<bool> _automaticReconnection;
std::thread _thread;
std::mutex _writeMutex;
static int kHeartBeatPeriod;
};
}

View File

@ -0,0 +1,603 @@
/*
* IXWebSocketTransport.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
//
// Adapted from https://github.com/dhbaird/easywsclient
//
#include "IXWebSocketTransport.h"
#include "IXSocket.h"
#ifdef __APPLE__
# include "IXSocketAppleSSL.h"
#else
# include "IXSocketOpenSSL.h"
#endif
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <cstdlib>
#include <vector>
#include <string>
#include <cstdarg>
#include <iostream>
#include <sstream>
#include <regex>
namespace ix {
WebSocketTransport::WebSocketTransport() :
_readyState(CLOSED)
{
;
}
WebSocketTransport::~WebSocketTransport()
{
;
}
void WebSocketTransport::configure(const std::string& url)
{
_url = url;
}
bool WebSocketTransport::parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port)
{
std::regex ex("(ws|wss)://([^/ :]+):?([^/ ]*)(/?[^ #?]*)\\x3f?([^ #]*)#?([^ ]*)");
std::cmatch what;
if (!regex_match(url.c_str(), what, ex))
{
return false;
}
std::string portStr;
protocol = std::string(what[1].first, what[1].second);
host = std::string(what[2].first, what[2].second);
portStr = std::string(what[3].first, what[3].second);
path = std::string(what[4].first, what[4].second);
query = std::string(what[5].first, what[5].second);
if (portStr.empty())
{
if (protocol == "ws")
{
port = 80;
}
else if (protocol == "wss")
{
port = 443;
}
}
else
{
std::stringstream ss;
ss << portStr;
ss >> port;
}
if (path.empty())
{
path = "/";
}
else if (path[0] != '/')
{
path = '/' + path;
}
if (!query.empty())
{
path += "?";
path += query;
}
return true;
}
void WebSocketTransport::printUrl(const std::string& url)
{
std::string protocol, host, path, query;
int port {0};
if (!WebSocketTransport::parseUrl(url, protocol, host,
path, query, port))
{
return;
}
std::cout << "[" << url << "]" << std::endl;
std::cout << protocol << std::endl;
std::cout << host << std::endl;
std::cout << port << std::endl;
std::cout << path << std::endl;
std::cout << query << std::endl;
std::cout << "-------------------------------" << std::endl;
}
WebSocketInitResult WebSocketTransport::init()
{
std::string protocol, host, path, query;
int port;
if (!WebSocketTransport::parseUrl(_url, protocol, host,
path, query, port))
{
return WebSocketInitResult(false, 0, "Could not parse URL");
}
if (protocol == "wss")
{
_socket.reset();
#ifdef __APPLE__
_socket = std::make_shared<SocketAppleSSL>();
#else
_socket = std::make_shared<SocketOpenSSL>();
#endif
}
else
{
_socket.reset();
_socket = std::make_shared<Socket>();
}
std::string errMsg;
bool success = _socket->connect(host, port, errMsg);
if (!success)
{
std::stringstream ss;
ss << "Unable to connect to " << host
<< " on port " << port
<< ", error: " << errMsg;
return WebSocketInitResult(false, 0, ss.str());
}
char line[256];
int status;
int i;
snprintf(line, 256,
"GET %s HTTP/1.1\r\n"
"Host: %s:%d\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n",
path.c_str(), host.c_str(), port);
// XXX: this should be done non-blocking,
size_t lineSize = strlen(line);
if (_socket->send(line, lineSize) != lineSize)
{
return WebSocketInitResult(false, 0, std::string("Failed sending GET request to ") + _url);
}
for (i = 0; i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n'); ++i)
{
if (_socket->recv(line+i, 1) == 0)
{
return WebSocketInitResult(false, 0, std::string("Failed reading HTTP status line from ") + _url);
}
}
line[i] = 0;
if (i == 255)
{
return WebSocketInitResult(false, 0, std::string("Got bad status line connecting to ") + _url);
}
// HTTP/1.0 is too old.
if (sscanf(line, "HTTP/1.0 %d", &status) == 1)
{
std::stringstream ss;
ss << "Server version is HTTP/1.0. Rejecting connection to " << host;
return WebSocketInitResult(false, status, ss.str());
}
// We want an 101 HTTP status
if (sscanf(line, "HTTP/1.1 %d", &status) != 1 || status != 101)
{
std::stringstream ss;
ss << "Got bad status connecting to " << host
<< ", status: " << status
<< ", HTTP Status line: " << line;
return WebSocketInitResult(false, status, ss.str());
}
// TODO: verify response headers,
while (true)
{
for (i = 0;
i < 2 || (i < 255 && line[i-2] != '\r' && line[i-1] != '\n');
++i)
{
if (_socket->recv(line+i, 1) == 0)
{
return WebSocketInitResult(false, status, std::string("Failed reading response header from ") + _url);
}
}
if (line[0] == '\r' && line[1] == '\n')
{
break;
}
}
_socket->configure();
setReadyState(OPEN);
return WebSocketInitResult(true, status, "");
}
WebSocketTransport::ReadyStateValues WebSocketTransport::getReadyState() const
{
return _readyState;
}
void WebSocketTransport::setReadyState(ReadyStateValues readyStateValue)
{
_readyState = readyStateValue;
_onStateChangeCallback(readyStateValue);
}
void WebSocketTransport::setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback)
{
_onStateChangeCallback = onStateChangeCallback;
}
void WebSocketTransport::poll()
{
_socket->poll(
[this]()
{
while (true)
{
int N = (int) _rxbuf.size();
ssize_t ret;
_rxbuf.resize(N + 1500);
ret = _socket->recv((char*)&_rxbuf[0] + N, 1500);
if (ret < 0 && (errno == EWOULDBLOCK ||
errno == EAGAIN)) {
_rxbuf.resize(N);
break;
}
else if (ret <= 0)
{
_rxbuf.resize(N);
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.resize(N + ret);
}
}
if (isSendBufferEmpty() && _readyState == CLOSING)
{
_socket->close();
setReadyState(CLOSED);
}
});
}
bool WebSocketTransport::isSendBufferEmpty() const
{
std::lock_guard<std::mutex> lock(_txbufMutex);
return _txbuf.empty();
}
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin,
std::string::const_iterator end,
uint64_t message_size,
uint8_t masking_key[4])
{
std::lock_guard<std::mutex> lock(_txbufMutex);
_txbuf.insert(_txbuf.end(), header.begin(), header.end());
_txbuf.insert(_txbuf.end(), begin, end);
// Masking
for (size_t i = 0; i != (size_t) message_size; ++i)
{
*(_txbuf.end() - (size_t) message_size + i) ^= masking_key[i&0x3];
}
}
void WebSocketTransport::appendToSendBuffer(const std::vector<uint8_t>& buffer)
{
std::lock_guard<std::mutex> lock(_txbufMutex);
_txbuf.insert(_txbuf.end(), buffer.begin(), buffer.end());
}
//
// http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-------+-+-------------+-------------------------------+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
// |I|S|S|S| (4) |A| (7) | (16/64) |
// |N|V|V|V| |S| | (if payload len==126/127) |
// | |1|2|3| |K| | |
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
// | Extended payload length continued, if payload len == 127 |
// + - - - - - - - - - - - - - - - +-------------------------------+
// | |Masking-key, if MASK set to 1 |
// +-------------------------------+-------------------------------+
// | Masking-key (continued) | Payload Data |
// +-------------------------------- - - - - - - - - - - - - - - - +
// : Payload Data continued ... :
// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
// | Payload Data continued ... |
// +---------------------------------------------------------------+
//
void WebSocketTransport::dispatch(const OnMessageCallback& onMessageCallback)
{
// TODO: consider acquiring a lock on _rxbuf...
while (true) {
wsheader_type ws;
if (_rxbuf.size() < 2) return; /* Need at least 2 */
const uint8_t * data = (uint8_t *) &_rxbuf[0]; // peek, but don't consume
ws.fin = (data[0] & 0x80) == 0x80;
ws.opcode = (wsheader_type::opcode_type) (data[0] & 0x0f);
ws.mask = (data[1] & 0x80) == 0x80;
ws.N0 = (data[1] & 0x7f);
ws.header_size = 2 + (ws.N0 == 126? 2 : 0) + (ws.N0 == 127? 8 : 0) + (ws.mask? 4 : 0);
if (_rxbuf.size() < ws.header_size) return; /* Need: ws.header_size - _rxbuf.size() */
//
// Calculate payload length:
// 0-125 mean the payload is that long.
// 126 means that the following two bytes indicate the length,
// 127 means the next 8 bytes indicate the length.
//
int i = 0;
if (ws.N0 < 126)
{
ws.N = ws.N0;
i = 2;
}
else if (ws.N0 == 126)
{
ws.N = 0;
ws.N |= ((uint64_t) data[2]) << 8;
ws.N |= ((uint64_t) data[3]) << 0;
i = 4;
}
else if (ws.N0 == 127)
{
ws.N = 0;
ws.N |= ((uint64_t) data[2]) << 56;
ws.N |= ((uint64_t) data[3]) << 48;
ws.N |= ((uint64_t) data[4]) << 40;
ws.N |= ((uint64_t) data[5]) << 32;
ws.N |= ((uint64_t) data[6]) << 24;
ws.N |= ((uint64_t) data[7]) << 16;
ws.N |= ((uint64_t) data[8]) << 8;
ws.N |= ((uint64_t) data[9]) << 0;
i = 10;
}
else
{
// invalid payload length according to the spec. bail out
return;
}
if (ws.mask)
{
ws.masking_key[0] = ((uint8_t) data[i+0]) << 0;
ws.masking_key[1] = ((uint8_t) data[i+1]) << 0;
ws.masking_key[2] = ((uint8_t) data[i+2]) << 0;
ws.masking_key[3] = ((uint8_t) data[i+3]) << 0;
}
else
{
ws.masking_key[0] = 0;
ws.masking_key[1] = 0;
ws.masking_key[2] = 0;
ws.masking_key[3] = 0;
}
if (_rxbuf.size() < ws.header_size+ws.N)
{
return; /* Need: ws.header_size+ws.N - _rxbuf.size() */
}
// We got a whole message, now do something with it:
if (
ws.opcode == wsheader_type::TEXT_FRAME
|| ws.opcode == wsheader_type::BINARY_FRAME
|| ws.opcode == wsheader_type::CONTINUATION
) {
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
_receivedData.insert(_receivedData.end(),
_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size+(size_t)ws.N);// just feed
if (ws.fin)
{
// fire callback with a string message
std::string stringMessage(_receivedData.begin(),
_receivedData.end());
onMessageCallback(stringMessage);
_receivedData.clear();
}
}
else if (ws.opcode == wsheader_type::PING)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
sendData(wsheader_type::PONG, pingData.size(),
pingData.begin(), pingData.end());
}
else if (ws.opcode == wsheader_type::PONG) { }
else if (ws.opcode == wsheader_type::CLOSE) { close(); }
else { close(); }
_rxbuf.erase(_rxbuf.begin(),
_rxbuf.begin() + ws.header_size + (size_t) ws.N);
}
}
unsigned WebSocketTransport::getRandomUnsigned()
{
auto now = std::chrono::system_clock::now();
auto seconds =
std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
return static_cast<unsigned>(seconds);
}
void WebSocketTransport::sendData(wsheader_type::opcode_type type,
uint64_t message_size,
std::string::const_iterator message_begin,
std::string::const_iterator message_end)
{
if (_readyState == CLOSING || _readyState == CLOSED)
{
return;
}
unsigned x = getRandomUnsigned();
uint8_t masking_key[4] = {};
masking_key[0] = (x >> 24);
masking_key[1] = (x >> 16) & 0xff;
masking_key[2] = (x >> 8) & 0xff;
masking_key[3] = (x) & 0xff;
std::vector<uint8_t> header;
header.assign(2 +
(message_size >= 126 ? 2 : 0) +
(message_size >= 65536 ? 6 : 0) + 4, 0);
header[0] = 0x80 | type;
if (message_size < 126)
{
header[1] = (message_size & 0xff) | 0x80;
header[2] = masking_key[0];
header[3] = masking_key[1];
header[4] = masking_key[2];
header[5] = masking_key[3];
}
else if (message_size < 65536)
{
header[1] = 126 | 0x80;
header[2] = (message_size >> 8) & 0xff;
header[3] = (message_size >> 0) & 0xff;
header[4] = masking_key[0];
header[5] = masking_key[1];
header[6] = masking_key[2];
header[7] = masking_key[3];
}
else
{ // TODO: run coverage testing here
header[1] = 127 | 0x80;
header[2] = (message_size >> 56) & 0xff;
header[3] = (message_size >> 48) & 0xff;
header[4] = (message_size >> 40) & 0xff;
header[5] = (message_size >> 32) & 0xff;
header[6] = (message_size >> 24) & 0xff;
header[7] = (message_size >> 16) & 0xff;
header[8] = (message_size >> 8) & 0xff;
header[9] = (message_size >> 0) & 0xff;
header[10] = masking_key[0];
header[11] = masking_key[1];
header[12] = masking_key[2];
header[13] = masking_key[3];
}
// _txbuf will keep growing until it can be transmitted over the socket:
appendToSendBuffer(header, message_begin, message_end,
message_size, masking_key);
// Now actually send this data
sendOnSocket();
}
void WebSocketTransport::sendPing()
{
std::string empty;
sendData(wsheader_type::PING, empty.size(), empty.begin(), empty.end());
}
void WebSocketTransport::sendBinary(const std::string& message)
{
sendData(wsheader_type::BINARY_FRAME, message.size(), message.begin(), message.end());
}
void WebSocketTransport::sendOnSocket()
{
std::lock_guard<std::mutex> lock(_txbufMutex);
while (_txbuf.size())
{
int ret = _socket->send((char*)&_txbuf[0], _txbuf.size());
if (ret < 0 && (errno == EWOULDBLOCK ||
errno == EAGAIN))
{
break;
}
else if (ret <= 0)
{
_socket->close();
setReadyState(CLOSED);
break;
}
else
{
_txbuf.erase(_txbuf.begin(), _txbuf.begin() + ret);
}
}
}
void WebSocketTransport::close()
{
if (_readyState == CLOSING || _readyState == CLOSED) return;
setReadyState(CLOSING);
uint8_t closeFrame[6] = {0x88, 0x80, 0x00, 0x00, 0x00, 0x00}; // last 4 bytes are a masking key
std::vector<uint8_t> header(closeFrame, closeFrame+6);
appendToSendBuffer(header);
sendOnSocket();
_socket->wakeUpFromPoll();
}
} // namespace ix

View File

@ -0,0 +1,132 @@
/*
* IXWebSocketTransport.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
//
// Adapted from https://github.com/dhbaird/easywsclient
//
#include <string>
#include <vector>
#include <functional>
#include <memory>
#include <mutex>
namespace ix
{
class Socket;
struct WebSocketInitResult
{
bool success;
int http_status;
std::string errorStr;
WebSocketInitResult(bool s, int h, std::string e)
{
success = s;
http_status = h;
errorStr = e;
}
// need to define a default
WebSocketInitResult()
{
success = false;
http_status = 0;
errorStr = "";
}
};
class WebSocketTransport
{
public:
enum ReadyStateValues
{
CLOSING,
CLOSED,
CONNECTING,
OPEN
};
using OnMessageCallback = std::function<void(const std::string&)>;
using OnStateChangeCallback = std::function<void(ReadyStateValues)>;
WebSocketTransport();
~WebSocketTransport();
void configure(const std::string& url);
WebSocketInitResult init();
void poll();
void send(const std::string& message);
void sendBinary(const std::string& message);
void sendBinary(const std::vector<uint8_t>& message);
void sendPing();
void close();
ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue);
void setOnStateChangeCallback(const OnStateChangeCallback& onStateChangeCallback);
void dispatch(const OnMessageCallback& onMessageCallback);
static void printUrl(const std::string& url);
static bool parseUrl(const std::string& url,
std::string& protocol,
std::string& host,
std::string& path,
std::string& query,
int& port);
private:
std::string _url;
std::string _origin;
struct wsheader_type {
unsigned header_size;
bool fin;
bool mask;
enum opcode_type {
CONTINUATION = 0x0,
TEXT_FRAME = 0x1,
BINARY_FRAME = 0x2,
CLOSE = 8,
PING = 9,
PONG = 0xa,
} opcode;
int N0;
uint64_t N;
uint8_t masking_key[4];
};
std::vector<uint8_t> _rxbuf;
std::vector<uint8_t> _txbuf;
mutable std::mutex _txbufMutex;
std::vector<uint8_t> _receivedData;
std::shared_ptr<Socket> _socket;
std::atomic<ReadyStateValues> _readyState;
OnStateChangeCallback _onStateChangeCallback;
void sendOnSocket();
void sendData(wsheader_type::opcode_type type,
uint64_t message_size,
std::string::const_iterator message_begin,
std::string::const_iterator message_end);
bool isSendBufferEmpty() const;
void appendToSendBuffer(const std::vector<uint8_t>& header,
std::string::const_iterator begin,
std::string::const_iterator end,
uint64_t message_size,
uint8_t masking_key[4]);
void appendToSendBuffer(const std::vector<uint8_t>& buffer);
unsigned getRandomUnsigned();
};
}