Added WebSocketMessageQueue

This commit is contained in:
dimon4eg 2019-05-12 00:26:52 +03:00
commit 7c5567db56
4 changed files with 173 additions and 0 deletions

View File

@ -28,6 +28,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketMessageQueue.cpp
ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketHandshake.cpp
@ -54,6 +55,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXNetSystem.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketMessageQueue.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXWebSocketHandshake.h

View File

@ -114,7 +114,10 @@ namespace ix
WebSocketSendInfo ping(const std::string& text);
void close();
// Set callback to receive websocket messages.
// Be aware: your callback will be executed from websocket's internal thread!
void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback();

View File

@ -0,0 +1,116 @@
/*
* IXWebSocketMessageQueue.cpp
* Author: Korchynskyi Dmytro
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketMessageQueue.h"
namespace ix
{
WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket)
{
bindWebsocket(websocket);
}
WebSocketMessageQueue::~WebSocketMessageQueue()
{
if (!_messages.empty())
{
// not handled all messages
}
bindWebsocket(nullptr);
}
void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket)
{
if (_websocket == websocket) return;
// unbind old
if (_websocket)
{
// set dummy callback just to avoid crash
_websocket->setOnMessageCallback([](
WebSocketMessageType type,
const std::string & str,
size_t wireSize,
const WebSocketErrorInfo & errorInfo,
const WebSocketOpenInfo & openInfo,
const WebSocketCloseInfo & closeInfo)
{});
}
_websocket = websocket;
// bind new
if (_websocket)
{
_websocket->setOnMessageCallback([this](
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
{
MessagePtr message(new Message());
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
{
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
});
}
}
void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback)
{
_onMessageUserCallback = callback;
}
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
{
MessagePtr message;
std::lock_guard<std::mutex> lock(_messagesMutex);
if (!_messages.empty())
{
message = std::move(_messages.front());
_messages.pop_front();
}
return message;
}
void WebSocketMessageQueue::poll(int count)
{
if (!_onMessageUserCallback)
return;
MessagePtr message;
while (count > 0 && (message = popMessage()))
{
_onMessageUserCallback(
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count;
}
}
}

View File

@ -0,0 +1,52 @@
/*
* IXWebSocketMessageQueue.h
* Author: Korchynskyi Dmytro
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocket.h"
#include <thread>
#include <list>
#include <memory>
namespace ix
{
//
// A helper class to dispatch websocket message callbacks in your thread.
//
class WebSocketMessageQueue
{
public:
WebSocketMessageQueue(WebSocket* websocket = nullptr);
~WebSocketMessageQueue();
void bindWebsocket(WebSocket* websocket);
void setOnMessageCallback(const OnMessageCallback& callback);
void poll(int count = 512);
protected:
struct Message
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessagePtr = std::shared_ptr<Message>;
MessagePtr popMessage();
private:
WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex;
std::list<MessagePtr> _messages;
};
}