Compare commits
	
		
			4 Commits
		
	
	
		
			v9.9.0
			...
			feature/st
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					ca829a3a98 | ||
| 
						 | 
					26a1e63626 | ||
| 
						 | 
					c98959b895 | ||
| 
						 | 
					baf18648e9 | 
@@ -7,12 +7,14 @@ set (IXSNAKE_SOURCES
 | 
				
			|||||||
    ixsnake/IXSnakeServer.cpp
 | 
					    ixsnake/IXSnakeServer.cpp
 | 
				
			||||||
    ixsnake/IXSnakeProtocol.cpp
 | 
					    ixsnake/IXSnakeProtocol.cpp
 | 
				
			||||||
    ixsnake/IXAppConfig.cpp
 | 
					    ixsnake/IXAppConfig.cpp
 | 
				
			||||||
 | 
					    ixsnake/IXStreamSql.cpp
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
set (IXSNAKE_HEADERS
 | 
					set (IXSNAKE_HEADERS
 | 
				
			||||||
    ixsnake/IXSnakeServer.h
 | 
					    ixsnake/IXSnakeServer.h
 | 
				
			||||||
    ixsnake/IXSnakeProtocol.h
 | 
					    ixsnake/IXSnakeProtocol.h
 | 
				
			||||||
    ixsnake/IXAppConfig.h
 | 
					    ixsnake/IXAppConfig.h
 | 
				
			||||||
 | 
					    ixsnake/IXStreamSql.h
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
add_library(ixsnake STATIC
 | 
					add_library(ixsnake STATIC
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -33,6 +33,9 @@ namespace snake
 | 
				
			|||||||
        // Misc
 | 
					        // Misc
 | 
				
			||||||
        bool verbose;
 | 
					        bool verbose;
 | 
				
			||||||
        bool disablePong;
 | 
					        bool disablePong;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // If non empty, every published message gets republished to a given channel
 | 
				
			||||||
 | 
					        std::string republishChannel;
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    bool isAppKeyValid(const AppConfig& appConfig, std::string appkey);
 | 
					    bool isAppKeyValid(const AppConfig& appConfig, std::string appkey);
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,6 +30,7 @@ namespace snake
 | 
				
			|||||||
        {
 | 
					        {
 | 
				
			||||||
            return _appkey;
 | 
					            return _appkey;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        void setAppkey(const std::string& appkey)
 | 
					        void setAppkey(const std::string& appkey)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            _appkey = appkey;
 | 
					            _appkey = appkey;
 | 
				
			||||||
@@ -39,6 +40,7 @@ namespace snake
 | 
				
			|||||||
        {
 | 
					        {
 | 
				
			||||||
            return _role;
 | 
					            return _role;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        void setRole(const std::string& role)
 | 
					        void setRole(const std::string& role)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            _role = role;
 | 
					            _role = role;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -8,6 +8,7 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
#include "IXAppConfig.h"
 | 
					#include "IXAppConfig.h"
 | 
				
			||||||
#include "IXSnakeConnectionState.h"
 | 
					#include "IXSnakeConnectionState.h"
 | 
				
			||||||
 | 
					#include "IXStreamSql.h"
 | 
				
			||||||
#include "nlohmann/json.hpp"
 | 
					#include "nlohmann/json.hpp"
 | 
				
			||||||
#include <iostream>
 | 
					#include <iostream>
 | 
				
			||||||
#include <ixcore/utils/IXCoreLogger.h>
 | 
					#include <ixcore/utils/IXCoreLogger.h>
 | 
				
			||||||
@@ -91,6 +92,7 @@ namespace snake
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    void handlePublish(std::shared_ptr<SnakeConnectionState> state,
 | 
					    void handlePublish(std::shared_ptr<SnakeConnectionState> state,
 | 
				
			||||||
                       std::shared_ptr<ix::WebSocket> ws,
 | 
					                       std::shared_ptr<ix::WebSocket> ws,
 | 
				
			||||||
 | 
					                       const AppConfig& appConfig,
 | 
				
			||||||
                       const nlohmann::json& pdu)
 | 
					                       const nlohmann::json& pdu)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        std::vector<std::string> channels;
 | 
					        std::vector<std::string> channels;
 | 
				
			||||||
@@ -115,6 +117,12 @@ namespace snake
 | 
				
			|||||||
            return;
 | 
					            return;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // add an extra channel if the config has one specified
 | 
				
			||||||
 | 
					        if (!appConfig.republishChannel.empty())
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            channels.push_back(appConfig.republishChannel);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for (auto&& channel : channels)
 | 
					        for (auto&& channel : channels)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            std::stringstream ss;
 | 
					            std::stringstream ss;
 | 
				
			||||||
@@ -180,12 +188,25 @@ namespace snake
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        std::string filterStr;
 | 
				
			||||||
 | 
					        if (pdu["body"].find("filter") != pdu["body"].end())
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            std::string filterStr = pdu["body"]["filter"];
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        int id = 0;
 | 
					        int id = 0;
 | 
				
			||||||
        auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) {
 | 
					        auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) {
 | 
				
			||||||
            auto msg = nlohmann::json::parse(messageStr);
 | 
					            auto msg = nlohmann::json::parse(messageStr);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            msg = msg["body"]["message"];
 | 
					            msg = msg["body"]["message"];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if (streamSql->valid() && !streamSql->match(msg))
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                return;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            nlohmann::json response = {
 | 
					            nlohmann::json response = {
 | 
				
			||||||
                {"action", "rtm/subscription/data"},
 | 
					                {"action", "rtm/subscription/data"},
 | 
				
			||||||
                {"id", id++},
 | 
					                {"id", id++},
 | 
				
			||||||
@@ -279,7 +300,7 @@ namespace snake
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        else if (action == "rtm/publish")
 | 
					        else if (action == "rtm/publish")
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            handlePublish(state, ws, pdu);
 | 
					            handlePublish(state, ws, appConfig, pdu);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        else if (action == "rtm/subscribe")
 | 
					        else if (action == "rtm/subscribe")
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										63
									
								
								ixsnake/ixsnake/IXStreamSql.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								ixsnake/ixsnake/IXStreamSql.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,63 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  IXStreamSql.cpp
 | 
				
			||||||
 | 
					 *  Author: Benjamin Sergeant
 | 
				
			||||||
 | 
					 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 *  Super simple hacked up version of a stream sql expression,
 | 
				
			||||||
 | 
					 *  that only supports non nested field evaluation
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include "IXStreamSql.h"
 | 
				
			||||||
 | 
					#include <sstream>
 | 
				
			||||||
 | 
					#include <iostream>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace snake
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    StreamSql::StreamSql(const std::string& sqlFilter)
 | 
				
			||||||
 | 
					        : _valid(false)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        std::string token;
 | 
				
			||||||
 | 
					        std::stringstream tokenStream(sqlFilter);
 | 
				
			||||||
 | 
					        std::vector<std::string> tokens;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // Split by ' '
 | 
				
			||||||
 | 
					        while (std::getline(tokenStream, token, ' '))
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            tokens.push_back(token);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        _valid = tokens.size() == 8;
 | 
				
			||||||
 | 
					        if (!_valid) return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        _field = tokens[5];
 | 
				
			||||||
 | 
					        _operator = tokens[6];
 | 
				
			||||||
 | 
					        _value = tokens[7];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // remove single quotes
 | 
				
			||||||
 | 
					        _value = _value.substr(1, _value.size() - 2);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (_operator == "LIKE")
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            _value = _value.substr(1, _value.size() - 2);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    bool StreamSql::valid() const
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        return _valid;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    bool StreamSql::match(const nlohmann::json& msg)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        if (!_valid) return false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (msg.find(_field) == msg.end())
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            return false;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        std::string value = msg[_field];
 | 
				
			||||||
 | 
					        return value == _value;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					} // namespace snake
 | 
				
			||||||
							
								
								
									
										29
									
								
								ixsnake/ixsnake/IXStreamSql.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								ixsnake/ixsnake/IXStreamSql.h
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,29 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  IXStreamSql.h
 | 
				
			||||||
 | 
					 *  Author: Benjamin Sergeant
 | 
				
			||||||
 | 
					 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#pragma once
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include <string>
 | 
				
			||||||
 | 
					#include "nlohmann/json.hpp"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace snake
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    class StreamSql
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					    public:
 | 
				
			||||||
 | 
					        StreamSql(const std::string& sqlFilter = std::string());
 | 
				
			||||||
 | 
					        ~StreamSql() = default;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        bool match(const nlohmann::json& msg);
 | 
				
			||||||
 | 
					        bool valid() const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private:
 | 
				
			||||||
 | 
					        std::string _field;
 | 
				
			||||||
 | 
					        std::string _operator;
 | 
				
			||||||
 | 
					        std::string _value;
 | 
				
			||||||
 | 
					        bool _valid;
 | 
				
			||||||
 | 
					    };
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -42,6 +42,7 @@ set (SOURCES
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  IXSocketTest.cpp
 | 
					  IXSocketTest.cpp
 | 
				
			||||||
  IXSocketConnectTest.cpp
 | 
					  IXSocketConnectTest.cpp
 | 
				
			||||||
 | 
					  # IXWebSocketLeakTest.cpp # commented until we have a fix for #224
 | 
				
			||||||
  IXWebSocketServerTest.cpp
 | 
					  IXWebSocketServerTest.cpp
 | 
				
			||||||
  IXWebSocketTestConnectionDisconnection.cpp
 | 
					  IXWebSocketTestConnectionDisconnection.cpp
 | 
				
			||||||
  IXUrlParserTest.cpp
 | 
					  IXUrlParserTest.cpp
 | 
				
			||||||
@@ -56,6 +57,7 @@ set (SOURCES
 | 
				
			|||||||
  IXWebSocketChatTest.cpp
 | 
					  IXWebSocketChatTest.cpp
 | 
				
			||||||
  IXWebSocketBroadcastTest.cpp
 | 
					  IXWebSocketBroadcastTest.cpp
 | 
				
			||||||
  IXWebSocketPerMessageDeflateCompressorTest.cpp
 | 
					  IXWebSocketPerMessageDeflateCompressorTest.cpp
 | 
				
			||||||
 | 
					  IXStreamSqlTest.cpp
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Some unittest don't work on windows yet
 | 
					# Some unittest don't work on windows yet
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -92,6 +92,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]")
 | 
				
			|||||||
        cobraBotConfig.enableHeartbeat = false;
 | 
					        cobraBotConfig.enableHeartbeat = false;
 | 
				
			||||||
        bool quiet = false;
 | 
					        bool quiet = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        cobraBotConfig.filter =
 | 
				
			||||||
 | 
					            std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // We could try to capture the output ... not sure how.
 | 
					        // We could try to capture the output ... not sure how.
 | 
				
			||||||
        bool fluentd = true;
 | 
					        bool fluentd = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										42
									
								
								test/IXStreamSqlTest.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								test/IXStreamSqlTest.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,42 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  IXStreamSqlTest.cpp
 | 
				
			||||||
 | 
					 *  Author: Benjamin Sergeant
 | 
				
			||||||
 | 
					 *  Copyright (c) 2020 Machine Zone. All rights reserved.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include "IXTest.h"
 | 
				
			||||||
 | 
					#include "catch.hpp"
 | 
				
			||||||
 | 
					#include <iostream>
 | 
				
			||||||
 | 
					#include <ixsnake/IXStreamSql.h>
 | 
				
			||||||
 | 
					#include <string.h>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					using namespace ix;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace ix
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    TEST_CASE("stream_sql", "[streamsql]")
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        SECTION("expression A")
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            snake::StreamSql streamSql(
 | 
				
			||||||
 | 
					                "select * from subscriber_republished_v1_neo where session LIKE '%123456%'");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            CHECK(streamSql.match(msg));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        SECTION("expression A")
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where "
 | 
				
			||||||
 | 
					                                       "session = '30091320ed8d4e50b758f8409b83bed7'");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"},
 | 
				
			||||||
 | 
					                                  {"id", "foo_id"},
 | 
				
			||||||
 | 
					                                  {"timestamp", 12}};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            CHECK(streamSql.match(msg));
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					} // namespace ix
 | 
				
			||||||
							
								
								
									
										182
									
								
								test/IXWebSocketLeakTest.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										182
									
								
								test/IXWebSocketLeakTest.cpp
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,182 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 *  IXWebSocketServer.cpp
 | 
				
			||||||
 | 
					 *  Author: Benjamin Sergeant, @marcelkauf
 | 
				
			||||||
 | 
					 *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include "IXTest.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include "catch.hpp"
 | 
				
			||||||
 | 
					#include <memory>
 | 
				
			||||||
 | 
					#include <sstream>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include <ixwebsocket/IXWebSocket.h>
 | 
				
			||||||
 | 
					#include <ixwebsocket/IXWebSocketServer.h>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					using namespace ix;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    class WebSocketClient
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					    public:
 | 
				
			||||||
 | 
					        WebSocketClient(int port);
 | 
				
			||||||
 | 
					        void start();
 | 
				
			||||||
 | 
					        void stop();
 | 
				
			||||||
 | 
					        bool isReady() const;
 | 
				
			||||||
 | 
					        bool hasConnectionError() const;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private:
 | 
				
			||||||
 | 
					        ix::WebSocket _webSocket;
 | 
				
			||||||
 | 
					        int _port;
 | 
				
			||||||
 | 
					        std::atomic<bool> _connectionError;
 | 
				
			||||||
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    WebSocketClient::WebSocketClient(int port)
 | 
				
			||||||
 | 
					        : _port(port)
 | 
				
			||||||
 | 
					        , _connectionError(false)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    bool WebSocketClient::hasConnectionError() const
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        return _connectionError;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    bool WebSocketClient::isReady() const
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        return _webSocket.getReadyState() == ix::ReadyState::Open;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    void WebSocketClient::stop()
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        _webSocket.stop();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    void WebSocketClient::start()
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        std::string url;
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            std::stringstream ss;
 | 
				
			||||||
 | 
					            ss << "ws://localhost:" << _port << "/";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            url = ss.str();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        _webSocket.setUrl(url);
 | 
				
			||||||
 | 
					        _webSocket.disableAutomaticReconnection();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        std::stringstream ss;
 | 
				
			||||||
 | 
					        log(std::string("Connecting to url: ") + url);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg)
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            std::stringstream ss;
 | 
				
			||||||
 | 
					            if (msg->type == ix::WebSocketMessageType::Open)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                log("client connected");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            else if (msg->type == ix::WebSocketMessageType::Close)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                log("client disconnected");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            else if (msg->type == ix::WebSocketMessageType::Error)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                _connectionError = true;
 | 
				
			||||||
 | 
					                log("error");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            else if (msg->type == ix::WebSocketMessageType::Pong)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                log("pong");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            else if (msg->type == ix::WebSocketMessageType::Ping)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                log("ping");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            else if (msg->type == ix::WebSocketMessageType::Message)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                log("message");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            else
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                log("invalid type");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        _webSocket.start();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					} // namespace
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TEST_CASE("Websocket leak test")
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    SECTION("Websocket destructor is called when closing the connection.")
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        // stores the server websocket in order to check the use_count
 | 
				
			||||||
 | 
					        std::shared_ptr<WebSocket> webSocketPtr;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            int port = getFreePort();
 | 
				
			||||||
 | 
					            WebSocketServer server(port);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            server.setOnConnectionCallback([&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket,
 | 
				
			||||||
 | 
					                                                           std::shared_ptr<ConnectionState> connectionState,
 | 
				
			||||||
 | 
					                                                           std::unique_ptr<ConnectionInfo> connectionInfo)
 | 
				
			||||||
 | 
					           {
 | 
				
			||||||
 | 
					                // original ptr in WebSocketServer::handleConnection and the callback argument
 | 
				
			||||||
 | 
					                REQUIRE(webSocket.use_count() == 2);
 | 
				
			||||||
 | 
					                webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](const ix::WebSocketMessagePtr& msg)
 | 
				
			||||||
 | 
					                {
 | 
				
			||||||
 | 
					                    if (msg->type == ix::WebSocketMessageType::Open)
 | 
				
			||||||
 | 
					                    {
 | 
				
			||||||
 | 
					                        log(std::string("New connection id: ") + connectionState->getId());
 | 
				
			||||||
 | 
					                        // original ptr in WebSocketServer::handleConnection, captured ptr of this callback, and ptr in WebSocketServer::_clients
 | 
				
			||||||
 | 
					                        REQUIRE(webSocket.use_count() == 3);
 | 
				
			||||||
 | 
					                        webSocketPtr = std::shared_ptr<WebSocket>(webSocket);
 | 
				
			||||||
 | 
					                        REQUIRE(webSocket.use_count() == 4);
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    else if (msg->type == ix::WebSocketMessageType::Close)
 | 
				
			||||||
 | 
					                    {
 | 
				
			||||||
 | 
					                        log(std::string("Client closed connection id: ") + connectionState->getId());
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    else
 | 
				
			||||||
 | 
					                    {
 | 
				
			||||||
 | 
					                        log(std::string(msg->str));
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                });
 | 
				
			||||||
 | 
					                // original ptr in WebSocketServer::handleConnection, argument of this callback, and captured ptr in websocket callback
 | 
				
			||||||
 | 
					                REQUIRE(webSocket.use_count() == 3);
 | 
				
			||||||
 | 
					            });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            server.listen();
 | 
				
			||||||
 | 
					            server.start();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            WebSocketClient webSocketClient(port);
 | 
				
			||||||
 | 
					            webSocketClient.start();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            while (true)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                REQUIRE(!webSocketClient.hasConnectionError());
 | 
				
			||||||
 | 
					                if (webSocketClient.isReady()) break;
 | 
				
			||||||
 | 
					                ix::msleep(10);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            REQUIRE(server.getClients().size() == 1);
 | 
				
			||||||
 | 
					            // same value as in Open-handler above
 | 
				
			||||||
 | 
					            REQUIRE(webSocketPtr.use_count() == 4);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            ix::msleep(500);
 | 
				
			||||||
 | 
					            webSocketClient.stop();
 | 
				
			||||||
 | 
					            ix::msleep(500);
 | 
				
			||||||
 | 
					            REQUIRE(server.getClients().size() == 0);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            // websocket should only be referenced by webSocketPtr but is still used by the websocket callback
 | 
				
			||||||
 | 
					            REQUIRE(webSocketPtr.use_count() == 1);
 | 
				
			||||||
 | 
					            webSocketPtr->setOnMessageCallback(nullptr);
 | 
				
			||||||
 | 
					            // websocket should only be referenced by webSocketPtr
 | 
				
			||||||
 | 
					            REQUIRE(webSocketPtr.use_count() == 1);
 | 
				
			||||||
 | 
					            server.stop();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        // websocket should only be referenced by webSocketPtr
 | 
				
			||||||
 | 
					        REQUIRE(webSocketPtr.use_count() == 1);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -122,6 +122,7 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    std::string key;
 | 
					    std::string key;
 | 
				
			||||||
    std::string logfile;
 | 
					    std::string logfile;
 | 
				
			||||||
    std::string scriptPath;
 | 
					    std::string scriptPath;
 | 
				
			||||||
 | 
					    std::string republishChannel;
 | 
				
			||||||
    ix::SocketTLSOptions tlsOptions;
 | 
					    ix::SocketTLSOptions tlsOptions;
 | 
				
			||||||
    ix::CobraConfig cobraConfig;
 | 
					    ix::CobraConfig cobraConfig;
 | 
				
			||||||
    ix::CobraBotConfig cobraBotConfig;
 | 
					    ix::CobraBotConfig cobraBotConfig;
 | 
				
			||||||
@@ -391,6 +392,7 @@ int main(int argc, char** argv)
 | 
				
			|||||||
    snakeApp->add_option("--redis_password", redisPassword, "Redis password");
 | 
					    snakeApp->add_option("--redis_password", redisPassword, "Redis password");
 | 
				
			||||||
    snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
 | 
					    snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data")
 | 
				
			||||||
        ->check(CLI::ExistingPath);
 | 
					        ->check(CLI::ExistingPath);
 | 
				
			||||||
 | 
					    snakeApp->add_option("--republish_channel", republishChannel, "Republish channel");
 | 
				
			||||||
    snakeApp->add_flag("-v", verbose, "Verbose");
 | 
					    snakeApp->add_flag("-v", verbose, "Verbose");
 | 
				
			||||||
    snakeApp->add_flag("-d", disablePong, "Disable Pongs");
 | 
					    snakeApp->add_flag("-d", disablePong, "Disable Pongs");
 | 
				
			||||||
    addTLSOptions(snakeApp);
 | 
					    addTLSOptions(snakeApp);
 | 
				
			||||||
@@ -637,7 +639,8 @@ int main(int argc, char** argv)
 | 
				
			|||||||
                                verbose,
 | 
					                                verbose,
 | 
				
			||||||
                                appsConfigPath,
 | 
					                                appsConfigPath,
 | 
				
			||||||
                                tlsOptions,
 | 
					                                tlsOptions,
 | 
				
			||||||
                                disablePong);
 | 
					                                disablePong,
 | 
				
			||||||
 | 
					                                republishChannel);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else if (app.got_subcommand("httpd"))
 | 
					    else if (app.got_subcommand("httpd"))
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										3
									
								
								ws/ws.h
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								ws/ws.h
									
									
									
									
									
								
							@@ -103,7 +103,8 @@ namespace ix
 | 
				
			|||||||
                      bool verbose,
 | 
					                      bool verbose,
 | 
				
			||||||
                      const std::string& appsConfigPath,
 | 
					                      const std::string& appsConfigPath,
 | 
				
			||||||
                      const ix::SocketTLSOptions& tlsOptions,
 | 
					                      const ix::SocketTLSOptions& tlsOptions,
 | 
				
			||||||
                      bool disablePong);
 | 
					                      bool disablePong,
 | 
				
			||||||
 | 
					                      const std::string& republishChannel);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    int ws_httpd_main(int port,
 | 
					    int ws_httpd_main(int port,
 | 
				
			||||||
                      const std::string& hostname,
 | 
					                      const std::string& hostname,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -45,7 +45,8 @@ namespace ix
 | 
				
			|||||||
                      bool verbose,
 | 
					                      bool verbose,
 | 
				
			||||||
                      const std::string& appsConfigPath,
 | 
					                      const std::string& appsConfigPath,
 | 
				
			||||||
                      const SocketTLSOptions& socketTLSOptions,
 | 
					                      const SocketTLSOptions& socketTLSOptions,
 | 
				
			||||||
                      bool disablePong)
 | 
					                      bool disablePong,
 | 
				
			||||||
 | 
					                      const std::string& republishChannel)
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        snake::AppConfig appConfig;
 | 
					        snake::AppConfig appConfig;
 | 
				
			||||||
        appConfig.port = port;
 | 
					        appConfig.port = port;
 | 
				
			||||||
@@ -55,6 +56,7 @@ namespace ix
 | 
				
			|||||||
        appConfig.redisPassword = redisPassword;
 | 
					        appConfig.redisPassword = redisPassword;
 | 
				
			||||||
        appConfig.socketTLSOptions = socketTLSOptions;
 | 
					        appConfig.socketTLSOptions = socketTLSOptions;
 | 
				
			||||||
        appConfig.disablePong = disablePong;
 | 
					        appConfig.disablePong = disablePong;
 | 
				
			||||||
 | 
					        appConfig.republishChannel = republishChannel;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Parse config file
 | 
					        // Parse config file
 | 
				
			||||||
        auto str = readAsString(appsConfigPath);
 | 
					        auto str = readAsString(appsConfigPath);
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user