set thread name / rename example

This commit is contained in:
Benjamin Sergeant 2018-12-23 14:14:38 -08:00
parent bd04b28b9e
commit a79f4c10a1
24 changed files with 103 additions and 97 deletions

View File

@ -14,6 +14,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXEventFd.cpp
ixwebsocket/IXSocket.cpp
ixwebsocket/IXSocketConnect.cpp
ixwebsocket/IXSetThreadName.cpp
ixwebsocket/IXDNSLookup.cpp
ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketTransport.cpp
@ -25,6 +26,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXEventFd.h
ixwebsocket/IXSocket.h
ixwebsocket/IXSocketConnect.h
ixwebsocket/IXSetThreadName.h
ixwebsocket/IXDNSLookup.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketTransport.h

View File

@ -4,7 +4,7 @@
#
cmake_minimum_required (VERSION 3.4.1)
project (satori_publisher)
project (cobra_publisher)
# There's -Weverything too for clang
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -Wshorten-64-to-32")
@ -17,18 +17,18 @@ option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
include_directories(satori_publisher ${OPENSSL_PREFIX}/include)
include_directories(satori_publisher .)
include_directories(cobra_publisher ${OPENSSL_PREFIX}/include)
include_directories(cobra_publisher .)
add_executable(satori_publisher
add_executable(cobra_publisher
jsoncpp/jsoncpp.cpp
ixcrypto/IXHMac.cpp
ixcrypto/IXBase64.cpp
IXSatoriConnection.cpp
satori_publisher.cpp)
IXCobraConnection.cpp
cobra_publisher.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(satori_publisher "-framework foundation" "-framework security")
target_link_libraries(cobra_publisher "-framework foundation" "-framework security")
endif()
get_filename_component(crypto_lib_path ${OPENSSL_PREFIX}/lib/libcrypto.a ABSOLUTE)
@ -36,5 +36,5 @@ add_library(lib_crypto STATIC IMPORTED)
set_target_properties(lib_crypto PROPERTIES IMPORTED_LOCATION ${crypto_lib_path})
link_directories(/usr/local/opt/openssl/lib)
target_link_libraries(satori_publisher ixwebsocket lib_crypto)
install(TARGETS satori_publisher DESTINATION bin)
target_link_libraries(cobra_publisher ixwebsocket lib_crypto)
install(TARGETS cobra_publisher DESTINATION bin)

View File

@ -1,10 +1,10 @@
/*
* IXSatoriConnection.cpp
* IXCobraConnection.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone. All rights reserved.
*/
#include "IXSatoriConnection.h"
#include "IXCobraConnection.h"
#include <ixcrypto/IXHMac.h>
#include <algorithm>
@ -16,35 +16,35 @@
namespace ix
{
TrafficTrackerCallback SatoriConnection::_trafficTrackerCallback = nullptr;
constexpr size_t SatoriConnection::kQueueMaxSize;
TrafficTrackerCallback CobraConnection::_trafficTrackerCallback = nullptr;
constexpr size_t CobraConnection::kQueueMaxSize;
SatoriConnection::SatoriConnection() :
CobraConnection::CobraConnection() :
_authenticated(false),
_eventCallback(nullptr),
_publishMode(SatoriConnection_PublishMode_Immediate)
_publishMode(CobraConnection_PublishMode_Immediate)
{
_pdu["action"] = "rtm/publish";
initWebSocketOnMessageCallback();
}
SatoriConnection::~SatoriConnection()
CobraConnection::~CobraConnection()
{
disconnect();
}
void SatoriConnection::setTrafficTrackerCallback(const TrafficTrackerCallback& callback)
void CobraConnection::setTrafficTrackerCallback(const TrafficTrackerCallback& callback)
{
_trafficTrackerCallback = callback;
}
void SatoriConnection::resetTrafficTrackerCallback()
void CobraConnection::resetTrafficTrackerCallback()
{
setTrafficTrackerCallback(nullptr);
}
void SatoriConnection::invokeTrafficTrackerCallback(size_t size, bool incoming)
void CobraConnection::invokeTrafficTrackerCallback(size_t size, bool incoming)
{
if (_trafficTrackerCallback)
{
@ -52,13 +52,13 @@ namespace ix
}
}
void SatoriConnection::setEventCallback(const EventCallback& eventCallback)
void CobraConnection::setEventCallback(const EventCallback& eventCallback)
{
std::lock_guard<std::mutex> lock(_eventCallbackMutex);
_eventCallback = eventCallback;
}
void SatoriConnection::invokeEventCallback(ix::SatoriConnectionEventType eventType,
void CobraConnection::invokeEventCallback(ix::CobraConnectionEventType eventType,
const std::string& errorMsg,
const WebSocketHttpHeaders& headers)
{
@ -69,18 +69,18 @@ namespace ix
}
}
void SatoriConnection::invokeErrorCallback(const std::string& errorMsg)
void CobraConnection::invokeErrorCallback(const std::string& errorMsg)
{
invokeEventCallback(ix::SatoriConnection_EventType_Error, errorMsg);
invokeEventCallback(ix::CobraConnection_EventType_Error, errorMsg);
}
void SatoriConnection::disconnect()
void CobraConnection::disconnect()
{
_authenticated = false;
_webSocket.stop();
}
void SatoriConnection::initWebSocketOnMessageCallback()
void CobraConnection::initWebSocketOnMessageCallback()
{
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType,
@ -90,12 +90,12 @@ namespace ix
const ix::WebSocketCloseInfo& closeInfo,
const ix::WebSocketHttpHeaders& headers)
{
SatoriConnection::invokeTrafficTrackerCallback(wireSize, true);
CobraConnection::invokeTrafficTrackerCallback(wireSize, true);
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
invokeEventCallback(ix::SatoriConnection_EventType_Open,
invokeEventCallback(ix::CobraConnection_EventType_Open,
std::string(),
headers);
sendHandshakeMessage();
@ -107,7 +107,7 @@ namespace ix
std::stringstream ss;
ss << "Close code " << closeInfo.code;
ss << " reason " << closeInfo.reason;
invokeEventCallback(ix::SatoriConnection_EventType_Closed,
invokeEventCallback(ix::CobraConnection_EventType_Closed,
ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Message)
@ -142,7 +142,7 @@ namespace ix
else if (action == "auth/authenticate/ok")
{
_authenticated = true;
invokeEventCallback(ix::SatoriConnection_EventType_Authenticated);
invokeEventCallback(ix::CobraConnection_EventType_Authenticated);
flushQueue();
}
else if (action == "auth/authenticate/error")
@ -170,12 +170,12 @@ namespace ix
});
}
void SatoriConnection::setPublishMode(SatoriConnectionPublishMode publishMode)
void CobraConnection::setPublishMode(CobraConnectionPublishMode publishMode)
{
_publishMode = publishMode;
}
void SatoriConnection::configure(const std::string& appkey,
void CobraConnection::configure(const std::string& appkey,
const std::string& endpoint,
const std::string& rolename,
const std::string& rolesecret,
@ -210,7 +210,7 @@ namespace ix
// }
//
//
bool SatoriConnection::sendHandshakeMessage()
bool CobraConnection::sendHandshakeMessage()
{
Json::Value data;
data["role"] = _role_name;
@ -224,7 +224,7 @@ namespace ix
pdu["body"] = body;
std::string serializedJson = serializeJson(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
CobraConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson).success;
}
@ -243,7 +243,7 @@ namespace ix
// }
// }
//
bool SatoriConnection::handleHandshakeResponse(const Json::Value& pdu)
bool CobraConnection::handleHandshakeResponse(const Json::Value& pdu)
{
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -272,7 +272,7 @@ namespace ix
// },
// }
//
bool SatoriConnection::sendAuthMessage(const std::string& nonce)
bool CobraConnection::sendAuthMessage(const std::string& nonce)
{
Json::Value credentials;
credentials["hash"] = hmac(nonce, _role_secret);
@ -286,13 +286,13 @@ namespace ix
pdu["body"] = body;
std::string serializedJson = serializeJson(pdu);
SatoriConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
CobraConnection::invokeTrafficTrackerCallback(serializedJson.size(), false);
return _webSocket.send(serializedJson).success;
}
bool SatoriConnection::handleSubscriptionData(const Json::Value& pdu)
bool CobraConnection::handleSubscriptionData(const Json::Value& pdu)
{
if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"];
@ -318,18 +318,18 @@ namespace ix
return true;
}
bool SatoriConnection::connect()
bool CobraConnection::connect()
{
_webSocket.start();
return true;
}
bool SatoriConnection::isConnected() const
bool CobraConnection::isConnected() const
{
return _webSocket.getReadyState() == ix::WebSocket_ReadyState_Open;
}
std::string SatoriConnection::serializeJson(const Json::Value& value)
std::string CobraConnection::serializeJson(const Json::Value& value)
{
std::lock_guard<std::mutex> lock(_jsonWriterMutex);
return _jsonWriter.write(value);
@ -338,7 +338,7 @@ namespace ix
//
// publish is not thread safe as we are trying to reuse some Json objects.
//
bool SatoriConnection::publish(const Json::Value& channels,
bool CobraConnection::publish(const Json::Value& channels,
const Json::Value& msg)
{
_body["channels"] = channels;
@ -347,7 +347,7 @@ namespace ix
std::string serializedJson = serializeJson(_pdu);
if (_publishMode == SatoriConnection_PublishMode_Batch)
if (_publishMode == CobraConnection_PublishMode_Batch)
{
enqueue(serializedJson);
return true;
@ -370,7 +370,7 @@ namespace ix
}
}
void SatoriConnection::subscribe(const std::string& channel,
void CobraConnection::subscribe(const std::string& channel,
SubscriptionCallback cb)
{
// Create and send a subscribe pdu
@ -388,7 +388,7 @@ namespace ix
_cbs[channel] = cb;
}
void SatoriConnection::unsubscribe(const std::string& channel)
void CobraConnection::unsubscribe(const std::string& channel)
{
{
std::lock_guard<std::mutex> lock(_cbsMutex);
@ -420,11 +420,11 @@ namespace ix
// enqueue(D) -> [D, C, B] -- now we drop A, the oldest message,
// -- and keep the 'fresh ones'
//
void SatoriConnection::enqueue(const std::string& msg)
void CobraConnection::enqueue(const std::string& msg)
{
std::lock_guard<std::mutex> lock(_queueMutex);
if (_messageQueue.size() == SatoriConnection::kQueueMaxSize)
if (_messageQueue.size() == CobraConnection::kQueueMaxSize)
{
_messageQueue.pop_back();
}
@ -436,7 +436,7 @@ namespace ix
// when sending them. If we fail to send something, we put it back in the queue
// at the end we picked it up originally (at the end).
//
bool SatoriConnection::flushQueue()
bool CobraConnection::flushQueue()
{
std::lock_guard<std::mutex> lock(_queueMutex);
@ -454,20 +454,20 @@ namespace ix
return true;
}
bool SatoriConnection::publishMessage(const std::string& serializedJson)
bool CobraConnection::publishMessage(const std::string& serializedJson)
{
auto webSocketSendInfo = _webSocket.send(serializedJson);
SatoriConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize,
CobraConnection::invokeTrafficTrackerCallback(webSocketSendInfo.wireSize,
false);
return webSocketSendInfo.success;
}
void SatoriConnection::suspend()
void CobraConnection::suspend()
{
disconnect();
}
void SatoriConnection::resume()
void CobraConnection::resume()
{
connect();
}

View File

@ -1,5 +1,5 @@
/*
* IXSatoriConnection.h
* IXCobraConnection.h
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone. All rights reserved.
*/
@ -18,31 +18,31 @@
namespace ix
{
enum SatoriConnectionEventType
enum CobraConnectionEventType
{
SatoriConnection_EventType_Authenticated = 0,
SatoriConnection_EventType_Error = 1,
SatoriConnection_EventType_Open = 2,
SatoriConnection_EventType_Closed = 3
CobraConnection_EventType_Authenticated = 0,
CobraConnection_EventType_Error = 1,
CobraConnection_EventType_Open = 2,
CobraConnection_EventType_Closed = 3
};
enum SatoriConnectionPublishMode
enum CobraConnectionPublishMode
{
SatoriConnection_PublishMode_Immediate = 0,
SatoriConnection_PublishMode_Batch = 1
CobraConnection_PublishMode_Immediate = 0,
CobraConnection_PublishMode_Batch = 1
};
using SubscriptionCallback = std::function<void(const Json::Value&)>;
using EventCallback = std::function<void(SatoriConnectionEventType,
using EventCallback = std::function<void(CobraConnectionEventType,
const std::string&,
const WebSocketHttpHeaders&)>;
using TrafficTrackerCallback = std::function<void(size_t size, bool incoming)>;
class SatoriConnection
class CobraConnection
{
public:
SatoriConnection();
~SatoriConnection();
CobraConnection();
~CobraConnection();
/// Configuration / set keys, etc...
/// All input data but the channel name is encrypted with rc4
@ -79,7 +79,7 @@ namespace ix
/// Close the connection
void disconnect();
/// Connect to Satori and authenticate the connection
/// Connect to Cobra and authenticate the connection
bool connect();
/// Returns true only if we're connected
@ -89,7 +89,7 @@ namespace ix
bool flushQueue();
/// Set the publish mode
void setPublishMode(SatoriConnectionPublishMode publishMode);
void setPublishMode(CobraConnectionPublishMode publishMode);
/// Lifecycle management. Free resources when backgrounding
void suspend();
@ -111,7 +111,7 @@ namespace ix
static void invokeTrafficTrackerCallback(size_t size, bool incoming);
/// Invoke event callbacks
void invokeEventCallback(SatoriConnectionEventType eventType,
void invokeEventCallback(CobraConnectionEventType eventType,
const std::string& errorMsg = std::string(),
const WebSocketHttpHeaders& headers = WebSocketHttpHeaders());
void invokeErrorCallback(const std::string& errorMsg);
@ -126,7 +126,7 @@ namespace ix
std::string _endpoint;
std::string _role_name;
std::string _role_secret;
std::atomic<SatoriConnectionPublishMode> _publishMode;
std::atomic<CobraConnectionPublishMode> _publishMode;
// Can be set on control+background thread, protecting with an atomic
std::atomic<bool> _authenticated;
@ -140,7 +140,7 @@ namespace ix
/// Traffic tracker callback
static TrafficTrackerCallback _trafficTrackerCallback;
/// Satori events callbacks
/// Cobra events callbacks
EventCallback _eventCallback;
mutable std::mutex _eventCallbackMutex;

View File

@ -0,0 +1,6 @@
```
mkdir build
cd build
cmake ..
make && (cd .. ; sh cobra_publisher.sh)
```

View File

@ -1,5 +1,5 @@
/*
* satori_publisher.cpp
* cobra_publisher.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
*/
@ -9,7 +9,7 @@
#include <fstream>
#include <atomic>
#include <ixwebsocket/IXWebSocket.h>
#include "IXSatoriConnection.h"
#include "IXCobraConnection.h"
#include "jsoncpp/json/json.h"
void msleep(int ms)
@ -34,7 +34,7 @@ int main(int argc, char* argv[])
std::atomic<size_t> incomingBytes(0);
std::atomic<size_t> outgoingBytes(0);
ix::SatoriConnection::setTrafficTrackerCallback(
ix::CobraConnection::setTrafficTrackerCallback(
[&incomingBytes, &outgoingBytes](size_t size, bool incoming)
{
if (incoming)
@ -49,19 +49,19 @@ int main(int argc, char* argv[])
);
bool done = false;
ix::SatoriConnection satoriConnection;
ix::CobraConnection cobraConnection;
ix::WebSocketPerMessageDeflateOptions webSocketPerMessageDeflateOptions(
true, false, false, 15, 15);
satoriConnection.configure(appkey, endpoint, rolename, rolesecret,
cobraConnection.configure(appkey, endpoint, rolename, rolesecret,
webSocketPerMessageDeflateOptions);
satoriConnection.connect();
satoriConnection.setEventCallback(
[&satoriConnection, channel, path, &done]
(ix::SatoriConnectionEventType eventType,
cobraConnection.connect();
cobraConnection.setEventCallback(
[&cobraConnection, channel, path, &done]
(ix::CobraConnectionEventType eventType,
const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers)
{
if (eventType == ix::SatoriConnection_EventType_Open)
if (eventType == ix::CobraConnection_EventType_Open)
{
std::cout << "Handshake Headers:" << std::endl;
for (auto it : headers)
@ -69,7 +69,7 @@ int main(int argc, char* argv[])
std::cout << it.first << ": " << it.second << std::endl;
}
}
else if (eventType == ix::SatoriConnection_EventType_Authenticated)
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{
std::cout << "Authenticated" << std::endl;
@ -87,7 +87,7 @@ int main(int argc, char* argv[])
Json::Reader reader;
reader.parse(line, value);
satoriConnection.publish(channel, value);
cobraConnection.publish(channel, value);
n++;
}
std::cerr << "#published messages: " << n << std::endl;
@ -99,14 +99,14 @@ int main(int argc, char* argv[])
done = true;
}
else if (eventType == ix::SatoriConnection_EventType_Error)
else if (eventType == ix::CobraConnection_EventType_Error)
{
std::cerr << "Satori Error received: " << errMsg << std::endl;
std::cerr << "Cobra Error received: " << errMsg << std::endl;
done = true;
}
else if (eventType == ix::SatoriConnection_EventType_Closed)
else if (eventType == ix::CobraConnection_EventType_Closed)
{
std::cerr << "Satori connection closed" << std::endl;
std::cerr << "Cobra connection closed" << std::endl;
}
}
);

View File

@ -8,4 +8,4 @@ rolename="a_role"
rolesecret="a_secret"
filename=${FILENAME:=events.jsonl}
build/satori_publisher $endpoint $appkey $channel $rolename $rolesecret $filename
build/cobra_publisher $endpoint $appkey $channel $rolename $rolesecret $filename

View File

@ -1,6 +0,0 @@
```
mkdir build
cd build
cmake ..
make && (cd .. ; sh satori_publisher.sh)
```

View File

@ -11,6 +11,7 @@ g++ --std=c++11 \
-g \
../../ixwebsocket/IXEventFd.cpp \
../../ixwebsocket/IXSocket.cpp \
../../ixwebsocket/IXSetThreadName.cpp \
../../ixwebsocket/IXWebSocketTransport.cpp \
../../ixwebsocket/IXWebSocket.cpp \
../../ixwebsocket/IXDNSLookup.cpp \

View File

@ -22,7 +22,7 @@ namespace ix
std::set<uint64_t> DNSLookup::_activeJobs;
std::mutex DNSLookup::_activeJobsMutex;
DNSLookup::DNSLookup(const std::string& hostname, int port, int wait) :
DNSLookup::DNSLookup(const std::string& hostname, int port, int64_t wait) :
_hostname(hostname),
_port(port),
_res(nullptr),

View File

@ -26,7 +26,7 @@ namespace ix
public:
DNSLookup(const std::string& hostname,
int port,
int wait = DNSLookup::kDefaultWait);
int64_t wait = DNSLookup::kDefaultWait);
~DNSLookup();
struct addrinfo* resolve(std::string& errMsg,
@ -47,7 +47,7 @@ namespace ix
std::string _hostname;
int _port;
int _wait;
int64_t _wait;
std::string _errMsg;
struct addrinfo* _res;

View File

@ -5,6 +5,7 @@
*/
#include "IXWebSocket.h"
#include "IXSetThreadName.h"
#include <iostream>
#include <cmath>
@ -25,8 +26,8 @@ namespace
}
}
namespace ix {
namespace ix
{
OnTrafficTrackerCallback WebSocket::_onTrafficTrackerCallback = nullptr;
WebSocket::WebSocket() :
@ -171,6 +172,8 @@ namespace ix {
void WebSocket::run()
{
setThreadName(_url);
while (true)
{
if (_stop) return;