Compare commits

...

5 Commits

Author SHA1 Message Date
cbf21b4008 add an option to easily disable per message deflate compression 2019-06-06 13:48:53 -07:00
68c1bf7017 fix Dockerfile link 2019-06-05 19:38:44 -07:00
257c901255 cobra to sentry / more error handling 2019-06-05 19:37:51 -07:00
15d8c663da cobra to sentry fixes 2019-06-05 18:47:48 -07:00
d50125c62d Feature/http async (#90)
* unittest working / uses shared_ptr for a bunch of things 🗿

* fix command line tools

* fix ws + add doc

* add more logging
2019-06-05 17:04:24 -07:00
25 changed files with 615 additions and 276 deletions

View File

@ -1 +1 @@
2.2.1 3.1.1

View File

@ -1 +1 @@
docker/Dockerfile.ubuntu_artful docker/Dockerfile.alpine

View File

@ -28,6 +28,9 @@ webSocket.setUrl(url);
// to make sure that load balancers do not kill an idle connection. // to make sure that load balancers do not kill an idle connection.
webSocket.setHeartBeatPeriod(45); webSocket.setHeartBeatPeriod(45);
// Per message deflate connection is enabled by default. You can tweak its parameters or disable it
webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received // Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback( webSocket.setOnMessageCallback(
[](ix::WebSocketMessageType messageType, [](ix::WebSocketMessageType messageType,
@ -129,33 +132,33 @@ Here is what the HTTP client API looks like. Note that HTTP client support is ve
// Preparation // Preparation
// //
HttpClient httpClient; HttpClient httpClient;
HttpRequestArgs args; HttpRequestArgsPtr args = httpClient.createRequest();
// Custom headers can be set // Custom headers can be set
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["Foo"] = "bar"; headers["Foo"] = "bar";
args.extraHeaders = headers; args->extraHeaders = headers;
// Timeout options // Timeout options
args.connectTimeout = connectTimeout; args->connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout; args->transferTimeout = transferTimeout;
// Redirect options // Redirect options
args.followRedirects = followRedirects; args->followRedirects = followRedirects;
args.maxRedirects = maxRedirects; args->maxRedirects = maxRedirects;
// Misc // Misc
args.compress = compress; // Enable gzip compression args->compress = compress; // Enable gzip compression
args.verbose = verbose; args->verbose = verbose;
args.logger = [](const std::string& msg) args->logger = [](const std::string& msg)
{ {
std::cout << msg; std::cout << msg;
}; };
// //
// Request // Synchronous Request
// //
HttpResponse out; HttpResponsePtr out;
std::string url = "https://www.google.com"; std::string url = "https://www.google.com";
// HEAD request // HEAD request
@ -175,13 +178,30 @@ out = httpClient.post(url, std::string("foo=bar"), args);
// //
// Result // Result
// //
auto statusCode = std::get<0>(out); auto errorCode = response->errorCode; // Can be HttpErrorCode::Ok, HttpErrorCode::UrlMalformed, etc...
auto errorCode = std::get<1>(out); auto errorCode = response->errorCode; // 200, 404, etc...
auto responseHeaders = std::get<2>(out); auto responseHeaders = response->headers; // All the headers in a special case-insensitive unordered_map of (string, string)
auto payload = std::get<3>(out); auto payload = response->payload; // All the bytes from the response as an std::string
auto errorMsg = std::get<4>(out); auto errorMsg = response->errorMsg; // Descriptive error message in case of failure
auto uploadSize = std::get<5>(out); auto uploadSize = response->uploadSize; // Byte count of uploaded data
auto downloadSize = std::get<6>(out); auto downloadSize = response->downloadSize; // Byte count of downloaded data
//
// Asynchronous Request
//
bool async = true;
HttpClient httpClient(async);
auto args = httpClient.createRequest(url, HttpClient::kGet);
// Push the request to a queue,
bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response)
{
// This callback execute in a background thread. Make sure you uses appropriate protection such as mutex
auto statusCode = response->statusCode; // acess results
}
);
// ok will be false if your httpClient is not async
``` ```
## Build ## Build
@ -238,7 +258,7 @@ No manual polling to fetch data is required. Data is sent and received instantly
### Automatic reconnection ### Automatic reconnection
If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. If the remote end (server) breaks the connection, the code will try to perpetually reconnect, by using an exponential backoff strategy, capped at one retry every 10 seconds. This behavior can be disabled.
### Large messages ### Large messages

View File

@ -0,0 +1,23 @@
# Build time
FROM ubuntu:bionic as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install python
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make", "ws"]

View File

@ -24,23 +24,95 @@ namespace ix
const std::string HttpClient::kDel = "DEL"; const std::string HttpClient::kDel = "DEL";
const std::string HttpClient::kPut = "PUT"; const std::string HttpClient::kPut = "PUT";
HttpClient::HttpClient() HttpClient::HttpClient(bool async) : _async(async), _stop(false)
{ {
if (!_async) return;
_thread = std::thread(&HttpClient::run, this);
} }
HttpClient::~HttpClient() HttpClient::~HttpClient()
{ {
if (!_thread.joinable()) return;
_stop = true;
_condition.notify_one();
_thread.join();
} }
HttpResponse HttpClient::request( HttpRequestArgsPtr HttpClient::createRequest(const std::string& url,
const std::string& verb)
{
auto request = std::make_shared<HttpRequestArgs>();
request->url = url;
request->verb = verb;
return request;
}
bool HttpClient::performRequest(HttpRequestArgsPtr args,
const OnResponseCallback& onResponseCallback)
{
if (!_async) return false;
// Enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queueMutex);
// add the task
_queue.push(std::make_pair(args, onResponseCallback));
} // release lock
// wake up one thread
_condition.notify_one();
return true;
}
void HttpClient::run()
{
while (true)
{
HttpRequestArgsPtr args;
OnResponseCallback onResponseCallback;
{
std::unique_lock<std::mutex> lock(_queueMutex);
while (!_stop && _queue.empty())
{
_condition.wait(lock);
}
if (_stop) return;
auto p = _queue.front();
_queue.pop();
args = p.first;
onResponseCallback = p.second;
}
if (_stop) return;
HttpResponsePtr response = request(args->url, args->verb, args->body, args);
onResponseCallback(response);
if (_stop) return;
}
}
HttpResponsePtr HttpClient::request(
const std::string& url, const std::string& url,
const std::string& verb, const std::string& verb,
const std::string& body, const std::string& body,
const HttpRequestArgs& args, HttpRequestArgsPtr args,
int redirects) int redirects)
{ {
// We only have one socket connection, so we cannot
// make multiple requests concurrently.
std::lock_guard<std::mutex> lock(_mutex);
uint64_t uploadSize = 0; uint64_t uploadSize = 0;
uint64_t downloadSize = 0; uint64_t downloadSize = 0;
int code = 0; int code = 0;
@ -54,7 +126,7 @@ namespace ix
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot parse url: " << url; ss << "Cannot parse url: " << url;
return HttpResponse(code, HttpErrorCode::UrlMalformed, return std::make_shared<HttpResponse>(code, HttpErrorCode::UrlMalformed,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -65,7 +137,7 @@ namespace ix
if (!_socket) if (!_socket)
{ {
return HttpResponse(code, HttpErrorCode::CannotCreateSocket, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotCreateSocket,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -75,13 +147,13 @@ namespace ix
ss << verb << " " << path << " HTTP/1.1\r\n"; ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n"; ss << "Host: " << host << "\r\n";
if (args.compress) if (args->compress)
{ {
ss << "Accept-Encoding: gzip" << "\r\n"; ss << "Accept-Encoding: gzip" << "\r\n";
} }
// Append extra headers // Append extra headers
for (auto&& it : args.extraHeaders) for (auto&& it : args->extraHeaders)
{ {
ss << it.first << ": " << it.second << "\r\n"; ss << it.first << ": " << it.second << "\r\n";
} }
@ -103,7 +175,7 @@ namespace ix
ss << "Content-Length: " << body.size() << "\r\n"; ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified // Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end()) if (args->extraHeaders.find("Content-Type") == args->extraHeaders.end())
{ {
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n"; ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
} }
@ -121,23 +193,23 @@ namespace ix
// Make a cancellation object dealing with connection timeout // Make a cancellation object dealing with connection timeout
auto isCancellationRequested = auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation); makeCancellationRequestWithTimeout(args->connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested); bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success) if (!success)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot connect to url: " << url << " / error : " << errMsg; ss << "Cannot connect to url: " << url << " / error : " << errMsg;
return HttpResponse(code, HttpErrorCode::CannotConnect, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotConnect,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Make a new cancellation object dealing with transfer timeout // Make a new cancellation object dealing with transfer timeout
isCancellationRequested = isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation); makeCancellationRequestWithTimeout(args->transferTimeout, requestInitCancellation);
if (args.verbose) if (args->verbose)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Sending " << verb << " request " ss << "Sending " << verb << " request "
@ -154,9 +226,9 @@ namespace ix
if (!_socket->writeBytes(req, isCancellationRequested)) if (!_socket->writeBytes(req, isCancellationRequested))
{ {
std::string errorMsg("Cannot send request"); std::string errorMsg("Cannot send request");
return HttpResponse(code, HttpErrorCode::SendError, return std::make_shared<HttpResponse>(code, HttpErrorCode::SendError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
uploadSize = req.size(); uploadSize = req.size();
@ -168,12 +240,12 @@ namespace ix
if (!lineValid) if (!lineValid)
{ {
std::string errorMsg("Cannot retrieve status line"); std::string errorMsg("Cannot retrieve status line");
return HttpResponse(code, HttpErrorCode::CannotReadStatusLine, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadStatusLine,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (args.verbose) if (args->verbose)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Status line " << line; ss << "Status line " << line;
@ -183,9 +255,9 @@ namespace ix
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1) if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{ {
std::string errorMsg("Cannot parse response code from status line"); std::string errorMsg("Cannot parse response code from status line");
return HttpResponse(code, HttpErrorCode::MissingStatus, return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingStatus,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
auto result = parseHttpHeaders(_socket, isCancellationRequested); auto result = parseHttpHeaders(_socket, isCancellationRequested);
@ -195,29 +267,29 @@ namespace ix
if (!headersValid) if (!headersValid)
{ {
std::string errorMsg("Cannot parse http headers"); std::string errorMsg("Cannot parse http headers");
return HttpResponse(code, HttpErrorCode::HeaderParsingError, return std::make_shared<HttpResponse>(code, HttpErrorCode::HeaderParsingError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Redirect ? // Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects) if ((code >= 301 && code <= 308) && args->followRedirects)
{ {
if (headers.find("Location") == headers.end()) if (headers.find("Location") == headers.end())
{ {
std::string errorMsg("Missing location header for redirect"); std::string errorMsg("Missing location header for redirect");
return HttpResponse(code, HttpErrorCode::MissingLocation, return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingLocation,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (redirects >= args.maxRedirects) if (redirects >= args->maxRedirects)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Too many redirects: " << redirects; ss << "Too many redirects: " << redirects;
return HttpResponse(code, HttpErrorCode::TooManyRedirects, return std::make_shared<HttpResponse>(code, HttpErrorCode::TooManyRedirects,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Recurse // Recurse
@ -227,9 +299,9 @@ namespace ix
if (verb == "HEAD") if (verb == "HEAD")
{ {
return HttpResponse(code, HttpErrorCode::Ok, return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(), headers, payload, std::string(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Parse response: // Parse response:
@ -243,14 +315,14 @@ namespace ix
payload.reserve(contentLength); payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength, auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback, args->onProgressCallback,
isCancellationRequested); isCancellationRequested);
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload += chunkResult.second; payload += chunkResult.second;
} }
@ -266,9 +338,9 @@ namespace ix
if (!lineResult.first) if (!lineResult.first)
{ {
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
uint64_t chunkSize; uint64_t chunkSize;
@ -276,7 +348,7 @@ namespace ix
ss << std::hex << line; ss << std::hex << line;
ss >> chunkSize; ss >> chunkSize;
if (args.verbose) if (args->verbose)
{ {
std::stringstream oss; std::stringstream oss;
oss << "Reading " << chunkSize << " bytes" oss << "Reading " << chunkSize << " bytes"
@ -288,14 +360,14 @@ namespace ix
// Read a chunk // Read a chunk
auto chunkResult = _socket->readBytes((size_t) chunkSize, auto chunkResult = _socket->readBytes((size_t) chunkSize,
args.onProgressCallback, args->onProgressCallback,
isCancellationRequested); isCancellationRequested);
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload += chunkResult.second; payload += chunkResult.second;
@ -304,9 +376,9 @@ namespace ix
if (!lineResult.first) if (!lineResult.first)
{ {
return HttpResponse(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (chunkSize == 0) break; if (chunkSize == 0) break;
@ -319,9 +391,9 @@ namespace ix
else else
{ {
std::string errorMsg("Cannot read http body"); std::string errorMsg("Cannot read http body");
return HttpResponse(code, HttpErrorCode::CannotReadBody, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadBody,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
downloadSize = payload.size(); downloadSize = payload.size();
@ -333,60 +405,60 @@ namespace ix
if (!gzipInflate(payload, decompressedPayload)) if (!gzipInflate(payload, decompressedPayload))
{ {
std::string errorMsg("Error decompressing payload"); std::string errorMsg("Error decompressing payload");
return HttpResponse(code, HttpErrorCode::Gzip, return std::make_shared<HttpResponse>(code, HttpErrorCode::Gzip,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload = decompressedPayload; payload = decompressedPayload;
} }
return HttpResponse(code, HttpErrorCode::Ok, return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(), headers, payload, std::string(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
HttpResponse HttpClient::get(const std::string& url, HttpResponsePtr HttpClient::get(const std::string& url,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kGet, std::string(), args); return request(url, kGet, std::string(), args);
} }
HttpResponse HttpClient::head(const std::string& url, HttpResponsePtr HttpClient::head(const std::string& url,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kHead, std::string(), args); return request(url, kHead, std::string(), args);
} }
HttpResponse HttpClient::del(const std::string& url, HttpResponsePtr HttpClient::del(const std::string& url,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kDel, std::string(), args); return request(url, kDel, std::string(), args);
} }
HttpResponse HttpClient::post(const std::string& url, HttpResponsePtr HttpClient::post(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kPost, serializeHttpParameters(httpParameters), args); return request(url, kPost, serializeHttpParameters(httpParameters), args);
} }
HttpResponse HttpClient::post(const std::string& url, HttpResponsePtr HttpClient::post(const std::string& url,
const std::string& body, const std::string& body,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kPost, body, args); return request(url, kPost, body, args);
} }
HttpResponse HttpClient::put(const std::string& url, HttpResponsePtr HttpClient::put(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kPut, serializeHttpParameters(httpParameters), args); return request(url, kPut, serializeHttpParameters(httpParameters), args);
} }
HttpResponse HttpClient::put(const std::string& url, HttpResponsePtr HttpClient::put(const std::string& url,
const std::string& body, const std::string& body,
const HttpRequestArgs& args) const HttpRequestArgsPtr args)
{ {
return request(url, kPut, body, args); return request(url, kPut, body, args);
} }
@ -488,11 +560,11 @@ namespace ix
} }
void HttpClient::log(const std::string& msg, void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
if (args.logger) if (args->logger)
{ {
args.logger(msg); args->logger(msg);
} }
} }
} }

View File

@ -10,11 +10,13 @@
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <condition_variable>
#include <functional> #include <functional>
#include <map> #include <map>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <tuple> #include <queue>
#include <thread>
namespace ix namespace ix
{ {
@ -34,7 +36,8 @@ namespace ix
MissingLocation = 11, MissingLocation = 11,
TooManyRedirects = 12, TooManyRedirects = 12,
ChunkReadError = 13, ChunkReadError = 13,
CannotReadBody = 14 CannotReadBody = 14,
Invalid = 100
}; };
struct HttpResponse struct HttpResponse
@ -66,12 +69,15 @@ namespace ix
} }
}; };
using HttpResponsePtr = std::shared_ptr<HttpResponse>;
using HttpParameters = std::map<std::string, std::string>; using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>; using Logger = std::function<void(const std::string&)>;
using OnResponseCallback = std::function<void(const HttpResponsePtr&)>;
struct HttpRequestArgs struct HttpRequestArgs
{ {
std::string url; std::string url;
std::string verb;
WebSocketHttpHeaders extraHeaders; WebSocketHttpHeaders extraHeaders;
std::string body; std::string body;
int connectTimeout; int connectTimeout;
@ -84,51 +90,72 @@ namespace ix
OnProgressCallback onProgressCallback; OnProgressCallback onProgressCallback;
}; };
using HttpRequestArgsPtr = std::shared_ptr<HttpRequestArgs>;
class HttpClient class HttpClient
{ {
public: public:
HttpClient(); HttpClient(bool async = false);
~HttpClient(); ~HttpClient();
HttpResponse get(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr get(const std::string& url, HttpRequestArgsPtr args);
HttpResponse head(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr head(const std::string& url, HttpRequestArgsPtr args);
HttpResponse del(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr del(const std::string& url, HttpRequestArgsPtr args);
HttpResponse post(const std::string& url, HttpResponsePtr post(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args); HttpRequestArgsPtr args);
HttpResponse post(const std::string& url, HttpResponsePtr post(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
HttpResponse put(const std::string& url,
const HttpParameters& httpParameters,
const HttpRequestArgs& args);
HttpResponse put(const std::string& url,
const std::string& body,
const HttpRequestArgs& args);
HttpResponse request(const std::string& url,
const std::string& verb,
const std::string& body, const std::string& body,
const HttpRequestArgs& args, HttpRequestArgsPtr args);
int redirects = 0);
HttpResponsePtr put(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args);
HttpResponsePtr put(const std::string& url,
const std::string& body,
HttpRequestArgsPtr args);
HttpResponsePtr request(const std::string& url,
const std::string& verb,
const std::string& body,
HttpRequestArgsPtr args,
int redirects = 0);
// Async API
HttpRequestArgsPtr createRequest(const std::string& url = std::string(),
const std::string& verb = HttpClient::kGet);
bool performRequest(HttpRequestArgsPtr request,
const OnResponseCallback& onResponseCallback);
std::string serializeHttpParameters(const HttpParameters& httpParameters); std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value); std::string urlEncode(const std::string& value);
private:
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(const std::string& in, std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost; const static std::string kPost;
const static std::string kGet; const static std::string kGet;
const static std::string kHead; const static std::string kHead;
const static std::string kDel; const static std::string kDel;
const static std::string kPut; const static std::string kPut;
private:
void log(const std::string& msg, HttpRequestArgsPtr args);
bool gzipInflate(const std::string& in, std::string& out);
// Async API background thread runner
void run();
// Async API
bool _async;
std::queue<std::pair<HttpRequestArgsPtr, OnResponseCallback>> _queue;
mutable std::mutex _queueMutex;
std::condition_variable _condition;
std::atomic<bool> _stop;
std::thread _thread;
std::shared_ptr<Socket> _socket;
std::mutex _mutex; // to protect accessing the _socket (only one socket per client)
}; };
} // namespace ix } // namespace ix

View File

@ -135,6 +135,13 @@ namespace ix
_enablePong = false; _enablePong = false;
} }
void WebSocket::disablePerMessageDeflate()
{
std::lock_guard<std::mutex> lock(_configMutex);
WebSocketPerMessageDeflateOptions perMessageDeflateOptions(false);
_perMessageDeflateOptions = perMessageDeflateOptions;
}
void WebSocket::start() void WebSocket::start()
{ {
if (_thread.joinable()) return; // we've already been started if (_thread.joinable()) return; // we've already been started

View File

@ -95,6 +95,7 @@ namespace ix
void setPingTimeout(int pingTimeoutSecs); void setPingTimeout(int pingTimeoutSecs);
void enablePong(); void enablePong();
void disablePong(); void disablePong();
void disablePerMessageDeflate();
// Run asynchronously, by calling start and stop. // Run asynchronously, by calling start and stop.
void start(); void start();

View File

@ -9,7 +9,7 @@ install: brew
# on osx it is good practice to make /usr/local user writable # on osx it is good practice to make /usr/local user writable
# sudo chown -R `whoami`/staff /usr/local # sudo chown -R `whoami`/staff /usr/local
brew: brew:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j install) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
ws: ws:
mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j) mkdir -p build && (cd build ; cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j)
@ -44,7 +44,7 @@ trail:
sh third_party/remote_trailing_whitespaces.sh sh third_party/remote_trailing_whitespaces.sh
format: format:
find ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \; find test ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
# That target is used to start a node server, but isn't required as we have # That target is used to start a node server, but isn't required as we have
# a builtin C++ server started in the unittest now # a builtin C++ server started in the unittest now

View File

@ -15,85 +15,219 @@ TEST_CASE("http client", "[http]")
{ {
SECTION("Connect to a remote HTTP server") SECTION("Connect to a remote HTTP server")
{ {
std::string url("http://httpbin.org/"); HttpClient httpClient;
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["User-Agent"] = "ixwebsocket";
HttpRequestArgs args; std::string url("http://httpbin.org/");
args.extraHeaders = headers; auto args = httpClient.createRequest(url);
args.connectTimeout = 60;
args.transferTimeout = 60; args->extraHeaders = headers;
args.followRedirects = true; args->connectTimeout = 60;
args.maxRedirects = 10; args->transferTimeout = 60;
args.verbose = true; args->followRedirects = true;
args.compress = true; args->maxRedirects = 10;
args.logger = [](const std::string& msg) args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{ {
std::cout << msg; std::cout << msg;
}; };
args.onProgressCallback = [](int current, int total) -> bool args->onProgressCallback = [](int current, int total) -> bool
{ {
std::cerr << "\r" << "Downloaded " std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total; << current << " bytes out of " << total;
return true; return true;
}; };
HttpClient httpClient; auto response = httpClient.get(url, args);
HttpResponse response = httpClient.get(url, args);
for (auto it : response.headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::cerr << "Upload size: " << response.uploadSize << std::endl; std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl; std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl; std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
REQUIRE(response.errorCode == HttpErrorCode::Ok); REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
} }
#if defined(IXWEBSOCKET_USE_TLS)
SECTION("Connect to a remote HTTPS server") SECTION("Connect to a remote HTTPS server")
{ {
std::string url("https://httpbin.org/"); HttpClient httpClient;
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["User-Agent"] = "ixwebsocket";
HttpRequestArgs args; std::string url("https://httpbin.org/");
args.extraHeaders = headers; auto args = httpClient.createRequest(url);
args.connectTimeout = 60;
args.transferTimeout = 60; args->extraHeaders = headers;
args.followRedirects = true; args->connectTimeout = 60;
args.maxRedirects = 10; args->transferTimeout = 60;
args.verbose = true; args->followRedirects = true;
args.compress = true; args->maxRedirects = 10;
args.logger = [](const std::string& msg) args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{ {
std::cout << msg; std::cout << msg;
}; };
args.onProgressCallback = [](int current, int total) -> bool args->onProgressCallback = [](int current, int total) -> bool
{ {
std::cerr << "\r" << "Downloaded " std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total; << current << " bytes out of " << total;
return true; return true;
}; };
HttpClient httpClient; auto response = httpClient.get(url, args);
HttpResponse response = httpClient.get(url, args);
for (auto it : response.headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::cerr << "Upload size: " << response.uploadSize << std::endl; std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl; std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl; std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
REQUIRE(response.errorCode == HttpErrorCode::Ok); REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
}
SECTION("Async API, one call")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("https://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode(0);
httpClient.performRequest(args, [&requestCompleted, &statusCode]
(const HttpResponsePtr& response)
{
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
// In case of failure, print response->errorMsg
statusCode = response->statusCode;
requestCompleted = true;
}
);
int wait = 0;
while (wait < 5000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode == 200);
}
SECTION("Async API, multiple calls")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("http://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode0(0);
std::atomic<int> statusCode1(0);
std::atomic<int> statusCode2(0);
for (int i = 0; i < 3; ++i)
{
httpClient.performRequest(args, [i, &requestCompleted, &statusCode0, &statusCode1, &statusCode2]
(const HttpResponsePtr& response)
{
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
// In case of failure, print response->errorMsg
if (i == 0)
{
statusCode0 = response->statusCode;
}
else if (i == 1)
{
statusCode1 = response->statusCode;
}
else if (i == 2)
{
statusCode2 = response->statusCode;
requestCompleted = true;
}
}
);
}
int wait = 0;
while (wait < 10000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode0 == 200);
REQUIRE(statusCode1 == 200);
REQUIRE(statusCode2 == 200);
} }
#endif
} }

View File

@ -6,13 +6,13 @@
#pragma once #pragma once
#include <string>
#include <vector>
#include <sstream>
#include <iostream> #include <iostream>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketServer.h> #include <sstream>
#include <string>
#include <vector>
namespace ix namespace ix
{ {
@ -28,20 +28,20 @@ namespace ix
struct Logger struct Logger
{ {
public: public:
template <typename T> template<typename T>
Logger& operator<<(T const& obj) Logger& operator<<(T const& obj)
{ {
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
std::stringstream ss; std::stringstream ss;
ss << obj; ss << obj;
spdlog::info(ss.str()); spdlog::info(ss.str());
return *this; return *this;
} }
private: private:
static std::mutex _mutex; static std::mutex _mutex;
}; };
void log(const std::string& msg); void log(const std::string& msg);
@ -49,4 +49,4 @@ namespace ix
int getFreePort(); int getFreePort();
bool startWebSocketEchoServer(ix::WebSocketServer& server); bool startWebSocketEchoServer(ix::WebSocketServer& server);
} } // namespace ix

View File

@ -8,6 +8,7 @@
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h> #include <ixwebsocket/IXWebSocketHttpHeaders.h>
@ -114,6 +115,7 @@ namespace ix
std::string SentryClient::computePayload(const Json::Value& msg) std::string SentryClient::computePayload(const Json::Value& msg)
{ {
Json::Value payload; Json::Value payload;
payload["platform"] = "python"; payload["platform"] = "python";
payload["sdk"]["name"] = "ws"; payload["sdk"]["name"] = "ws";
payload["sdk"]["version"] = "1.0.0"; payload["sdk"]["version"] = "1.0.0";
@ -132,51 +134,77 @@ namespace ix
Json::Value extra; Json::Value extra;
extra["cobra_event"] = msg; extra["cobra_event"] = msg;
extra["cobra_event"] = msg;
exception["extra"] = extra; //
// "tags": [
// [
// "a",
// "b"
// ],
// ]
//
Json::Value tags;
Json::Value gameTag;
gameTag.append("game");
gameTag.append(msg["device"]["game"]);
tags.append(gameTag);
Json::Value userIdTag;
userIdTag.append("userid");
userIdTag.append(msg["device"]["user_id"]);
tags.append(userIdTag);
Json::Value environmentTag;
environmentTag.append("environment");
environmentTag.append(msg["device"]["environment"]);
tags.append(environmentTag);
payload["tags"] = tags;
return _jsonWriter.write(payload); return _jsonWriter.write(payload);
} }
bool SentryClient::send(const Json::Value& msg, std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg,
bool verbose) bool verbose)
{ {
HttpRequestArgs args; auto args = _httpClient.createRequest();
args.extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader(); args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args.connectTimeout = 60; args->connectTimeout = 60;
args.transferTimeout = 5 * 60; args->transferTimeout = 5 * 60;
args.followRedirects = true; args->followRedirects = true;
args.verbose = verbose; args->verbose = verbose;
args.logger = [](const std::string& msg) args->logger = [](const std::string& msg)
{ {
std::cout << msg; spdlog::info("request logger: {}", msg);
}; };
std::string body = computePayload(msg); std::string body = computePayload(msg);
HttpResponse response = _httpClient.post(_url, body, args); HttpResponsePtr response = _httpClient.post(_url, body, args);
if (verbose) if (verbose)
{ {
for (auto it : response.headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
std::cerr << "Upload size: " << response.uploadSize << std::endl; spdlog::info("Upload size: {}", response->uploadSize);
std::cerr << "Download size: " << response.downloadSize << std::endl; spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << response.statusCode << std::endl; spdlog::info("Status: {}", response->statusCode);
if (response.errorCode != HttpErrorCode::Ok) if (response->errorCode != HttpErrorCode::Ok)
{ {
std::cerr << "error message: " << response.errorMsg << std::endl; spdlog::info("error message: {}", response->errorMsg);
} }
if (response.headers["Content-Type"] != "application/octet-stream") if (response->headers["Content-Type"] != "application/octet-stream")
{ {
std::cerr << "payload: " << response.payload << std::endl; spdlog::info("payload: {}", response->payload);
} }
} }
return response.statusCode == 200; return std::make_pair(response, body);
} }
} // namespace ix } // namespace ix

View File

@ -6,6 +6,7 @@
#pragma once #pragma once
#include <algorithm>
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <regex> #include <regex>
@ -18,7 +19,7 @@ namespace ix
SentryClient(const std::string& dsn); SentryClient(const std::string& dsn);
~SentryClient() = default; ~SentryClient() = default;
bool send(const Json::Value& msg, bool verbose); std::pair<HttpResponsePtr, std::string> send(const Json::Value& msg, bool verbose);
private: private:
int64_t getTimestamp(); int64_t getTimestamp();

View File

@ -99,7 +99,7 @@ namespace ix
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind,
const Json::Value& msg) const Json::Value& msg)
{ {
// Now actually enqueue the task // Enqueue the task
{ {
// acquire lock // acquire lock
std::unique_lock<std::mutex> lock(_queue_mutex); std::unique_lock<std::mutex> lock(_queue_mutex);

View File

@ -25,7 +25,6 @@ namespace ix
~CobraMetricsThreadedPublisher(); ~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc... /// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey, void configure(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& channel, const std::string& channel,

View File

@ -10,7 +10,6 @@
#if defined(IXWEBSOCKET_USE_MBED_TLS) #if defined(IXWEBSOCKET_USE_MBED_TLS)
# include <mbedtls/md.h> # include <mbedtls/md.h>
#elif defined(__APPLE__) #elif defined(__APPLE__)
# include <ixwebsocket/IXSocketMbedTLS.h>
# include <CommonCrypto/CommonHMAC.h> # include <CommonCrypto/CommonHMAC.h>
#else #else
# include <openssl/hmac.h> # include <openssl/hmac.h>

View File

@ -80,6 +80,7 @@ int main(int argc, char** argv)
bool strict = false; bool strict = false;
bool stress = false; bool stress = false;
bool disableAutomaticReconnection = false; bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false;
int port = 8008; int port = 8008;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -110,6 +111,7 @@ int main(int argc, char** argv)
CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server"); CLI::App* connectApp = app.add_subcommand("connect", "Connect to a remote server");
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection"); connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
connectApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required(); chatApp->add_option("url", url, "Connection url")->required();
@ -241,7 +243,8 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("connect")) else if (app.got_subcommand("connect"))
{ {
ret = ix::ws_connect_main(url, disableAutomaticReconnection); ret = ix::ws_connect_main(url, disableAutomaticReconnection,
disablePerMessageDeflate);
} }
else if (app.got_subcommand("chat")) else if (app.got_subcommand("chat"))
{ {

View File

@ -30,7 +30,9 @@ namespace ix
int ws_chat_main(const std::string& url, const std::string& user); int ws_chat_main(const std::string& url, const std::string& user);
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection); int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs); int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs);

View File

@ -14,6 +14,7 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include "IXSentryClient.h" #include "IXSentryClient.h"
@ -50,6 +51,8 @@ namespace ix
&queue, verbose, &errorSending, &sentCount, &queue, verbose, &errorSending, &sentCount,
&stop, &dsn] &stop, &dsn]
{ {
SentryClient sentryClient(dsn);
while (true) while (true)
{ {
Json::Value msg; Json::Value msg;
@ -62,10 +65,13 @@ namespace ix
queue.pop(); queue.pop();
} }
SentryClient sc(dsn); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
if (!sc.send(msg, verbose)) if (response->statusCode != 200)
{ {
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true; errorSending = true;
} }
else else
@ -99,16 +105,16 @@ namespace ix
{ {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
std::cerr << "Subscriber: connected" << std::endl; spdlog::info("Subscriber connected");
for (auto it : headers) for (auto it : headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
if (eventType == ix::CobraConnection_EventType_Closed) if (eventType == ix::CobraConnection_EventType_Closed)
{ {
std::cerr << "Subscriber: closed" << std::endl; spdlog::info("Subscriber closed");
} }
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
@ -122,7 +128,7 @@ namespace ix
{ {
if (verbose) if (verbose)
{ {
std::cerr << jsonWriter.write(msg) << std::endl; spdlog::info(jsonWriter.write(msg));
} }
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
@ -132,8 +138,7 @@ namespace ix
receivedCount != 0 && receivedCount != 0 &&
(sentCount * scaleFactor < receivedCount)) (sentCount * scaleFactor < receivedCount))
{ {
std::cerr << "message dropped: sending is backlogged !" spdlog::warn("message dropped: sending is backlogged !");
<< std::endl;
condition.notify_one(); condition.notify_one();
progressCondition.notify_one(); progressCondition.notify_one();
@ -153,15 +158,15 @@ namespace ix
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)
{ {
std::cerr << "Subscriber: subscribed to channel " << subscriptionId << std::endl; spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
} }
else if (eventType == ix::CobraConnection_EventType_UnSubscribed) else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{ {
std::cerr << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl; spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
} }
else if (eventType == ix::CobraConnection_EventType_Error) else if (eventType == ix::CobraConnection_EventType_Error)
{ {
std::cerr << "Subscriber: error" << errMsg << std::endl; spdlog::error("Subscriber: error {}", errMsg);
} }
} }
); );
@ -172,18 +177,21 @@ namespace ix
std::unique_lock<std::mutex> lock(progressConditionVariableMutex); std::unique_lock<std::mutex> lock(progressConditionVariableMutex);
progressCondition.wait(lock); progressCondition.wait(lock);
std::cout << "messages" spdlog::info("messages received {} sent {}", receivedCount, sentCount);
<< " received " << receivedCount
<< " sent " << sentCount
<< std::endl;
if (strict && errorSending) break; if (strict && errorSending) break;
} }
conn.disconnect(); conn.disconnect();
// FIXME: join all the bg threads and stop them. // join all the bg threads and stop them.
stop = true;
for (int i = 0; i < jobs; i++)
{
spdlog::error("joining thread {}", i);
pool[i].join();
}
return 0; return (strict && errorSending) ? 1 : 0;
} }
} }

View File

@ -15,7 +15,8 @@ namespace ix
{ {
public: public:
WebSocketConnect(const std::string& _url, WebSocketConnect(const std::string& _url,
bool disableAutomaticReconnection); bool disableAutomaticReconnection,
bool disablePerMessageDeflate);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -26,13 +27,16 @@ namespace ix
private: private:
std::string _url; std::string _url;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
bool _disablePerMessageDeflate;
void log(const std::string& msg); void log(const std::string& msg);
}; };
WebSocketConnect::WebSocketConnect(const std::string& url, WebSocketConnect::WebSocketConnect(const std::string& url,
bool disableAutomaticReconnection) : bool disableAutomaticReconnection,
_url(url) bool disablePerMessageDeflate) :
_url(url),
_disablePerMessageDeflate(disablePerMessageDeflate)
{ {
if (disableAutomaticReconnection) if (disableAutomaticReconnection)
{ {
@ -54,9 +58,16 @@ namespace ix
{ {
_webSocket.setUrl(_url); _webSocket.setUrl(_url);
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions( if (_disablePerMessageDeflate)
true, false, false, 15, 15); {
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions); _webSocket.disablePerMessageDeflate();
}
else
{
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
_webSocket.setPerMessageDeflateOptions(webSocketPerMessageDeflateOptions);
}
std::stringstream ss; std::stringstream ss;
log(std::string("Connecting to url: ") + _url); log(std::string("Connecting to url: ") + _url);
@ -130,10 +141,14 @@ namespace ix
_webSocket.send(text); _webSocket.send(text);
} }
int ws_connect_main(const std::string& url, bool disableAutomaticReconnection) int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection,
bool disablePerMessageDeflate)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url, disableAutomaticReconnection); WebSocketConnect webSocketChat(url,
disableAutomaticReconnection,
disablePerMessageDeflate);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)

View File

@ -95,19 +95,20 @@ namespace ix
const std::string& output, const std::string& output,
bool compress) bool compress)
{ {
HttpRequestArgs args; HttpClient httpClient;
args.extraHeaders = parseHeaders(headersData); auto args = httpClient.createRequest();
args.connectTimeout = connectTimeout; args->extraHeaders = parseHeaders(headersData);
args.transferTimeout = transferTimeout; args->connectTimeout = connectTimeout;
args.followRedirects = followRedirects; args->transferTimeout = transferTimeout;
args.maxRedirects = maxRedirects; args->followRedirects = followRedirects;
args.verbose = verbose; args->maxRedirects = maxRedirects;
args.compress = compress; args->verbose = verbose;
args.logger = [](const std::string& msg) args->compress = compress;
args->logger = [](const std::string& msg)
{ {
std::cout << msg; std::cout << msg;
}; };
args.onProgressCallback = [](int current, int total) -> bool args->onProgressCallback = [](int current, int total) -> bool
{ {
std::cerr << "\r" << "Downloaded " std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total; << current << " bytes out of " << total;
@ -116,8 +117,7 @@ namespace ix
HttpParameters httpParameters = parsePostParameters(data); HttpParameters httpParameters = parsePostParameters(data);
HttpClient httpClient; HttpResponsePtr response;
HttpResponse response;
if (headersOnly) if (headersOnly)
{ {
response = httpClient.head(url, args); response = httpClient.head(url, args);
@ -133,21 +133,21 @@ namespace ix
std::cerr << std::endl; std::cerr << std::endl;
for (auto it : response.headers) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::cerr << "Upload size: " << response.uploadSize << std::endl; std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response.downloadSize << std::endl; std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response.statusCode << std::endl; std::cerr << "Status: " << response->statusCode << std::endl;
if (response.errorCode != HttpErrorCode::Ok) if (response->errorCode != HttpErrorCode::Ok)
{ {
std::cerr << "error message: " << response.errorMsg << std::endl; std::cerr << "error message: " << response->errorMsg << std::endl;
} }
if (!headersOnly && response.errorCode == HttpErrorCode::Ok) if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
{ {
if (save || !output.empty()) if (save || !output.empty())
{ {
@ -160,14 +160,14 @@ namespace ix
std::cout << "Writing to disk: " << filename << std::endl; std::cout << "Writing to disk: " << filename << std::endl;
std::ofstream out(filename); std::ofstream out(filename);
out.write((char*)&response.payload.front(), response.payload.size()); out.write((char*)&response->payload.front(), response->payload.size());
out.close(); out.close();
} }
else else
{ {
if (response.headers["Content-Type"] != "application/octet-stream") if (response->headers["Content-Type"] != "application/octet-stream")
{ {
std::cout << "payload: " << response.payload << std::endl; std::cout << "payload: " << response->payload << std::endl;
} }
else else
{ {