unittest for sending large messages

This commit is contained in:
Benjamin Sergeant 2019-02-16 10:31:55 -08:00
parent 474fd70ec7
commit 726e66ca66
3 changed files with 56 additions and 12 deletions

View File

@ -42,13 +42,13 @@ namespace ix
void SocketServer::logError(const std::string& str) void SocketServer::logError(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cerr << str << std::endl; fprintf(stderr, "%s\n", str.c_str());
} }
void SocketServer::logInfo(const std::string& str) void SocketServer::logInfo(const std::string& str)
{ {
std::lock_guard<std::mutex> lock(_logMutex); std::lock_guard<std::mutex> lock(_logMutex);
std::cout << str << std::endl; fprintf(stderr, "%s\n", str.c_str());
} }
std::pair<bool, std::string> SocketServer::listen() std::pair<bool, std::string> SocketServer::listen()

View File

@ -24,7 +24,7 @@ test_server:
(cd test && npm i ws && node broadcast-server.js) (cd test && npm i ws && node broadcast-server.js)
# env TEST=Websocket_server make test # env TEST=Websocket_server make test
# env TEST=websocket_server make test # env TEST=Websocket_chat make test
# env TEST=heartbeat make test # env TEST=heartbeat make test
test: test:
python test/run.py python test/run.py

View File

@ -11,7 +11,8 @@
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <queue> #include <vector>
#include <mutex>
#include <ixwebsocket/IXWebSocket.h> #include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXWebSocketServer.h> #include <ixwebsocket/IXWebSocketServer.h>
#include "msgpack11.hpp" #include "msgpack11.hpp"
@ -39,9 +40,11 @@ namespace
void sendMessage(const std::string& text); void sendMessage(const std::string& text);
size_t getReceivedMessagesCount() const; size_t getReceivedMessagesCount() const;
const std::vector<std::string>& getReceivedMessages() const;
std::string encodeMessage(const std::string& text); std::string encodeMessage(const std::string& text);
std::pair<std::string, std::string> decodeMessage(const std::string& str); std::pair<std::string, std::string> decodeMessage(const std::string& str);
void appendMessage(const std::string& message);
private: private:
std::string _user; std::string _user;
@ -50,7 +53,8 @@ namespace
ix::WebSocket _webSocket; ix::WebSocket _webSocket;
std::queue<std::string> _receivedQueue; std::vector<std::string> _receivedMessages;
mutable std::mutex _mutex;
}; };
WebSocketChat::WebSocketChat(const std::string& user, WebSocketChat::WebSocketChat(const std::string& user,
@ -65,7 +69,20 @@ namespace
size_t WebSocketChat::getReceivedMessagesCount() const size_t WebSocketChat::getReceivedMessagesCount() const
{ {
return _receivedQueue.size(); std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages.size();
}
const std::vector<std::string>& WebSocketChat::getReceivedMessages() const
{
std::lock_guard<std::mutex> lock(_mutex);
return _receivedMessages;
}
void WebSocketChat::appendMessage(const std::string& message)
{
std::lock_guard<std::mutex> lock(_mutex);
_receivedMessages.push_back(message);
} }
bool WebSocketChat::isReady() const bool WebSocketChat::isReady() const
@ -85,7 +102,8 @@ namespace
std::stringstream ss; std::stringstream ss;
ss << "ws://localhost:" ss << "ws://localhost:"
<< _port << _port
<< "/"; << "/"
<< _user;
url = ss.str(); url = ss.str();
} }
@ -127,10 +145,16 @@ namespace
// as we do for the satori chat example. // as we do for the satori chat example.
// store text // store text
_receivedQueue.push(result.second); appendMessage(result.second);
std::string payload = result.second;
if (payload.size() > 2000)
{
payload = "<message too large>";
}
ss << std::endl ss << std::endl
<< result.first << " > " << result.second << result.first << " > " << payload
<< std::endl << std::endl
<< _user << " > "; << _user << " > ";
log(ss.str()); log(ss.str());
@ -269,15 +293,35 @@ TEST_CASE("Websocket_chat", "[websocket_chat]")
chatB.sendMessage("from B1"); chatB.sendMessage("from B1");
chatB.sendMessage("from B2"); chatB.sendMessage("from B2");
// Give us 1s for all messages to be received // FIXME: cannot handle large message, we need to break them down
ix::msleep(1000); // into small one at the websocket layer (using CONTINUATION opcode)
size_t size = 512 * 1000; // 512K is OK, larger is not !!
std::string bigMessage(size, 'a');
chatB.sendMessage(bigMessage);
// Wait until all messages are received. 10s timeout
int attempts = 0;
while (chatA.getReceivedMessagesCount() != 3 ||
chatB.getReceivedMessagesCount() != 3)
{
REQUIRE(attempts++ < 10);
ix::msleep(1000);
}
chatA.stop(); chatA.stop();
chatB.stop(); chatB.stop();
REQUIRE(chatA.getReceivedMessagesCount() == 2); REQUIRE(chatA.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessagesCount() == 3); REQUIRE(chatB.getReceivedMessagesCount() == 3);
REQUIRE(chatB.getReceivedMessages()[0] == "from A1");
REQUIRE(chatB.getReceivedMessages()[1] == "from A2");
REQUIRE(chatB.getReceivedMessages()[2] == "from A3");
REQUIRE(chatA.getReceivedMessages()[0] == "from B1");
REQUIRE(chatA.getReceivedMessages()[1] == "from B2");
REQUIRE(chatA.getReceivedMessages()[2].size() == bigMessage.size());
// Give us 500ms for the server to notice that clients went away // Give us 500ms for the server to notice that clients went away
ix::msleep(500); ix::msleep(500);
REQUIRE(server.getClients().size() == 0); REQUIRE(server.getClients().size() == 0);