From 977feae1d6a5792d377c9009463df555e8bb4b37 Mon Sep 17 00:00:00 2001 From: Benjamin Sergeant Date: Thu, 25 Oct 2018 12:01:47 -0700 Subject: [PATCH] Better ping/pong support --- examples/ping_pong/.gitignore | 1 + examples/ping_pong/CMakeLists.txt | 27 ++++++ examples/ping_pong/client.py | 17 ++++ examples/ping_pong/ping_pong.cpp | 133 +++++++++++++++++++++++++++ examples/ping_pong/server.py | 13 +++ examples/ping_pong/test.sh | 4 + ixwebsocket/IXWebSocket.cpp | 46 ++++++++- ixwebsocket/IXWebSocket.h | 7 +- ixwebsocket/IXWebSocketTransport.cpp | 26 +++++- ixwebsocket/IXWebSocketTransport.h | 12 ++- 10 files changed, 276 insertions(+), 10 deletions(-) create mode 100644 examples/ping_pong/.gitignore create mode 100644 examples/ping_pong/CMakeLists.txt create mode 100644 examples/ping_pong/client.py create mode 100644 examples/ping_pong/ping_pong.cpp create mode 100644 examples/ping_pong/server.py create mode 100644 examples/ping_pong/test.sh diff --git a/examples/ping_pong/.gitignore b/examples/ping_pong/.gitignore new file mode 100644 index 00000000..5ceb3864 --- /dev/null +++ b/examples/ping_pong/.gitignore @@ -0,0 +1 @@ +venv diff --git a/examples/ping_pong/CMakeLists.txt b/examples/ping_pong/CMakeLists.txt new file mode 100644 index 00000000..95ad2585 --- /dev/null +++ b/examples/ping_pong/CMakeLists.txt @@ -0,0 +1,27 @@ +# +# Author: Benjamin Sergeant +# Copyright (c) 2018 Machine Zone, Inc. All rights reserved. +# + +cmake_minimum_required (VERSION 3.4.1) +project (ping_pong) + +set (CMAKE_CXX_STANDARD 11) + +option(USE_TLS "Add TLS support" ON) + +add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket) + +add_executable(ping_pong ping_pong.cpp) + +if (APPLE AND USE_TLS) + target_link_libraries(ping_pong "-framework foundation" "-framework security") +endif() + +if (WIN32) + target_link_libraries(ping_pong wsock32 ws2_32) + add_definitions(-D_CRT_SECURE_NO_WARNINGS) +endif() + +target_link_libraries(ping_pong ixwebsocket) +install(TARGETS ping_pong DESTINATION bin) diff --git a/examples/ping_pong/client.py b/examples/ping_pong/client.py new file mode 100644 index 00000000..b495d44f --- /dev/null +++ b/examples/ping_pong/client.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +import asyncio +import websockets + +async def hello(uri): + async with websockets.connect(uri) as websocket: + await websocket.send("Hello world!") + response = await websocket.recv() + print(response) + + pong_waiter = await websocket.ping('coucou') + ret = await pong_waiter # only if you want to wait for the pong + print(ret) + +asyncio.get_event_loop().run_until_complete( + hello('ws://localhost:5678')) diff --git a/examples/ping_pong/ping_pong.cpp b/examples/ping_pong/ping_pong.cpp new file mode 100644 index 00000000..4ac55b4c --- /dev/null +++ b/examples/ping_pong/ping_pong.cpp @@ -0,0 +1,133 @@ +/* + * ws_connect.cpp + * Author: Benjamin Sergeant + * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. + */ + +#include +#include +#include +#include + +using namespace ix; + +namespace +{ + void log(const std::string& msg) + { + std::cout << msg << std::endl; + } + + class WebSocketPingPong + { + public: + WebSocketPingPong(const std::string& _url); + + void subscribe(const std::string& channel); + void start(); + void stop(); + + void ping(const std::string& text); + + private: + std::string _url; + ix::WebSocket _webSocket; + }; + + WebSocketPingPong::WebSocketPingPong(const std::string& url) : + _url(url) + { + ; + } + + void WebSocketPingPong::stop() + { + _webSocket.stop(); + } + + void WebSocketPingPong::start() + { + _webSocket.configure(_url); + + std::stringstream ss; + log(std::string("Connecting to url: ") + _url); + + _webSocket.setOnMessageCallback( + [this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error) + { + std::stringstream ss; + if (messageType == ix::WebSocket_MessageType_Open) + { + log("ws_connect: connected"); + } + else if (messageType == ix::WebSocket_MessageType_Close) + { + log("ws_connect: disconnected"); + } + else if (messageType == ix::WebSocket_MessageType_Message) + { + ss << "ws_connect: received message: " + << str; + log(ss.str()); + } + else if (messageType == ix::WebSocket_MessageType_Error) + { + ss << "Connection error: " << error.reason << std::endl; + ss << "#retries: " << error.retries << std::endl; + ss << "Wait time(ms): " << error.wait_time << std::endl; + ss << "HTTP Status: " << error.http_status << std::endl; + log(ss.str()); + } + else if (messageType == ix::WebSocket_MessageType_Pong) + { + ss << "Invalid ix::WebSocketMessageType"; + log(ss.str()); + } + }); + + _webSocket.start(); + } + + void WebSocketPingPong::ping(const std::string& text) + { + _webSocket.ping(text); + } + + void interactiveMain(const std::string& url) + { + std::cout << "Type Ctrl-D to exit prompt..." << std::endl; + WebSocketPingPong webSocketPingPong(url); + webSocketPingPong.start(); + + while (true) + { + std::string text; + std::cout << "> " << std::flush; + std::getline(std::cin, text); + + if (!std::cin) + { + break; + } + + webSocketPingPong.ping(text); + } + + std::cout << std::endl; + webSocketPingPong.stop(); + } +} + +int main(int argc, char** argv) +{ + if (argc != 2) + { + std::cerr << "Usage: ping_pong " << std::endl; + return 1; + } + std::string url = argv[1]; + + Socket::init(); + interactiveMain(url); + return 0; +} diff --git a/examples/ping_pong/server.py b/examples/ping_pong/server.py new file mode 100644 index 00000000..c1f8b8a5 --- /dev/null +++ b/examples/ping_pong/server.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +import asyncio +import websockets + +async def echo(websocket, path): + async for message in websocket: + print(message) + await websocket.send(message) + +asyncio.get_event_loop().run_until_complete( + websockets.serve(echo, 'localhost', 5678)) +asyncio.get_event_loop().run_forever() diff --git a/examples/ping_pong/test.sh b/examples/ping_pong/test.sh new file mode 100644 index 00000000..9504bbb4 --- /dev/null +++ b/examples/ping_pong/test.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +(cd build ; make) +./build/ping_pong ws://localhost:5678 diff --git a/ixwebsocket/IXWebSocket.cpp b/ixwebsocket/IXWebSocket.cpp index 9d5c40b4..fab18b53 100644 --- a/ixwebsocket/IXWebSocket.cpp +++ b/ixwebsocket/IXWebSocket.cpp @@ -177,8 +177,28 @@ namespace ix { // 3. Dispatch the incoming messages _ws.dispatch( - [this](const std::string& msg) + [this](const std::string& msg, + WebSocketTransport::MessageKind messageKind) { + WebSocketMessageType webSocketMessageType; + switch (messageKind) + { + case WebSocketTransport::MSG: + { + webSocketMessageType = WebSocket_MessageType_Message; + } break; + + case WebSocketTransport::PING: + { + webSocketMessageType = WebSocket_MessageType_Ping; + } break; + + case WebSocketTransport::PONG: + { + webSocketMessageType = WebSocket_MessageType_Pong; + } break; + } + _onMessageCallback(WebSocket_MessageType_Message, msg, WebSocketErrorInfo()); WebSocket::invokeTrafficTrackerCallback(msg.size(), true); @@ -210,6 +230,20 @@ namespace ix { } bool WebSocket::send(const std::string& text) + { + return sendMessage(text, false); + } + + bool WebSocket::ping(const std::string& text) + { + // Standard limit ping message size + constexpr size_t pingMaxPayloadSize = 125; + if (text.size() > pingMaxPayloadSize) return false; + + return sendMessage(text, true); + } + + bool WebSocket::sendMessage(const std::string& text, bool ping) { if (!isConnected()) return false; @@ -223,7 +257,15 @@ namespace ix { // incoming messages are arriving / there's data to be received. // std::lock_guard lock(_writeMutex); - _ws.sendBinary(text); + + if (ping) + { + _ws.sendPing(text); + } + else + { + _ws.sendBinary(text); + } WebSocket::invokeTrafficTrackerCallback(text.size(), false); diff --git a/ixwebsocket/IXWebSocket.h b/ixwebsocket/IXWebSocket.h index afe03813..749e9df7 100644 --- a/ixwebsocket/IXWebSocket.h +++ b/ixwebsocket/IXWebSocket.h @@ -32,7 +32,9 @@ namespace ix WebSocket_MessageType_Message = 0, WebSocket_MessageType_Open = 1, WebSocket_MessageType_Close = 2, - WebSocket_MessageType_Error = 3 + WebSocket_MessageType_Error = 3, + WebSocket_MessageType_Ping = 4, + WebSocket_MessageType_Pong = 5 }; struct WebSocketErrorInfo @@ -56,6 +58,7 @@ namespace ix void start(); void stop(); bool send(const std::string& text); + bool ping(const std::string& text); void close(); void setOnMessageCallback(const OnMessageCallback& callback); @@ -70,6 +73,8 @@ namespace ix private: void run(); + bool sendMessage(const std::string& text, bool ping); + WebSocketInitResult connect(); bool isConnected() const; bool isClosing() const; diff --git a/ixwebsocket/IXWebSocketTransport.cpp b/ixwebsocket/IXWebSocketTransport.cpp index 9f02fa90..be113148 100644 --- a/ixwebsocket/IXWebSocketTransport.cpp +++ b/ixwebsocket/IXWebSocketTransport.cpp @@ -449,7 +449,7 @@ namespace ix { // fire callback with a string message std::string stringMessage(_receivedData.begin(), _receivedData.end()); - onMessageCallback(stringMessage); + onMessageCallback(stringMessage, MSG); _receivedData.clear(); } @@ -467,10 +467,27 @@ namespace ix { std::string pingData(_rxbuf.begin()+ws.header_size, _rxbuf.begin()+ws.header_size + (size_t) ws.N); + // Reply back right away sendData(wsheader_type::PONG, pingData.size(), pingData.begin(), pingData.end()); + + onMessageCallback(pingData, PING); + } + else if (ws.opcode == wsheader_type::PONG) + { + if (ws.mask) + { + for (size_t j = 0; j != ws.N; ++j) + { + _rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3]; + } + } + + std::string pongData(_rxbuf.begin()+ws.header_size, + _rxbuf.begin()+ws.header_size + (size_t) ws.N); + + onMessageCallback(pongData, PONG); } - else if (ws.opcode == wsheader_type::PONG) { } else if (ws.opcode == wsheader_type::CLOSE) { close(); } else { close(); } @@ -556,10 +573,9 @@ namespace ix { sendOnSocket(); } - void WebSocketTransport::sendPing() + void WebSocketTransport::sendPing(const std::string& message) { - std::string empty; - sendData(wsheader_type::PING, empty.size(), empty.begin(), empty.end()); + sendData(wsheader_type::PING, message.size(), message.begin(), message.end()); } void WebSocketTransport::sendBinary(const std::string& message) diff --git a/ixwebsocket/IXWebSocketTransport.h b/ixwebsocket/IXWebSocketTransport.h index 1fc1f2e4..08f872c4 100644 --- a/ixwebsocket/IXWebSocketTransport.h +++ b/ixwebsocket/IXWebSocketTransport.h @@ -54,7 +54,15 @@ namespace ix OPEN }; - using OnMessageCallback = std::function; + enum MessageKind + { + MSG, + PING, + PONG + }; + + using OnMessageCallback = std::function; using OnStateChangeCallback = std::function; WebSocketTransport(); @@ -67,7 +75,7 @@ namespace ix void send(const std::string& message); void sendBinary(const std::string& message); void sendBinary(const std::vector& message); - void sendPing(); + void sendPing(const std::string& message); void close(); ReadyStateValues getReadyState() const; void setReadyState(ReadyStateValues readyStateValue);