Added IXWebSocketPoll class

This commit is contained in:
Dimon4eg 2019-05-08 22:02:56 +03:00
parent 2732dfd0f1
commit 77c7fdc636
4 changed files with 167 additions and 0 deletions

View File

@ -28,6 +28,7 @@ set( IXWEBSOCKET_SOURCES
ixwebsocket/IXCancellationRequest.cpp
ixwebsocket/IXNetSystem.cpp
ixwebsocket/IXWebSocket.cpp
ixwebsocket/IXWebSocketPoll.cpp
ixwebsocket/IXWebSocketServer.cpp
ixwebsocket/IXWebSocketTransport.cpp
ixwebsocket/IXWebSocketHandshake.cpp
@ -54,6 +55,7 @@ set( IXWEBSOCKET_HEADERS
ixwebsocket/IXNetSystem.h
ixwebsocket/IXProgressCallback.h
ixwebsocket/IXWebSocket.h
ixwebsocket/IXWebSocketPoll.h
ixwebsocket/IXWebSocketServer.h
ixwebsocket/IXWebSocketTransport.h
ixwebsocket/IXWebSocketHandshake.h

View File

@ -113,6 +113,11 @@ namespace ix
WebSocketSendInfo ping(const std::string& text);
void close();
/**
* Set callback to receive websocket messages.
*
* Be aware: your callback will be executed from websocket's internal thread!
*/
void setOnMessageCallback(const OnMessageCallback& callback);
static void setTrafficTrackerCallback(const OnTrafficTrackerCallback& callback);
static void resetTrafficTrackerCallback();

View File

@ -0,0 +1,108 @@
/*
* IXWebSocketPoll.cpp
* Author: Korchynskyi Dmytro
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include "IXWebSocketPoll.h"
namespace ix
{
WebSocketPoll::WebSocketPoll(WebSocket* websocket)
{
}
WebSocketPoll::~WebSocketPoll()
{
if (!_messages.empty())
{
}
bindWebsocket(nullptr);
}
void WebSocketPoll::bindWebsocket(WebSocket * websocket)
{
if (_websocket != websocket)
{
// unbind old
if (_websocket)
{
_websocket->setOnMessageCallback(nullptr);
}
_websocket = websocket;
// bind new
if (_websocket)
{
_websocket->setOnMessageCallback([this](
WebSocketMessageType type,
const std::string& str,
size_t wireSize,
const WebSocketErrorInfo& errorInfo,
const WebSocketOpenInfo& openInfo,
const WebSocketCloseInfo& closeInfo)
{
MessageDataPtr message(new Message());
message->type = type;
message->str = str;
message->wireSize = wireSize;
message->errorInfo = errorInfo;
message->openInfo = openInfo;
message->closeInfo = closeInfo;
_messagesMutex.lock();
_messages.emplace_back(std::move(message));
_messagesMutex.unlock();
});
}
}
}
void WebSocketPoll::setOnMessageCallback(const OnMessageCallback& callback)
{
_onMessageUserCallback = callback;
}
WebSocketPoll::MessageDataPtr WebSocketPoll::popMessage()
{
MessageDataPtr message;
_messagesMutex.lock();
if (!_messages.empty())
{
message = std::move(_messages.front());
_messages.pop_front();
}
_messagesMutex.unlock();
return message;
}
void WebSocketPoll::poll(int count)
{
if (!_onMessageUserCallback)
return;
MessageDataPtr message;
while (count > 0 && (message = popMessage()))
{
_onMessageUserCallback(
message->type,
message->str,
message->wireSize,
message->errorInfo,
message->openInfo,
message->closeInfo
);
--count;
}
}
}

View File

@ -0,0 +1,52 @@
/*
* IXWebSocketPoll.h
* Author: Korchynskyi Dmytro
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#pragma once
#include "IXWebSocket.h"
#include <thread>
#include <list>
#include <memory>
namespace ix
{
/**
* A helper class to dispatch websocket message callbacks in your thread.
*/
class WebSocketPoll
{
public:
WebSocketPoll(WebSocket* websocket = nullptr);
~WebSocketPoll();
void bindWebsocket(WebSocket* websocket);
void setOnMessageCallback(const OnMessageCallback& callback);
void poll(int count = 512);
protected:
struct Message
{
WebSocketMessageType type;
std::string str;
size_t wireSize;
WebSocketErrorInfo errorInfo;
WebSocketOpenInfo openInfo;
WebSocketCloseInfo closeInfo;
};
using MessageDataPtr = std::shared_ptr<Message>;
MessageDataPtr popMessage();
private:
WebSocket* _websocket = nullptr;
OnMessageCallback _onMessageUserCallback;
std::mutex _messagesMutex;
std::list<MessageDataPtr> _messages;
};
}