Compare commits
	
		
			8 Commits
		
	
	
		
			v9.9.0
			...
			feature/no
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | f7031d0d3e | ||
|  | 595e6c57df | ||
|  | 87709c201e | ||
|  | e70d83ace1 | ||
|  | ca829a3a98 | ||
|  | 26a1e63626 | ||
|  | c98959b895 | ||
|  | baf18648e9 | 
| @@ -36,6 +36,8 @@ set( IXWEBSOCKET_SOURCES | ||||
|     ixwebsocket/IXNetSystem.cpp | ||||
|     ixwebsocket/IXSelectInterrupt.cpp | ||||
|     ixwebsocket/IXSelectInterruptFactory.cpp | ||||
|     ixwebsocket/IXSelectInterruptPipe.cpp | ||||
|     ixwebsocket/IXSetThreadName.cpp | ||||
|     ixwebsocket/IXSocket.cpp | ||||
|     ixwebsocket/IXSocketConnect.cpp | ||||
|     ixwebsocket/IXSocketFactory.cpp | ||||
| @@ -69,6 +71,7 @@ set( IXWEBSOCKET_HEADERS | ||||
|     ixwebsocket/IXProgressCallback.h | ||||
|     ixwebsocket/IXSelectInterrupt.h | ||||
|     ixwebsocket/IXSelectInterruptFactory.h | ||||
|     ixwebsocket/IXSelectInterruptPipe.h | ||||
|     ixwebsocket/IXSetThreadName.h | ||||
|     ixwebsocket/IXSocket.h | ||||
|     ixwebsocket/IXSocketConnect.h | ||||
| @@ -99,23 +102,6 @@ set( IXWEBSOCKET_HEADERS | ||||
|     ixwebsocket/IXWebSocketVersion.h | ||||
| ) | ||||
|  | ||||
| if (UNIX) | ||||
|     # Linux, Mac, iOS, Android | ||||
|     list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.cpp ) | ||||
|     list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/IXSelectInterruptPipe.h ) | ||||
| endif() | ||||
|  | ||||
| # Platform specific code | ||||
| if (APPLE) | ||||
|     list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/apple/IXSetThreadName_apple.cpp) | ||||
| elseif (WIN32) | ||||
|     list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/windows/IXSetThreadName_windows.cpp) | ||||
| elseif (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") | ||||
|     list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/freebsd/IXSetThreadName_freebsd.cpp) | ||||
| else() | ||||
|     list( APPEND IXWEBSOCKET_SOURCES ixwebsocket/linux/IXSetThreadName_linux.cpp) | ||||
| endif() | ||||
|  | ||||
| option(USE_TLS "Enable TLS support" FALSE) | ||||
|  | ||||
| if (USE_TLS) | ||||
|   | ||||
| @@ -1,6 +1,14 @@ | ||||
| # Changelog | ||||
| All changes to this project will be documented in this file. | ||||
|  | ||||
| ## [9.9.2] - 2020-07-10 | ||||
|  | ||||
| (socket server) bump default max connection count from 32 to 128 | ||||
|  | ||||
| ## [9.9.1] - 2020-07-10 | ||||
|  | ||||
| (snake) implement super simple stream sql expression support in snake server | ||||
|  | ||||
| ## [9.9.0] - 2020-07-08 | ||||
|  | ||||
| (socket+websocket+http+redis+snake servers) expose the remote ip and remote port when a new connection is made | ||||
|   | ||||
| @@ -7,12 +7,14 @@ set (IXSNAKE_SOURCES | ||||
|     ixsnake/IXSnakeServer.cpp | ||||
|     ixsnake/IXSnakeProtocol.cpp | ||||
|     ixsnake/IXAppConfig.cpp | ||||
|     ixsnake/IXStreamSql.cpp | ||||
| ) | ||||
|  | ||||
| set (IXSNAKE_HEADERS | ||||
|     ixsnake/IXSnakeServer.h | ||||
|     ixsnake/IXSnakeProtocol.h | ||||
|     ixsnake/IXAppConfig.h | ||||
|     ixsnake/IXStreamSql.h | ||||
| ) | ||||
|  | ||||
| add_library(ixsnake STATIC | ||||
|   | ||||
| @@ -33,6 +33,9 @@ namespace snake | ||||
|         // Misc | ||||
|         bool verbose; | ||||
|         bool disablePong; | ||||
|  | ||||
|         // If non empty, every published message gets republished to a given channel | ||||
|         std::string republishChannel; | ||||
|     }; | ||||
|  | ||||
|     bool isAppKeyValid(const AppConfig& appConfig, std::string appkey); | ||||
|   | ||||
| @@ -30,6 +30,7 @@ namespace snake | ||||
|         { | ||||
|             return _appkey; | ||||
|         } | ||||
|  | ||||
|         void setAppkey(const std::string& appkey) | ||||
|         { | ||||
|             _appkey = appkey; | ||||
| @@ -39,6 +40,7 @@ namespace snake | ||||
|         { | ||||
|             return _role; | ||||
|         } | ||||
|  | ||||
|         void setRole(const std::string& role) | ||||
|         { | ||||
|             _role = role; | ||||
|   | ||||
| @@ -8,6 +8,7 @@ | ||||
|  | ||||
| #include "IXAppConfig.h" | ||||
| #include "IXSnakeConnectionState.h" | ||||
| #include "IXStreamSql.h" | ||||
| #include "nlohmann/json.hpp" | ||||
| #include <iostream> | ||||
| #include <ixcore/utils/IXCoreLogger.h> | ||||
| @@ -91,6 +92,7 @@ namespace snake | ||||
|  | ||||
|     void handlePublish(std::shared_ptr<SnakeConnectionState> state, | ||||
|                        std::shared_ptr<ix::WebSocket> ws, | ||||
|                        const AppConfig& appConfig, | ||||
|                        const nlohmann::json& pdu) | ||||
|     { | ||||
|         std::vector<std::string> channels; | ||||
| @@ -115,6 +117,12 @@ namespace snake | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         // add an extra channel if the config has one specified | ||||
|         if (!appConfig.republishChannel.empty()) | ||||
|         { | ||||
|             channels.push_back(appConfig.republishChannel); | ||||
|         } | ||||
|  | ||||
|         for (auto&& channel : channels) | ||||
|         { | ||||
|             std::stringstream ss; | ||||
| @@ -180,12 +188,25 @@ namespace snake | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         std::string filterStr; | ||||
|         if (pdu["body"].find("filter") != pdu["body"].end()) | ||||
|         { | ||||
|             std::string filterStr = pdu["body"]["filter"]; | ||||
|         } | ||||
|  | ||||
|         std::unique_ptr<StreamSql> streamSql = std::make_unique<StreamSql>(filterStr); | ||||
|  | ||||
|         int id = 0; | ||||
|         auto callback = [ws, &id, &subscriptionId](const std::string& messageStr) { | ||||
|         auto callback = [ws, &id, &subscriptionId, &streamSql](const std::string& messageStr) { | ||||
|             auto msg = nlohmann::json::parse(messageStr); | ||||
|  | ||||
|             msg = msg["body"]["message"]; | ||||
|  | ||||
|             if (streamSql->valid() && !streamSql->match(msg)) | ||||
|             { | ||||
|                 return; | ||||
|             } | ||||
|  | ||||
|             nlohmann::json response = { | ||||
|                 {"action", "rtm/subscription/data"}, | ||||
|                 {"id", id++}, | ||||
| @@ -279,7 +300,7 @@ namespace snake | ||||
|         } | ||||
|         else if (action == "rtm/publish") | ||||
|         { | ||||
|             handlePublish(state, ws, pdu); | ||||
|             handlePublish(state, ws, appConfig, pdu); | ||||
|         } | ||||
|         else if (action == "rtm/subscribe") | ||||
|         { | ||||
|   | ||||
							
								
								
									
										63
									
								
								ixsnake/ixsnake/IXStreamSql.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								ixsnake/ixsnake/IXStreamSql.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,63 @@ | ||||
| /* | ||||
|  *  IXStreamSql.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved. | ||||
|  * | ||||
|  *  Super simple hacked up version of a stream sql expression, | ||||
|  *  that only supports non nested field evaluation | ||||
|  */ | ||||
|  | ||||
| #include "IXStreamSql.h" | ||||
| #include <sstream> | ||||
| #include <iostream> | ||||
|  | ||||
| namespace snake | ||||
| { | ||||
|     StreamSql::StreamSql(const std::string& sqlFilter) | ||||
|         : _valid(false) | ||||
|     { | ||||
|         std::string token; | ||||
|         std::stringstream tokenStream(sqlFilter); | ||||
|         std::vector<std::string> tokens; | ||||
|  | ||||
|         // Split by ' ' | ||||
|         while (std::getline(tokenStream, token, ' ')) | ||||
|         { | ||||
|             tokens.push_back(token); | ||||
|         } | ||||
|  | ||||
|         _valid = tokens.size() == 8; | ||||
|         if (!_valid) return; | ||||
|  | ||||
|         _field = tokens[5]; | ||||
|         _operator = tokens[6]; | ||||
|         _value = tokens[7]; | ||||
|  | ||||
|         // remove single quotes | ||||
|         _value = _value.substr(1, _value.size() - 2); | ||||
|  | ||||
|         if (_operator == "LIKE") | ||||
|         { | ||||
|             _value = _value.substr(1, _value.size() - 2); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     bool StreamSql::valid() const | ||||
|     { | ||||
|         return _valid; | ||||
|     } | ||||
|  | ||||
|     bool StreamSql::match(const nlohmann::json& msg) | ||||
|     { | ||||
|         if (!_valid) return false; | ||||
|  | ||||
|         if (msg.find(_field) == msg.end()) | ||||
|         { | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         std::string value = msg[_field]; | ||||
|         return value == _value; | ||||
|     } | ||||
|  | ||||
| } // namespace snake | ||||
							
								
								
									
										29
									
								
								ixsnake/ixsnake/IXStreamSql.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								ixsnake/ixsnake/IXStreamSql.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,29 @@ | ||||
| /* | ||||
|  *  IXStreamSql.h | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #pragma once | ||||
|  | ||||
| #include <string> | ||||
| #include "nlohmann/json.hpp" | ||||
|  | ||||
| namespace snake | ||||
| { | ||||
|     class StreamSql | ||||
|     { | ||||
|     public: | ||||
|         StreamSql(const std::string& sqlFilter = std::string()); | ||||
|         ~StreamSql() = default; | ||||
|  | ||||
|         bool match(const nlohmann::json& msg); | ||||
|         bool valid() const; | ||||
|  | ||||
|     private: | ||||
|         std::string _field; | ||||
|         std::string _operator; | ||||
|         std::string _value; | ||||
|         bool _valid; | ||||
|     }; | ||||
| } | ||||
| @@ -5,8 +5,10 @@ | ||||
|  */ | ||||
|  | ||||
| // | ||||
| // On macOS we use UNIX pipes to wake up select. | ||||
| // On UNIX we use pipes to wake up select. There is no way to do that | ||||
| // on Windows so this file is compiled out on Windows. | ||||
| // | ||||
| #ifndef _WIN32 | ||||
|  | ||||
| #include "IXSelectInterruptPipe.h" | ||||
|  | ||||
| @@ -144,3 +146,5 @@ namespace ix | ||||
|         return _fildes[kPipeReadIndex]; | ||||
|     } | ||||
| } // namespace ix | ||||
|  | ||||
| #endif // !_WIN32 | ||||
|   | ||||
							
								
								
									
										81
									
								
								ixwebsocket/IXSetThreadName.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										81
									
								
								ixwebsocket/IXSetThreadName.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,81 @@ | ||||
| /* | ||||
|  *  IXSetThreadName.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2018 2020 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
| #include "IXSetThreadName.h" | ||||
|  | ||||
| // unix systems | ||||
| #if defined(__APPLE__) || defined(__linux__) || defined(BSD) | ||||
| # include <pthread.h> | ||||
| #endif | ||||
|  | ||||
| // freebsd needs this header as well | ||||
| #if defined(BSD) | ||||
| # include <pthread_np.h> | ||||
| #endif | ||||
|  | ||||
| // Windows | ||||
| #ifdef _WIN32 | ||||
| # include <Windows.h> | ||||
| #endif | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
| #ifdef _WIN32 | ||||
|     const DWORD MS_VC_EXCEPTION = 0x406D1388; | ||||
|  | ||||
| #pragma pack(push, 8) | ||||
|     typedef struct tagTHREADNAME_INFO | ||||
|     { | ||||
|         DWORD dwType;     // Must be 0x1000. | ||||
|         LPCSTR szName;    // Pointer to name (in user addr space). | ||||
|         DWORD dwThreadID; // Thread ID (-1=caller thread). | ||||
|         DWORD dwFlags;    // Reserved for future use, must be zero. | ||||
|     } THREADNAME_INFO; | ||||
| #pragma pack(pop) | ||||
|  | ||||
|     void SetThreadName(DWORD dwThreadID, const char* threadName) | ||||
|     { | ||||
|         THREADNAME_INFO info; | ||||
|         info.dwType = 0x1000; | ||||
|         info.szName = threadName; | ||||
|         info.dwThreadID = dwThreadID; | ||||
|         info.dwFlags = 0; | ||||
|  | ||||
|         __try | ||||
|         { | ||||
|             RaiseException( | ||||
|                 MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info); | ||||
|         } | ||||
|         __except (EXCEPTION_EXECUTE_HANDLER) | ||||
|         { | ||||
|         } | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|     void setThreadName(const std::string& name) | ||||
|     { | ||||
| #if defined(__APPLE__) | ||||
|         // | ||||
|         // Apple reserves 16 bytes for its thread names | ||||
|         // Notice that the Apple version of pthread_setname_np | ||||
|         // does not take a pthread_t argument | ||||
|         // | ||||
|         pthread_setname_np(name.substr(0, 63).c_str()); | ||||
| #elif defined(__linux__) | ||||
|         // | ||||
|         // Linux only reserves 16 bytes for its thread names | ||||
|         // See prctl and PR_SET_NAME property in | ||||
|         // http://man7.org/linux/man-pages/man2/prctl.2.html | ||||
|         // | ||||
|         pthread_setname_np(pthread_self(), name.substr(0, 15).c_str()); | ||||
| #elif defined(_WIN32) | ||||
|         SetThreadName(-1, name.c_str()); | ||||
| #elif defined(BSD) | ||||
|         pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str()); | ||||
| #else | ||||
|         // ... assert here ? | ||||
| #endif | ||||
|     } | ||||
| } // namespace ix | ||||
| @@ -22,7 +22,7 @@ namespace ix | ||||
|     const int SocketServer::kDefaultPort(8080); | ||||
|     const std::string SocketServer::kDefaultHost("127.0.0.1"); | ||||
|     const int SocketServer::kDefaultTcpBacklog(5); | ||||
|     const size_t SocketServer::kDefaultMaxConnections(32); | ||||
|     const size_t SocketServer::kDefaultMaxConnections(128); | ||||
|     const int SocketServer::kDefaultAddressFamily(AF_INET); | ||||
|  | ||||
|     SocketServer::SocketServer( | ||||
|   | ||||
| @@ -6,4 +6,4 @@ | ||||
|  | ||||
| #pragma once | ||||
|  | ||||
| #define IX_WEBSOCKET_VERSION "9.9.0" | ||||
| #define IX_WEBSOCKET_VERSION "9.9.2" | ||||
|   | ||||
| @@ -1,20 +0,0 @@ | ||||
| /* | ||||
|  *  IXSetThreadName_apple.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2018 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
| #include "../IXSetThreadName.h" | ||||
| #include <pthread.h> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     void setThreadName(const std::string& name) | ||||
|     { | ||||
|         // | ||||
|         // Apple reserves 16 bytes for its thread names | ||||
|         // Notice that the Apple version of pthread_setname_np | ||||
|         // does not take a pthread_t argument | ||||
|         // | ||||
|         pthread_setname_np(name.substr(0, 63).c_str()); | ||||
|     } | ||||
| } // namespace ix | ||||
| @@ -1,16 +0,0 @@ | ||||
| /* | ||||
|  *  IXSetThreadName_freebsd.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
| #include "../IXSetThreadName.h" | ||||
| #include <pthread.h> | ||||
| #include <pthread_np.h> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     void setThreadName(const std::string& name) | ||||
|     { | ||||
|         pthread_set_name_np(pthread_self(), name.substr(0, 15).c_str()); | ||||
|     } | ||||
| } // namespace ix | ||||
| @@ -1,20 +0,0 @@ | ||||
| /* | ||||
|  *  IXSetThreadName_linux.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2018 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
| #include "../IXSetThreadName.h" | ||||
| #include <pthread.h> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     void setThreadName(const std::string& name) | ||||
|     { | ||||
|         // | ||||
|         // Linux only reserves 16 bytes for its thread names | ||||
|         // See prctl and PR_SET_NAME property in | ||||
|         // http://man7.org/linux/man-pages/man2/prctl.2.html | ||||
|         // | ||||
|         pthread_setname_np(pthread_self(), name.substr(0, 15).c_str()); | ||||
|     } | ||||
| } // namespace ix | ||||
| @@ -1,46 +0,0 @@ | ||||
| /* | ||||
|  *  IXSetThreadName_windows.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2019 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
| #include "../IXSetThreadName.h" | ||||
|  | ||||
| #include <Windows.h> | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     const DWORD MS_VC_EXCEPTION = 0x406D1388; | ||||
|  | ||||
| #pragma pack(push, 8) | ||||
|     typedef struct tagTHREADNAME_INFO | ||||
|     { | ||||
|         DWORD dwType;     // Must be 0x1000. | ||||
|         LPCSTR szName;    // Pointer to name (in user addr space). | ||||
|         DWORD dwThreadID; // Thread ID (-1=caller thread). | ||||
|         DWORD dwFlags;    // Reserved for future use, must be zero. | ||||
|     } THREADNAME_INFO; | ||||
| #pragma pack(pop) | ||||
|  | ||||
|     void SetThreadName(DWORD dwThreadID, const char* threadName) | ||||
|     { | ||||
|         THREADNAME_INFO info; | ||||
|         info.dwType = 0x1000; | ||||
|         info.szName = threadName; | ||||
|         info.dwThreadID = dwThreadID; | ||||
|         info.dwFlags = 0; | ||||
|  | ||||
|         __try | ||||
|         { | ||||
|             RaiseException( | ||||
|                 MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*) &info); | ||||
|         } | ||||
|         __except (EXCEPTION_EXECUTE_HANDLER) | ||||
|         { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     void setThreadName(const std::string& name) | ||||
|     { | ||||
|         SetThreadName(-1, name.c_str()); | ||||
|     } | ||||
| } // namespace ix | ||||
| @@ -42,6 +42,7 @@ set (SOURCES | ||||
|  | ||||
|   IXSocketTest.cpp | ||||
|   IXSocketConnectTest.cpp | ||||
|   # IXWebSocketLeakTest.cpp # commented until we have a fix for #224 | ||||
|   IXWebSocketServerTest.cpp | ||||
|   IXWebSocketTestConnectionDisconnection.cpp | ||||
|   IXUrlParserTest.cpp | ||||
| @@ -56,6 +57,7 @@ set (SOURCES | ||||
|   IXWebSocketChatTest.cpp | ||||
|   IXWebSocketBroadcastTest.cpp | ||||
|   IXWebSocketPerMessageDeflateCompressorTest.cpp | ||||
|   IXStreamSqlTest.cpp | ||||
| ) | ||||
|  | ||||
| # Some unittest don't work on windows yet | ||||
|   | ||||
| @@ -92,6 +92,9 @@ TEST_CASE("Cobra_to_stdout_bot", "[cobra_bots]") | ||||
|         cobraBotConfig.enableHeartbeat = false; | ||||
|         bool quiet = false; | ||||
|  | ||||
|         cobraBotConfig.filter = | ||||
|             std::string("select * from `") + channel + "` where id = 'sms_metric_A_id'"; | ||||
|  | ||||
|         // We could try to capture the output ... not sure how. | ||||
|         bool fluentd = true; | ||||
|  | ||||
|   | ||||
							
								
								
									
										42
									
								
								test/IXStreamSqlTest.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								test/IXStreamSqlTest.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,42 @@ | ||||
| /* | ||||
|  *  IXStreamSqlTest.cpp | ||||
|  *  Author: Benjamin Sergeant | ||||
|  *  Copyright (c) 2020 Machine Zone. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include "IXTest.h" | ||||
| #include "catch.hpp" | ||||
| #include <iostream> | ||||
| #include <ixsnake/IXStreamSql.h> | ||||
| #include <string.h> | ||||
|  | ||||
| using namespace ix; | ||||
|  | ||||
| namespace ix | ||||
| { | ||||
|     TEST_CASE("stream_sql", "[streamsql]") | ||||
|     { | ||||
|         SECTION("expression A") | ||||
|         { | ||||
|             snake::StreamSql streamSql( | ||||
|                 "select * from subscriber_republished_v1_neo where session LIKE '%123456%'"); | ||||
|  | ||||
|             nlohmann::json msg = {{"session", "123456"}, {"id", "foo_id"}, {"timestamp", 12}}; | ||||
|  | ||||
|             CHECK(streamSql.match(msg)); | ||||
|         } | ||||
|  | ||||
|         SECTION("expression A") | ||||
|         { | ||||
|             snake::StreamSql streamSql("select * from `subscriber_republished_v1_neo` where " | ||||
|                                        "session = '30091320ed8d4e50b758f8409b83bed7'"); | ||||
|  | ||||
|             nlohmann::json msg = {{"session", "30091320ed8d4e50b758f8409b83bed7"}, | ||||
|                                   {"id", "foo_id"}, | ||||
|                                   {"timestamp", 12}}; | ||||
|  | ||||
|             CHECK(streamSql.match(msg)); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| } // namespace ix | ||||
							
								
								
									
										182
									
								
								test/IXWebSocketLeakTest.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										182
									
								
								test/IXWebSocketLeakTest.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,182 @@ | ||||
| /* | ||||
|  *  IXWebSocketServer.cpp | ||||
|  *  Author: Benjamin Sergeant, @marcelkauf | ||||
|  *  Copyright (c) 2020 Machine Zone, Inc. All rights reserved. | ||||
|  */ | ||||
|  | ||||
| #include "IXTest.h" | ||||
|  | ||||
| #include "catch.hpp" | ||||
| #include <memory> | ||||
| #include <sstream> | ||||
|  | ||||
| #include <ixwebsocket/IXWebSocket.h> | ||||
| #include <ixwebsocket/IXWebSocketServer.h> | ||||
|  | ||||
| using namespace ix; | ||||
|  | ||||
| namespace | ||||
| { | ||||
|     class WebSocketClient | ||||
|     { | ||||
|     public: | ||||
|         WebSocketClient(int port); | ||||
|         void start(); | ||||
|         void stop(); | ||||
|         bool isReady() const; | ||||
|         bool hasConnectionError() const; | ||||
|  | ||||
|     private: | ||||
|         ix::WebSocket _webSocket; | ||||
|         int _port; | ||||
|         std::atomic<bool> _connectionError; | ||||
|     }; | ||||
|  | ||||
|     WebSocketClient::WebSocketClient(int port) | ||||
|         : _port(port) | ||||
|         , _connectionError(false) | ||||
|     { | ||||
|     } | ||||
|  | ||||
|     bool WebSocketClient::hasConnectionError() const | ||||
|     { | ||||
|         return _connectionError; | ||||
|     } | ||||
|  | ||||
|     bool WebSocketClient::isReady() const | ||||
|     { | ||||
|         return _webSocket.getReadyState() == ix::ReadyState::Open; | ||||
|     } | ||||
|  | ||||
|     void WebSocketClient::stop() | ||||
|     { | ||||
|         _webSocket.stop(); | ||||
|     } | ||||
|  | ||||
|     void WebSocketClient::start() | ||||
|     { | ||||
|         std::string url; | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             ss << "ws://localhost:" << _port << "/"; | ||||
|  | ||||
|             url = ss.str(); | ||||
|         } | ||||
|  | ||||
|         _webSocket.setUrl(url); | ||||
|         _webSocket.disableAutomaticReconnection(); | ||||
|  | ||||
|         std::stringstream ss; | ||||
|         log(std::string("Connecting to url: ") + url); | ||||
|  | ||||
|         _webSocket.setOnMessageCallback([this](const ix::WebSocketMessagePtr& msg) | ||||
|         { | ||||
|             std::stringstream ss; | ||||
|             if (msg->type == ix::WebSocketMessageType::Open) | ||||
|             { | ||||
|                 log("client connected"); | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|             { | ||||
|                 log("client disconnected"); | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Error) | ||||
|             { | ||||
|                 _connectionError = true; | ||||
|                 log("error"); | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Pong) | ||||
|             { | ||||
|                 log("pong"); | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Ping) | ||||
|             { | ||||
|                 log("ping"); | ||||
|             } | ||||
|             else if (msg->type == ix::WebSocketMessageType::Message) | ||||
|             { | ||||
|                 log("message"); | ||||
|             } | ||||
|             else | ||||
|             { | ||||
|                 log("invalid type"); | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|         _webSocket.start(); | ||||
|     } | ||||
| } // namespace | ||||
|  | ||||
| TEST_CASE("Websocket leak test") | ||||
| { | ||||
|     SECTION("Websocket destructor is called when closing the connection.") | ||||
|     { | ||||
|         // stores the server websocket in order to check the use_count | ||||
|         std::shared_ptr<WebSocket> webSocketPtr; | ||||
|  | ||||
|         { | ||||
|             int port = getFreePort(); | ||||
|             WebSocketServer server(port); | ||||
|  | ||||
|             server.setOnConnectionCallback([&webSocketPtr](std::shared_ptr<ix::WebSocket> webSocket, | ||||
|                                                            std::shared_ptr<ConnectionState> connectionState, | ||||
|                                                            std::unique_ptr<ConnectionInfo> connectionInfo) | ||||
|            { | ||||
|                 // original ptr in WebSocketServer::handleConnection and the callback argument | ||||
|                 REQUIRE(webSocket.use_count() == 2); | ||||
|                 webSocket->setOnMessageCallback([&webSocketPtr, webSocket, connectionState](const ix::WebSocketMessagePtr& msg) | ||||
|                 { | ||||
|                     if (msg->type == ix::WebSocketMessageType::Open) | ||||
|                     { | ||||
|                         log(std::string("New connection id: ") + connectionState->getId()); | ||||
|                         // original ptr in WebSocketServer::handleConnection, captured ptr of this callback, and ptr in WebSocketServer::_clients | ||||
|                         REQUIRE(webSocket.use_count() == 3); | ||||
|                         webSocketPtr = std::shared_ptr<WebSocket>(webSocket); | ||||
|                         REQUIRE(webSocket.use_count() == 4); | ||||
|                     } | ||||
|                     else if (msg->type == ix::WebSocketMessageType::Close) | ||||
|                     { | ||||
|                         log(std::string("Client closed connection id: ") + connectionState->getId()); | ||||
|                     } | ||||
|                     else | ||||
|                     { | ||||
|                         log(std::string(msg->str)); | ||||
|                     } | ||||
|                 }); | ||||
|                 // original ptr in WebSocketServer::handleConnection, argument of this callback, and captured ptr in websocket callback | ||||
|                 REQUIRE(webSocket.use_count() == 3); | ||||
|             }); | ||||
|  | ||||
|             server.listen(); | ||||
|             server.start(); | ||||
|  | ||||
|             WebSocketClient webSocketClient(port); | ||||
|             webSocketClient.start(); | ||||
|  | ||||
|             while (true) | ||||
|             { | ||||
|                 REQUIRE(!webSocketClient.hasConnectionError()); | ||||
|                 if (webSocketClient.isReady()) break; | ||||
|                 ix::msleep(10); | ||||
|             } | ||||
|  | ||||
|             REQUIRE(server.getClients().size() == 1); | ||||
|             // same value as in Open-handler above | ||||
|             REQUIRE(webSocketPtr.use_count() == 4); | ||||
|  | ||||
|             ix::msleep(500); | ||||
|             webSocketClient.stop(); | ||||
|             ix::msleep(500); | ||||
|             REQUIRE(server.getClients().size() == 0); | ||||
|  | ||||
|             // websocket should only be referenced by webSocketPtr but is still used by the websocket callback | ||||
|             REQUIRE(webSocketPtr.use_count() == 1); | ||||
|             webSocketPtr->setOnMessageCallback(nullptr); | ||||
|             // websocket should only be referenced by webSocketPtr | ||||
|             REQUIRE(webSocketPtr.use_count() == 1); | ||||
|             server.stop(); | ||||
|         } | ||||
|         // websocket should only be referenced by webSocketPtr | ||||
|         REQUIRE(webSocketPtr.use_count() == 1); | ||||
|     } | ||||
| } | ||||
| @@ -122,6 +122,7 @@ int main(int argc, char** argv) | ||||
|     std::string key; | ||||
|     std::string logfile; | ||||
|     std::string scriptPath; | ||||
|     std::string republishChannel; | ||||
|     ix::SocketTLSOptions tlsOptions; | ||||
|     ix::CobraConfig cobraConfig; | ||||
|     ix::CobraBotConfig cobraBotConfig; | ||||
| @@ -391,6 +392,7 @@ int main(int argc, char** argv) | ||||
|     snakeApp->add_option("--redis_password", redisPassword, "Redis password"); | ||||
|     snakeApp->add_option("--apps_config_path", appsConfigPath, "Path to auth data") | ||||
|         ->check(CLI::ExistingPath); | ||||
|     snakeApp->add_option("--republish_channel", republishChannel, "Republish channel"); | ||||
|     snakeApp->add_flag("-v", verbose, "Verbose"); | ||||
|     snakeApp->add_flag("-d", disablePong, "Disable Pongs"); | ||||
|     addTLSOptions(snakeApp); | ||||
| @@ -637,7 +639,8 @@ int main(int argc, char** argv) | ||||
|                                 verbose, | ||||
|                                 appsConfigPath, | ||||
|                                 tlsOptions, | ||||
|                                 disablePong); | ||||
|                                 disablePong, | ||||
|                                 republishChannel); | ||||
|     } | ||||
|     else if (app.got_subcommand("httpd")) | ||||
|     { | ||||
|   | ||||
							
								
								
									
										3
									
								
								ws/ws.h
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								ws/ws.h
									
									
									
									
									
								
							| @@ -103,7 +103,8 @@ namespace ix | ||||
|                       bool verbose, | ||||
|                       const std::string& appsConfigPath, | ||||
|                       const ix::SocketTLSOptions& tlsOptions, | ||||
|                       bool disablePong); | ||||
|                       bool disablePong, | ||||
|                       const std::string& republishChannel); | ||||
|  | ||||
|     int ws_httpd_main(int port, | ||||
|                       const std::string& hostname, | ||||
|   | ||||
| @@ -45,7 +45,8 @@ namespace ix | ||||
|                       bool verbose, | ||||
|                       const std::string& appsConfigPath, | ||||
|                       const SocketTLSOptions& socketTLSOptions, | ||||
|                       bool disablePong) | ||||
|                       bool disablePong, | ||||
|                       const std::string& republishChannel) | ||||
|     { | ||||
|         snake::AppConfig appConfig; | ||||
|         appConfig.port = port; | ||||
| @@ -55,6 +56,7 @@ namespace ix | ||||
|         appConfig.redisPassword = redisPassword; | ||||
|         appConfig.socketTLSOptions = socketTLSOptions; | ||||
|         appConfig.disablePong = disablePong; | ||||
|         appConfig.republishChannel = republishChannel; | ||||
|  | ||||
|         // Parse config file | ||||
|         auto str = readAsString(appsConfigPath); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user