Compare commits

...

10 Commits

Author SHA1 Message Date
68c1bf7017 fix Dockerfile link 2019-06-05 19:38:44 -07:00
257c901255 cobra to sentry / more error handling 2019-06-05 19:37:51 -07:00
15d8c663da cobra to sentry fixes 2019-06-05 18:47:48 -07:00
d50125c62d Feature/http async (#90)
* unittest working / uses shared_ptr for a bunch of things 🗿

* fix command line tools

* fix ws + add doc

* add more logging
2019-06-05 17:04:24 -07:00
9262880369 Fix compile error with JSON uint64_t 🚯 2019-06-04 13:45:29 -07:00
2b111e8352 HttpResponse is a struct, not a tuple 🉐 2019-06-03 22:12:52 -07:00
a35cbdfb7c http / PUT fix 🐚 2019-06-03 21:12:39 -07:00
6a41b7389f http client: stop hardcoding Accept header, and use a default value if one is passed in 👭 2019-06-03 14:02:54 -07:00
a187e69650 Add simple HTTP and HTTPS client test ㊙️ 2019-06-03 12:23:35 -07:00
fcacddbd9f (http client) / Add DEL and PUT method, make requests and other utilities public 👐 2019-06-03 11:38:56 -07:00
19 changed files with 679 additions and 224 deletions

View File

@ -129,33 +129,33 @@ Here is what the HTTP client API looks like. Note that HTTP client support is ve
// Preparation // Preparation
// //
HttpClient httpClient; HttpClient httpClient;
HttpRequestArgs args; HttpRequestArgsPtr args = httpClient.createRequest();
// Custom headers can be set // Custom headers can be set
WebSocketHttpHeaders headers; WebSocketHttpHeaders headers;
headers["Foo"] = "bar"; headers["Foo"] = "bar";
args.extraHeaders = headers; args->extraHeaders = headers;
// Timeout options // Timeout options
args.connectTimeout = connectTimeout; args->connectTimeout = connectTimeout;
args.transferTimeout = transferTimeout; args->transferTimeout = transferTimeout;
// Redirect options // Redirect options
args.followRedirects = followRedirects; args->followRedirects = followRedirects;
args.maxRedirects = maxRedirects; args->maxRedirects = maxRedirects;
// Misc // Misc
args.compress = compress; // Enable gzip compression args->compress = compress; // Enable gzip compression
args.verbose = verbose; args->verbose = verbose;
args.logger = [](const std::string& msg) args->logger = [](const std::string& msg)
{ {
std::cout << msg; std::cout << msg;
}; };
// //
// Request // Synchronous Request
// //
HttpResponse out; HttpResponsePtr out;
std::string url = "https://www.google.com"; std::string url = "https://www.google.com";
// HEAD request // HEAD request
@ -175,13 +175,30 @@ out = httpClient.post(url, std::string("foo=bar"), args);
// //
// Result // Result
// //
auto statusCode = std::get<0>(out); auto errorCode = response->errorCode; // Can be HttpErrorCode::Ok, HttpErrorCode::UrlMalformed, etc...
auto errorCode = std::get<1>(out); auto errorCode = response->errorCode; // 200, 404, etc...
auto responseHeaders = std::get<2>(out); auto responseHeaders = response->headers; // All the headers in a special case-insensitive unordered_map of (string, string)
auto payload = std::get<3>(out); auto payload = response->payload; // All the bytes from the response as an std::string
auto errorMsg = std::get<4>(out); auto errorMsg = response->errorMsg; // Descriptive error message in case of failure
auto uploadSize = std::get<5>(out); auto uploadSize = response->uploadSize; // Byte count of uploaded data
auto downloadSize = std::get<6>(out); auto downloadSize = response->downloadSize; // Byte count of downloaded data
//
// Asynchronous Request
//
bool async = true;
HttpClient httpClient(async);
auto args = httpClient.createRequest(url, HttpClient::kGet);
// Push the request to a queue,
bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response)
{
// This callback execute in a background thread. Make sure you uses appropriate protection such as mutex
auto statusCode = response->statusCode; // acess results
}
);
// ok will be false if your httpClient is not async
``` ```
## Build ## Build

View File

@ -0,0 +1,23 @@
# Build time
FROM ubuntu:bionic as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install python
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make", "ws"]

View File

@ -21,24 +21,98 @@ namespace ix
const std::string HttpClient::kPost = "POST"; const std::string HttpClient::kPost = "POST";
const std::string HttpClient::kGet = "GET"; const std::string HttpClient::kGet = "GET";
const std::string HttpClient::kHead = "HEAD"; const std::string HttpClient::kHead = "HEAD";
const std::string HttpClient::kDel = "DEL";
const std::string HttpClient::kPut = "PUT";
HttpClient::HttpClient() HttpClient::HttpClient(bool async) : _async(async), _stop(false)
{ {
if (!_async) return;
_thread = std::thread(&HttpClient::run, this);
} }
HttpClient::~HttpClient() HttpClient::~HttpClient()
{ {
if (!_thread.joinable()) return;
_stop = true;
_condition.notify_one();
_thread.join();
} }
HttpResponse HttpClient::request( HttpRequestArgsPtr HttpClient::createRequest(const std::string& url,
const std::string& verb)
{
auto request = std::make_shared<HttpRequestArgs>();
request->url = url;
request->verb = verb;
return request;
}
bool HttpClient::performRequest(HttpRequestArgsPtr args,
const OnResponseCallback& onResponseCallback)
{
if (!_async) return false;
// Enqueue the task
{
// acquire lock
std::unique_lock<std::mutex> lock(_queueMutex);
// add the task
_queue.push(std::make_pair(args, onResponseCallback));
} // release lock
// wake up one thread
_condition.notify_one();
return true;
}
void HttpClient::run()
{
while (true)
{
HttpRequestArgsPtr args;
OnResponseCallback onResponseCallback;
{
std::unique_lock<std::mutex> lock(_queueMutex);
while (!_stop && _queue.empty())
{
_condition.wait(lock);
}
if (_stop) return;
auto p = _queue.front();
_queue.pop();
args = p.first;
onResponseCallback = p.second;
}
if (_stop) return;
HttpResponsePtr response = request(args->url, args->verb, args->body, args);
onResponseCallback(response);
if (_stop) return;
}
}
HttpResponsePtr HttpClient::request(
const std::string& url, const std::string& url,
const std::string& verb, const std::string& verb,
const std::string& body, const std::string& body,
const HttpRequestArgs& args, HttpRequestArgsPtr args,
int redirects) int redirects)
{ {
// We only have one socket connection, so we cannot
// make multiple requests concurrently.
std::lock_guard<std::mutex> lock(_mutex);
uint64_t uploadSize = 0; uint64_t uploadSize = 0;
uint64_t downloadSize = 0; uint64_t downloadSize = 0;
int code = 0; int code = 0;
@ -52,7 +126,7 @@ namespace ix
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot parse url: " << url; ss << "Cannot parse url: " << url;
return std::make_tuple(code, HttpErrorCode::UrlMalformed, return std::make_shared<HttpResponse>(code, HttpErrorCode::UrlMalformed,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -63,7 +137,7 @@ namespace ix
if (!_socket) if (!_socket)
{ {
return std::make_tuple(code, HttpErrorCode::CannotCreateSocket, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotCreateSocket,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -72,26 +146,36 @@ namespace ix
std::stringstream ss; std::stringstream ss;
ss << verb << " " << path << " HTTP/1.1\r\n"; ss << verb << " " << path << " HTTP/1.1\r\n";
ss << "Host: " << host << "\r\n"; ss << "Host: " << host << "\r\n";
ss << "User-Agent: ixwebsocket/1.0.0" << "\r\n";
ss << "Accept: */*" << "\r\n";
if (args.compress) if (args->compress)
{ {
ss << "Accept-Encoding: gzip" << "\r\n"; ss << "Accept-Encoding: gzip" << "\r\n";
} }
// Append extra headers // Append extra headers
for (auto&& it : args.extraHeaders) for (auto&& it : args->extraHeaders)
{ {
ss << it.first << ": " << it.second << "\r\n"; ss << it.first << ": " << it.second << "\r\n";
} }
if (verb == kPost) // Set a default Accept header if none is present
if (headers.find("Accept") == headers.end())
{
ss << "Accept: */*" << "\r\n";
}
// Set a default User agent if none is present
if (headers.find("User-Agent") == headers.end())
{
ss << "User-Agent: ixwebsocket" << "\r\n";
}
if (verb == kPost || verb == kPut)
{ {
ss << "Content-Length: " << body.size() << "\r\n"; ss << "Content-Length: " << body.size() << "\r\n";
// Set default Content-Type if unspecified // Set default Content-Type if unspecified
if (args.extraHeaders.find("Content-Type") == args.extraHeaders.end()) if (args->extraHeaders.find("Content-Type") == args->extraHeaders.end())
{ {
ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n"; ss << "Content-Type: application/x-www-form-urlencoded" << "\r\n";
} }
@ -109,23 +193,23 @@ namespace ix
// Make a cancellation object dealing with connection timeout // Make a cancellation object dealing with connection timeout
auto isCancellationRequested = auto isCancellationRequested =
makeCancellationRequestWithTimeout(args.connectTimeout, requestInitCancellation); makeCancellationRequestWithTimeout(args->connectTimeout, requestInitCancellation);
bool success = _socket->connect(host, port, errMsg, isCancellationRequested); bool success = _socket->connect(host, port, errMsg, isCancellationRequested);
if (!success) if (!success)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Cannot connect to url: " << url << " / error : " << errMsg; ss << "Cannot connect to url: " << url << " / error : " << errMsg;
return std::make_tuple(code, HttpErrorCode::CannotConnect, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotConnect,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Make a new cancellation object dealing with transfer timeout // Make a new cancellation object dealing with transfer timeout
isCancellationRequested = isCancellationRequested =
makeCancellationRequestWithTimeout(args.transferTimeout, requestInitCancellation); makeCancellationRequestWithTimeout(args->transferTimeout, requestInitCancellation);
if (args.verbose) if (args->verbose)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Sending " << verb << " request " ss << "Sending " << verb << " request "
@ -142,7 +226,7 @@ namespace ix
if (!_socket->writeBytes(req, isCancellationRequested)) if (!_socket->writeBytes(req, isCancellationRequested))
{ {
std::string errorMsg("Cannot send request"); std::string errorMsg("Cannot send request");
return std::make_tuple(code, HttpErrorCode::SendError, return std::make_shared<HttpResponse>(code, HttpErrorCode::SendError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -156,12 +240,12 @@ namespace ix
if (!lineValid) if (!lineValid)
{ {
std::string errorMsg("Cannot retrieve status line"); std::string errorMsg("Cannot retrieve status line");
return std::make_tuple(code, HttpErrorCode::CannotReadStatusLine, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadStatusLine,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (args.verbose) if (args->verbose)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Status line " << line; ss << "Status line " << line;
@ -171,7 +255,7 @@ namespace ix
if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1) if (sscanf(line.c_str(), "HTTP/1.1 %d", &code) != 1)
{ {
std::string errorMsg("Cannot parse response code from status line"); std::string errorMsg("Cannot parse response code from status line");
return std::make_tuple(code, HttpErrorCode::MissingStatus, return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingStatus,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -183,27 +267,27 @@ namespace ix
if (!headersValid) if (!headersValid)
{ {
std::string errorMsg("Cannot parse http headers"); std::string errorMsg("Cannot parse http headers");
return std::make_tuple(code, HttpErrorCode::HeaderParsingError, return std::make_shared<HttpResponse>(code, HttpErrorCode::HeaderParsingError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
// Redirect ? // Redirect ?
if ((code >= 301 && code <= 308) && args.followRedirects) if ((code >= 301 && code <= 308) && args->followRedirects)
{ {
if (headers.find("Location") == headers.end()) if (headers.find("Location") == headers.end())
{ {
std::string errorMsg("Missing location header for redirect"); std::string errorMsg("Missing location header for redirect");
return std::make_tuple(code, HttpErrorCode::MissingLocation, return std::make_shared<HttpResponse>(code, HttpErrorCode::MissingLocation,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
if (redirects >= args.maxRedirects) if (redirects >= args->maxRedirects)
{ {
std::stringstream ss; std::stringstream ss;
ss << "Too many redirects: " << redirects; ss << "Too many redirects: " << redirects;
return std::make_tuple(code, HttpErrorCode::TooManyRedirects, return std::make_shared<HttpResponse>(code, HttpErrorCode::TooManyRedirects,
headers, payload, ss.str(), headers, payload, ss.str(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -215,7 +299,7 @@ namespace ix
if (verb == "HEAD") if (verb == "HEAD")
{ {
return std::make_tuple(code, HttpErrorCode::Ok, return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(), headers, payload, std::string(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -231,12 +315,12 @@ namespace ix
payload.reserve(contentLength); payload.reserve(contentLength);
auto chunkResult = _socket->readBytes(contentLength, auto chunkResult = _socket->readBytes(contentLength,
args.onProgressCallback, args->onProgressCallback,
isCancellationRequested); isCancellationRequested);
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -254,7 +338,7 @@ namespace ix
if (!lineResult.first) if (!lineResult.first)
{ {
return std::make_tuple(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -264,7 +348,7 @@ namespace ix
ss << std::hex << line; ss << std::hex << line;
ss >> chunkSize; ss >> chunkSize;
if (args.verbose) if (args->verbose)
{ {
std::stringstream oss; std::stringstream oss;
oss << "Reading " << chunkSize << " bytes" oss << "Reading " << chunkSize << " bytes"
@ -276,12 +360,12 @@ namespace ix
// Read a chunk // Read a chunk
auto chunkResult = _socket->readBytes((size_t) chunkSize, auto chunkResult = _socket->readBytes((size_t) chunkSize,
args.onProgressCallback, args->onProgressCallback,
isCancellationRequested); isCancellationRequested);
if (!chunkResult.first) if (!chunkResult.first)
{ {
errorMsg = "Cannot read chunk"; errorMsg = "Cannot read chunk";
return std::make_tuple(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -292,7 +376,7 @@ namespace ix
if (!lineResult.first) if (!lineResult.first)
{ {
return std::make_tuple(code, HttpErrorCode::ChunkReadError, return std::make_shared<HttpResponse>(code, HttpErrorCode::ChunkReadError,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -307,7 +391,7 @@ namespace ix
else else
{ {
std::string errorMsg("Cannot read http body"); std::string errorMsg("Cannot read http body");
return std::make_tuple(code, HttpErrorCode::CannotReadBody, return std::make_shared<HttpResponse>(code, HttpErrorCode::CannotReadBody,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
@ -321,44 +405,64 @@ namespace ix
if (!gzipInflate(payload, decompressedPayload)) if (!gzipInflate(payload, decompressedPayload))
{ {
std::string errorMsg("Error decompressing payload"); std::string errorMsg("Error decompressing payload");
return std::make_tuple(code, HttpErrorCode::Gzip, return std::make_shared<HttpResponse>(code, HttpErrorCode::Gzip,
headers, payload, errorMsg, headers, payload, errorMsg,
uploadSize, downloadSize); uploadSize, downloadSize);
} }
payload = decompressedPayload; payload = decompressedPayload;
} }
return std::make_tuple(code, HttpErrorCode::Ok, return std::make_shared<HttpResponse>(code, HttpErrorCode::Ok,
headers, payload, std::string(), headers, payload, std::string(),
uploadSize, downloadSize); uploadSize, downloadSize);
} }
HttpResponse HttpClient::get(const std::string& url, HttpResponsePtr HttpClient::get(const std::string& url,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kGet, std::string(), args); return request(url, kGet, std::string(), args);
} }
HttpResponse HttpClient::head(const std::string& url, HttpResponsePtr HttpClient::head(const std::string& url,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kHead, std::string(), args); return request(url, kHead, std::string(), args);
} }
HttpResponse HttpClient::post(const std::string& url, HttpResponsePtr HttpClient::del(const std::string& url,
HttpRequestArgsPtr args)
{
return request(url, kDel, std::string(), args);
}
HttpResponsePtr HttpClient::post(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kPost, serializeHttpParameters(httpParameters), args); return request(url, kPost, serializeHttpParameters(httpParameters), args);
} }
HttpResponse HttpClient::post(const std::string& url, HttpResponsePtr HttpClient::post(const std::string& url,
const std::string& body, const std::string& body,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
return request(url, kPost, body, args); return request(url, kPost, body, args);
} }
HttpResponsePtr HttpClient::put(const std::string& url,
const HttpParameters& httpParameters,
HttpRequestArgsPtr args)
{
return request(url, kPut, serializeHttpParameters(httpParameters), args);
}
HttpResponsePtr HttpClient::put(const std::string& url,
const std::string& body,
const HttpRequestArgsPtr args)
{
return request(url, kPut, body, args);
}
std::string HttpClient::urlEncode(const std::string& value) std::string HttpClient::urlEncode(const std::string& value)
{ {
std::ostringstream escaped; std::ostringstream escaped;
@ -456,11 +560,11 @@ namespace ix
} }
void HttpClient::log(const std::string& msg, void HttpClient::log(const std::string& msg,
const HttpRequestArgs& args) HttpRequestArgsPtr args)
{ {
if (args.logger) if (args->logger)
{ {
args.logger(msg); args->logger(msg);
} }
} }
} }

View File

@ -10,11 +10,13 @@
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <condition_variable>
#include <functional> #include <functional>
#include <map> #include <map>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <tuple> #include <queue>
#include <thread>
namespace ix namespace ix
{ {
@ -34,23 +36,48 @@ namespace ix
MissingLocation = 11, MissingLocation = 11,
TooManyRedirects = 12, TooManyRedirects = 12,
ChunkReadError = 13, ChunkReadError = 13,
CannotReadBody = 14 CannotReadBody = 14,
Invalid = 100
}; };
using HttpResponse = std::tuple<int, // status struct HttpResponse
HttpErrorCode, // error code {
WebSocketHttpHeaders, int statusCode;
std::string, // payload HttpErrorCode errorCode;
std::string, // error msg WebSocketHttpHeaders headers;
uint64_t, // upload size std::string payload;
uint64_t>; // download size std::string errorMsg;
uint64_t uploadSize;
uint64_t downloadSize;
HttpResponse(int s = 0,
const HttpErrorCode& c = HttpErrorCode::Ok,
const WebSocketHttpHeaders& h = WebSocketHttpHeaders(),
const std::string& p = std::string(),
const std::string& e = std::string(),
uint64_t u = 0,
uint64_t d = 0)
: statusCode(s)
, errorCode(c)
, headers(h)
, payload(p)
, errorMsg(e)
, uploadSize(u)
, downloadSize(d)
{
;
}
};
using HttpResponsePtr = std::shared_ptr<HttpResponse>;
using HttpParameters = std::map<std::string, std::string>; using HttpParameters = std::map<std::string, std::string>;
using Logger = std::function<void(const std::string&)>; using Logger = std::function<void(const std::string&)>;
using OnResponseCallback = std::function<void(const HttpResponsePtr&)>;
struct HttpRequestArgs struct HttpRequestArgs
{ {
std::string url; std::string url;
std::string verb;
WebSocketHttpHeaders extraHeaders; WebSocketHttpHeaders extraHeaders;
std::string body; std::string body;
int connectTimeout; int connectTimeout;
@ -63,41 +90,72 @@ namespace ix
OnProgressCallback onProgressCallback; OnProgressCallback onProgressCallback;
}; };
using HttpRequestArgsPtr = std::shared_ptr<HttpRequestArgs>;
class HttpClient class HttpClient
{ {
public: public:
HttpClient(); HttpClient(bool async = false);
~HttpClient(); ~HttpClient();
HttpResponse get(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr get(const std::string& url, HttpRequestArgsPtr args);
HttpResponse head(const std::string& url, const HttpRequestArgs& args); HttpResponsePtr head(const std::string& url, HttpRequestArgsPtr args);
HttpResponsePtr del(const std::string& url, HttpRequestArgsPtr args);
HttpResponse post(const std::string& url, HttpResponsePtr post(const std::string& url,
const HttpParameters& httpParameters, const HttpParameters& httpParameters,
const HttpRequestArgs& args); HttpRequestArgsPtr args);
HttpResponse post(const std::string& url, HttpResponsePtr post(const std::string& url,
const std::string& body, const std::string& body,
const HttpRequestArgs& args); HttpRequestArgsPtr args);
private: HttpResponsePtr put(const std::string& url,
HttpResponse request(const std::string& url, const HttpParameters& httpParameters,
HttpRequestArgsPtr args);
HttpResponsePtr put(const std::string& url,
const std::string& body,
HttpRequestArgsPtr args);
HttpResponsePtr request(const std::string& url,
const std::string& verb, const std::string& verb,
const std::string& body, const std::string& body,
const HttpRequestArgs& args, HttpRequestArgsPtr args,
int redirects = 0); int redirects = 0);
// Async API
HttpRequestArgsPtr createRequest(const std::string& url = std::string(),
const std::string& verb = HttpClient::kGet);
bool performRequest(HttpRequestArgsPtr request,
const OnResponseCallback& onResponseCallback);
std::string serializeHttpParameters(const HttpParameters& httpParameters); std::string serializeHttpParameters(const HttpParameters& httpParameters);
std::string urlEncode(const std::string& value); std::string urlEncode(const std::string& value);
void log(const std::string& msg, const HttpRequestArgs& args);
bool gzipInflate(const std::string& in, std::string& out);
std::shared_ptr<Socket> _socket;
const static std::string kPost; const static std::string kPost;
const static std::string kGet; const static std::string kGet;
const static std::string kHead; const static std::string kHead;
const static std::string kDel;
const static std::string kPut;
private:
void log(const std::string& msg, HttpRequestArgsPtr args);
bool gzipInflate(const std::string& in, std::string& out);
// Async API background thread runner
void run();
// Async API
bool _async;
std::queue<std::pair<HttpRequestArgsPtr, OnResponseCallback>> _queue;
mutable std::mutex _queueMutex;
std::condition_variable _condition;
std::atomic<bool> _stop;
std::thread _thread;
std::shared_ptr<Socket> _socket;
std::mutex _mutex; // to protect accessing the _socket (only one socket per client)
}; };
} // namespace ix } // namespace ix

View File

@ -44,7 +44,7 @@ trail:
sh third_party/remote_trailing_whitespaces.sh sh third_party/remote_trailing_whitespaces.sh
format: format:
sh third_party/indent.sh find ixwebsocket ws -name '*.cpp' -o -name '*.h' -exec clang-format -i {} \;
# That target is used to start a node server, but isn't required as we have # That target is used to start a node server, but isn't required as we have
# a builtin C++ server started in the unittest now # a builtin C++ server started in the unittest now

View File

@ -37,6 +37,7 @@ set (SOURCES
IXWebSocketTestConnectionDisconnection.cpp IXWebSocketTestConnectionDisconnection.cpp
IXUrlParserTest.cpp IXUrlParserTest.cpp
IXWebSocketServerTest.cpp IXWebSocketServerTest.cpp
IXHttpClientTest.cpp
) )
# Some unittest don't work on windows yet # Some unittest don't work on windows yet

233
test/IXHttpClientTest.cpp Normal file
View File

@ -0,0 +1,233 @@
/*
* IXSocketTest.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2019 Machine Zone. All rights reserved.
*/
#include <iostream>
#include <ixwebsocket/IXHttpClient.h>
#include "catch.hpp"
using namespace ix;
TEST_CASE("http client", "[http]")
{
SECTION("Connect to a remote HTTP server")
{
HttpClient httpClient;
WebSocketHttpHeaders headers;
std::string url("http://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
auto response = httpClient.get(url, args);
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
}
SECTION("Connect to a remote HTTPS server")
{
HttpClient httpClient;
WebSocketHttpHeaders headers;
std::string url("https://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
auto response = httpClient.get(url, args);
for (auto it : response->headers)
{
std::cerr << it.first << ": " << it.second << std::endl;
}
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
REQUIRE(response->errorCode == HttpErrorCode::Ok);
REQUIRE(response->statusCode == 200);
}
SECTION("Async API, one call")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("https://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode(0);
httpClient.performRequest(args, [&requestCompleted, &statusCode]
(const HttpResponsePtr& response)
{
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
// In case of failure, print response->errorMsg
statusCode = response->statusCode;
requestCompleted = true;
}
);
int wait = 0;
while (wait < 5000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode == 200);
}
SECTION("Async API, multiple calls")
{
bool async = true;
HttpClient httpClient(async);
WebSocketHttpHeaders headers;
std::string url("http://httpbin.org/");
auto args = httpClient.createRequest(url);
args->extraHeaders = headers;
args->connectTimeout = 60;
args->transferTimeout = 60;
args->followRedirects = true;
args->maxRedirects = 10;
args->verbose = true;
args->compress = true;
args->logger = [](const std::string& msg)
{
std::cout << msg;
};
args->onProgressCallback = [](int current, int total) -> bool
{
std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total;
return true;
};
std::atomic<bool> requestCompleted(false);
std::atomic<int> statusCode0(0);
std::atomic<int> statusCode1(0);
std::atomic<int> statusCode2(0);
for (int i = 0; i < 3; ++i)
{
httpClient.performRequest(args, [i, &requestCompleted, &statusCode0, &statusCode1, &statusCode2]
(const HttpResponsePtr& response)
{
std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << response->statusCode << std::endl;
std::cerr << "Error message: " << response->errorMsg << std::endl;
// In case of failure, print response->errorMsg
if (i == 0)
{
statusCode0 = response->statusCode;
}
else if (i == 1)
{
statusCode1 = response->statusCode;
}
else if (i == 2)
{
statusCode2 = response->statusCode;
requestCompleted = true;
}
}
);
}
int wait = 0;
while (wait < 10000)
{
if (requestCompleted) break;
std::chrono::duration<double, std::milli> duration(10);
std::this_thread::sleep_for(duration);
wait += 10;
}
std::cerr << "Done" << std::endl;
REQUIRE(statusCode0 == 200);
REQUIRE(statusCode1 == 200);
REQUIRE(statusCode2 == 200);
}
}

View File

@ -8,6 +8,7 @@
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <spdlog/spdlog.h>
#include <ixwebsocket/IXWebSocketHttpHeaders.h> #include <ixwebsocket/IXWebSocketHttpHeaders.h>
@ -88,7 +89,7 @@ namespace ix
ss >> lineno; ss >> lineno;
Json::Value frame; Json::Value frame;
frame["lineno"] = lineno; frame["lineno"] = Json::UInt64(lineno);
frame["filename"] = fileName; frame["filename"] = fileName;
frame["function"] = function; frame["function"] = function;
@ -114,6 +115,7 @@ namespace ix
std::string SentryClient::computePayload(const Json::Value& msg) std::string SentryClient::computePayload(const Json::Value& msg)
{ {
Json::Value payload; Json::Value payload;
payload["platform"] = "python"; payload["platform"] = "python";
payload["sdk"]["name"] = "ws"; payload["sdk"]["name"] = "ws";
payload["sdk"]["version"] = "1.0.0"; payload["sdk"]["version"] = "1.0.0";
@ -132,59 +134,77 @@ namespace ix
Json::Value extra; Json::Value extra;
extra["cobra_event"] = msg; extra["cobra_event"] = msg;
extra["cobra_event"] = msg;
exception["extra"] = extra; //
// "tags": [
// [
// "a",
// "b"
// ],
// ]
//
Json::Value tags;
Json::Value gameTag;
gameTag.append("game");
gameTag.append(msg["device"]["game"]);
tags.append(gameTag);
Json::Value userIdTag;
userIdTag.append("userid");
userIdTag.append(msg["device"]["user_id"]);
tags.append(userIdTag);
Json::Value environmentTag;
environmentTag.append("environment");
environmentTag.append(msg["device"]["environment"]);
tags.append(environmentTag);
payload["tags"] = tags;
return _jsonWriter.write(payload); return _jsonWriter.write(payload);
} }
bool SentryClient::send(const Json::Value& msg, std::pair<HttpResponsePtr, std::string> SentryClient::send(const Json::Value& msg,
bool verbose) bool verbose)
{ {
HttpRequestArgs args; auto args = _httpClient.createRequest();
args.extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader(); args->extraHeaders["X-Sentry-Auth"] = SentryClient::computeAuthHeader();
args.connectTimeout = 60; args->connectTimeout = 60;
args.transferTimeout = 5 * 60; args->transferTimeout = 5 * 60;
args.followRedirects = true; args->followRedirects = true;
args.verbose = verbose; args->verbose = verbose;
args.logger = [](const std::string& msg) args->logger = [](const std::string& msg)
{ {
std::cout << msg; spdlog::info("request logger: {}", msg);
}; };
std::string body = computePayload(msg); std::string body = computePayload(msg);
HttpResponse out = _httpClient.post(_url, body, args); HttpResponsePtr response = _httpClient.post(_url, body, args);
auto statusCode = std::get<0>(out);
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
if (verbose) if (verbose)
{ {
for (auto it : responseHeaders) for (auto it : response->headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
std::cerr << "Upload size: " << uploadSize << std::endl; spdlog::info("Upload size: {}", response->uploadSize);
std::cerr << "Download size: " << downloadSize << std::endl; spdlog::info("Download size: {}", response->downloadSize);
std::cerr << "Status: " << statusCode << std::endl; spdlog::info("Status: {}", response->statusCode);
if (errorCode != HttpErrorCode::Ok) if (response->errorCode != HttpErrorCode::Ok)
{ {
std::cerr << "error message: " << errorMsg << std::endl; spdlog::info("error message: {}", response->errorMsg);
} }
if (responseHeaders["Content-Type"] != "application/octet-stream") if (response->headers["Content-Type"] != "application/octet-stream")
{ {
std::cerr << "payload: " << payload << std::endl; spdlog::info("payload: {}", response->payload);
} }
} }
return statusCode == 200; return std::make_pair(response, body);
} }
} // namespace ix } // namespace ix

View File

@ -9,6 +9,7 @@
#include <ixwebsocket/IXHttpClient.h> #include <ixwebsocket/IXHttpClient.h>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <regex> #include <regex>
#include <algorithm>
namespace ix namespace ix
{ {
@ -18,7 +19,7 @@ namespace ix
SentryClient(const std::string& dsn); SentryClient(const std::string& dsn);
~SentryClient() = default; ~SentryClient() = default;
bool send(const Json::Value& msg, bool verbose); std::pair<HttpResponsePtr, std::string> send(const Json::Value& msg, bool verbose);
private: private:
int64_t getTimestamp(); int64_t getTimestamp();

View File

@ -184,7 +184,7 @@ namespace ix
msg["data"] = data; msg["data"] = data;
msg["session"] = _session; msg["session"] = _session;
msg["version"] = kVersion; msg["version"] = kVersion;
msg["timestamp"] = getMillisecondsSinceEpoch(); msg["timestamp"] = Json::UInt64(getMillisecondsSinceEpoch());
{ {
std::lock_guard<std::mutex> lock(_device_mutex); std::lock_guard<std::mutex> lock(_device_mutex);

View File

@ -99,7 +99,7 @@ namespace ix
void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind, void CobraMetricsThreadedPublisher::pushMessage(MessageKind messageKind,
const Json::Value& msg) const Json::Value& msg)
{ {
// Now actually enqueue the task // Enqueue the task
{ {
// acquire lock // acquire lock
std::unique_lock<std::mutex> lock(_queue_mutex); std::unique_lock<std::mutex> lock(_queue_mutex);

View File

@ -25,7 +25,6 @@ namespace ix
~CobraMetricsThreadedPublisher(); ~CobraMetricsThreadedPublisher();
/// Configuration / set keys, etc... /// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
void configure(const std::string& appkey, void configure(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& channel, const std::string& channel,

View File

@ -10,7 +10,6 @@
#if defined(IXWEBSOCKET_USE_MBED_TLS) #if defined(IXWEBSOCKET_USE_MBED_TLS)
# include <mbedtls/md.h> # include <mbedtls/md.h>
#elif defined(__APPLE__) #elif defined(__APPLE__)
# include <ixwebsocket/IXSocketMbedTLS.h>
# include <CommonCrypto/CommonHMAC.h> # include <CommonCrypto/CommonHMAC.h>
#else #else
# include <openssl/hmac.h> # include <openssl/hmac.h>

View File

@ -14,6 +14,7 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
#include "IXSentryClient.h" #include "IXSentryClient.h"
@ -50,6 +51,8 @@ namespace ix
&queue, verbose, &errorSending, &sentCount, &queue, verbose, &errorSending, &sentCount,
&stop, &dsn] &stop, &dsn]
{ {
SentryClient sentryClient(dsn);
while (true) while (true)
{ {
Json::Value msg; Json::Value msg;
@ -62,10 +65,13 @@ namespace ix
queue.pop(); queue.pop();
} }
SentryClient sc(dsn); auto ret = sentryClient.send(msg, verbose);
HttpResponsePtr response = ret.first;
if (!sc.send(msg, verbose)) if (response->statusCode != 200)
{ {
spdlog::error("Error sending data to sentry: {}", response->statusCode);
spdlog::error("Body: {}", ret.second);
spdlog::error("Response: {}", response->payload);
errorSending = true; errorSending = true;
} }
else else
@ -99,16 +105,16 @@ namespace ix
{ {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
std::cerr << "Subscriber: connected" << std::endl; spdlog::info("Subscriber connected");
for (auto it : headers) for (auto it : headers)
{ {
std::cerr << it.first << ": " << it.second << std::endl; spdlog::info("{}: {}", it.first, it.second);
} }
} }
if (eventType == ix::CobraConnection_EventType_Closed) if (eventType == ix::CobraConnection_EventType_Closed)
{ {
std::cerr << "Subscriber: closed" << std::endl; spdlog::info("Subscriber closed");
} }
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
@ -122,7 +128,7 @@ namespace ix
{ {
if (verbose) if (verbose)
{ {
std::cerr << jsonWriter.write(msg) << std::endl; spdlog::info(jsonWriter.write(msg));
} }
// If we cannot send to sentry fast enough, drop the message // If we cannot send to sentry fast enough, drop the message
@ -132,8 +138,7 @@ namespace ix
receivedCount != 0 && receivedCount != 0 &&
(sentCount * scaleFactor < receivedCount)) (sentCount * scaleFactor < receivedCount))
{ {
std::cerr << "message dropped: sending is backlogged !" spdlog::warn("message dropped: sending is backlogged !");
<< std::endl;
condition.notify_one(); condition.notify_one();
progressCondition.notify_one(); progressCondition.notify_one();
@ -153,15 +158,15 @@ namespace ix
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)
{ {
std::cerr << "Subscriber: subscribed to channel " << subscriptionId << std::endl; spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
} }
else if (eventType == ix::CobraConnection_EventType_UnSubscribed) else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{ {
std::cerr << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl; spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
} }
else if (eventType == ix::CobraConnection_EventType_Error) else if (eventType == ix::CobraConnection_EventType_Error)
{ {
std::cerr << "Subscriber: error" << errMsg << std::endl; spdlog::error("Subscriber: error {}", errMsg);
} }
} }
); );
@ -172,18 +177,21 @@ namespace ix
std::unique_lock<std::mutex> lock(progressConditionVariableMutex); std::unique_lock<std::mutex> lock(progressConditionVariableMutex);
progressCondition.wait(lock); progressCondition.wait(lock);
std::cout << "messages" spdlog::info("messages received {} sent {}", receivedCount, sentCount);
<< " received " << receivedCount
<< " sent " << sentCount
<< std::endl;
if (strict && errorSending) break; if (strict && errorSending) break;
} }
conn.disconnect(); conn.disconnect();
// FIXME: join all the bg threads and stop them. // join all the bg threads and stop them.
stop = true;
for (int i = 0; i < jobs; i++)
{
spdlog::error("joining thread {}", i);
pool[i].join();
}
return 0; return (strict && errorSending) ? 1 : 0;
} }
} }

View File

@ -95,19 +95,20 @@ namespace ix
const std::string& output, const std::string& output,
bool compress) bool compress)
{ {
HttpRequestArgs args; HttpClient httpClient;
args.extraHeaders = parseHeaders(headersData); auto args = httpClient.createRequest();
args.connectTimeout = connectTimeout; args->extraHeaders = parseHeaders(headersData);
args.transferTimeout = transferTimeout; args->connectTimeout = connectTimeout;
args.followRedirects = followRedirects; args->transferTimeout = transferTimeout;
args.maxRedirects = maxRedirects; args->followRedirects = followRedirects;
args.verbose = verbose; args->maxRedirects = maxRedirects;
args.compress = compress; args->verbose = verbose;
args.logger = [](const std::string& msg) args->compress = compress;
args->logger = [](const std::string& msg)
{ {
std::cout << msg; std::cout << msg;
}; };
args.onProgressCallback = [](int current, int total) -> bool args->onProgressCallback = [](int current, int total) -> bool
{ {
std::cerr << "\r" << "Downloaded " std::cerr << "\r" << "Downloaded "
<< current << " bytes out of " << total; << current << " bytes out of " << total;
@ -116,46 +117,37 @@ namespace ix
HttpParameters httpParameters = parsePostParameters(data); HttpParameters httpParameters = parsePostParameters(data);
HttpClient httpClient; HttpResponsePtr response;
HttpResponse out;
if (headersOnly) if (headersOnly)
{ {
out = httpClient.head(url, args); response = httpClient.head(url, args);
} }
else if (data.empty()) else if (data.empty())
{ {
out = httpClient.get(url, args); response = httpClient.get(url, args);
} }
else else
{ {
out = httpClient.post(url, httpParameters, args); response = httpClient.post(url, httpParameters, args);
} }
std::cerr << std::endl; std::cerr << std::endl;
auto statusCode = std::get<0>(out); for (auto it : response->headers)
auto errorCode = std::get<1>(out);
auto responseHeaders = std::get<2>(out);
auto payload = std::get<3>(out);
auto errorMsg = std::get<4>(out);
auto uploadSize = std::get<5>(out);
auto downloadSize = std::get<6>(out);
for (auto it : responseHeaders)
{ {
std::cerr << it.first << ": " << it.second << std::endl; std::cerr << it.first << ": " << it.second << std::endl;
} }
std::cerr << "Upload size: " << uploadSize << std::endl; std::cerr << "Upload size: " << response->uploadSize << std::endl;
std::cerr << "Download size: " << downloadSize << std::endl; std::cerr << "Download size: " << response->downloadSize << std::endl;
std::cerr << "Status: " << statusCode << std::endl; std::cerr << "Status: " << response->statusCode << std::endl;
if (errorCode != HttpErrorCode::Ok) if (response->errorCode != HttpErrorCode::Ok)
{ {
std::cerr << "error message: " << errorMsg << std::endl; std::cerr << "error message: " << response->errorMsg << std::endl;
} }
if (!headersOnly && errorCode == HttpErrorCode::Ok) if (!headersOnly && response->errorCode == HttpErrorCode::Ok)
{ {
if (save || !output.empty()) if (save || !output.empty())
{ {
@ -168,14 +160,14 @@ namespace ix
std::cout << "Writing to disk: " << filename << std::endl; std::cout << "Writing to disk: " << filename << std::endl;
std::ofstream out(filename); std::ofstream out(filename);
out.write((char*)&payload.front(), payload.size()); out.write((char*)&response->payload.front(), response->payload.size());
out.close(); out.close();
} }
else else
{ {
if (responseHeaders["Content-Type"] != "application/octet-stream") if (response->headers["Content-Type"] != "application/octet-stream")
{ {
std::cout << "payload: " << payload << std::endl; std::cout << "payload: " << response->payload << std::endl;
} }
else else
{ {