Compare commits
1 Commits
master
...
feature/we
Author | SHA1 | Date | |
---|---|---|---|
|
cc588f9c05 |
@ -44,6 +44,7 @@ set( IXWEBSOCKET_SOURCES
|
||||
ixwebsocket/IXWebSocketCloseConstants.cpp
|
||||
ixwebsocket/IXWebSocketHandshake.cpp
|
||||
ixwebsocket/IXWebSocketHttpHeaders.cpp
|
||||
ixwebsocket/IXWebSocketMessage.cpp
|
||||
ixwebsocket/IXWebSocketPerMessageDeflate.cpp
|
||||
ixwebsocket/IXWebSocketPerMessageDeflateCodec.cpp
|
||||
ixwebsocket/IXWebSocketPerMessageDeflateOptions.cpp
|
||||
|
@ -30,16 +30,16 @@ namespace ix
|
||||
, _handshakeTimeoutSecs(kDefaultHandShakeTimeoutSecs)
|
||||
, _enablePong(kDefaultEnablePong)
|
||||
, _pingIntervalSecs(kDefaultPingIntervalSecs)
|
||||
, _webSocketMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Message))
|
||||
, _webSocketErrorMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Error))
|
||||
, _webSocketOpenMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Open))
|
||||
, _webSocketCloseMessage(std::make_unique<WebSocketMessage>(WebSocketMessageType::Close))
|
||||
{
|
||||
_ws.setOnCloseCallback(
|
||||
[this](uint16_t code, const std::string& reason, size_t wireSize, bool remote) {
|
||||
_onMessageCallback(
|
||||
std::make_unique<WebSocketMessage>(WebSocketMessageType::Close,
|
||||
"",
|
||||
wireSize,
|
||||
WebSocketErrorInfo(),
|
||||
WebSocketOpenInfo(),
|
||||
WebSocketCloseInfo(code, reason, remote)));
|
||||
_webSocketCloseMessage->wireSize = wireSize;
|
||||
_webSocketCloseMessage->closeInfo = WebSocketCloseInfo(code, reason, remote);
|
||||
_onMessageCallback(_webSocketCloseMessage);
|
||||
});
|
||||
}
|
||||
|
||||
@ -193,13 +193,8 @@ namespace ix
|
||||
return status;
|
||||
}
|
||||
|
||||
_onMessageCallback(std::make_unique<WebSocketMessage>(
|
||||
WebSocketMessageType::Open,
|
||||
"",
|
||||
0,
|
||||
WebSocketErrorInfo(),
|
||||
WebSocketOpenInfo(status.uri, status.headers, status.protocol),
|
||||
WebSocketCloseInfo()));
|
||||
_webSocketOpenMessage->openInfo = WebSocketOpenInfo(status.uri, status.headers, status.protocol),
|
||||
_onMessageCallback(_webSocketOpenMessage);
|
||||
|
||||
if (_pingIntervalSecs > 0)
|
||||
{
|
||||
@ -224,13 +219,8 @@ namespace ix
|
||||
return status;
|
||||
}
|
||||
|
||||
_onMessageCallback(
|
||||
std::make_unique<WebSocketMessage>(WebSocketMessageType::Open,
|
||||
"",
|
||||
0,
|
||||
WebSocketErrorInfo(),
|
||||
WebSocketOpenInfo(status.uri, status.headers),
|
||||
WebSocketCloseInfo()));
|
||||
_webSocketOpenMessage->openInfo = WebSocketOpenInfo(status.uri, status.headers);
|
||||
_onMessageCallback(_webSocketOpenMessage);
|
||||
|
||||
if (_pingIntervalSecs > 0)
|
||||
{
|
||||
@ -310,12 +300,8 @@ namespace ix
|
||||
connectErr.reason = status.errorStr;
|
||||
connectErr.http_status = status.http_status;
|
||||
|
||||
_onMessageCallback(std::make_unique<WebSocketMessage>(WebSocketMessageType::Error,
|
||||
"",
|
||||
0,
|
||||
connectErr,
|
||||
WebSocketOpenInfo(),
|
||||
WebSocketCloseInfo()));
|
||||
_webSocketErrorMessage->errorInfo = connectErr;
|
||||
_onMessageCallback(_webSocketErrorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -381,18 +367,15 @@ namespace ix
|
||||
break;
|
||||
}
|
||||
|
||||
WebSocketErrorInfo webSocketErrorInfo;
|
||||
webSocketErrorInfo.decompressionError = decompressionError;
|
||||
|
||||
bool binary = messageKind == WebSocketTransport::MessageKind::MSG_BINARY;
|
||||
|
||||
_onMessageCallback(std::make_unique<WebSocketMessage>(webSocketMessageType,
|
||||
msg,
|
||||
wireSize,
|
||||
webSocketErrorInfo,
|
||||
WebSocketOpenInfo(),
|
||||
WebSocketCloseInfo(),
|
||||
binary));
|
||||
_webSocketMessage->type = webSocketMessageType;
|
||||
_webSocketMessage->str = msg;
|
||||
_webSocketMessage->wireSize = wireSize;
|
||||
_webSocketMessage->errorInfo.decompressionError = decompressionError;
|
||||
_webSocketMessage->binary = binary;
|
||||
|
||||
_onMessageCallback(_webSocketMessage);
|
||||
|
||||
WebSocket::invokeTrafficTrackerCallback(wireSize, true);
|
||||
});
|
||||
|
@ -155,6 +155,12 @@ namespace ix
|
||||
static const int kDefaultPingIntervalSecs;
|
||||
static const int kDefaultPingTimeoutSecs;
|
||||
|
||||
// One message ptr for each message kinds
|
||||
WebSocketMessagePtr _webSocketMessage;
|
||||
WebSocketMessagePtr _webSocketErrorMessage;
|
||||
WebSocketMessagePtr _webSocketOpenMessage;
|
||||
WebSocketMessagePtr _webSocketCloseMessage;
|
||||
|
||||
// Subprotocols
|
||||
std::vector<std::string> _subProtocols;
|
||||
|
||||
|
@ -6,6 +6,9 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
{
|
||||
struct WebSocketCloseInfo
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
namespace ix
|
||||
@ -17,5 +18,10 @@ namespace ix
|
||||
int http_status = 0;
|
||||
std::string reason;
|
||||
bool decompressionError = false;
|
||||
|
||||
WebSocketErrorInfo()
|
||||
{
|
||||
;
|
||||
}
|
||||
};
|
||||
} // namespace ix
|
||||
|
12
ixwebsocket/IXWebSocketMessage.cpp
Normal file
12
ixwebsocket/IXWebSocketMessage.cpp
Normal file
@ -0,0 +1,12 @@
|
||||
/*
|
||||
* IXWebSocketMessage.cpp
|
||||
* Author: Benjamin Sergeant
|
||||
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
|
||||
*/
|
||||
|
||||
#include "IXWebSocketMessage.h"
|
||||
|
||||
namespace ix
|
||||
{
|
||||
std::string WebSocketMessage::kStr("foo");
|
||||
}
|
@ -19,22 +19,23 @@ namespace ix
|
||||
struct WebSocketMessage
|
||||
{
|
||||
WebSocketMessageType type;
|
||||
const std::string& str;
|
||||
std::string& str;
|
||||
size_t wireSize;
|
||||
WebSocketErrorInfo errorInfo;
|
||||
WebSocketOpenInfo openInfo;
|
||||
WebSocketCloseInfo closeInfo;
|
||||
bool binary;
|
||||
|
||||
static std::string kStr;
|
||||
|
||||
WebSocketMessage(WebSocketMessageType t,
|
||||
const std::string& s,
|
||||
size_t w,
|
||||
WebSocketErrorInfo e,
|
||||
WebSocketOpenInfo o,
|
||||
WebSocketCloseInfo c,
|
||||
size_t w = 0,
|
||||
WebSocketErrorInfo e = WebSocketErrorInfo(),
|
||||
WebSocketOpenInfo o = WebSocketOpenInfo(),
|
||||
WebSocketCloseInfo c = WebSocketCloseInfo(),
|
||||
bool b = false)
|
||||
: type(t)
|
||||
, str(s)
|
||||
, str(WebSocketMessage::kStr)
|
||||
, wireSize(w)
|
||||
, errorInfo(e)
|
||||
, openInfo(o)
|
||||
@ -43,6 +44,11 @@ namespace ix
|
||||
{
|
||||
;
|
||||
}
|
||||
|
||||
// void setStr(const std::string& s)
|
||||
// {
|
||||
// str = std::move(s);
|
||||
// }
|
||||
};
|
||||
|
||||
using WebSocketMessagePtr = std::unique_ptr<WebSocketMessage>;
|
||||
|
@ -6,6 +6,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
#include "IXWebSocketHttpHeaders.h"
|
||||
|
||||
namespace ix
|
||||
{
|
||||
struct WebSocketOpenInfo
|
||||
|
@ -515,8 +515,10 @@ namespace ix
|
||||
//
|
||||
if (ws.fin && _chunks.empty())
|
||||
{
|
||||
emitMessage(
|
||||
_fragmentedMessageKind, frameData, _receivedMessageCompressed, onMessageCallback);
|
||||
emitMessage(_fragmentedMessageKind,
|
||||
frameData,
|
||||
_receivedMessageCompressed,
|
||||
onMessageCallback);
|
||||
|
||||
_receivedMessageCompressed = false;
|
||||
}
|
||||
|
@ -6,4 +6,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define IX_WEBSOCKET_VERSION "9.2.5"
|
||||
#define IX_WEBSOCKET_VERSION "9.2.6"
|
||||
|
@ -6,8 +6,8 @@
|
||||
|
||||
#include "IXTest.h"
|
||||
#include "catch.hpp"
|
||||
#include <iostream>
|
||||
#include "msgpack11.hpp"
|
||||
#include <iostream>
|
||||
#include <ixwebsocket/IXSocket.h>
|
||||
#include <ixwebsocket/IXSocketFactory.h>
|
||||
#include <ixwebsocket/IXWebSocket.h>
|
||||
@ -130,7 +130,8 @@ namespace
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Error)
|
||||
{
|
||||
ss << "websocket_broadcast_client: " << _user << " Error ! " << msg->errorInfo.reason;
|
||||
ss << "websocket_broadcast_client: " << _user << " Error ! "
|
||||
<< msg->errorInfo.reason;
|
||||
log(ss.str());
|
||||
}
|
||||
else if (msg->type == ix::WebSocketMessageType::Ping)
|
||||
@ -234,7 +235,7 @@ namespace
|
||||
server.start();
|
||||
return true;
|
||||
}
|
||||
} // namespace ix
|
||||
} // namespace
|
||||
|
||||
TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
|
||||
{
|
||||
@ -247,7 +248,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
|
||||
|
||||
std::string session = ix::generateSessionId();
|
||||
std::vector<std::shared_ptr<WebSocketChat>> chatClients;
|
||||
for (int i = 0 ; i < 10; ++i)
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
std::string user("user_" + std::to_string(i));
|
||||
chatClients.push_back(std::make_shared<WebSocketChat>(user, session, port));
|
||||
@ -259,7 +260,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
|
||||
while (true)
|
||||
{
|
||||
bool allReady = true;
|
||||
for (size_t i = 0 ; i < chatClients.size(); ++i)
|
||||
for (size_t i = 0; i < chatClients.size(); ++i)
|
||||
{
|
||||
allReady &= chatClients[i]->isReady();
|
||||
}
|
||||
@ -269,7 +270,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
|
||||
|
||||
for (int j = 0; j < 1000; j++)
|
||||
{
|
||||
for (size_t i = 0 ; i < chatClients.size(); ++i)
|
||||
for (size_t i = 0; i < chatClients.size(); ++i)
|
||||
{
|
||||
chatClients[i]->sendMessage("hello world");
|
||||
}
|
||||
@ -291,7 +292,7 @@ TEST_CASE("Websocket_broadcast_server", "[websocket_server]")
|
||||
|
||||
// Stop all clients
|
||||
size_t messageCount = chatClients.size() * 50;
|
||||
for (size_t i = 0 ; i < chatClients.size(); ++i)
|
||||
for (size_t i = 0; i < chatClients.size(); ++i)
|
||||
{
|
||||
REQUIRE(chatClients[i]->getReceivedMessagesCount() >= messageCount);
|
||||
chatClients[i]->stop();
|
||||
|
11
ws/ws.cpp
11
ws/ws.cpp
@ -268,8 +268,10 @@ int main(int argc, char** argv)
|
||||
cobra2statsd->add_option("--port", statsdPort, "Statsd port");
|
||||
cobra2statsd->add_option("--prefix", prefix, "Statsd prefix");
|
||||
cobra2statsd->add_option("--fields", fields, "Extract fields for naming the event")->join();
|
||||
cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")->join();
|
||||
cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")->join();
|
||||
cobra2statsd->add_option("--gauge", gauge, "Value to extract, and use as a statsd gauge")
|
||||
->join();
|
||||
cobra2statsd->add_option("--timer", timer, "Value to extract, and use as a statsd timer")
|
||||
->join();
|
||||
cobra2statsd->add_option("channel", channel, "Channel")->required();
|
||||
cobra2statsd->add_flag("-v", verbose, "Verbose");
|
||||
cobra2statsd->add_option("--pidfile", pidfile, "Pid file");
|
||||
@ -449,7 +451,8 @@ int main(int argc, char** argv)
|
||||
}
|
||||
else if (app.got_subcommand("cobra_subscribe"))
|
||||
{
|
||||
ret = ix::ws_cobra_subscribe_main(cobraConfig, channel, filter, position, quiet, fluentd, runtime);
|
||||
ret = ix::ws_cobra_subscribe_main(
|
||||
cobraConfig, channel, filter, position, quiet, fluentd, runtime);
|
||||
}
|
||||
else if (app.got_subcommand("cobra_publish"))
|
||||
{
|
||||
@ -463,7 +466,7 @@ int main(int argc, char** argv)
|
||||
{
|
||||
if (!timer.empty() && !gauge.empty())
|
||||
{
|
||||
spdlog::error("--gauge and --timer options are exclusive. " \
|
||||
spdlog::error("--gauge and --timer options are exclusive. "
|
||||
"you can only supply one");
|
||||
ret = 1;
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ namespace ix
|
||||
enveloppe["message"] = msgWithPosition;
|
||||
|
||||
jsonWriter->write(enveloppe, &std::cout);
|
||||
std::cout << std::endl; // add lf and flush
|
||||
std::cout << std::endl; // add lf and flush
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -184,7 +184,7 @@ namespace ix
|
||||
// Run for a duration, used by unittesting now
|
||||
else
|
||||
{
|
||||
for (int i = 0 ; i < runtime; ++i)
|
||||
for (int i = 0; i < runtime; ++i)
|
||||
{
|
||||
auto duration = std::chrono::seconds(1);
|
||||
std::this_thread::sleep_for(duration);
|
||||
|
Loading…
Reference in New Issue
Block a user