indent files

This commit is contained in:
Benjamin Sergeant 2020-03-20 17:00:18 -07:00
parent 1d6373335c
commit 5ce846f48b
12 changed files with 121 additions and 118 deletions

View File

@ -37,9 +37,8 @@ namespace ix
static OSStatus writeToSocket(SSLConnectionRef connection, const void* data, size_t* len); static OSStatus writeToSocket(SSLConnectionRef connection, const void* data, size_t* len);
static OSStatus readFromSocket(SSLConnectionRef connection, void* data, size_t* len); static OSStatus readFromSocket(SSLConnectionRef connection, void* data, size_t* len);
OSStatus tlsHandShake( OSStatus tlsHandShake(std::string& errMsg,
std::string& errMsg, const CancellationRequest& isCancellationRequested);
const CancellationRequest& isCancellationRequested);
SSLContextRef _sslContext; SSLContextRef _sslContext;
mutable std::mutex _mutex; // AppleSSL routines are not thread-safe mutable std::mutex _mutex; // AppleSSL routines are not thread-safe

View File

@ -224,10 +224,9 @@ namespace ix
return true; return true;
} }
bool SocketOpenSSL::openSSLClientHandshake( bool SocketOpenSSL::openSSLClientHandshake(const std::string& host,
const std::string& host, std::string& errMsg,
std::string& errMsg, const CancellationRequest& isCancellationRequested)
const CancellationRequest& isCancellationRequested)
{ {
while (true) while (true)
{ {

View File

@ -160,10 +160,8 @@ namespace ix
{ {
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions, _ws.configure(
_socketTLSOptions, _perMessageDeflateOptions, _socketTLSOptions, _enablePong, _pingIntervalSecs);
_enablePong,
_pingIntervalSecs);
} }
WebSocketHttpHeaders headers(_extraHeaders); WebSocketHttpHeaders headers(_extraHeaders);
@ -216,10 +214,8 @@ namespace ix
{ {
{ {
std::lock_guard<std::mutex> lock(_configMutex); std::lock_guard<std::mutex> lock(_configMutex);
_ws.configure(_perMessageDeflateOptions, _ws.configure(
_socketTLSOptions, _perMessageDeflateOptions, _socketTLSOptions, _enablePong, _pingIntervalSecs);
_enablePong,
_pingIntervalSecs);
} }
WebSocketInitResult status = _ws.connectToSocket(socket, timeoutSecs); WebSocketInitResult status = _ws.connectToSocket(socket, timeoutSecs);

View File

@ -251,16 +251,15 @@ namespace ix
// No timeout if state is not OPEN, otherwise computed // No timeout if state is not OPEN, otherwise computed
// pingIntervalOrTimeoutGCD (equals to -1 if no ping and no ping timeout are set) // pingIntervalOrTimeoutGCD (equals to -1 if no ping and no ping timeout are set)
int lastingTimeoutDelayInMs = int lastingTimeoutDelayInMs = (_readyState != ReadyState::OPEN) ? 0 : _pingIntervalSecs;
(_readyState != ReadyState::OPEN) ? 0 : _pingIntervalSecs;
if (_pingIntervalSecs > 0) if (_pingIntervalSecs > 0)
{ {
// compute lasting delay to wait for next ping / timeout, if at least one set // compute lasting delay to wait for next ping / timeout, if at least one set
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
lastingTimeoutDelayInMs = lastingTimeoutDelayInMs = (int) std::chrono::duration_cast<std::chrono::milliseconds>(
(int) std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastSendPingTimePoint) now - _lastSendPingTimePoint)
.count(); .count();
} }
#ifdef _WIN32 #ifdef _WIN32

View File

@ -126,32 +126,36 @@ namespace
std::string filter; std::string filter;
std::string position("$"); std::string position("$");
_conn.subscribe( _conn.subscribe(channel,
channel, filter, position, [this](const Json::Value& msg, const std::string& /*position*/) { filter,
spdlog::info("receive {}", msg.toStyledString()); position,
[this](const Json::Value& msg, const std::string& /*position*/) {
spdlog::info("receive {}", msg.toStyledString());
if (!msg.isObject()) return; if (!msg.isObject()) return;
if (!msg.isMember("user")) return; if (!msg.isMember("user")) return;
if (!msg.isMember("text")) return; if (!msg.isMember("text")) return;
if (!msg.isMember("session")) return; if (!msg.isMember("session")) return;
std::string msg_user = msg["user"].asString(); std::string msg_user = msg["user"].asString();
std::string msg_text = msg["text"].asString(); std::string msg_text = msg["text"].asString();
std::string msg_session = msg["session"].asString(); std::string msg_session = msg["session"].asString();
// We are not interested in messages // We are not interested in messages
// from a different session. // from a different session.
if (msg_session != _session) return; if (msg_session != _session) return;
// We are not interested in our own messages // We are not interested in our own messages
if (msg_user == _user) return; if (msg_user == _user) return;
_receivedQueue.push(msg); _receivedQueue.push(msg);
std::stringstream ss; std::stringstream ss;
ss << std::endl << msg_user << " > " << msg_text << std::endl << _user << " > "; ss << std::endl
log(ss.str()); << msg_user << " > " << msg_text << std::endl
}); << _user << " > ";
log(ss.str());
});
} }
void CobraChat::sendMessage(const std::string& text) void CobraChat::sendMessage(const std::string& text)

View File

@ -77,18 +77,20 @@ namespace
std::string filter; std::string filter;
std::string position("$"); std::string position("$");
conn.subscribe( conn.subscribe(channel,
channel, filter, position, [](const Json::Value& msg, const std::string& /*position*/) { filter,
log(msg.toStyledString()); position,
[](const Json::Value& msg, const std::string& /*position*/) {
log(msg.toStyledString());
std::string id = msg["id"].asString(); std::string id = msg["id"].asString();
{ {
std::lock_guard<std::mutex> guard(gProtectIds); std::lock_guard<std::mutex> guard(gProtectIds);
gIds.insert(id); gIds.insert(id);
} }
gMessageCount++; gMessageCount++;
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)
{ {

View File

@ -150,10 +150,7 @@ TEST_CASE("Cobra_to_sentry_bot", "[cobra_bots]")
// -> https://github.com/openssl/openssl/issues/7967 // -> https://github.com/openssl/openssl/issues/7967
// https://xxxxx:yyyyyy@sentry.io/1234567 // https://xxxxx:yyyyyy@sentry.io/1234567
std::stringstream oss; std::stringstream oss;
oss << getHttpScheme() oss << getHttpScheme() << "xxxxxxx:yyyyyyy@localhost:" << sentryPort << "/1234567";
<< "xxxxxxx:yyyyyyy@localhost:"
<< sentryPort
<< "/1234567";
std::string dsn = oss.str(); std::string dsn = oss.str();
SocketTLSOptions tlsOptionsClient = makeClientTLSOptions(); SocketTLSOptions tlsOptionsClient = makeClientTLSOptions();

View File

@ -148,7 +148,7 @@ namespace ix
auto vec = load(path); auto vec = load(path);
return std::string(vec.begin(), vec.end()); return std::string(vec.begin(), vec.end());
} }
SocketTLSOptions makeClientTLSOptions() SocketTLSOptions makeClientTLSOptions()
{ {
SocketTLSOptions tlsOptionsClient; SocketTLSOptions tlsOptionsClient;
@ -237,9 +237,7 @@ namespace ix
std::string makeCobraEndpoint(int port, bool preferTLS) std::string makeCobraEndpoint(int port, bool preferTLS)
{ {
std::stringstream ss; std::stringstream ss;
ss << getWsScheme(preferTLS) ss << getWsScheme(preferTLS) << "localhost:" << port;
<< "localhost:"
<< port;
std::string endpoint = ss.str(); std::string endpoint = ss.str();
return endpoint; return endpoint;

View File

@ -9,8 +9,8 @@
#include "IXGetFreePort.h" #include "IXGetFreePort.h"
#include <iostream> #include <iostream>
#include <ixsnake/IXAppConfig.h> #include <ixsnake/IXAppConfig.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <ixwebsocket/IXSocketTLSOptions.h> #include <ixwebsocket/IXSocketTLSOptions.h>
#include <ixwebsocket/IXWebSocketServer.h>
#include <mutex> #include <mutex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <sstream> #include <sstream>

View File

@ -171,9 +171,7 @@ int main(int argc, char** argv)
connectApp->add_option("--max_wait", connectApp->add_option("--max_wait",
maxWaitBetweenReconnectionRetries, maxWaitBetweenReconnectionRetries,
"Max Wait Time between reconnection retries"); "Max Wait Time between reconnection retries");
connectApp->add_option("--ping_interval", connectApp->add_option("--ping_interval", pingIntervalSecs, "Interval between sending pings");
pingIntervalSecs,
"Interval between sending pings");
connectApp->add_option("--subprotocol", subprotocol, "Subprotocol"); connectApp->add_option("--subprotocol", subprotocol, "Subprotocol");
addTLSOptions(connectApp); addTLSOptions(connectApp);
@ -475,7 +473,8 @@ int main(int argc, char** argv)
} }
else if (app.got_subcommand("cobra_metrics_to_redis")) else if (app.got_subcommand("cobra_metrics_to_redis"))
{ {
ret = ix::ws_cobra_metrics_to_redis(cobraConfig, channel, filter, position, hostname, redisPort); ret = ix::ws_cobra_metrics_to_redis(
cobraConfig, channel, filter, position, hostname, redisPort);
} }
else if (app.got_subcommand("snake")) else if (app.got_subcommand("snake"))
{ {

View File

@ -30,8 +30,7 @@ namespace ix
CobraMetricsPublisher cobraMetricsPublisher; CobraMetricsPublisher cobraMetricsPublisher;
cobraMetricsPublisher.enable(true); cobraMetricsPublisher.enable(true);
cobraMetricsPublisher.configure(config, cobraMetricsPublisher.configure(config, channel);
channel);
while (!cobraMetricsPublisher.isAuthenticated()) while (!cobraMetricsPublisher.isAuthenticated())
; ;

View File

@ -70,63 +70,74 @@ namespace ix
std::string subscriptionPosition(position); std::string subscriptionPosition(position);
conn.setEventCallback( conn.setEventCallback([&conn,
[&conn, &channel, &jsonWriter, &filter, &subscriptionPosition, &msgCount, &msgPerSeconds, &quiet, &fluentd]( &channel,
ix::CobraConnectionEventType eventType, &jsonWriter,
const std::string& errMsg, &filter,
const ix::WebSocketHttpHeaders& headers, &subscriptionPosition,
const std::string& subscriptionId, &msgCount,
CobraConnection::MsgId msgId) { &msgPerSeconds,
if (eventType == ix::CobraConnection_EventType_Open) &quiet,
{ &fluentd](ix::CobraConnectionEventType eventType,
spdlog::info("Subscriber connected"); const std::string& errMsg,
const ix::WebSocketHttpHeaders& headers,
const std::string& subscriptionId,
CobraConnection::MsgId msgId) {
if (eventType == ix::CobraConnection_EventType_Open)
{
spdlog::info("Subscriber connected");
for (auto it : headers) for (auto it : headers)
{
spdlog::info("{}: {}", it.first, it.second);
}
}
else if (eventType == ix::CobraConnection_EventType_Authenticated)
{ {
spdlog::info("Subscriber authenticated"); spdlog::info("{}: {}", it.first, it.second);
spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition); }
conn.subscribe(channel, }
filter, else if (eventType == ix::CobraConnection_EventType_Authenticated)
subscriptionPosition, {
[&jsonWriter, &quiet, &msgPerSeconds, &msgCount, &fluentd, &subscriptionPosition]( spdlog::info("Subscriber authenticated");
const Json::Value& msg, const std::string& position) { spdlog::info("Subscribing to {} at position {}", channel, subscriptionPosition);
if (!quiet) conn.subscribe(
{ channel,
writeToStdout(fluentd, jsonWriter, msg, position); filter,
} subscriptionPosition,
[&jsonWriter,
&quiet,
&msgPerSeconds,
&msgCount,
&fluentd,
&subscriptionPosition](const Json::Value& msg, const std::string& position) {
if (!quiet)
{
writeToStdout(fluentd, jsonWriter, msg, position);
}
msgPerSeconds++; msgPerSeconds++;
msgCount++; msgCount++;
subscriptionPosition = position; subscriptionPosition = position;
}); });
} }
else if (eventType == ix::CobraConnection_EventType_Subscribed) else if (eventType == ix::CobraConnection_EventType_Subscribed)
{ {
spdlog::info("Subscriber: subscribed to channel {}", subscriptionId); spdlog::info("Subscriber: subscribed to channel {}", subscriptionId);
} }
else if (eventType == ix::CobraConnection_EventType_UnSubscribed) else if (eventType == ix::CobraConnection_EventType_UnSubscribed)
{ {
spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId); spdlog::info("Subscriber: unsubscribed from channel {}", subscriptionId);
} }
else if (eventType == ix::CobraConnection_EventType_Error) else if (eventType == ix::CobraConnection_EventType_Error)
{ {
spdlog::error("Subscriber: error {}", errMsg); spdlog::error("Subscriber: error {}", errMsg);
} }
else if (eventType == ix::CobraConnection_EventType_Published) else if (eventType == ix::CobraConnection_EventType_Published)
{ {
spdlog::error("Published message hacked: {}", msgId); spdlog::error("Published message hacked: {}", msgId);
} }
else if (eventType == ix::CobraConnection_EventType_Pong) else if (eventType == ix::CobraConnection_EventType_Pong)
{ {
spdlog::info("Received websocket pong"); spdlog::info("Received websocket pong");
} }
}); });
while (true) while (true)
{ {