IXWebSocket/ixwebsocket/IXWebSocketMessageQueue.cpp

122 lines
3.1 KiB
C++
Raw Normal View History

2019-05-08 21:02:56 +02:00
/*
2019-05-08 23:16:37 +02:00
* IXWebSocketMessageQueue.cpp
2019-05-08 21:02:56 +02:00
* Author: Korchynskyi Dmytro
2019-05-08 23:16:37 +02:00
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
2019-05-08 21:02:56 +02:00
*/
2019-05-08 23:09:51 +02:00
#include "IXWebSocketMessageQueue.h"
2019-05-08 21:02:56 +02:00
namespace ix
{
2019-05-08 23:09:51 +02:00
WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket)
2019-05-08 21:02:56 +02:00
{
2019-05-08 21:24:39 +02:00
bindWebsocket(websocket);
2019-05-08 21:02:56 +02:00
}
2019-05-08 23:09:51 +02:00
WebSocketMessageQueue::~WebSocketMessageQueue()
2019-05-08 21:02:56 +02:00
{
if (!_messages.empty())
{
2019-05-08 21:24:39 +02:00
// not handled all messages
2019-05-08 21:02:56 +02:00
}
bindWebsocket(nullptr);
}
2019-05-08 23:09:51 +02:00
void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket)
2019-05-08 21:02:56 +02:00
{
2019-05-08 23:23:16 +02:00
if (_websocket == websocket) return;
// unbind old
if (_websocket)
2019-05-08 21:02:56 +02:00
{
2019-05-11 23:26:52 +02:00
// set dummy callback just to avoid crash
_websocket->setOnMessageCallback([](
2019-05-12 19:05:28 +02:00
WebSocketMessageType,
2019-05-12 19:16:02 +02:00
const std::string&,
2019-05-12 19:05:28 +02:00
size_t,
2019-05-12 19:16:02 +02:00
const WebSocketErrorInfo&,
const WebSocketOpenInfo&,
const WebSocketCloseInfo&)
2019-05-11 23:26:52 +02:00
{});
2019-05-08 23:23:16 +02:00
}
2019-05-08 21:02:56 +02:00
2019-05-08 23:23:16 +02:00
_websocket = websocket;
2019-05-08 21:02:56 +02:00
2019-05-08 23:23:16 +02:00
// bind new
if (_websocket)
{
_websocket->setOnMessageCallback([this](
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
2019-05-08 21:02:56 +02:00
{
2019-05-09 00:05:47 +02:00
MessagePtr message(new Message());
2019-05-08 23:23:16 +02:00
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
2019-05-08 21:02:56 +02:00
{
2019-05-08 23:23:16 +02:00
std::lock_guard<std::mutex> lock(_messagesMutex);
_messages.emplace_back(std::move(message));
}
});
2019-05-08 21:02:56 +02:00
}
}
2019-05-08 23:09:51 +02:00
void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback)
2019-05-08 21:02:56 +02:00
{
_onMessageUserCallback = callback;
}
2019-05-12 19:59:18 +02:00
void WebSocketMessageQueue::setOnMessageCallback(OnMessageCallback&& callback)
{
_onMessageUserCallback = std::move(callback);
}
2019-05-08 21:02:56 +02:00
2019-05-09 00:05:47 +02:00
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
2019-05-08 21:02:56 +02:00
{
2019-05-09 00:05:47 +02:00
MessagePtr message;
2019-05-08 23:20:26 +02:00
std::lock_guard<std::mutex> lock(_messagesMutex);
2019-05-08 21:02:56 +02:00
if (!_messages.empty())
{
message = std::move(_messages.front());
_messages.pop_front();
}
return message;
}
2019-05-08 23:09:51 +02:00
void WebSocketMessageQueue::poll(int count)
2019-05-08 21:02:56 +02:00
{
if (!_onMessageUserCallback)
return;
2019-05-09 00:05:47 +02:00
MessagePtr message;
2019-05-08 21:02:56 +02:00
while (count > 0 && (message = popMessage()))
{
_onMessageUserCallback(
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count;
}
}
}