Compare commits
60 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83c261977d | ||
|
|
6ca28d96bf | ||
|
|
c4a5647b62 | ||
|
|
720d5593a5 | ||
|
|
13fa325134 | ||
|
|
773cbb4907 | ||
|
|
a696264b48 | ||
|
|
b7db5f77fb | ||
|
|
b11678e636 | ||
|
|
f746070944 | ||
|
|
3323a51ab5 | ||
|
|
0e59927384 | ||
|
|
5c4840f129 | ||
|
|
9ac02323ad | ||
|
|
cdbed26d1f | ||
|
|
23f171f34d | ||
|
|
20b625e483 | ||
|
|
f1604c6460 | ||
|
|
ba0e007c05 | ||
|
|
643e1bf20f | ||
|
|
24a32a0603 | ||
|
|
c5caf32b77 | ||
|
|
09956d7500 | ||
|
|
d91c896e46 | ||
|
|
042e6a22b8 | ||
|
|
14ec12d1f0 | ||
|
|
288b05a048 | ||
|
|
5af3096070 | ||
|
|
570fa01c04 | ||
|
|
2a69038c4c | ||
|
|
0ba127e447 | ||
|
|
7714bdf7e0 | ||
|
|
4e5e7ae50a | ||
|
|
5741b2f6c1 | ||
|
|
76172f92e9 | ||
|
|
f8b547c028 | ||
|
|
7ccd9e1709 | ||
|
|
9217b27d40 | ||
|
|
819e9025b1 | ||
|
|
53ceab9f91 | ||
|
|
a7ed4fe5c3 | ||
|
|
3190cd322d | ||
|
|
dad2b64e15 | ||
|
|
e527ab1613 | ||
|
|
d7a0bc212d | ||
|
|
a41d08343c | ||
|
|
6467f98241 | ||
|
|
b24e4334f6 | ||
|
|
bf8abcbf4a | ||
|
|
bb484414b1 | ||
|
|
fc75b13fae | ||
|
|
78f59b4207 | ||
|
|
7c5567db56 | ||
|
|
7ecaf1f982 | ||
|
|
d0a41f3894 | ||
|
|
57562b234f | ||
|
|
469d127d61 | ||
|
|
d6e9b61c8e | ||
|
|
7fb1b65ddd | ||
|
|
77c7fdc636 |
@@ -42,6 +42,7 @@ set( IXWEBSOCKET_SOURCES
|
||||
ixwebsocket/IXSelectInterruptFactory.cpp
|
||||
ixwebsocket/IXConnectionState.cpp
|
||||
ixwebsocket/IXWebSocketCloseConstants.cpp
|
||||
ixwebsocket/IXWebSocketMessageQueue.cpp
|
||||
)
|
||||
|
||||
set( IXWEBSOCKET_HEADERS
|
||||
@@ -72,6 +73,7 @@ set( IXWEBSOCKET_HEADERS
|
||||
ixwebsocket/IXSelectInterruptFactory.h
|
||||
ixwebsocket/IXConnectionState.h
|
||||
ixwebsocket/IXWebSocketCloseConstants.h
|
||||
ixwebsocket/IXWebSocketMessageQueue.h
|
||||
)
|
||||
|
||||
if (UNIX)
|
||||
|
||||
@@ -242,7 +242,7 @@ namespace ix
|
||||
}
|
||||
|
||||
char output[29] = {};
|
||||
WebSocketHandshakeKeyGen::generate(secWebSocketKey.c_str(), output);
|
||||
WebSocketHandshakeKeyGen::generate(secWebSocketKey, output);
|
||||
if (std::string(output) != headers["sec-websocket-accept"])
|
||||
{
|
||||
std::string errorMsg("Invalid Sec-WebSocket-Accept value");
|
||||
@@ -348,7 +348,7 @@ namespace ix
|
||||
}
|
||||
|
||||
char output[29] = {};
|
||||
WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"].c_str(), output);
|
||||
WebSocketHandshakeKeyGen::generate(headers["sec-websocket-key"], output);
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "HTTP/1.1 101 Switching Protocols\r\n";
|
||||
|
||||
121
ixwebsocket/IXWebSocketMessageQueue.cpp
Normal file
121
ixwebsocket/IXWebSocketMessageQueue.cpp
Normal file
@@ -0,0 +1,121 @@
|
||||
/*
|
||||
* IXWebSocketMessageQueue.cpp
|
||||
* Author: Korchynskyi Dmytro
|
||||
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXWebSocketMessageQueue.h"
|
||||
|
||||
namespace ix
|
||||
{
|
||||
|
||||
WebSocketMessageQueue::WebSocketMessageQueue(WebSocket* websocket)
|
||||
{
|
||||
bindWebsocket(websocket);
|
||||
}
|
||||
|
||||
WebSocketMessageQueue::~WebSocketMessageQueue()
|
||||
{
|
||||
if (!_messages.empty())
|
||||
{
|
||||
// not handled all messages
|
||||
}
|
||||
|
||||
bindWebsocket(nullptr);
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::bindWebsocket(WebSocket * websocket)
|
||||
{
|
||||
if (_websocket == websocket) return;
|
||||
|
||||
// unbind old
|
||||
if (_websocket)
|
||||
{
|
||||
// set dummy callback just to avoid crash
|
||||
_websocket->setOnMessageCallback([](
|
||||
WebSocketMessageType,
|
||||
const std::string&,
|
||||
size_t,
|
||||
const WebSocketErrorInfo&,
|
||||
const WebSocketOpenInfo&,
|
||||
const WebSocketCloseInfo&)
|
||||
{});
|
||||
}
|
||||
|
||||
_websocket = websocket;
|
||||
|
||||
// bind new
|
||||
if (_websocket)
|
||||
{
|
||||
_websocket->setOnMessageCallback([this](
|
||||
WebSocketMessageType type,
|
||||
const std::string& str,
|
||||
size_t wireSize,
|
||||
const WebSocketErrorInfo& errorInfo,
|
||||
const WebSocketOpenInfo& openInfo,
|
||||
const WebSocketCloseInfo& closeInfo)
|
||||
{
|
||||
MessagePtr message(new Message());
|
||||
|
||||
message->type = type;
|
||||
message->str = str;
|
||||
message->wireSize = wireSize;
|
||||
message->errorInfo = errorInfo;
|
||||
message->openInfo = openInfo;
|
||||
message->closeInfo = closeInfo;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_messagesMutex);
|
||||
_messages.emplace_back(std::move(message));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::setOnMessageCallback(const OnMessageCallback& callback)
|
||||
{
|
||||
_onMessageUserCallback = callback;
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::setOnMessageCallback(OnMessageCallback&& callback)
|
||||
{
|
||||
_onMessageUserCallback = std::move(callback);
|
||||
}
|
||||
|
||||
WebSocketMessageQueue::MessagePtr WebSocketMessageQueue::popMessage()
|
||||
{
|
||||
MessagePtr message;
|
||||
std::lock_guard<std::mutex> lock(_messagesMutex);
|
||||
|
||||
if (!_messages.empty())
|
||||
{
|
||||
message = std::move(_messages.front());
|
||||
_messages.pop_front();
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
void WebSocketMessageQueue::poll(int count)
|
||||
{
|
||||
if (!_onMessageUserCallback)
|
||||
return;
|
||||
|
||||
MessagePtr message;
|
||||
|
||||
while (count > 0 && (message = popMessage()))
|
||||
{
|
||||
_onMessageUserCallback(
|
||||
message->type,
|
||||
message->str,
|
||||
message->wireSize,
|
||||
message->errorInfo,
|
||||
message->openInfo,
|
||||
message->closeInfo
|
||||
);
|
||||
|
||||
--count;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
53
ixwebsocket/IXWebSocketMessageQueue.h
Normal file
53
ixwebsocket/IXWebSocketMessageQueue.h
Normal file
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* IXWebSocketMessageQueue.h
|
||||
* Author: Korchynskyi Dmytro
|
||||
* Copyright (c) 2017-2019 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "IXWebSocket.h"
|
||||
#include <thread>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
//
|
||||
// A helper class to dispatch websocket message callbacks in your thread.
|
||||
//
|
||||
class WebSocketMessageQueue
|
||||
{
|
||||
public:
|
||||
WebSocketMessageQueue(WebSocket* websocket = nullptr);
|
||||
~WebSocketMessageQueue();
|
||||
|
||||
void bindWebsocket(WebSocket* websocket);
|
||||
|
||||
void setOnMessageCallback(const OnMessageCallback& callback);
|
||||
void setOnMessageCallback(OnMessageCallback&& callback);
|
||||
|
||||
void poll(int count = 512);
|
||||
|
||||
protected:
|
||||
struct Message
|
||||
{
|
||||
WebSocketMessageType type;
|
||||
std::string str;
|
||||
size_t wireSize;
|
||||
WebSocketErrorInfo errorInfo;
|
||||
WebSocketOpenInfo openInfo;
|
||||
WebSocketCloseInfo closeInfo;
|
||||
};
|
||||
|
||||
using MessagePtr = std::shared_ptr<Message>;
|
||||
|
||||
MessagePtr popMessage();
|
||||
|
||||
private:
|
||||
WebSocket* _websocket = nullptr;
|
||||
OnMessageCallback _onMessageUserCallback;
|
||||
std::mutex _messagesMutex;
|
||||
std::list<MessagePtr> _messages;
|
||||
};
|
||||
}
|
||||
@@ -20,6 +20,8 @@
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstddef>
|
||||
#include <string>
|
||||
#include <string.h>
|
||||
|
||||
class WebSocketHandshakeKeyGen {
|
||||
template <int N, typename T>
|
||||
@@ -100,7 +102,12 @@ class WebSocketHandshakeKeyGen {
|
||||
}
|
||||
|
||||
public:
|
||||
static inline void generate(const char input[24], char output[28]) {
|
||||
static inline void generate(const std::string& inputStr, char output[28]) {
|
||||
|
||||
char input[25] = {};
|
||||
strncpy(input, inputStr.c_str(), 25 - 1);
|
||||
input[25 - 1] = '\0';
|
||||
|
||||
uint32_t b_output[5] = {
|
||||
0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0
|
||||
};
|
||||
|
||||
@@ -51,6 +51,7 @@ endif()
|
||||
# IXWebSocketPingTest.cpp
|
||||
# IXWebSocketPingTimeoutTest.cpp
|
||||
# IXWebSocketCloseTest.cpp
|
||||
# IXWebSocketMessageQTest.cpp (trigger a segfault on Linux)
|
||||
|
||||
add_executable(ixwebsocket_unittest ${SOURCES})
|
||||
|
||||
|
||||
191
test/IXWebSocketMessageQTest.cpp
Normal file
191
test/IXWebSocketMessageQTest.cpp
Normal file
@@ -0,0 +1,191 @@
|
||||
/*
|
||||
* IXWebSocketMessageQTest.cpp
|
||||
* Author: Korchynskyi Dmytro
|
||||
* Copyright (c) 2019 Machine Zone. All rights reserved.
|
||||
*/
|
||||
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
#include <ixwebsocket/IXWebSocketServer.h>
|
||||
#include <ixwebsocket/IXWebSocketMessageQueue.h>
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <thread>
|
||||
|
||||
using namespace ix;
|
||||
|
||||
namespace
|
||||
{
|
||||
bool startServer(ix::WebSocketServer& server)
|
||||
{
|
||||
server.setOnConnectionCallback(
|
||||
[&server](std::shared_ptr<ix::WebSocket> webSocket,
|
||||
std::shared_ptr<ConnectionState> connectionState)
|
||||
{
|
||||
webSocket->setOnMessageCallback(
|
||||
[connectionState, &server](ix::WebSocketMessageType messageType,
|
||||
const std::string & str,
|
||||
size_t wireSize,
|
||||
const ix::WebSocketErrorInfo & error,
|
||||
const ix::WebSocketOpenInfo & openInfo,
|
||||
const ix::WebSocketCloseInfo & closeInfo)
|
||||
{
|
||||
if (messageType == ix::WebSocketMessageType::Open)
|
||||
{
|
||||
Logger() << "New connection";
|
||||
connectionState->computeId();
|
||||
Logger() << "id: " << connectionState->getId();
|
||||
Logger() << "Uri: " << openInfo.uri;
|
||||
Logger() << "Headers:";
|
||||
for (auto it : openInfo.headers)
|
||||
{
|
||||
Logger() << it.first << ": " << it.second;
|
||||
}
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Close)
|
||||
{
|
||||
Logger() << "Closed connection";
|
||||
}
|
||||
else if (messageType == ix::WebSocketMessageType::Message)
|
||||
{
|
||||
Logger() << "Message received: " << str;
|
||||
|
||||
for (auto&& client : server.getClients())
|
||||
{
|
||||
client->send(str);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
auto res = server.listen();
|
||||
if (!res.first)
|
||||
{
|
||||
Logger() << res.second;
|
||||
return false;
|
||||
}
|
||||
|
||||
server.start();
|
||||
return true;
|
||||
}
|
||||
|
||||
class MsgQTestClient
|
||||
{
|
||||
public:
|
||||
MsgQTestClient()
|
||||
{
|
||||
msgQ.bindWebsocket(&ws);
|
||||
|
||||
msgQ.setOnMessageCallback([this](WebSocketMessageType messageType,
|
||||
const std::string & str,
|
||||
size_t wireSize,
|
||||
const WebSocketErrorInfo & error,
|
||||
const WebSocketOpenInfo & openInfo,
|
||||
const WebSocketCloseInfo & closeInfo)
|
||||
{
|
||||
REQUIRE(mainThreadId == std::this_thread::get_id());
|
||||
|
||||
std::stringstream ss;
|
||||
if (messageType == WebSocketMessageType::Open)
|
||||
{
|
||||
log("client connected");
|
||||
sendNextMessage();
|
||||
}
|
||||
else if (messageType == WebSocketMessageType::Close)
|
||||
{
|
||||
log("client disconnected");
|
||||
}
|
||||
else if (messageType == WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "Error ! " << error.reason;
|
||||
log(ss.str());
|
||||
testDone = true;
|
||||
}
|
||||
else if (messageType == WebSocketMessageType::Pong)
|
||||
{
|
||||
ss << "Received pong message " << str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (messageType == WebSocketMessageType::Ping)
|
||||
{
|
||||
ss << "Received ping message " << str;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (messageType == WebSocketMessageType::Message)
|
||||
{
|
||||
REQUIRE(str.compare("Hey dude!") == 0);
|
||||
++receivedCount;
|
||||
ss << "Received message " << str;
|
||||
log(ss.str());
|
||||
sendNextMessage();
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "Invalid WebSocketMessageType";
|
||||
log(ss.str());
|
||||
testDone = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void sendNextMessage()
|
||||
{
|
||||
if (receivedCount >= 3)
|
||||
{
|
||||
testDone = true;
|
||||
succeeded = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto info = ws.sendText("Hey dude!");
|
||||
if (info.success)
|
||||
log("sent message");
|
||||
else
|
||||
log("send failed");
|
||||
}
|
||||
}
|
||||
|
||||
void run(const std::string& url)
|
||||
{
|
||||
mainThreadId = std::this_thread::get_id();
|
||||
testDone = false;
|
||||
receivedCount = 0;
|
||||
|
||||
ws.setUrl(url);
|
||||
ws.start();
|
||||
|
||||
while (!testDone)
|
||||
{
|
||||
msgQ.poll();
|
||||
msleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
bool isSucceeded() const { return succeeded; }
|
||||
|
||||
private:
|
||||
WebSocket ws;
|
||||
WebSocketMessageQueue msgQ;
|
||||
bool testDone = false;
|
||||
uint32_t receivedCount = 0;
|
||||
std::thread::id mainThreadId;
|
||||
bool succeeded = false;
|
||||
};
|
||||
}
|
||||
|
||||
TEST_CASE("Websocket_message_queue", "[websocket_message_q]")
|
||||
{
|
||||
SECTION("Send several messages")
|
||||
{
|
||||
int port = getFreePort();
|
||||
WebSocketServer server(port);
|
||||
REQUIRE(startServer(server));
|
||||
|
||||
MsgQTestClient testClient;
|
||||
testClient.run("ws://127.0.0.1:" + std::to_string(port));
|
||||
REQUIRE(testClient.isSucceeded());
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user