(websocket) WebSocketMessagePtr is a unique_ptr instead of a shared_ptr
This commit is contained in:
parent
37a054723a
commit
432f0570f4
@ -44,7 +44,6 @@ set( IXWEBSOCKET_SOURCES
|
||||
ixwebsocket/IXWebSocketCloseConstants.cpp
|
||||
ixwebsocket/IXWebSocketHandshake.cpp
|
||||
ixwebsocket/IXWebSocketHttpHeaders.cpp
|
||||
ixwebsocket/IXWebSocketMessageQueue.cpp
|
||||
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
|
||||
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
|
||||
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
|
||||
@ -84,7 +83,6 @@ set( IXWEBSOCKET_HEADERS
|
||||
ixwebsocket/IXWebSocketHttpHeaders.h
|
||||
ixwebsocket/IXWebSocketInitResult.h
|
||||
ixwebsocket/IXWebSocketMessage.h
|
||||
ixwebsocket/IXWebSocketMessageQueue.h
|
||||
ixwebsocket/IXWebSocketMessageType.h
|
||||
ixwebsocket/IXWebSocketOpenInfo.h
|
||||
ixwebsocket/IXWebSocketPerMessageDeflate.h
|
||||
|
@ -1,6 +1,10 @@
|
||||
# Changelog
|
||||
All changes to this project will be documented in this file.
|
||||
|
||||
## [9.2.5] - 2020-04-13
|
||||
|
||||
(websocket) WebSocketMessagePtr is a unique_ptr instead of a shared_ptr
|
||||
|
||||
## [9.2.4] - 2020-04-13
|
||||
|
||||
(websocket) use persistent member variable as temp variables to encode/decode zlib messages in order to reduce transient allocations
|
||||
|
@ -34,7 +34,7 @@ namespace ix
|
||||
_ws.setOnCloseCallback(
|
||||
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) {
|
||||
_onMessageCallback(
|
||||
std::make_shared<WebSocketMessage>(WebSocketMessageType::Close,
|
||||
std::make_unique<WebSocketMessage>(WebSocketMessageType::Close,
|
||||
"",
|
||||
wireSize,
|
||||
WebSocketErrorInfo(),
|
||||
@ -193,7 +193,7 @@ namespace ix
|
||||
return status;
|
||||
}
|
||||
|
||||
_onMessageCallback(std::make_shared<WebSocketMessage>(
|
||||
_onMessageCallback(std::make_unique<WebSocketMessage>(
|
||||
WebSocketMessageType::Open,
|
||||
"",
|
||||
0,
|
||||
@ -225,7 +225,7 @@ namespace ix
|
||||
}
|
||||
|
||||
_onMessageCallback(
|
||||
std::make_shared<WebSocketMessage>(WebSocketMessageType::Open,
|
||||
std::make_unique<WebSocketMessage>(WebSocketMessageType::Open,
|
||||
"",
|
||||
0,
|
||||
WebSocketErrorInfo(),
|
||||
@ -310,7 +310,7 @@ namespace ix
|
||||
connectErr.reason = status.errorStr;
|
||||
connectErr.http_status = status.http_status;
|
||||
|
||||
_onMessageCallback(std::make_shared<WebSocketMessage>(WebSocketMessageType::Error,
|
||||
_onMessageCallback(std::make_unique<WebSocketMessage>(WebSocketMessageType::Error,
|
||||
"",
|
||||
0,
|
||||
connectErr,
|
||||
@ -386,7 +386,7 @@ namespace ix
|
||||
|
||||
bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
|
||||
|
||||
_onMessageCallback(std::make_shared<WebSocketMessage>(webSocketMessageType,
|
||||
_onMessageCallback(std::make_unique<WebSocketMessage>(webSocketMessageType,
|
||||
msg,
|
||||
wireSize,
|
||||
webSocketErrorInfo,
|
||||
|
@ -45,5 +45,5 @@ namespace ix
|
||||
}
|
||||
};
|
||||
|
||||
using WebSocketMessagePtr = std::shared_ptr<WebSocketMessage>;
|
||||
using WebSocketMessagePtr = std::unique_ptr<WebSocketMessage>;
|
||||
} // namespace ix
|
||||
|
@ -1,86 +0,0 @@
|
||||
/*
|
||||
* IXWebSocketMessageQueue.cpp
|
||||
* Author: Korchynskyi Dmytro
|
||||
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXWebSocketMessageQueue.h"
|
||||
|
||||
namespace ix
|
||||
{
|
||||
WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket)
|
||||
{
|
||||
bindWebsocket(websocket);
|
||||
}
|
||||
|
||||
WebSocketMessageQueue::~WebSocketMessageQueue()
|
||||
{
|
||||
if (!_messages.empty())
|
||||
{
|
||||
// not handled all messages
|
||||
}
|
||||
|
||||
bindWebsocket(nullptr);
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::bindWebsocket(WebSocket* websocket)
|
||||
{
|
||||
if (_websocket == websocket) return;
|
||||
|
||||
// unbind old
|
||||
if (_websocket)
|
||||
{
|
||||
// set dummy callback just to avoid crash
|
||||
_websocket->setOnMessageCallback([](const WebSocketMessagePtr&) {});
|
||||
}
|
||||
|
||||
_websocket = websocket;
|
||||
|
||||
// bind new
|
||||
if (_websocket)
|
||||
{
|
||||
_websocket->setOnMessageCallback([this](const WebSocketMessagePtr& msg) {
|
||||
std::lock_guard<std::mutex> lock(_messagesMutex);
|
||||
_messages.emplace_back(std::move(msg));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback)
|
||||
{
|
||||
_onMessageUserCallback = callback;
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::setOnMessageCallback(OnMessageCallback&& callback)
|
||||
{
|
||||
_onMessageUserCallback = std::move(callback);
|
||||
}
|
||||
|
||||
WebSocketMessagePtr WebSocketMessageQueue::popMessage()
|
||||
{
|
||||
WebSocketMessagePtr message;
|
||||
std::lock_guard<std::mutex> lock(_messagesMutex);
|
||||
|
||||
if (!_messages.empty())
|
||||
{
|
||||
message = std::move(_messages.front());
|
||||
_messages.pop_front();
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::poll(int count)
|
||||
{
|
||||
if (!_onMessageUserCallback) return;
|
||||
|
||||
WebSocketMessagePtr message;
|
||||
|
||||
while (count > 0 && (message = popMessage()))
|
||||
{
|
||||
_onMessageUserCallback(message);
|
||||
--count;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ix
|
@ -1,41 +0,0 @@
|
||||
/*
|
||||
* IXWebSocketMessageQueue.h
|
||||
* Author: Korchynskyi Dmytro
|
||||
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "IXWebSocket.h"
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
//
|
||||
// A helper class to dispatch websocket message callbacks in your thread.
|
||||
//
|
||||
class WebSocketMessageQueue
|
||||
{
|
||||
public:
|
||||
WebSocketMessageQueue(WebSocket* websocket = nullptr);
|
||||
~WebSocketMessageQueue();
|
||||
|
||||
void bindWebsocket(WebSocket* websocket);
|
||||
|
||||
void setOnMessageCallback(const OnMessageCallback& callback);
|
||||
void setOnMessageCallback(OnMessageCallback&& callback);
|
||||
|
||||
void poll(int count = 512);
|
||||
|
||||
protected:
|
||||
WebSocketMessagePtr popMessage();
|
||||
|
||||
private:
|
||||
WebSocket* _websocket = nullptr;
|
||||
OnMessageCallback _onMessageUserCallback;
|
||||
std::mutex _messagesMutex;
|
||||
std::list<WebSocketMessagePtr> _messages;
|
||||
};
|
||||
} // namespace ix
|
@ -6,4 +6,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define IX_WEBSOCKET_VERSION "9.2.4"
|
||||
#define IX_WEBSOCKET_VERSION "9.2.5"
|
||||
|
@ -1,178 +0,0 @@
|
||||
/*
|
||||
* IXWebSocketMessageQTest.cpp
|
||||
* Author: Korchynskyi Dmytro
|
||||
* Copyright (c) 2019 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <ixwebsocket/IXWebSocketMessageQueue.h>
|
||||
#include <ixwebsocket/IXWebSocketServer.h>
|
||||
#include <thread>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
bool startServer(ix::WebSocketServer& server)
|
||||
{
|
||||
server.setOnConnectionCallback([&server](std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState) {
|
||||
webSocket->setOnMessageCallback(
|
||||
[connectionState, &server](const WebSocketMessagePtr& msg) {
|
||||
if (msg->type == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
TLogger() << "New connection";
|
||||
connectionState->computeId();
|
||||
TLogger() << "id: " << connectionState->getId();
|
||||
TLogger() << "Uri: " << msg->openInfo.uri;
|
||||
TLogger() << "Headers:";
|
||||
for (auto&& it : msg->openInfo.headers)
|
||||
{
|
||||
TLogger() << it.first << ": " << it.second;
|
||||
}
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
TLogger() << "Closed connection";
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
TLogger() << "Message received: " << msg->str;
|
||||
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
client->send(msg->str);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
{
|
||||
TLogger() << res.second;
|
||||
return false;
|
||||
}
|
||||
|
||||
server.start();
|
||||
return true;
|
||||
}
|
||||
|
||||
class MsgQTestClient
|
||||
{
|
||||
public:
|
||||
MsgQTestClient()
|
||||
{
|
||||
msgQ.bindWebsocket(&ws);
|
||||
|
||||
msgQ.setOnMessageCallback([this](const WebSocketMessagePtr& msg) {
|
||||
REQUIRE(mainThreadId == std::this_thread::get_id());
|
||||
|
||||
std::stringstream ss;
|
||||
if (msg->type == WebSocketMessageType::Open)
|
||||
{
|
||||
log("client connected");
|
||||
sendNextMessage();
|
||||
}
|
||||
else if (msg->type == WebSocketMessageType::Close)
|
||||
{
|
||||
log("client disconnected");
|
||||
}
|
||||
else if (msg->type == WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "Error ! " << msg->errorInfo.reason;
|
||||
log(ss.str());
|
||||
testDone = true;
|
||||
}
|
||||
else if (msg->type == WebSocketMessageType::Pong)
|
||||
{
|
||||
ss << "Received pong message " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == WebSocketMessageType::Ping)
|
||||
{
|
||||
ss << "Received ping message " << msg->str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == WebSocketMessageType::Message)
|
||||
{
|
||||
REQUIRE(msg->str.compare("Hey dude!") == 0);
|
||||
++receivedCount;
|
||||
ss << "Received message " << msg->str;
|
||||
log(ss.str());
|
||||
sendNextMessage();
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "Invalid WebSocketMessageType";
|
||||
log(ss.str());
|
||||
testDone = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void sendNextMessage()
|
||||
{
|
||||
if (receivedCount >= 3)
|
||||
{
|
||||
testDone = true;
|
||||
succeeded = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto info = ws.sendText("Hey dude!");
|
||||
if (info.success)
|
||||
log("sent message");
|
||||
else
|
||||
log("send failed");
|
||||
}
|
||||
}
|
||||
|
||||
void run(const std::string& url)
|
||||
{
|
||||
mainThreadId = std::this_thread::get_id();
|
||||
testDone = false;
|
||||
receivedCount = 0;
|
||||
|
||||
ws.setUrl(url);
|
||||
ws.start();
|
||||
|
||||
while (!testDone)
|
||||
{
|
||||
msgQ.poll();
|
||||
msleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
bool isSucceeded() const
|
||||
{
|
||||
return succeeded;
|
||||
}
|
||||
|
||||
private:
|
||||
WebSocket ws;
|
||||
WebSocketMessageQueue msgQ;
|
||||
bool testDone = false;
|
||||
uint32_t receivedCount = 0;
|
||||
std::thread::id mainThreadId;
|
||||
bool succeeded = false;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
|
||||
{
|
||||
SECTION("Send several messages")
|
||||
{
|
||||
int port = getFreePort();
|
||||
WebSocketServer server(port);
|
||||
REQUIRE(startServer(server));
|
||||
|
||||
MsgQTestClient testClient;
|
||||
testClient.run("ws://127.0.0.1:" + std::to_string(port));
|
||||
REQUIRE(testClient.isSucceeded());
|
||||
|
||||
server.stop();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user