(doc) Add more doc to SocketServer

This commit is contained in:
Benjamin Sergeant 2019-04-17 20:31:34 -07:00
parent d486c72e02
commit 52f460f66d
9 changed files with 34 additions and 20 deletions

View File

@ -155,7 +155,13 @@ namespace ix
_connectionStateFactory = connectionStateFactory; _connectionStateFactory = connectionStateFactory;
} }
//
// join the threads for connections that have been closed // join the threads for connections that have been closed
//
// When a connection is closed by a client, the connection state terminated
// field becomes true, and we can use that to know that we can join that thread
// and remove it from our _connectionsThreads data structure (a list).
//
void SocketServer::closeTerminatedThreads() void SocketServer::closeTerminatedThreads()
{ {
auto it = _connectionsThreads.begin(); auto it = _connectionsThreads.begin();
@ -188,7 +194,7 @@ namespace ix
if (_stop) return; if (_stop) return;
// Garbage collection to shutdown/join threads for closed connections. // Garbage collection to shutdown/join threads for closed connections.
// We could run this in its own thread, so that we dont need to accept // We could run this in its own thread, so that we dont need to accept
// a new connection to close a thread. // a new connection to close a thread.
// We could also use a condition variable to be notify when we need to do this // We could also use a condition variable to be notify when we need to do this
closeTerminatedThreads(); closeTerminatedThreads();

View File

@ -25,6 +25,7 @@ namespace ix
public: public:
using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>; using ConnectionStateFactory = std::function<std::shared_ptr<ConnectionState>()>;
// Each connection is handled by its own worker thread.
// We use a list as we only care about remove and append operations. // We use a list as we only care about remove and append operations.
using ConnectionThreads = std::list<std::pair<std::shared_ptr<ConnectionState>, using ConnectionThreads = std::list<std::pair<std::shared_ptr<ConnectionState>,
std::thread>>; std::thread>>;
@ -36,6 +37,9 @@ namespace ix
virtual ~SocketServer(); virtual ~SocketServer();
virtual void stop(); virtual void stop();
// It is possible to override ConnectionState through inheritance
// this method allows user to change the factory by returning an object
// that inherits from ConnectionState but has its own methods.
void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory); void setConnectionStateFactory(const ConnectionStateFactory& connectionStateFactory);
const static int kDefaultPort; const static int kDefaultPort;
@ -65,15 +69,19 @@ namespace ix
std::mutex _logMutex; std::mutex _logMutex;
// background thread to wait for incoming connections
std::atomic<bool> _stop; std::atomic<bool> _stop;
std::thread _thread; std::thread _thread;
// the list of (connectionState, threads) for each connections
ConnectionThreads _connectionsThreads; ConnectionThreads _connectionsThreads;
// used to have the main control thread for a server
// wait for a 'terminate' notification without busy polling
std::condition_variable _conditionVariable; std::condition_variable _conditionVariable;
std::mutex _conditionVariableMutex; std::mutex _conditionVariableMutex;
// // the factory to create ConnectionState objects
ConnectionStateFactory _connectionStateFactory; ConnectionStateFactory _connectionStateFactory;
// Methods // Methods

View File

@ -798,7 +798,7 @@ namespace ix
_closeReason = reason; _closeReason = reason;
_closeWireSize = closeWireSize; _closeWireSize = closeWireSize;
} }
setReadyState(CLOSED); setReadyState(CLOSED);
} }

View File

@ -258,7 +258,7 @@ namespace ix
return _webSocket->send(serializedJson).success; return _webSocket->send(serializedJson).success;
} }
// //
// Extract the nonce from the handshake response // Extract the nonce from the handshake response
// use it to compute a hash during authentication // use it to compute a hash during authentication
// //
@ -357,7 +357,7 @@ namespace ix
if (!pdu.isMember("body")) return false; if (!pdu.isMember("body")) return false;
Json::Value body = pdu["body"]; Json::Value body = pdu["body"];
// Identify subscription_id, so that we can find // Identify subscription_id, so that we can find
// which callback to execute // which callback to execute
if (!body.isMember("subscription_id")) return false; if (!body.isMember("subscription_id")) return false;
Json::Value subscriptionId = body["subscription_id"]; Json::Value subscriptionId = body["subscription_id"];
@ -531,5 +531,5 @@ namespace ix
{ {
connect(); connect();
} }
} // namespace ix } // namespace ix

View File

@ -90,7 +90,7 @@ namespace ix
/// Returns true only if we're connected /// Returns true only if we're connected
bool isConnected() const; bool isConnected() const;
/// Flush the publish queue /// Flush the publish queue
bool flushQueue(); bool flushQueue();
@ -128,7 +128,7 @@ namespace ix
/// ///
/// Member variables /// Member variables
/// ///
std::unique_ptr<WebSocket> _webSocket; std::unique_ptr<WebSocket> _webSocket;
/// Configuration data /// Configuration data
@ -158,10 +158,10 @@ namespace ix
std::unordered_map<std::string, SubscriptionCallback> _cbs; std::unordered_map<std::string, SubscriptionCallback> _cbs;
mutable std::mutex _cbsMutex; mutable std::mutex _cbsMutex;
// Message Queue can be touched on control+background thread, // Message Queue can be touched on control+background thread,
// protecting with a mutex. // protecting with a mutex.
// //
// Message queue is used when there are problems sending messages so // Message queue is used when there are problems sending messages so
// that sending can be retried later. // that sending can be retried later.
std::deque<std::string> _messageQueue; std::deque<std::string> _messageQueue;
mutable std::mutex _queueMutex; mutable std::mutex _queueMutex;
@ -169,5 +169,5 @@ namespace ix
// Cap the queue size (100 elems so far -> ~100k) // Cap the queue size (100 elems so far -> ~100k)
static constexpr size_t kQueueMaxSize = 256; static constexpr size_t kQueueMaxSize = 256;
}; };
} // namespace ix } // namespace ix

View File

@ -110,10 +110,10 @@ namespace ix
Json::Value exception; Json::Value exception;
exception["value"] = msg["data"]["message"]; exception["value"] = msg["data"]["message"];
std::string stackTraceFieldName = std::string stackTraceFieldName =
(msg["id"].asString() == "game_noisytypes_id") ? "traceback" : "stack"; (msg["id"].asString() == "game_noisytypes_id") ? "traceback" : "stack";
exception["stacktrace"]["frames"] = exception["stacktrace"]["frames"] =
parseLuaStackTrace(msg["data"][stackTraceFieldName].asString()); parseLuaStackTrace(msg["data"][stackTraceFieldName].asString());
payload["exception"].append(exception); payload["exception"].append(exception);

View File

@ -43,5 +43,5 @@ namespace ix
HttpClient _httpClient; HttpClient _httpClient;
}; };
} // namespace ix } // namespace ix

View File

@ -70,7 +70,7 @@ namespace ix
const std::string& prefix, const std::string& prefix,
const std::string& fields, const std::string& fields,
bool verbose); bool verbose);
int ws_cobra_to_sentry_main(const std::string& appkey, int ws_cobra_to_sentry_main(const std::string& appkey,
const std::string& endpoint, const std::string& endpoint,
const std::string& rolename, const std::string& rolename,

View File

@ -88,9 +88,9 @@ namespace ix
} }
conn.setEventCallback( conn.setEventCallback(
[&conn, &channel, &jsonWriter, [&conn, &channel, &jsonWriter,
verbose, &receivedCount, &sentCount, verbose, &receivedCount, &sentCount,
&condition, &conditionVariableMutex, &condition, &conditionVariableMutex,
&progressCondition, &queue] &progressCondition, &queue]
(ix::CobraConnectionEventType eventType, (ix::CobraConnectionEventType eventType,
const std::string& errMsg, const std::string& errMsg,
@ -114,7 +114,7 @@ namespace ix
{ {
std::cerr << "Subscriber authenticated" << std::endl; std::cerr << "Subscriber authenticated" << std::endl;
conn.subscribe(channel, conn.subscribe(channel,
[&jsonWriter, verbose, [&jsonWriter, verbose,
&sentCount, &receivedCount, &sentCount, &receivedCount,
&condition, &conditionVariableMutex, &condition, &conditionVariableMutex,
&progressCondition, &queue] &progressCondition, &queue]
@ -132,7 +132,7 @@ namespace ix
receivedCount != 0 && receivedCount != 0 &&
(sentCount * scaleFactor < receivedCount)) (sentCount * scaleFactor < receivedCount))
{ {
std::cerr << "message dropped: sending is backlogged !" std::cerr << "message dropped: sending is backlogged !"
<< std::endl; << std::endl;
condition.notify_one(); condition.notify_one();