Compare commits

...

5 Commits

Author SHA1 Message Date
cbf21b4008 add an option to easily disable per message deflate compression 2019-06-06 13:48:53 -07:00
68c1bf7017 fix Dockerfile link 2019-06-05 19:38:44 -07:00
257c901255 cobra to sentry / more error handling 2019-06-05 19:37:51 -07:00
15d8c663da cobra to sentry fixes 2019-06-05 18:47:48 -07:00
d50125c62d Feature/http async (#90)
* unittest working / uses shared_ptr for a bunch of things 🗿

* fix command line tools

* fix ws + add doc

* add more logging
2019-06-05 17:04:24 -07:00
25 changed files with 615 additions and 276 deletions

View File

@ -1 +1 @@
2.2.1
3.1.1

View File

@ -1 +1 @@
docker/Dockerfile.ubuntu_artful
docker/Dockerfile.alpine

View File

@ -28,6 +28,9 @@ webSocket.setUrl(url);
// to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45);
// Per message deflate connection is enabled by default. You can tweak its parameters or disable it
webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType,
@ -129,33 +132,33 @@ Here is what the HTTP client API looks like. Note that HTTP client support is ve
// Preparation
//
HttpClient httpClient;
HttpRequestArgs args;
HttpRequestArgsPtr args = httpClient.createRequest();
// Custom headers can be set
WebSocketHttpHeaders headers;
headers["Foo"] = "bar";
args.extraHeaders = headers;
args->extraHeaders = headers;
// Timeout options
args.connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout;
args->connectTimeout = connectTimeout;
args->transferTimeout = transferTimeout;
// Redirect options
args.followRedirects = followRedirects;
args.maxRedirects = maxRedirects;
args->followRedirects = followRedirects;
args->maxRedirects = maxRedirects;
// Misc
args.compress = compress; // Enable gzip compression
args.verbose = verbose;
args.logger = [](const std::string& msg)
args->compress = compress; // Enable gzip compression
args->verbose = verbose;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
//
// Request
// Synchronous Request
//
HttpResponse out;
HttpResponsePtr out;
std::string url = "https://www.google.com";
// HEAD request
@ -175,13 +178,30 @@ out = httpClient.post(url, std::string("foo=bar"), args);
//
// Result
//
auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
auto errorCode = response->errorCode; // Can be HttpErrorCode::Ok, HttpErrorCode::UrlMalformed, etc...
auto errorCode = response->errorCode; // 200, 404, etc...
auto responseHeaders = response->headers; // All the headers in a special case-insensitive unordered_map of (string, string)
auto payload = response->payload; // All the bytes from the response as an std::string
auto errorMsg = response->errorMsg; // Descriptive error message in case of failure
auto uploadSize = response->uploadSize; // Byte count of uploaded data
auto downloadSize = response->downloadSize; // Byte count of downloaded data
//
// Asynchronous Request
//
bool async = true;
HttpClient httpClient(async);
auto args = httpClient.createRequest(url, HttpClient::kGet);
// Push the request to a queue,
bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response)
{
// This callback execute in a background thread. Make sure you uses appropriate protection such as mutex
auto statusCode = response->statusCode; // acess results
}
);
// ok will be false if your httpClient is not async
```
## Build
@ -238,7 +258,7 @@ No manual polling to fetch data is required. Data is sent and received instantly
### Automatic reconnection
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds.
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. This behavior can be disabled.
### Large messages

View File

@ -0,0 +1,23 @@
# Build time
FROM ubuntu:bionic as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install python
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make", "ws"]

View File

@ -24,23 +24,95 @@ namespace ix
const std::string HttpClient::kDel = "DEL";
const std::string HttpClient::kPut = "PUT";
HttpClient::HttpClient()
HttpClient::HttpClient(bool async) : _async(async), _stop(false)
{
if (!_async) return;
_thread = std::thread(&HttpClient::run, this);
}
HttpClient::~HttpClient()
{
if (!_thread.joinable()) return;
_stop = true;
_condition.notify_one();
_thread.join();
}
HttpResponse HttpClient::request(
HttpRequestArgsPtr HttpClient::createRequest(const std::string& url,
const std::string& verb)
{
auto request = std::make_shared<HttpRequestArgs>();
request->url = url;
request->verb = verb;
return request;
}
bool HttpClient::performRequest(HttpRequestArgsPtr args,
const OnResponseCallback& onResponseCallback)
{
if (!_async) return false;
// Enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queueMutex);
// add the task
_queue.push(std::make_pair(args, onResponseCallback));
} // release lock
// wake up one thread
_condition.notify_one();
return true;
}
void HttpClient::run()
{
while (true)
{
HttpRequestArgsPtr args;
OnResponseCallback onResponseCallback;
{
std::unique_lock<std::mutex> lock(_queueMutex);
while (!_stop && _queue.empty())
{
_condition.wait(lock);
}
if (_stop) return;
auto p = _queue.front();
_queue.pop();
args = p.first;
onResponseCallback = p.second;
}
if (_stop) return;
HttpResponsePtr response = request(args->url, args->verb, args->body, args);
onResponseCallback(response);
if (_stop) return;
}
}
HttpResponsePtr HttpClient::request(
const std::string& url,
const std::string& verb,
const std::string& body,
const HttpRequestArgs& args,
HttpRequestArgsPtr args,
int redirects)
{
// We only have one socket connection, so we cannot
// make multiple requests concurrently.
std::lock_guard<std::mutex> lock(_mutex);
uint64_t uploadSize = 0;
uint64_t downloadSize = 0;
int code = 0;
@ -54,7 +126,7 @@ namespace ix
{
std::stringstream ss;
ss << "Cannot parse url: " << url;
return HttpResponse(code, HttpErrorCode::UrlMalformed,
return std::make_shared<HttpResponse>(code, HttpErrorCode::UrlMalformed,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
@ -65,7 +137,7 @@ namespace ix
if (!_socket)
{
return HttpResponse(code, HttpErrorCode::CannotCreateSocket,
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotCreateSocket,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
@ -75,13 +147,13 @@ namespace ix
ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n";
if (args.compress)
if (args->compress)
{
ss << "Accept-Encoding: gzip" << "\r\n";
}
// Append extra headers
for (auto&& it : args.extraHeaders)
for (auto&& it : args->extraHeaders)
{
ss << it.first << ": " << it.second << "\r\n";
}
@ -103,7 +175,7 @@ namespace ix
ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end())
if (args->extraHeaders.find("Content-Type") == args->extraHeaders.end())
{
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
}
@ -121,23 +193,23 @@ namespace ix
// Make a cancellation object dealing with connection timeout
auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation);
makeCancellationRequestWithTimeout(args->connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success)
{
std::stringstream ss;
ss << "Cannot connect to url: " << url << " / error : " << errMsg;
return HttpResponse(code, HttpErrorCode::CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotConnect,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Make a new cancellation object dealing with transfer timeout
isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation);
makeCancellationRequestWithTimeout(args->transferTimeout, requestInitCancellation);
if (args.verbose)
if (args->verbose)
{
std::stringstream ss;
ss << "Sending " << verb << " request "
@ -154,9 +226,9 @@ namespace ix
if (!_socket->writeBytes(req, isCancellationRequested))
{
std::string errorMsg("Cannot send request");
return HttpResponse(code, HttpErrorCode::SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::SendError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uploadSize = req.size();
@ -168,12 +240,12 @@ namespace ix
if (!lineValid)
{
std::string errorMsg("Cannot retrieve status line");
return HttpResponse(code, HttpErrorCode::CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadStatusLine,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (args.verbose)
if (args->verbose)
{
std::stringstream ss;
ss << "Status line " << line;
@ -183,9 +255,9 @@ namespace ix
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{
std::string errorMsg("Cannot parse response code from status line");
return HttpResponse(code, HttpErrorCode::MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingStatus,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
auto result = parseHttpHeaders(_socket, isCancellationRequested);
@ -195,29 +267,29 @@ namespace ix
if (!headersValid)
{
std::string errorMsg("Cannot parse http headers");
return HttpResponse(code, HttpErrorCode::HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::HeaderParsingError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
// Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects)
if ((code >= 301 && code <= 308) && args->followRedirects)
{
if (headers.find("Location") == headers.end())
{
std::string errorMsg("Missing location header for redirect");
return HttpResponse(code, HttpErrorCode::MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingLocation,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (redirects >= args.maxRedirects)
if (redirects >= args->maxRedirects)
{
std::stringstream ss;
ss << "Too many redirects: " << redirects;
return HttpResponse(code, HttpErrorCode::TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::TooManyRedirects,
headers, payload, ss.str(),
uploadSize, downloadSize);
}
// Recurse
@ -227,9 +299,9 @@ namespace ix
if (verb == "HEAD")
{
return HttpResponse(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
// Parse response:
@ -243,14 +315,14 @@ namespace ix
payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback,
args->onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
}
@ -266,9 +338,9 @@ namespace ix
if (!lineResult.first)
{
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
uint64_t chunkSize;
@ -276,7 +348,7 @@ namespace ix
ss << std::hex << line;
ss >> chunkSize;
if (args.verbose)
if (args->verbose)
{
std::stringstream oss;
oss << "Reading " << chunkSize << " bytes"
@ -288,14 +360,14 @@ namespace ix
// Read a chunk
auto chunkResult = _socket->readBytes((size_t) chunkSize,
args.onProgressCallback,
args->onProgressCallback,
isCancellationRequested);
if (!chunkResult.first)
{
errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload += chunkResult.second;
@ -304,9 +376,9 @@ namespace ix
if (!lineResult.first)
{
return HttpResponse(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
if (chunkSize == 0) break;
@ -319,9 +391,9 @@ namespace ix
else
{
std::string errorMsg("Cannot read http body");
return HttpResponse(code, HttpErrorCode::CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadBody,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
downloadSize = payload.size();
@ -333,60 +405,60 @@ namespace ix
if (!gzipInflate(payload, decompressedPayload))
{
std::string errorMsg("Error decompressing payload");
return HttpResponse(code, HttpErrorCode::Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::Gzip,
headers, payload, errorMsg,
uploadSize, downloadSize);
}
payload = decompressedPayload;
}
return HttpResponse(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(),
uploadSize, downloadSize);
}
HttpResponse HttpClient::get(const std::string& url,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::get(const std::string& url,
HttpRequestArgsPtr args)
{
return request(url, kGet, std::string(), args);
}
HttpResponse HttpClient::head(const std::string& url,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::head(const std::string& url,
HttpRequestArgsPtr args)
{
return request(url, kHead, std::string(), args);
}
HttpResponse HttpClient::del(const std::string& url,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::del(const std::string& url,
HttpRequestArgsPtr args)
{
return request(url, kDel, std::string(), args);
}
HttpResponse HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::post(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args)
{
return request(url, kPost, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::post(const std::string& url,
const std::string& body,
HttpRequestArgsPtr args)
{
return request(url, kPost, body, args);
}
HttpResponse HttpClient::put(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::put(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args)
{
return request(url, kPut, serializeHttpParameters(httpParameters), args);
}
HttpResponse HttpClient::put(const std::string& url,
const std::string& body,
const HttpRequestArgs& args)
HttpResponsePtr HttpClient::put(const std::string& url,
const std::string& body,
const HttpRequestArgsPtr args)
{
return request(url, kPut, body, args);
}
@ -488,11 +560,11 @@ namespace ix
}
void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args)
HttpRequestArgsPtr args)
{
if (args.logger)
if (args->logger)
{
args.logger(msg);
args->logger(msg);
}
}
}

View File

@ -10,11 +10,13 @@
#include "IXWebSocketHttpHeaders.h"
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <tuple>
#include <queue>
#include <thread>
namespace ix
{
@ -34,7 +36,8 @@ namespace ix
MissingLocation = 11,
TooManyRedirects = 12,
ChunkReadError = 13,
CannotReadBody = 14
CannotReadBody = 14,
Invalid = 100
};
struct HttpResponse
@ -66,12 +69,15 @@ namespace ix
}
};
using HttpResponsePtr = std::shared_ptr<HttpResponse>;
using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>;
using OnResponseCallback = std::function<void(const HttpResponsePtr&)>;
struct HttpRequestArgs
{
std::string url;
std::string verb;
WebSocketHttpHeaders extraHeaders;
std::string body;
int connectTimeout;
@ -84,51 +90,72 @@ namespace ix
OnProgressCallback onProgressCallback;
};
using HttpRequestArgsPtr = std::shared_ptr<HttpRequestArgs>;
class HttpClient
{
public:
HttpClient();
HttpClient(bool async = false);
~HttpClient();
HttpResponse get(const std::string& url, const HttpRequestArgs& args);
HttpResponse head(const std::string& url, const HttpRequestArgs& args);
HttpResponse del(const std::string& url, const HttpRequestArgs& args);
HttpResponsePtr get(const std::string& url, HttpRequestArgsPtr args);
HttpResponsePtr head(const std::string& url, HttpRequestArgsPtr args);
HttpResponsePtr del(const std::string& url, HttpRequestArgsPtr args);
HttpResponse post(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
HttpResponse put(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse put(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
HttpResponse request(const std::string& url,
const std::string& verb,
HttpResponsePtr post(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args);
HttpResponsePtr post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args,
int redirects = 0);
HttpRequestArgsPtr args);
HttpResponsePtr put(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args);
HttpResponsePtr put(const std::string& url,
const std::string& body,
HttpRequestArgsPtr args);
HttpResponsePtr request(const std::string& url,
const std::string& verb,
const std::string& body,
HttpRequestArgsPtr args,
int redirects = 0);
// Async API
HttpRequestArgsPtr createRequest(const std::string& url = std::string(),
const std::string& verb = HttpClient::kGet);
bool performRequest(HttpRequestArgsPtr request,
const OnResponseCallback& onResponseCallback);
std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value);
private:
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(const std::string& in, std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost;
const static std::string kGet;
const static std::string kHead;
const static std::string kDel;
const static std::string kPut;
private:
void log(const std::string& msg, HttpRequestArgsPtr args);
bool gzipInflate(const std::string& in, std::string& out);
// Async API background thread runner
void run();
// Async API
bool _async;
std::queue<std::pair<HttpRequestArgsPtr, OnResponseCallback>> _queue;
mutable std::mutex _queueMutex;
std::condition_variable _condition;
std::atomic<bool> _stop;
std::thread _thread;
std::shared_ptr<Socket> _socket;
std::mutex _mutex; // to protect accessing the _socket (only one socket per client)
};
} // namespace ix

View File

@ -82,7 +82,7 @@ namespace ix
mbedtls_ssl_set_bio(&_ssl, &_sockfd, mbedtls_net_send, mbedtls_net_recv, NULL);
int res;
do
do
{
res = mbedtls_ssl_handshake(&_ssl);
}

View File

@ -135,6 +135,13 @@ namespace ix
_enablePong = false;
}
void WebSocket::disablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(false);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::start()
{
if (_thread.joinable()) return; // we've already been started

View File

@ -95,6 +95,7 @@ namespace ix
void setPingTimeout(int pingTimeoutSecs);
void enablePong();
void disablePong();
void disablePerMessageDeflate();
// Run asynchronously, by calling start and stop.
void start();

View File

@ -81,7 +81,7 @@ namespace ix
{
_onMessageUserCallback = std::move(callback);
}
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
{
MessagePtr message;

View File

@ -1041,7 +1041,7 @@ namespace ix
_requestInitCancellation = true;
if (_readyState == ReadyState::CLOSING || _readyState == ReadyState::CLOSED) return;
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = code;

View File

@ -9,7 +9,7 @@ install: brew
# on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local
brew:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j install)
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
ws:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j)
@ -44,7 +44,7 @@ trail:
sh third_party/remote_trailing_whitespaces.sh
format:
find ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
find test ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
# That target is used to start a node server, but isn't required as we have
# a builtin C++ server started in the unittest now

View File

@ -15,85 +15,219 @@ TEST_CASE("http client", "[http]")
{
SECTION("Connect to a remote HTTP server")
{
std::string url("http://httpbin.org/");
HttpClient httpClient;
WebSocketHttpHeaders headers;
headers["User-Agent"] = "ixwebsocket";
HttpRequestArgs args;
args.extraHeaders = headers;
args.connectTimeout = 60;
args.transferTimeout = 60;
args.followRedirects = true;
args.maxRedirects = 10;
args.verbose = true;
args.compress = true;
args.logger = [](const std::string& msg)
std::string url("http://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args.onProgressCallback = [](int current, int total) -> bool
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
HttpClient httpClient;
HttpResponse response = httpClient.get(url, args);
auto response = httpClient.get(url, args);
for (auto it : response.headers)
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::cerr << "Upload size: " << response.uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl;
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
REQUIRE(response.errorCode == HttpErrorCode::Ok);
REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
}
#if defined(IXWEBSOCKET_USE_TLS)
SECTION("Connect to a remote HTTPS server")
{
std::string url("https://httpbin.org/");
HttpClient httpClient;
WebSocketHttpHeaders headers;
headers["User-Agent"] = "ixwebsocket";
HttpRequestArgs args;
args.extraHeaders = headers;
args.connectTimeout = 60;
args.transferTimeout = 60;
args.followRedirects = true;
args.maxRedirects = 10;
args.verbose = true;
args.compress = true;
args.logger = [](const std::string& msg)
std::string url("https://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args.onProgressCallback = [](int current, int total) -> bool
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
HttpClient httpClient;
HttpResponse response = httpClient.get(url, args);
auto response = httpClient.get(url, args);
for (auto it : response.headers)
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::cerr << "Upload size: " << response.uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl;
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
REQUIRE(response.errorCode == HttpErrorCode::Ok);
REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
}
SECTION("Async API, one call")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("https://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode(0);
httpClient.performRequest(args, [&requestCompleted, &statusCode]
(const HttpResponsePtr& response)
{
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
// In case of failure, print response->errorMsg
statusCode = response->statusCode;
requestCompleted = true;
}
);
int wait = 0;
while (wait < 5000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode == 200);
}
SECTION("Async API, multiple calls")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("http://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode0(0);
std::atomic<int> statusCode1(0);
std::atomic<int> statusCode2(0);
for (int i = 0; i < 3; ++i)
{
httpClient.performRequest(args, [i, &requestCompleted, &statusCode0, &statusCode1, &statusCode2]
(const HttpResponsePtr& response)
{
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
// In case of failure, print response->errorMsg
if (i == 0)
{
statusCode0 = response->statusCode;
}
else if (i == 1)
{
statusCode1 = response->statusCode;
}
else if (i == 2)
{
statusCode2 = response->statusCode;
requestCompleted = true;
}
}
);
}
int wait = 0;
while (wait < 10000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode0 == 200);
REQUIRE(statusCode1 == 200);
REQUIRE(statusCode2 == 200);
}
#endif
}

View File

@ -6,13 +6,13 @@
#pragma once
#include <string>
#include <vector>
#include <sstream>
#include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex>
#include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <sstream>
#include <string>
#include <vector>
namespace ix
{
@ -28,20 +28,20 @@ namespace ix
struct Logger
{
public:
template <typename T>
Logger& operator<<(T const& obj)
{
std::lock_guard<std::mutex> lock(_mutex);
public:
template<typename T>
Logger& operator<<(T const& obj)
{
std::lock_guard<std::mutex> lock(_mutex);
std::stringstream ss;
ss << obj;
spdlog::info(ss.str());
return *this;
}
std::stringstream ss;
ss << obj;
spdlog::info(ss.str());
return *this;
}
private:
static std::mutex _mutex;
private:
static std::mutex _mutex;
};
void log(const std::string& msg);
@ -49,4 +49,4 @@ namespace ix
int getFreePort();
bool startWebSocketEchoServer(ix::WebSocketServer& server);
}
} // namespace ix

View File

@ -210,7 +210,7 @@ namespace
<< closeInfo.reason
<< ")";
log(ss.str());
std::lock_guard<std::mutex> lck(mutexWrite);
receivedCloseCode = closeInfo.code;

View File

@ -8,6 +8,7 @@
#include <chrono>
#include <iostream>
#include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h>
@ -114,6 +115,7 @@ namespace ix
std::string SentryClient::computePayload(const Json::Value& msg)
{
Json::Value payload;
payload["platform"] = "python";
payload["sdk"]["name"] = "ws";
payload["sdk"]["version"] = "1.0.0";
@ -132,51 +134,77 @@ namespace ix
Json::Value extra;
extra["cobra_event"] = msg;
extra["cobra_event"] = msg;
exception["extra"] = extra;
//
// "tags": [
// [
// "a",
// "b"
// ],
// ]
//
Json::Value tags;
Json::Value gameTag;
gameTag.append("game");
gameTag.append(msg["device"]["game"]);
tags.append(gameTag);
Json::Value userIdTag;
userIdTag.append("userid");
userIdTag.append(msg["device"]["user_id"]);
tags.append(userIdTag);
Json::Value environmentTag;
environmentTag.append("environment");
environmentTag.append(msg["device"]["environment"]);
tags.append(environmentTag);
payload["tags"] = tags;
return _jsonWriter.write(payload);
}
bool SentryClient::send(const Json::Value& msg,
bool verbose)
std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg,
bool verbose)
{
HttpRequestArgs args;
args.extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args.connectTimeout = 60;
args.transferTimeout = 5 * 60;
args.followRedirects = true;
args.verbose = verbose;
args.logger = [](const std::string& msg)
auto args = _httpClient.createRequest();
args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args->connectTimeout = 60;
args->transferTimeout = 5 * 60;
args->followRedirects = true;
args->verbose = verbose;
args->logger = [](const std::string& msg)
{
std::cout << msg;
spdlog::info("request logger: {}", msg);
};
std::string body = computePayload(msg);
HttpResponse response = _httpClient.post(_url, body, args);
HttpResponsePtr response = _httpClient.post(_url, body, args);
if (verbose)
{
for (auto it : response.headers)
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
std::cerr << "Upload size: " << response.uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl;
spdlog::info("Upload size: {}", response->uploadSize);
spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response.statusCode << std::endl;
if (response.errorCode != HttpErrorCode::Ok)
spdlog::info("Status: {}", response->statusCode);
if (response->errorCode != HttpErrorCode::Ok)
{
std::cerr << "error message: " << response.errorMsg << std::endl;
spdlog::info("error message: {}", response->errorMsg);
}
if (response.headers["Content-Type"] != "application/octet-stream")
if (response->headers["Content-Type"] != "application/octet-stream")
{
std::cerr << "payload: " << response.payload << std::endl;
spdlog::info("payload: {}", response->payload);
}
}
return response.statusCode == 200;
return std::make_pair(response, body);
}
} // namespace ix

View File

@ -6,6 +6,7 @@
#pragma once
#include <algorithm>
#include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h>
#include <regex>
@ -18,7 +19,7 @@ namespace ix
SentryClient(const std::string& dsn);
~SentryClient() = default;
bool send(const Json::Value& msg, bool verbose);
std::pair<HttpResponsePtr, std::string> send(const Json::Value& msg, bool verbose);
private:
int64_t getTimestamp();

View File

@ -99,7 +99,7 @@ namespace ix
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind,
const Json::Value& msg)
{
// Now actually enqueue the task
// Enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queue_mutex);

View File

@ -25,7 +25,6 @@ namespace ix
~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey,
const std::string& endpoint,
const std::string& channel,

View File

@ -10,7 +10,6 @@
#if defined(IXWEBSOCKET_USE_MBED_TLS)
# include <mbedtls/md.h>
#elif defined(__APPLE__)
# include <ixwebsocket/IXSocketMbedTLS.h>
# include <CommonCrypto/CommonHMAC.h>
#else
# include <openssl/hmac.h>

View File

@ -80,6 +80,7 @@ int main(int argc, char** argv)
bool strict = false;
bool stress = false;
bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
int port = 8008;
int redisPort = 6379;
int statsdPort = 8125;
@ -110,6 +111,7 @@ int main(int argc, char** argv)
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
connectApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required();
@ -241,7 +243,8 @@ int main(int argc, char** argv)
}
else if (app.got_subcommand("connect"))
{
ret = ix::ws_connect_main(url, disableAutomaticReconnection);
ret = ix::ws_connect_main(url, disableAutomaticReconnection,
disablePerMessageDeflate);
}
else if (app.got_subcommand("chat"))
{

View File

@ -30,7 +30,9 @@ namespace ix
int ws_chat_main(const std::string& url, const std::string& user);
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection);
int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs);

View File

@ -14,6 +14,7 @@
#include <mutex>
#include <condition_variable>
#include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include "IXSentryClient.h"
@ -50,6 +51,8 @@ namespace ix
&queue, verbose, &errorSending, &sentCount,
&stop, &dsn]
{
SentryClient sentryClient(dsn);
while (true)
{
Json::Value msg;
@ -62,10 +65,13 @@ namespace ix
queue.pop();
}
SentryClient sc(dsn);
if (!sc.send(msg, verbose))
auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
if (response->statusCode != 200)
{
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true;
}
else
@ -99,16 +105,16 @@ namespace ix
{
if (eventType == ix::CobraConnection_EventType_Open)
{
std::cerr << "Subscriber: connected" << std::endl;
spdlog::info("Subscriber connected");
for (auto it : headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
spdlog::info("{}: {}", it.first, it.second);
}
}
if (eventType == ix::CobraConnection_EventType_Closed)
{
std::cerr << "Subscriber: closed" << std::endl;
spdlog::info("Subscriber closed");
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
@ -122,7 +128,7 @@ namespace ix
{
if (verbose)
{
std::cerr << jsonWriter.write(msg) << std::endl;
spdlog::info(jsonWriter.write(msg));
}
// If we cannot send to sentry fast enough, drop the message
@ -132,8 +138,7 @@ namespace ix
receivedCount != 0 &&
(sentCount * scaleFactor < receivedCount))
{
std::cerr << "message dropped: sending is backlogged !"
<< std::endl;
spdlog::warn("message dropped: sending is backlogged !");
condition.notify_one();
progressCondition.notify_one();
@ -153,15 +158,15 @@ namespace ix
}
else if (eventType == ix::CobraConnection_EventType_Subscribed)
{
std::cerr << "Subscriber: subscribed to channel " << subscriptionId << std::endl;
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{
std::cerr << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl;
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
}
else if (eventType == ix::CobraConnection_EventType_Error)
{
std::cerr << "Subscriber: error" << errMsg << std::endl;
spdlog::error("Subscriber: error {}", errMsg);
}
}
);
@ -172,18 +177,21 @@ namespace ix
std::unique_lock<std::mutex> lock(progressConditionVariableMutex);
progressCondition.wait(lock);
std::cout << "messages"
<< " received " << receivedCount
<< " sent " << sentCount
<< std::endl;
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
if (strict && errorSending) break;
}
conn.disconnect();
// FIXME: join all the bg threads and stop them.
// join all the bg threads and stop them.
stop = true;
for (int i = 0; i < jobs; i++)
{
spdlog::error("joining thread {}", i);
pool[i].join();
}
return 0;
return (strict && errorSending) ? 1 : 0;
}
}

View File

@ -15,7 +15,8 @@ namespace ix
{
public:
WebSocketConnect(const std::string& _url,
bool disableAutomaticReconnection);
bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
void subscribe(const std::string& channel);
void start();
@ -26,13 +27,16 @@ namespace ix
private:
std::string _url;
ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
void log(const std::string& msg);
};
WebSocketConnect::WebSocketConnect(const std::string& url,
bool disableAutomaticReconnection) :
_url(url)
bool disableAutomaticReconnection,
bool disablePerMessageDeflate) :
_url(url),
_disablePerMessageDeflate(disablePerMessageDeflate)
{
if (disableAutomaticReconnection)
{
@ -54,9 +58,16 @@ namespace ix
{
_webSocket.setUrl(_url);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
if (_disablePerMessageDeflate)
{
_webSocket.disablePerMessageDeflate();
}
else
{
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
@ -130,10 +141,14 @@ namespace ix
_webSocket.send(text);
}
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection)
int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url, disableAutomaticReconnection);
WebSocketConnect webSocketChat(url,
disableAutomaticReconnection,
disablePerMessageDeflate);
webSocketChat.start();
while (true)

View File

@ -95,19 +95,20 @@ namespace ix
const std::string& output,
bool compress)
{
HttpRequestArgs args;
args.extraHeaders = parseHeaders(headersData);
args.connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout;
args.followRedirects = followRedirects;
args.maxRedirects = maxRedirects;
args.verbose = verbose;
args.compress = compress;
args.logger = [](const std::string& msg)
HttpClient httpClient;
auto args = httpClient.createRequest();
args->extraHeaders = parseHeaders(headersData);
args->connectTimeout = connectTimeout;
args->transferTimeout = transferTimeout;
args->followRedirects = followRedirects;
args->maxRedirects = maxRedirects;
args->verbose = verbose;
args->compress = compress;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args.onProgressCallback = [](int current, int total) -> bool
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
@ -116,8 +117,7 @@ namespace ix
HttpParameters httpParameters = parsePostParameters(data);
HttpClient httpClient;
HttpResponse response;
HttpResponsePtr response;
if (headersOnly)
{
response = httpClient.head(url, args);
@ -133,21 +133,21 @@ namespace ix
std::cerr << std::endl;
for (auto it : response.headers)
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::cerr << "Upload size: " << response.uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl;
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl;
if (response.errorCode != HttpErrorCode::Ok)
std::cerr << "Status: " << response->statusCode << std::endl;
if (response->errorCode != HttpErrorCode::Ok)
{
std::cerr << "error message: " << response.errorMsg << std::endl;
std::cerr << "error message: " << response->errorMsg << std::endl;
}
if (!headersOnly && response.errorCode == HttpErrorCode::Ok)
if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
{
if (save || !output.empty())
{
@ -160,14 +160,14 @@ namespace ix
std::cout << "Writing to disk: " << filename << std::endl;
std::ofstream out(filename);
out.write((char*)&response.payload.front(), response.payload.size());
out.write((char*)&response->payload.front(), response->payload.size());
out.close();
}
else
{
if (response.headers["Content-Type"] != "application/octet-stream")
if (response->headers["Content-Type"] != "application/octet-stream")
{
std::cout << "payload: " << response.payload << std::endl;
std::cout << "payload: " << response->payload << std::endl;
}
else
{