Compare commits

..

1 Commits

Author SHA1 Message Date
967d679f17 disable Linux + mac 2019-06-23 18:33:08 -07:00
44 changed files with 263 additions and 3338 deletions

View File

@ -6,26 +6,21 @@ language: bash
matrix: matrix:
include: include:
# macOS # macOS
- os: osx # - os: osx
env: # compiler: clang
- HOMEBREW_NO_AUTO_UPDATE=1 # script:
compiler: clang # - python test/run.py
script: # - make ws
- brew install mbedtls
- python test/run.py
- make ws
# Linux # # Linux
- os: linux # - os: linux
dist: bionic # dist: xenial
before_install: # script:
- sudo apt-get install -y libmbedtls-dev # - python test/run.py
script: # - make ws
- python test/run.py # env:
- make ws # - CC=gcc
env: # - CXX=g++
- CC=gcc
- CXX=g++
# Clang + Linux disabled for now # Clang + Linux disabled for now
# - os: linux # - os: linux
@ -35,22 +30,12 @@ matrix:
# - CC=clang # - CC=clang
# - CXX=clang++ # - CXX=clang++
# Windows Windows
# - os: windows - os: windows
# env: env:
# - CMAKE_PATH="/c/Program Files/CMake/bin" - CMAKE_PATH="/c/Program Files/CMake/bin"
# script: script:
# - cd third_party/zlib - export PATH=$CMAKE_PATH:$PATH
# - cmake . # - cmake -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .
# - cmake --build . --target install # - cmake --build --parallel .
# - cd ../.. - python test/run.py
# # - cd third_party/mbedtls
# # - cmake .
# # - cmake --build . --target install
# # - cd ../..
# - export PATH=$CMAKE_PATH:$PATH
# - cd test
# - cmake .
# - cmake --build --parallel .
# - ixwebsocket_unittest.exe
# # - python test/run.py

View File

@ -1,24 +1,6 @@
# Changelog # Changelog
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [5.0.5] - 2019-08-22
- Windows: use select instead of WSAPoll, through a poll wrapper
## [5.0.4] - 2019-08-20
- Windows build fixes (there was a problem with the use of ::poll that has a different name on Windows (WSAPoll))
## [5.0.3] - 2019-08-14
- CobraMetricThreadedPublisher _enable flag is an atomic, and CobraMetricsPublisher is enabled by default
## [5.0.2] - 2019-08-01
- ws cobra_subscribe has a new -q (quiet) option
- ws cobra_subscribe knows to and display msg stats (count and # of messages received per second)
- ws cobra_subscribe, cobra_to_statsd and cobra_to_sentry commands have a new option, --filter to restrict the events they want to receive
## [5.0.1] - 2019-07-25
- ws connect command has a new option to send in binary mode (still default to text)
- ws connect command has readline history thanks to libnoise-cpp. Now ws connect one can use using arrows to lookup previous sent messages and edit them
## [5.0.0] - 2019-06-23 ## [5.0.0] - 2019-06-23
### Changed ### Changed
- New HTTP server / still very early. ws gained a new command, httpd can run a simple webserver serving local files. - New HTTP server / still very early. ws gained a new command, httpd can run a simple webserver serving local files.

View File

@ -143,7 +143,7 @@ if (UNIX)
target_link_libraries(ixwebsocket ${CMAKE_THREAD_LIBS_INIT}) target_link_libraries(ixwebsocket ${CMAKE_THREAD_LIBS_INIT})
endif() endif()
if (USE_TLS AND USE_OPEN_SSL) if (USE_OPEN_SSL)
find_package(OpenSSL REQUIRED) find_package(OpenSSL REQUIRED)
add_definitions(${OPENSSL_DEFINITIONS}) add_definitions(${OPENSSL_DEFINITIONS})
message(STATUS "OpenSSL: " ${OPENSSL_VERSION}) message(STATUS "OpenSSL: " ${OPENSSL_VERSION})
@ -151,7 +151,7 @@ if (USE_TLS AND USE_OPEN_SSL)
target_link_libraries(ixwebsocket ${OPENSSL_LIBRARIES}) target_link_libraries(ixwebsocket ${OPENSSL_LIBRARIES})
endif() endif()
if (USE_TLS AND USE_MBED_TLS) if (USE_MBED_TLS)
if (USE_VENDORED_THIRD_PARTY) if (USE_VENDORED_THIRD_PARTY)
set (ENABLE_PROGRAMS OFF) set (ENABLE_PROGRAMS OFF)
add_subdirectory(third_party/mbedtls) add_subdirectory(third_party/mbedtls)

View File

@ -1 +1 @@
5.0.4 5.0.0

105
README.md
View File

@ -16,32 +16,9 @@
The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage. The [*ws*](https://github.com/machinezone/IXWebSocket/tree/master/ws) folder countains many interactive programs for chat, [file transfers](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_send.cpp), [curl like](https://github.com/machinezone/IXWebSocket/blob/master/ws/ws_http_client.cpp) http clients, demonstrating client and server usage.
### Windows note Here is what the client API looks like.
To use the network system on Windows, you need to initialize it once with *WSAStartup()* and clean it up with *WSACleanup()*. We have helpers for that which you can use, see below. This init would typically take place in your main function.
``` ```
#include <ixwebsocket/IXNetSystem.h>
int main()
{
ix::initNetSystem();
...
ix::uninitNetSystem();
return 0;
}
```
### WebSocket client API
```
#include <ixwebsocket/IXWebSocket.h>
...
# Our websocket object
ix::WebSocket webSocket; ix::WebSocket webSocket;
std::string url("ws://localhost:8080/"); std::string url("ws://localhost:8080/");
@ -55,14 +32,14 @@ webSocket.setHeartBeatPeriod(45);
webSocket.disablePerMessageDeflate(); webSocket.disablePerMessageDeflate();
// Setup a callback to be fired when a message or an event (open, close, error) is received // Setup a callback to be fired when a message or an event (open, close, error) is received
webSocket.setOnMessageCallback([](const ix::WebSocketMessagePtr& msg) webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
{ {
if (msg->type == ix::WebSocketMessageType::Message) if (msg->type == ix::WebSocketMessageType::Message)
{ {
std::cout << msg->str << std::endl; std::cout << msg->str << std::endl;
} }
} });
);
// Now that our callback is setup, we can start our background thread and receive messages // Now that our callback is setup, we can start our background thread and receive messages
webSocket.start(); webSocket.start();
@ -79,13 +56,9 @@ webSocket.sendBinary("some serialized binary data");
webSocket.stop() webSocket.stop()
``` ```
### WebSocket server API Here is what the server API looks like. Note that server support is very recent and subject to changes.
``` ```
#include <ixwebsocket/IXWebSocketServer.h>
...
// Run a server on localhost at a given port. // Run a server on localhost at a given port.
// Bound host name, max connections and listen backlog can also be passed in as parameters. // Bound host name, max connections and listen backlog can also be passed in as parameters.
ix::WebSocketServer server(port); ix::WebSocketServer server(port);
@ -144,13 +117,9 @@ server.wait();
``` ```
### HTTP client API Here is what the HTTP client API looks like.
``` ```
#include <ixwebsocket/IXHttpClient.h>
...
// //
// Preparation // Preparation
// //
@ -201,7 +170,7 @@ out = httpClient.post(url, std::string("foo=bar"), args);
// //
// Result // Result
// //
auto statusCode = response->statusCode; // Can be HttpErrorCode::Ok, HttpErrorCode::UrlMalformed, etc... auto errorCode = response->errorCode; // Can be HttpErrorCode::Ok, HttpErrorCode::UrlMalformed, etc...
auto errorCode = response->errorCode; // 200, 404, etc... auto errorCode = response->errorCode; // 200, 404, etc...
auto responseHeaders = response->headers; // All the headers in a special case-insensitive unordered_map of (string, string) auto responseHeaders = response->headers; // All the headers in a special case-insensitive unordered_map of (string, string)
auto payload = response->payload; // All the bytes from the response as an std::string auto payload = response->payload; // All the bytes from the response as an std::string
@ -227,11 +196,9 @@ bool ok = httpClient.performRequest(args, [](const HttpResponsePtr& response)
// ok will be false if your httpClient is not async // ok will be false if your httpClient is not async
``` ```
### HTTP server API Here is what the HTTP server API looks like. Note that HTTP server support is very, very recent and subject to changes.
``` ```
#include <ixwebsocket/IXHttpServer.h>
ix::HttpServer server(port, hostname); ix::HttpServer server(port, hostname);
auto res = server.listen(); auto res = server.listen();
@ -254,7 +221,7 @@ setOnConnectionCallback(
{ {
// Build a string for the response // Build a string for the response
std::stringstream ss; std::stringstream ss;
ss << request->method ss << request->method
<< " " << " "
<< request->uri; << request->uri;
@ -269,52 +236,23 @@ setOnConnectionCallback(
## Build ## Build
### CMake
CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project. Otherwise the usual way will suffice. CMakefiles for the library and the examples are available. This library has few dependencies, so it is possible to just add the source files into your project. Otherwise the usual way will suffice.
``` ```
mkdir build # make a build dir so that you can build out of tree. mkdir build # make a build dir so that you can build out of tree.
cd build cd build
cmake -DUSE_TLS=1 .. cmake ..
make -j make -j
make install # will install to /usr/local on Unix, on macOS it is a good idea to sudo chown -R `whoami`:staff /usr/local make install # will install to /usr/local on Unix, on macOS it is a good idea to sudo chown -R `whoami`:staff /usr/local
``` ```
Headers and a static library will be installed to the target dir. Headers and a static library will be installed to the target dir.
A [conan](https://conan.io/) file is available at [conan-IXWebSocket](https://github.com/Zinnion/conan-IXWebSocket).
There is a unittest which can be executed by typing `make test`. There is a unittest which can be executed by typing `make test`.
Options for building: There is a Dockerfile for running some code on Linux. To use docker-compose you must make a docker container first.
* `-DUSE_TLS=1` will enable TLS support
* `-DUSE_MBED_TLS=1` will use [mbedlts](https://tls.mbed.org/) for the TLS support (default on Windows)
* `-DUSE_WS=1` will build the ws interactive command line tool
### vcpkg
It is possible to get IXWebSocket through Microsoft [vcpkg](https://github.com/microsoft/vcpkg).
```
vcpkg install ixwebsocket
```
### Conan
Support for building with conan was contributed by Olivia Zoe (thanks !). The package name to reference is `IXWebSocket/5.0.0@LunarWatcher/stable`. The package is in the process to be published to the official conan package repo, but in the meantime, it can be accessed by adding a new remote
```
conan remote add remote_name_here https://api.bintray.com/conan/oliviazoe0/conan-packages
```
### Docker
There is a Dockerfile for running the unittest on Linux, and to run the `ws` tool. It is also available on the docker registry.
```
docker run bsergean/ws
```
To use docker-compose you must make a docker container first.
``` ```
$ make docker $ make docker
@ -327,6 +265,12 @@ ws is a websocket tool
... ...
``` ```
Finally you can build and install the `ws command line tool` with Homebrew. The homebrew version might be slightly out of date.
```
brew tap bsergean/IXWebSocket
brew install IXWebSocket
```
## Implementation details ## Implementation details
@ -419,7 +363,8 @@ If the connection was closed and sending failed, the return value will be set to
The onMessage event will be fired when the connection is opened or closed. This is similar to the [Javascript browser API](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket), which has `open` and `close` events notification that can be registered with the browser `addEventListener`. The onMessage event will be fired when the connection is opened or closed. This is similar to the [Javascript browser API](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket), which has `open` and `close` events notification that can be registered with the browser `addEventListener`.
``` ```
webSocket.setOnMessageCallback([](const ix::WebSocketMessagePtr& msg) webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
{ {
if (msg->type == ix::WebSocketMessageType::Open) if (msg->type == ix::WebSocketMessageType::Open)
{ {
@ -450,7 +395,8 @@ webSocket.setOnMessageCallback([](const ix::WebSocketMessagePtr& msg)
A message will be fired when there is an error with the connection. The message type will be `ix::WebSocketMessageType::Error`. Multiple fields will be available on the event to describe the error. A message will be fired when there is an error with the connection. The message type will be `ix::WebSocketMessageType::Error`. Multiple fields will be available on the event to describe the error.
``` ```
webSocket.setOnMessageCallback([](const ix::WebSocketMessagePtr& msg) webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
{ {
if (msg->type == ix::WebSocketMessageType::Error) if (msg->type == ix::WebSocketMessageType::Error)
{ {
@ -484,7 +430,8 @@ websocket.configure(url);
Ping/pong messages are used to implement keep-alive. 2 message types exists to identify ping and pong messages. Note that when a ping message is received, a pong is instantly send back as requested by the WebSocket spec. Ping/pong messages are used to implement keep-alive. 2 message types exists to identify ping and pong messages. Note that when a ping message is received, a pong is instantly send back as requested by the WebSocket spec.
``` ```
webSocket.setOnMessageCallback([](const ix::WebSocketMessagePtr& msg) webSocket.setOnMessageCallback(
[](const ix::WebSocketMessagePtr& msg)
{ {
if (msg->type == ix::WebSocketMessageType::Ping || if (msg->type == ix::WebSocketMessageType::Ping ||
msg->type == ix::WebSocketMessageType::Pong) msg->type == ix::WebSocketMessageType::Pong)

View File

@ -1,23 +0,0 @@
# Build time
FROM ubuntu:disco as build
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get -y install wget
RUN mkdir -p /tmp/cmake
WORKDIR /tmp/cmake
RUN wget https://github.com/Kitware/CMake/releases/download/v3.14.0/cmake-3.14.0-Linux-x86_64.tar.gz
RUN tar zxf cmake-3.14.0-Linux-x86_64.tar.gz
RUN apt-get -y install g++
RUN apt-get -y install libssl-dev
RUN apt-get -y install libz-dev
RUN apt-get -y install make
RUN apt-get -y install python
COPY . .
ARG CMAKE_BIN_PATH=/tmp/cmake/cmake-3.14.0-Linux-x86_64/bin
ENV PATH="${CMAKE_BIN_PATH}:${PATH}"
RUN ["make", "test"]

View File

@ -9,20 +9,18 @@
#include <string.h> #include <string.h>
#include <chrono> #include <chrono>
#include <thread>
namespace ix namespace ix
{ {
const int64_t DNSLookup::kDefaultWait = 1; // ms const int64_t DNSLookup::kDefaultWait = 10; // ms
DNSLookup::DNSLookup(const std::string& hostname, int port, int64_t wait) : DNSLookup::DNSLookup(const std::string& hostname, int port, int64_t wait) :
_hostname(hostname),
_port(port), _port(port),
_wait(wait), _wait(wait),
_res(nullptr), _res(nullptr),
_done(false) _done(false)
{ {
; setHostname(hostname);
} }
struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname, struct addrinfo* DNSLookup::getAddrInfo(const std::string& hostname,
@ -50,14 +48,14 @@ namespace ix
struct addrinfo* DNSLookup::resolve(std::string& errMsg, struct addrinfo* DNSLookup::resolve(std::string& errMsg,
const CancellationRequest& isCancellationRequested, const CancellationRequest& isCancellationRequested,
bool cancellable) bool blocking)
{ {
return cancellable ? resolveCancellable(errMsg, isCancellationRequested) return blocking ? resolveBlocking(errMsg, isCancellationRequested)
: resolveUnCancellable(errMsg, isCancellationRequested); : resolveAsync(errMsg, isCancellationRequested);
} }
struct addrinfo* DNSLookup::resolveUnCancellable(std::string& errMsg, struct addrinfo* DNSLookup::resolveBlocking(std::string& errMsg,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
errMsg = "no error"; errMsg = "no error";
@ -68,11 +66,11 @@ namespace ix
return nullptr; return nullptr;
} }
return getAddrInfo(_hostname, _port, errMsg); return getAddrInfo(getHostname(), _port, errMsg);
} }
struct addrinfo* DNSLookup::resolveCancellable(std::string& errMsg, struct addrinfo* DNSLookup::resolveAsync(std::string& errMsg,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
errMsg = "no error"; errMsg = "no error";
@ -91,21 +89,20 @@ namespace ix
auto ptr = shared_from_this(); auto ptr = shared_from_this();
std::weak_ptr<DNSLookup> self(ptr); std::weak_ptr<DNSLookup> self(ptr);
int port = _port; _thread = std::thread(&DNSLookup::run, this, self, getHostname(), _port);
std::string hostname(_hostname); _thread.detach();
// We make the background thread doing the work a shared pointer std::unique_lock<std::mutex> lock(_conditionVariableMutex);
// instead of a member variable, because it can keep running when
// this object goes out of scope, in case of cancellation
auto t = std::make_shared<std::thread>(&DNSLookup::run, this, self, hostname, port);
t->detach();
while (!_done) while (!_done)
{ {
// Wait for 1 milliseconds, to see if the bg thread has terminated. // Wait for 10 milliseconds on the condition variable, to see
// We do not use a condition variable to wait, as destroying this one // if the bg thread has terminated.
// if the bg thread is alive can cause undefined behavior. if (_condition.wait_for(lock, std::chrono::milliseconds(_wait)) == std::cv_status::no_timeout)
std::this_thread::sleep_for(std::chrono::milliseconds(_wait)); {
// Background thread has terminated, so we can break of this loop
break;
}
// Were we cancelled ? // Were we cancelled ?
if (isCancellationRequested && isCancellationRequested()) if (isCancellationRequested && isCancellationRequested())
@ -126,7 +123,7 @@ namespace ix
return getRes(); return getRes();
} }
void DNSLookup::run(std::weak_ptr<DNSLookup> self, std::string hostname, int port) // thread runner void DNSLookup::run(std::weak_ptr<DNSLookup> self, const std::string& hostname, int port) // thread runner
{ {
// We don't want to read or write into members variables of an object that could be // We don't want to read or write into members variables of an object that could be
// gone, so we use temporary variables (res) or we pass in by copy everything that // gone, so we use temporary variables (res) or we pass in by copy everything that
@ -140,10 +137,23 @@ namespace ix
setRes(res); setRes(res);
setErrMsg(errMsg); setErrMsg(errMsg);
_condition.notify_one();
_done = true; _done = true;
} }
} }
void DNSLookup::setHostname(const std::string& hostname)
{
std::lock_guard<std::mutex> lock(_hostnameMutex);
_hostname = hostname;
}
const std::string& DNSLookup::getHostname()
{
std::lock_guard<std::mutex> lock(_hostnameMutex);
return _hostname;
}
void DNSLookup::setErrMsg(const std::string& errMsg) void DNSLookup::setErrMsg(const std::string& errMsg)
{ {
std::lock_guard<std::mutex> lock(_errMsgMutex); std::lock_guard<std::mutex> lock(_errMsgMutex);

View File

@ -12,10 +12,11 @@
#include "IXCancellationRequest.h" #include "IXCancellationRequest.h"
#include <atomic> #include <atomic>
#include <memory> #include <condition_variable>
#include <mutex>
#include <set> #include <set>
#include <string> #include <string>
#include <thread>
#include <memory>
struct addrinfo; struct addrinfo;
@ -29,19 +30,22 @@ namespace ix
struct addrinfo* resolve(std::string& errMsg, struct addrinfo* resolve(std::string& errMsg,
const CancellationRequest& isCancellationRequested, const CancellationRequest& isCancellationRequested,
bool cancellable = true); bool blocking = false);
private: private:
struct addrinfo* resolveCancellable(std::string& errMsg, struct addrinfo* resolveAsync(std::string& errMsg,
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
struct addrinfo* resolveUnCancellable(std::string& errMsg, struct addrinfo* resolveBlocking(std::string& errMsg,
const CancellationRequest& isCancellationRequested); const CancellationRequest& isCancellationRequested);
static struct addrinfo* getAddrInfo(const std::string& hostname, static struct addrinfo* getAddrInfo(const std::string& hostname,
int port, int port,
std::string& errMsg); std::string& errMsg);
void run(std::weak_ptr<DNSLookup> self, std::string hostname, int port); // thread runner void run(std::weak_ptr<DNSLookup> self, const std::string& hostname, int port); // thread runner
void setHostname(const std::string& hostname);
const std::string& getHostname();
void setErrMsg(const std::string& errMsg); void setErrMsg(const std::string& errMsg);
const std::string& getErrMsg(); const std::string& getErrMsg();
@ -50,9 +54,10 @@ namespace ix
struct addrinfo* getRes(); struct addrinfo* getRes();
std::string _hostname; std::string _hostname;
std::mutex _hostnameMutex;
int _port; int _port;
int64_t _wait; int64_t _wait;
const static int64_t kDefaultWait;
struct addrinfo* _res; struct addrinfo* _res;
std::mutex _resMutex; std::mutex _resMutex;
@ -61,5 +66,10 @@ namespace ix
std::mutex _errMsgMutex; std::mutex _errMsgMutex;
std::atomic<bool> _done; std::atomic<bool> _done;
std::thread _thread;
std::condition_variable _condition;
std::mutex _conditionVariableMutex;
const static int64_t kDefaultWait;
}; };
} // namespace ix } // namespace ix

View File

@ -131,7 +131,7 @@ namespace ix
return false; return false;
} }
return response->payload.empty() return response->payload.empty()
? true ? true
: socket->writeBytes(response->payload, nullptr); : socket->writeBytes(response->payload, nullptr);
} }

View File

@ -6,8 +6,8 @@
#pragma once #pragma once
#include "IXProgressCallback.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXProgressCallback.h"
#include <tuple> #include <tuple>
namespace ix namespace ix
@ -111,12 +111,10 @@ namespace ix
class Http class Http
{ {
public: public:
static std::tuple<bool, std::string, HttpRequestPtr> parseRequest( static std::tuple<bool, std::string, HttpRequestPtr> parseRequest(std::shared_ptr<Socket> socket);
std::shared_ptr<Socket> socket);
static bool sendResponse(HttpResponsePtr response, std::shared_ptr<Socket> socket); static bool sendResponse(HttpResponsePtr response, std::shared_ptr<Socket> socket);
static std::tuple<std::string, std::string, std::string> parseRequestLine( static std::tuple<std::string, std::string, std::string> parseRequestLine(const std::string& line);
const std::string& line);
static std::string trim(const std::string& str); static std::string trim(const std::string& str);
}; };
} // namespace ix }

View File

@ -14,7 +14,6 @@
#include <vector> #include <vector>
#include <cstring> #include <cstring>
#include <assert.h>
#include <zlib.h> #include <zlib.h>
namespace ix namespace ix
@ -53,8 +52,6 @@ namespace ix
bool HttpClient::performRequest(HttpRequestArgsPtr args, bool HttpClient::performRequest(HttpRequestArgsPtr args,
const OnResponseCallback& onResponseCallback) const OnResponseCallback& onResponseCallback)
{ {
assert(_async && "HttpClient needs its async parameter set to true "
"in order to call performRequest");
if (!_async) return false; if (!_async) return false;
// Enqueue the task // Enqueue the task

View File

@ -6,9 +6,9 @@
#pragma once #pragma once
#include "IXHttp.h"
#include "IXSocket.h" #include "IXSocket.h"
#include "IXWebSocketHttpHeaders.h" #include "IXWebSocketHttpHeaders.h"
#include "IXHttp.h"
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>

View File

@ -131,7 +131,7 @@ namespace ix
// Log request // Log request
std::stringstream ss; std::stringstream ss;
ss << request->method ss << request->method
<< " " << " "
<< request->uri << request->uri
<< " " << " "

View File

@ -6,9 +6,9 @@
#pragma once #pragma once
#include "IXHttp.h"
#include "IXSocketServer.h" #include "IXSocketServer.h"
#include "IXWebSocket.h" #include "IXWebSocket.h"
#include "IXHttp.h"
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@ -47,3 +47,4 @@ namespace ix
void setDefaultConnectionCallback(); void setDefaultConnectionCallback();
}; };
} // namespace ix } // namespace ix

View File

@ -15,8 +15,9 @@ namespace ix
WSADATA wsaData; WSADATA wsaData;
int err; int err;
// Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */
wVersionRequested = MAKEWORD(2, 2); wVersionRequested = MAKEWORD(2, 2);
err = WSAStartup(wVersionRequested, &wsaData); err = WSAStartup(wVersionRequested, &wsaData);
return err == 0; return err == 0;
@ -29,83 +30,10 @@ namespace ix
{ {
#ifdef _WIN32 #ifdef _WIN32
int err = WSACleanup(); int err = WSACleanup();
return err == 0; return err == 0;
#else #else
return true; return true;
#endif #endif
} }
} }
// This function should be in the global namespace
#ifdef _WIN32
//
// That function could 'return WSAPoll(pfd, nfds, timeout);'
// but WSAPoll is said to have weird behaviors on the internet
// (the curl folks have had problems with it).
//
// So we make it a select wrapper
//
int poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
int maxfd = 0;
fd_set readfds, writefds, errorfds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&errorfds);
for (nfds_t i = 0; i < nfds; ++i)
{
struct pollfd *fd = &fds[i];
if (fd->fd > maxfd)
{
maxfd = fd->fd;
}
if ((fd->events & POLLIN))
{
FD_SET(fd->fd, &readfds);
}
if ((fd->events & POLLOUT))
{
FD_SET(fd->fd, &writefds);
}
if ((fd->events & POLLERR))
{
FD_SET(fd->fd, &errorfds);
}
}
struct timeval tv;
tv.tv_sec = timeout / 1000;
tv.tv_usec = (timeout % 1000) * 1000;
int ret = select(maxfd + 1, &readfds, &writefds, &errorfds,
timeout != -1 ? &tv : NULL);
if (ret < 0)
{
return ret;
}
for (nfds_t i = 0; i < nfds; ++i)
{
struct pollfd *fd = &fds[i];
fd->revents = 0;
if (FD_ISSET(fd->fd, &readfds))
{
fd->revents |= POLLIN;
}
if (FD_ISSET(fd->fd, &writefds))
{
fd->revents |= POLLOUT;
}
if (FD_ISSET(fd->fd, &errorfds))
{
fd->revents |= POLLERR;
}
}
return ret;
}
#endif

View File

@ -12,18 +12,11 @@
#include <basetsd.h> #include <basetsd.h>
#include <io.h> #include <io.h>
#include <ws2def.h> #include <ws2def.h>
// Define our own poll on Windows
typedef unsigned long int nfds_t;
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
#else #else
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
#include <netdb.h> #include <netdb.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <poll.h>
#include <sys/select.h> #include <sys/select.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h> #include <sys/stat.h>

View File

@ -44,42 +44,41 @@ namespace ix
close(); close();
} }
PollResultType Socket::poll(bool readyToRead, PollResultType Socket::poll(int timeoutMs)
int timeoutMs,
int sockfd,
std::shared_ptr<SelectInterrupt> selectInterrupt)
{ {
// return isReadyToRead(timeoutMs);
// We used to use ::select to poll but on Android 9 we get large fds out of ::connect }
// which crash in FD_SET as they are larger than FD_SETSIZE.
// Switching to ::poll does fix that.
//
// However poll isn't as portable as select and has bugs on Windows, so we should write a
// shim to fallback to select on those platforms.
// See https://github.com/mpv-player/mpv/pull/5203/files for such a select wrapper.
//
nfds_t nfds = 1;
struct pollfd fds[2];
fds[0].fd = sockfd; PollResultType Socket::select(bool readyToRead, int timeoutMs)
fds[0].events = (readyToRead) ? POLLIN : POLLOUT; {
fds[0].events |= POLLERR; fd_set rfds;
fd_set wfds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
// File descriptor used to interrupt select when needed fd_set* fds = (readyToRead) ? &rfds : & wfds;
int interruptFd = -1; if (_sockfd != -1)
if (selectInterrupt)
{ {
interruptFd = selectInterrupt->getFd(); FD_SET(_sockfd, fds);
if (interruptFd != -1)
{
nfds = 2;
fds[1].fd = interruptFd;
fds[1].events = POLLIN;
}
} }
int ret = ::poll(fds, nfds, timeoutMs); // File descriptor used to interrupt select when needed
int interruptFd = _selectInterrupt->getFd();
if (interruptFd != -1)
{
FD_SET(interruptFd, fds);
}
struct timeval timeout;
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_usec = 1000 * (timeoutMs % 1000);
// Compute the highest fd.
int sockfd = _sockfd;
int nfds = (std::max)(sockfd, interruptFd);
int ret = ::select(nfds + 1, &rfds, &wfds, nullptr,
(timeoutMs < 0) ? nullptr : &timeout);
PollResultType pollResult = PollResultType::ReadyForRead; PollResultType pollResult = PollResultType::ReadyForRead;
if (ret < 0) if (ret < 0)
@ -90,9 +89,9 @@ namespace ix
{ {
pollResult = PollResultType::Timeout; pollResult = PollResultType::Timeout;
} }
else if (interruptFd != -1 && fds[1].revents & POLLIN) else if (interruptFd != -1 && FD_ISSET(interruptFd, &rfds))
{ {
uint64_t value = selectInterrupt->read(); uint64_t value = _selectInterrupt->read();
if (value == kSendRequest) if (value == kSendRequest)
{ {
@ -103,36 +102,13 @@ namespace ix
pollResult = PollResultType::CloseRequest; pollResult = PollResultType::CloseRequest;
} }
} }
else if (sockfd != -1 && readyToRead && fds[0].revents & POLLIN) else if (sockfd != -1 && readyToRead && FD_ISSET(sockfd, &rfds))
{ {
pollResult = PollResultType::ReadyForRead; pollResult = PollResultType::ReadyForRead;
} }
else if (sockfd != -1 && !readyToRead && fds[0].revents & POLLOUT) else if (sockfd != -1 && !readyToRead && FD_ISSET(sockfd, &wfds))
{ {
pollResult = PollResultType::ReadyForWrite; pollResult = PollResultType::ReadyForWrite;
#ifdef _WIN32
// On connect error, in async mode, windows will write to the exceptions fds
if (fds[0].revents & POLLERR)
{
pollResult = PollResultType::Error;
}
#else
int optval = -1;
socklen_t optlen = sizeof(optval);
// getsockopt() puts the errno value for connect into optval so 0
// means no-error.
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 ||
optval != 0)
{
pollResult = PollResultType::Error;
// set errno to optval so that external callers can have an
// appropriate error description when calling strerror
errno = optval;
}
#endif
} }
return pollResult; return pollResult;
@ -146,7 +122,7 @@ namespace ix
} }
bool readyToRead = true; bool readyToRead = true;
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); return select(readyToRead, timeoutMs);
} }
PollResultType Socket::isReadyToWrite(int timeoutMs) PollResultType Socket::isReadyToWrite(int timeoutMs)
@ -157,7 +133,7 @@ namespace ix
} }
bool readyToRead = false; bool readyToRead = false;
return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); return select(readyToRead, timeoutMs);
} }
// Wake up from poll/select by writing to the pipe which is watched by select // Wake up from poll/select by writing to the pipe which is watched by select
@ -256,14 +232,14 @@ namespace ix
bool Socket::writeBytes(const std::string& str, bool Socket::writeBytes(const std::string& str,
const CancellationRequest& isCancellationRequested) const CancellationRequest& isCancellationRequested)
{ {
int offset = 0; char* buffer = const_cast<char*>(str.c_str());
int len = (int) str.size(); int len = (int) str.size();
while (true) while (true)
{ {
if (isCancellationRequested && isCancellationRequested()) return false; if (isCancellationRequested && isCancellationRequested()) return false;
ssize_t ret = send((char*)&str[offset], len); ssize_t ret = send(buffer, len);
// We wrote some bytes, as needed, all good. // We wrote some bytes, as needed, all good.
if (ret > 0) if (ret > 0)
@ -274,8 +250,8 @@ namespace ix
} }
else else
{ {
offset += ret; buffer += ret;
len -= ret; len -= ret;
continue; continue;
} }
} }

View File

@ -88,12 +88,6 @@ namespace ix
static bool isWaitNeeded(); static bool isWaitNeeded();
static void closeSocket(int fd); static void closeSocket(int fd);
static PollResultType poll(bool readyToRead,
int timeoutMs,
int sockfd,
std::shared_ptr<SelectInterrupt> selectInterrupt = nullptr);
// Used as special codes for pipe communication // Used as special codes for pipe communication
static const uint64_t kSendRequest; static const uint64_t kSendRequest;
static const uint64_t kCloseRequest; static const uint64_t kCloseRequest;
@ -103,6 +97,8 @@ namespace ix
std::mutex _socketMutex; std::mutex _socketMutex;
private: private:
PollResultType select(bool readyToRead, int timeoutMs);
static const int kDefaultPollTimeout; static const int kDefaultPollTimeout;
static const int kDefaultPollNoTimeout; static const int kDefaultPollNoTimeout;

View File

@ -63,31 +63,55 @@ namespace ix
return -1; return -1;
} }
int timeoutMs = 10; // On Linux the timeout needs to be re-initialized everytime
bool readyToRead = false; // http://man7.org/linux/man-pages/man2/select.2.html
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, fd); struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms timeout
if (pollResult == PollResultType::Timeout) fd_set wfds;
{ fd_set efds;
continue;
} FD_ZERO(&wfds);
else if (pollResult == PollResultType::Error) FD_SET(fd, &wfds);
FD_ZERO(&efds);
FD_SET(fd, &efds);
// Use select to check the status of the new connection
res = select(fd + 1, nullptr, &wfds, &efds, &timeout);
if (res < 0 && (Socket::getErrno() == EBADF || Socket::getErrno() == EINVAL))
{ {
Socket::closeSocket(fd); Socket::closeSocket(fd);
errMsg = std::string("Connect error: ") + errMsg = std::string("Connect error, select error: ") + strerror(Socket::getErrno());
strerror(Socket::getErrno());
return -1; return -1;
} }
else if (pollResult == PollResultType::ReadyForWrite)
// Nothing was written to the socket, wait again.
if (!FD_ISSET(fd, &wfds)) continue;
// Something was written to the socket. Check for errors.
int optval = -1;
socklen_t optlen = sizeof(optval);
#ifdef _WIN32
// On connect error, in async mode, windows will write to the exceptions fds
if (FD_ISSET(fd, &efds))
#else
// getsockopt() puts the errno value for connect into optval so 0
// means no-error.
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 ||
optval != 0)
#endif
{ {
return fd; Socket::closeSocket(fd);
errMsg = strerror(optval);
return -1;
} }
else else
{ {
Socket::closeSocket(fd); // Success !
errMsg = std::string("Connect error: ") + return fd;
strerror(Socket::getErrno());
return -1;
} }
} }

View File

@ -213,12 +213,17 @@ namespace ix
{ {
if (_stop) return; if (_stop) return;
// Use poll to check whether a new connection is in progress // Use select to check whether a new connection is in progress
int timeoutMs = 10; fd_set rfds;
bool readyToRead = true; struct timeval timeout;
PollResultType pollResult = Socket::poll(readyToRead, timeoutMs, _serverFd); timeout.tv_sec = 0;
timeout.tv_usec = 10 * 1000; // 10ms timeout
if (pollResult == PollResultType::Error) FD_ZERO(&rfds);
FD_SET(_serverFd, &rfds);
if (select(_serverFd + 1, &rfds, nullptr, nullptr, &timeout) < 0 &&
(errno == EBADF || errno == EINVAL))
{ {
std::stringstream ss; std::stringstream ss;
ss << "SocketServer::run() error in select: " ss << "SocketServer::run() error in select: "
@ -227,8 +232,9 @@ namespace ix
continue; continue;
} }
if (pollResult != PollResultType::ReadyForRead) if (!FD_ISSET(_serverFd, &rfds))
{ {
// We reached the select timeout, and no new connections are pending
continue; continue;
} }

View File

@ -330,7 +330,7 @@ namespace ix
} }
// poll the socket // poll the socket
PollResultType pollResult = _socket->isReadyToRead(lastingTimeoutDelayInMs); PollResultType pollResult = _socket->poll(lastingTimeoutDelayInMs);
// Make sure we send all the buffered data // Make sure we send all the buffered data
// there can be a lot of it for large messages. // there can be a lot of it for large messages.
@ -542,7 +542,7 @@ namespace ix
) { ) {
unmaskReceiveBuffer(ws); unmaskReceiveBuffer(ws);
MessageKind messageKind = MessageKind messageKind =
(ws.opcode == wsheader_type::TEXT_FRAME) (ws.opcode == wsheader_type::TEXT_FRAME)
? MessageKind::MSG_TEXT ? MessageKind::MSG_TEXT
: MessageKind::MSG_BINARY; : MessageKind::MSG_BINARY;
@ -755,7 +755,7 @@ namespace ix
{ {
if (_readyState != ReadyState::OPEN && _readyState != ReadyState::CLOSING) if (_readyState != ReadyState::OPEN && _readyState != ReadyState::CLOSING)
{ {
return WebSocketSendInfo(false); return WebSocketSendInfo();
} }
size_t payloadSize = message.size(); size_t payloadSize = message.size();

View File

@ -12,14 +12,11 @@ brew:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 .. ; make -j install)
ws: ws:
mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 .. ; make -j) mkdir -p build && (cd build ; cmake -DCMAKE_BUILD_TYPE=Debug -DUSE_TLS=1 -DUSE_WS=1 -DUSE_MBED_TLS=1 -DUSE_VENDORED_THIRD_PARTY=1 .. ; make -j)
uninstall: uninstall:
xargs rm -fv < build/install_manifest.txt xargs rm -fv < build/install_manifest.txt
tag:
git tag v"`cat DOCKER_VERSION`"
.PHONY: docker .PHONY: docker
NAME := bsergean/ws NAME := bsergean/ws

View File

@ -6,9 +6,9 @@
#pragma once #pragma once
#include "IXGetFreePort.h"
#include <iostream> #include <iostream>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include "IXGetFreePort.h"
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>

View File

@ -34,8 +34,6 @@ namespace
const std::string& getCloseReason(); const std::string& getCloseReason();
bool getCloseRemote(); bool getCloseRemote();
bool hasConnectionError() const;
private: private:
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
int _port; int _port;
@ -44,7 +42,6 @@ namespace
uint16_t _closeCode; uint16_t _closeCode;
std::string _closeReason; std::string _closeReason;
bool _closeRemote; bool _closeRemote;
std::atomic<bool> _connectionError;
}; };
WebSocketClient::WebSocketClient(int port) WebSocketClient::WebSocketClient(int port)
@ -52,16 +49,10 @@ namespace
, _closeCode(0) , _closeCode(0)
, _closeReason(std::string("")) , _closeReason(std::string(""))
, _closeRemote(false) , _closeRemote(false)
, _connectionError(false)
{ {
; ;
} }
bool WebSocketClient::hasConnectionError() const
{
return _connectionError;
}
bool WebSocketClient::isReady() const bool WebSocketClient::isReady() const
{ {
return _webSocket.getReadyState() == ix::ReadyState::Open; return _webSocket.getReadyState() == ix::ReadyState::Open;
@ -142,7 +133,6 @@ namespace
} }
else if (msg->type == ix::WebSocketMessageType::Error) else if (msg->type == ix::WebSocketMessageType::Error)
{ {
_connectionError = true;
ss << "Error ! " << msg->errorInfo.reason; ss << "Error ! " << msg->errorInfo.reason;
log(ss.str()); log(ss.str());
} }
@ -258,7 +248,6 @@ TEST_CASE("Websocket_client_close_default", "[close]")
// Wait for all chat instance to be ready // Wait for all chat instance to be ready
while (true) while (true)
{ {
REQUIRE(!webSocketClient.hasConnectionError());
if (webSocketClient.isReady()) break; if (webSocketClient.isReady()) break;
ix::msleep(10); ix::msleep(10);
} }

View File

@ -111,6 +111,7 @@ def runCMake(sanitizer, buildDir):
-B"{buildDir}" \ -B"{buildDir}" \
-DCMAKE_BUILD_TYPE={CMAKE_BUILD_TYPE} \ -DCMAKE_BUILD_TYPE={CMAKE_BUILD_TYPE} \
-DUSE_TLS=1 \ -DUSE_TLS=1 \
-DUSE_MBED_TLS=1 \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \ -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DUSE_VENDORED_THIRD_PARTY={USE_VENDORED_THIRD_PARTY} \ -DUSE_VENDORED_THIRD_PARTY={USE_VENDORED_THIRD_PARTY} \
-G{generator}' -G{generator}'

View File

@ -1,44 +0,0 @@
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app
# Others
*.dSYM
*.swp
Debug
Release
*.suo
*.sdf
*.user
xcuserdata
*.xcworkspace
Makefile
CMakeFiles
CMakeCache.txt
*.cmake
history.txt

View File

@ -1,22 +0,0 @@
Copyright (c) 2015 yhirose
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,95 +0,0 @@
cpp-linenoise
=============
Multi-platfrom (Unix, Windows) C++ header-only linenoise-based readline library.
This library gathered code from following excellent libraries, clean it up, and put it into a C++ header file for convenience.
* `linenoise.h` and `linenose.c` ([antirez/linenoise](https://github.com/antirez/linenoise))
* `ANSI.c` ([adoxa/ansicon](https://github.com/adoxa/ansicon))
* `Win32_ANSI.h` and `Win32_ANSI.c` ([MSOpenTech/redis](https://github.com/MSOpenTech/redis))
The licenses for the libraries are included in `linenoise.hpp`.
Usage
-----
```c++
#include "linenoise.hpp"
...
const auto path = "history.txt";
// Setup completion words every time when a user types
linenoise::SetCompletionCallback([](const char* editBuffer, std::vector<std::string>& completions) {
if (editBuffer[0] == 'h') {
completions.push_back("hello");
completions.push_back("hello there");
}
});
// Enable the multi-line mode
linenoise::SetMultiLine(true);
// Set max length of the history
linenoise::SetHistoryMaxLen(4);
// Load history
linenoise::LoadHistory(path);
while (true) {
// Read line
std::string line;
auto quit = linenoise::Readline("hello> ", line);
if (quit) {
break;
}
cout << "echo: '" << line << "'" << endl;
// Add text to history
linenoise::AddHistory(line.c_str());
}
// Save history
linenoise::SaveHistory(path);
```
API
---
```c++
namespace linenoise;
std::string Readline(const char* prompt);
void SetMultiLine(bool multiLineMode);
typedef std::function<void (const char* editBuffer, std::vector<std::string>& completions)> CompletionCallback;
void SetCompletionCallback(CompletionCallback fn);
bool SetHistoryMaxLen(size_t len);
bool LoadHistory(const char* path);
bool SaveHistory(const char* path);
bool AddHistory(const char* line);
const std::vector<std::string>& GetHistory();
```
Tested compilers
----------------
* Visual Studio 2015
* Clang 3.5
* GCC 6.3.1
License
-------
BSD license (© 2015 Yuji Hirose)

View File

@ -1,5 +0,0 @@
cmake_minimum_required(VERSION 3.0)
include_directories(.)
add_definitions("-std=c++1y")
add_executable(example example.cpp)

View File

@ -1,54 +0,0 @@
#include <iostream>
#include "../linenoise.hpp"
using namespace std;
int main(int argc, const char** argv)
{
const auto path = "history.txt";
// Enable the multi-line mode
linenoise::SetMultiLine(true);
// Set max length of the history
linenoise::SetHistoryMaxLen(4);
// Setup completion words every time when a user types
linenoise::SetCompletionCallback([](const char* editBuffer, std::vector<std::string>& completions) {
if (editBuffer[0] == 'h') {
#ifdef _WIN32
completions.push_back("hello こんにちは");
completions.push_back("hello こんにちは there");
#else
completions.push_back("hello");
completions.push_back("hello there");
#endif
}
});
// Load history
linenoise::LoadHistory(path);
while (true) {
std::string line;
#ifdef _WIN32
auto quit = linenoise::Readline("hello> ", line);
#else
auto quit = linenoise::Readline("\033[32mこんにちは\x1b[0m> ", line);
#endif
if (quit) {
break;
}
cout << "echo: '" << line << "'" << endl;
// Add line to history
linenoise::AddHistory(line.c_str());
// Save history
linenoise::SaveHistory(path);
}
return 0;
}

View File

@ -1,28 +0,0 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.23107.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "example", "example.vcxproj", "{563BF990-4217-439F-92A4-F8A285052772}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{563BF990-4217-439F-92A4-F8A285052772}.Debug|x64.ActiveCfg = Debug|x64
{563BF990-4217-439F-92A4-F8A285052772}.Debug|x64.Build.0 = Debug|x64
{563BF990-4217-439F-92A4-F8A285052772}.Debug|x86.ActiveCfg = Debug|Win32
{563BF990-4217-439F-92A4-F8A285052772}.Debug|x86.Build.0 = Debug|Win32
{563BF990-4217-439F-92A4-F8A285052772}.Release|x64.ActiveCfg = Release|x64
{563BF990-4217-439F-92A4-F8A285052772}.Release|x64.Build.0 = Release|x64
{563BF990-4217-439F-92A4-F8A285052772}.Release|x86.ActiveCfg = Release|Win32
{563BF990-4217-439F-92A4-F8A285052772}.Release|x86.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal

View File

@ -1,153 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{563BF990-4217-439F-92A4-F8A285052772}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>example</RootNamespace>
<WindowsTargetPlatformVersion>8.1</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v140</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v140</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v140</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v140</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="Shared">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<LinkIncremental>true</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<PrecompiledHeader>
</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<PrecompiledHeader>
</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PrecompiledHeader>
</PrecompiledHeader>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PrecompiledHeader>
</PrecompiledHeader>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="example.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\linenoise.hpp" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,6 @@ include_directories(ws ..)
include_directories(ws ../third_party) include_directories(ws ../third_party)
include_directories(ws ../third_party/statsd-client-cpp/src) include_directories(ws ../third_party/statsd-client-cpp/src)
include_directories(ws ../third_party/spdlog/include) include_directories(ws ../third_party/spdlog/include)
include_directories(ws ../third_party/cpp-linenoise)
include_directories(ws snake) include_directories(ws snake)
if (UNIX) if (UNIX)

View File

@ -429,18 +429,12 @@ namespace ix
} }
void CobraConnection::subscribe(const std::string& channel, void CobraConnection::subscribe(const std::string& channel,
const std::string& filter, SubscriptionCallback cb)
SubscriptionCallback cb)
{ {
// Create and send a subscribe pdu // Create and send a subscribe pdu
Json::Value body; Json::Value body;
body["channel"] = channel; body["channel"] = channel;
if (!filter.empty())
{
body["filter"] = filter;
}
Json::Value pdu; Json::Value pdu;
pdu["action"] = "rtm/subscribe"; pdu["action"] = "rtm/subscribe";
pdu["body"] = body; pdu["body"] = body;

View File

@ -75,9 +75,7 @@ namespace ix
// Subscribe to a channel, and execute a callback when an incoming // Subscribe to a channel, and execute a callback when an incoming
// message arrives. // message arrives.
void subscribe(const std::string& channel, void subscribe(const std::string& channel, SubscriptionCallback cb);
const std::string& filter = std::string(),
SubscriptionCallback cb = nullptr);
/// Unsubscribe from a channel /// Unsubscribe from a channel
void unsubscribe(const std::string& channel); void unsubscribe(const std::string& channel);

View File

@ -17,7 +17,7 @@ namespace ix
const std::string CobraMetricsPublisher::kSetBlacklistId = "sms_set_blacklist_id"; const std::string CobraMetricsPublisher::kSetBlacklistId = "sms_set_blacklist_id";
CobraMetricsPublisher::CobraMetricsPublisher() : CobraMetricsPublisher::CobraMetricsPublisher() :
_enabled(true) _enabled(false)
{ {
} }

View File

@ -7,7 +7,6 @@
#pragma once #pragma once
#include "IXCobraMetricsThreadedPublisher.h" #include "IXCobraMetricsThreadedPublisher.h"
#include <atomic>
#include <chrono> #include <chrono>
#include <jsoncpp/json/json.h> #include <jsoncpp/json/json.h>
#include <string> #include <string>
@ -133,8 +132,8 @@ namespace ix
CobraMetricsThreadedPublisher _cobra_metrics_theaded_publisher; CobraMetricsThreadedPublisher _cobra_metrics_theaded_publisher;
/// A boolean to enable or disable this system /// A boolean to enable or disable this system
/// push becomes a no-op when _enabled is false /// push becomes a no-op when _enabled is true
std::atomic<bool> _enabled; bool _enabled;
/// A uuid used to uniquely identify a session /// A uuid used to uniquely identify a session
std::string _session; std::string _session;

View File

@ -9,10 +9,15 @@
// //
#include "ws.h" #include "ws.h"
//
// Main drive for websocket utilities
//
#include <string> #include <string>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
// #include <unistd.h>
#include <cli11/CLI11.hpp> #include <cli11/CLI11.hpp>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
@ -55,7 +60,6 @@ int main(int argc, char** argv)
std::string hostname("127.0.0.1"); std::string hostname("127.0.0.1");
std::string pidfile; std::string pidfile;
std::string channel; std::string channel;
std::string filter;
std::string message; std::string message;
std::string password; std::string password;
std::string appkey; std::string appkey;
@ -72,14 +76,12 @@ int main(int argc, char** argv)
bool followRedirects = false; bool followRedirects = false;
bool verbose = false; bool verbose = false;
bool save = false; bool save = false;
bool quiet = false;
bool compress = false; bool compress = false;
bool strict = false; bool strict = false;
bool stress = false; bool stress = false;
bool disableAutomaticReconnection = false; bool disableAutomaticReconnection = false;
bool disablePerMessageDeflate = false; bool disablePerMessageDeflate = false;
bool greetings = false; bool greetings = false;
bool binaryMode = false;
int port = 8008; int port = 8008;
int redisPort = 6379; int redisPort = 6379;
int statsdPort = 8125; int statsdPort = 8125;
@ -111,7 +113,6 @@ int main(int argc, char** argv)
connectApp->add_option("url", url, "Connection url")->required(); connectApp->add_option("url", url, "Connection url")->required();
connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection"); connectApp->add_flag("-d", disableAutomaticReconnection, "Disable Automatic Reconnection");
connectApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate"); connectApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
connectApp->add_flag("-b", binaryMode, "Send in binary mode");
CLI::App* chatApp = app.add_subcommand("chat", "Group chat"); CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
chatApp->add_option("url", url, "Connection url")->required(); chatApp->add_option("url", url, "Connection url")->required();
@ -167,8 +168,6 @@ int main(int argc, char** argv)
cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret"); cobraSubscribeApp->add_option("--rolesecret", rolesecret, "Role secret");
cobraSubscribeApp->add_option("channel", channel, "Channel")->required(); cobraSubscribeApp->add_option("channel", channel, "Channel")->required();
cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file"); cobraSubscribeApp->add_option("--pidfile", pidfile, "Pid file");
cobraSubscribeApp->add_option("--filter", filter, "Stream SQL Filter");
cobraSubscribeApp->add_flag("-q", quiet, "Quiet / only display stats");
CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher"); CLI::App* cobraPublish = app.add_subcommand("cobra_publish", "Cobra publisher");
cobraPublish->add_option("--appkey", appkey, "Appkey"); cobraPublish->add_option("--appkey", appkey, "Appkey");
@ -193,7 +192,6 @@ int main(int argc, char** argv)
cobra2statsd->add_option("channel", channel, "Channel")->required(); cobra2statsd->add_option("channel", channel, "Channel")->required();
cobra2statsd->add_flag("-v", verbose, "Verbose"); cobra2statsd->add_flag("-v", verbose, "Verbose");
cobra2statsd->add_option("--pidfile", pidfile, "Pid file"); cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
cobra2statsd->add_option("--filter", filter, "Stream SQL Filter");
CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra to sentry"); CLI::App* cobra2sentry = app.add_subcommand("cobra_to_sentry", "Cobra to sentry");
cobra2sentry->add_option("--appkey", appkey, "Appkey"); cobra2sentry->add_option("--appkey", appkey, "Appkey");
@ -206,7 +204,6 @@ int main(int argc, char** argv)
cobra2sentry->add_flag("-v", verbose, "Verbose"); cobra2sentry->add_flag("-v", verbose, "Verbose");
cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails"); cobra2sentry->add_flag("-s", strict, "Strict mode. Error out when sending to sentry fails");
cobra2sentry->add_option("--pidfile", pidfile, "Pid file"); cobra2sentry->add_option("--pidfile", pidfile, "Pid file");
cobra2sentry->add_option("--filter", filter, "Stream SQL Filter");
CLI::App* runApp = app.add_subcommand("snake", "Snake server"); CLI::App* runApp = app.add_subcommand("snake", "Snake server");
runApp->add_option("--port", port, "Connection url"); runApp->add_option("--port", port, "Connection url");
@ -253,7 +250,7 @@ int main(int argc, char** argv)
else if (app.got_subcommand("connect")) else if (app.got_subcommand("connect"))
{ {
ret = ix::ws_connect_main(url, disableAutomaticReconnection, ret = ix::ws_connect_main(url, disableAutomaticReconnection,
disablePerMessageDeflate, binaryMode); disablePerMessageDeflate);
} }
else if (app.got_subcommand("chat")) else if (app.got_subcommand("chat"))
{ {
@ -291,7 +288,7 @@ int main(int argc, char** argv)
{ {
ret = ix::ws_cobra_subscribe_main(appkey, endpoint, ret = ix::ws_cobra_subscribe_main(appkey, endpoint,
rolename, rolesecret, rolename, rolesecret,
channel, filter, quiet); channel);
} }
else if (app.got_subcommand("cobra_publish")) else if (app.got_subcommand("cobra_publish"))
{ {
@ -303,14 +300,14 @@ int main(int argc, char** argv)
{ {
ret = ix::ws_cobra_to_statsd_main(appkey, endpoint, ret = ix::ws_cobra_to_statsd_main(appkey, endpoint,
rolename, rolesecret, rolename, rolesecret,
channel, filter, hostname, statsdPort, channel, hostname, statsdPort,
prefix, fields, verbose); prefix, fields, verbose);
} }
else if (app.got_subcommand("cobra_to_sentry")) else if (app.got_subcommand("cobra_to_sentry"))
{ {
ret = ix::ws_cobra_to_sentry_main(appkey, endpoint, ret = ix::ws_cobra_to_sentry_main(appkey, endpoint,
rolename, rolesecret, rolename, rolesecret,
channel, filter, dsn, channel, dsn,
verbose, strict, jobs); verbose, strict, jobs);
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))

View File

@ -32,8 +32,7 @@ namespace ix
int ws_connect_main(const std::string& url, int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection, bool disableAutomaticReconnection,
bool disablePerMessageDeflate, bool disablePerMessageDeflate);
bool binaryMode);
int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs); int ws_receive_main(const std::string& url, bool enablePerMessageDeflate, int delayMs);
@ -56,9 +55,7 @@ namespace ix
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel);
const std::string& filter,
bool quiet);
int ws_cobra_publish_main(const std::string& appkey, int ws_cobra_publish_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
@ -73,7 +70,6 @@ namespace ix
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& filter,
const std::string& host, const std::string& host,
int port, int port,
const std::string& prefix, const std::string& prefix,
@ -85,7 +81,6 @@ namespace ix
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& filter,
const std::string& dsn, const std::string& dsn,
bool verbose, bool verbose,
bool strict, bool strict,

View File

@ -11,17 +11,13 @@
#include <atomic> #include <atomic>
#include <ixcobra/IXCobraConnection.h> #include <ixcobra/IXCobraConnection.h>
#include <spdlog/spdlog.h>
namespace ix namespace ix
{ {
int ws_cobra_subscribe_main(const std::string& appkey, int ws_cobra_subscribe_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel)
const std::string& filter,
bool quiet)
{ {
ix::CobraConnection conn; ix::CobraConnection conn;
@ -32,28 +28,8 @@ namespace ix
Json::FastWriter jsonWriter; Json::FastWriter jsonWriter;
// Display incoming messages
std::atomic<int> msgPerSeconds(0);
std::atomic<int> msgCount(0);
auto timer = [&msgPerSeconds, &msgCount]
{
while (true)
{
std::cout << "#messages " << msgCount << " "
<< "msg/s " << msgPerSeconds
<< std::endl;
msgPerSeconds = 0;
auto duration = std::chrono::seconds(1);
std::this_thread::sleep_for(duration);
}
};
std::thread t(timer);
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &jsonWriter, &filter, &msgCount, &msgPerSeconds, &quiet] [&conn, &channel, &jsonWriter]
(ix::CobraConnectionEventType eventType, (ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -61,40 +37,33 @@ namespace ix
{ {
if (eventType == ix::CobraConnection_EventType_Open) if (eventType == ix::CobraConnection_EventType_Open)
{ {
spdlog::info("Subscriber connected"); std::cout << "Subscriber: connected" << std::endl;
for (auto it : headers) for (auto it : headers)
{ {
spdlog::info("{}: {}", it.first, it.second); std::cerr << it.first << ": " << it.second << std::endl;
} }
} }
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
spdlog::info("Subscriber authenticated"); std::cout << "Subscriber authenticated" << std::endl;
conn.subscribe(channel, filter, conn.subscribe(channel,
[&jsonWriter, &quiet, [&jsonWriter](const Json::Value& msg)
&msgPerSeconds, &msgCount](const Json::Value& msg)
{ {
if (!quiet) std::cout << jsonWriter.write(msg) << std::endl;
{
std::cout << jsonWriter.write(msg) << std::endl;
}
msgPerSeconds++;
msgCount++;
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)
{ {
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); std::cout << "Subscriber: subscribed to channel " << subscriptionId << std::endl;
} }
else if (eventType == ix::CobraConnection_EventType_UnSubscribed) else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{ {
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); std::cout << "Subscriber: unsubscribed from channel " << subscriptionId << std::endl;
} }
else if (eventType == ix::CobraConnection_EventType_Error) else if (eventType == ix::CobraConnection_EventType_Error)
{ {
spdlog::error("Subscriber: error {}", errMsg); std::cout << "Subscriber: error" << errMsg << std::endl;
} }
} }
); );

View File

@ -25,7 +25,6 @@ namespace ix
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& filter,
const std::string& dsn, const std::string& dsn,
bool verbose, bool verbose,
bool strict, bool strict,
@ -95,7 +94,7 @@ namespace ix
} }
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &filter, &jsonWriter, [&conn, &channel, &jsonWriter,
verbose, &receivedCount, &sentCount, verbose, &receivedCount, &sentCount,
&condition, &conditionVariableMutex, &condition, &conditionVariableMutex,
&progressCondition, &queue] &progressCondition, &queue]
@ -120,7 +119,7 @@ namespace ix
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
std::cerr << "Subscriber authenticated" << std::endl; std::cerr << "Subscriber authenticated" << std::endl;
conn.subscribe(channel, filter, conn.subscribe(channel,
[&jsonWriter, verbose, [&jsonWriter, verbose,
&sentCount, &receivedCount, &sentCount, &receivedCount,
&condition, &conditionVariableMutex, &condition, &conditionVariableMutex,

View File

@ -63,7 +63,6 @@ namespace ix
const std::string& rolename, const std::string& rolename,
const std::string& rolesecret, const std::string& rolesecret,
const std::string& channel, const std::string& channel,
const std::string& filter,
const std::string& host, const std::string& host,
int port, int port,
const std::string& prefix, const std::string& prefix,
@ -91,7 +90,7 @@ namespace ix
uint64_t msgCount = 0; uint64_t msgCount = 0;
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &filter, &jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount] [&conn, &channel, &jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount]
(ix::CobraConnectionEventType eventType, (ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers, const ix::WebSocketHttpHeaders& headers,
@ -113,7 +112,7 @@ namespace ix
else if (eventType == ix::CobraConnection_EventType_Authenticated) else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
spdlog::info("Subscriber authenticated"); spdlog::info("Subscriber authenticated");
conn.subscribe(channel, filter, conn.subscribe(channel,
[&jsonWriter, &statsdClient, [&jsonWriter, &statsdClient,
verbose, &tokens, &prefix, &msgCount] verbose, &tokens, &prefix, &msgCount]
(const Json::Value& msg) (const Json::Value& msg)

View File

@ -9,9 +9,6 @@
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h> #include <ixwebsocket/IXSocket.h>
#include "linenoise.hpp"
namespace ix namespace ix
{ {
class WebSocketConnect class WebSocketConnect
@ -19,8 +16,7 @@ namespace ix
public: public:
WebSocketConnect(const std::string& _url, WebSocketConnect(const std::string& _url,
bool disableAutomaticReconnection, bool disableAutomaticReconnection,
bool disablePerMessageDeflate, bool disablePerMessageDeflate);
bool binaryMode);
void subscribe(const std::string& channel); void subscribe(const std::string& channel);
void start(); void start();
@ -32,18 +28,15 @@ namespace ix
std::string _url; std::string _url;
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
bool _disablePerMessageDeflate; bool _disablePerMessageDeflate;
bool _binaryMode;
void log(const std::string& msg); void log(const std::string& msg);
}; };
WebSocketConnect::WebSocketConnect(const std::string& url, WebSocketConnect::WebSocketConnect(const std::string& url,
bool disableAutomaticReconnection, bool disableAutomaticReconnection,
bool disablePerMessageDeflate, bool disablePerMessageDeflate) :
bool binaryMode) :
_url(url), _url(url),
_disablePerMessageDeflate(disablePerMessageDeflate), _disablePerMessageDeflate(disablePerMessageDeflate)
_binaryMode(binaryMode)
{ {
if (disableAutomaticReconnection) if (disableAutomaticReconnection)
{ {
@ -140,57 +133,45 @@ namespace ix
void WebSocketConnect::sendMessage(const std::string& text) void WebSocketConnect::sendMessage(const std::string& text)
{ {
if (_binaryMode) _webSocket.sendText(text);
{
_webSocket.sendBinary(text);
}
else
{
_webSocket.sendText(text);
}
} }
int ws_connect_main(const std::string& url, int ws_connect_main(const std::string& url,
bool disableAutomaticReconnection, bool disableAutomaticReconnection,
bool disablePerMessageDeflate, bool disablePerMessageDeflate)
bool binaryMode)
{ {
std::cout << "Type Ctrl-D to exit prompt..." << std::endl; std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketConnect webSocketChat(url, WebSocketConnect webSocketChat(url,
disableAutomaticReconnection, disableAutomaticReconnection,
disablePerMessageDeflate, disablePerMessageDeflate);
binaryMode);
webSocketChat.start(); webSocketChat.start();
while (true) while (true)
{ {
// Read line std::string text;
std::string line; std::cout << "> " << std::flush;
auto quit = linenoise::Readline("> ", line); std::getline(std::cin, text);
if (quit) if (text == "/stop")
{
break;
}
if (line == "/stop")
{ {
std::cout << "Stopping connection..." << std::endl; std::cout << "Stopping connection..." << std::endl;
webSocketChat.stop(); webSocketChat.stop();
continue; continue;
} }
if (line == "/start") if (text == "/start")
{ {
std::cout << "Starting connection..." << std::endl; std::cout << "Starting connection..." << std::endl;
webSocketChat.start(); webSocketChat.start();
continue; continue;
} }
webSocketChat.sendMessage(line); if (!std::cin)
{
break;
}
// Add text to history webSocketChat.sendMessage(text);
linenoise::AddHistory(line.c_str());
} }
std::cout << std::endl; std::cout << std::endl;