Better ping/pong support

This commit is contained in:
Benjamin Sergeant 2018-10-25 12:01:47 -07:00
parent 9c872fcc3e
commit 977feae1d6
10 changed files with 276 additions and 10 deletions

1
examples/ping_pong/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
venv

View File

@ -0,0 +1,27 @@
#
# Author: Benjamin Sergeant
# Copyright (c) 2018 Machine Zone, Inc. All rights reserved.
#
cmake_minimum_required (VERSION 3.4.1)
project (ping_pong)
set (CMAKE_CXX_STANDARD 11)
option(USE_TLS "Add TLS support" ON)
add_subdirectory(${PROJECT_SOURCE_DIR}/../.. ixwebsocket)
add_executable(ping_pong ping_pong.cpp)
if (APPLE AND USE_TLS)
target_link_libraries(ping_pong "-framework foundation" "-framework security")
endif()
if (WIN32)
target_link_libraries(ping_pong wsock32 ws2_32)
add_definitions(-D_CRT_SECURE_NO_WARNINGS)
endif()
target_link_libraries(ping_pong ixwebsocket)
install(TARGETS ping_pong DESTINATION bin)

View File

@ -0,0 +1,17 @@
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
response = await websocket.recv()
print(response)
pong_waiter = await websocket.ping('coucou')
ret = await pong_waiter # only if you want to wait for the pong
print(ret)
asyncio.get_event_loop().run_until_complete(
hello('ws://localhost:5678'))

View File

@ -0,0 +1,133 @@
/*
* ws_connect.cpp
* Author: Benjamin Sergeant
* Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
*/
#include <iostream>
#include <sstream>
#include <ixwebsocket/IXWebSocket.h>
#include <ixwebsocket/IXSocket.h>
using namespace ix;
namespace
{
void log(const std::string& msg)
{
std::cout << msg << std::endl;
}
class WebSocketPingPong
{
public:
WebSocketPingPong(const std::string& _url);
void subscribe(const std::string& channel);
void start();
void stop();
void ping(const std::string& text);
private:
std::string _url;
ix::WebSocket _webSocket;
};
WebSocketPingPong::WebSocketPingPong(const std::string& url) :
_url(url)
{
;
}
void WebSocketPingPong::stop()
{
_webSocket.stop();
}
void WebSocketPingPong::start()
{
_webSocket.configure(_url);
std::stringstream ss;
log(std::string("Connecting to url: ") + _url);
_webSocket.setOnMessageCallback(
[this](ix::WebSocketMessageType messageType, const std::string& str, ix::WebSocketErrorInfo error)
{
std::stringstream ss;
if (messageType == ix::WebSocket_MessageType_Open)
{
log("ws_connect: connected");
}
else if (messageType == ix::WebSocket_MessageType_Close)
{
log("ws_connect: disconnected");
}
else if (messageType == ix::WebSocket_MessageType_Message)
{
ss << "ws_connect: received message: "
<< str;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Error)
{
ss << "Connection error: " << error.reason << std::endl;
ss << "#retries: " << error.retries << std::endl;
ss << "Wait time(ms): " << error.wait_time << std::endl;
ss << "HTTP Status: " << error.http_status << std::endl;
log(ss.str());
}
else if (messageType == ix::WebSocket_MessageType_Pong)
{
ss << "Invalid ix::WebSocketMessageType";
log(ss.str());
}
});
_webSocket.start();
}
void WebSocketPingPong::ping(const std::string& text)
{
_webSocket.ping(text);
}
void interactiveMain(const std::string& url)
{
std::cout << "Type Ctrl-D to exit prompt..." << std::endl;
WebSocketPingPong webSocketPingPong(url);
webSocketPingPong.start();
while (true)
{
std::string text;
std::cout << "> " << std::flush;
std::getline(std::cin, text);
if (!std::cin)
{
break;
}
webSocketPingPong.ping(text);
}
std::cout << std::endl;
webSocketPingPong.stop();
}
}
int main(int argc, char** argv)
{
if (argc != 2)
{
std::cerr << "Usage: ping_pong <url>" << std::endl;
return 1;
}
std::string url = argv[1];
Socket::init();
interactiveMain(url);
return 0;
}

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(message)
await websocket.send(message)
asyncio.get_event_loop().run_until_complete(
websockets.serve(echo, 'localhost', 5678))
asyncio.get_event_loop().run_forever()

View File

@ -0,0 +1,4 @@
#!/bin/sh
(cd build ; make)
./build/ping_pong ws://localhost:5678

View File

@ -177,8 +177,28 @@ namespace ix {
// 3. Dispatch the incoming messages // 3. Dispatch the incoming messages
_ws.dispatch( _ws.dispatch(
[this](const std::string& msg) [this](const std::string& msg,
WebSocketTransport::MessageKind messageKind)
{ {
WebSocketMessageType webSocketMessageType;
switch (messageKind)
{
case WebSocketTransport::MSG:
{
webSocketMessageType = WebSocket_MessageType_Message;
} break;
case WebSocketTransport::PING:
{
webSocketMessageType = WebSocket_MessageType_Ping;
} break;
case WebSocketTransport::PONG:
{
webSocketMessageType = WebSocket_MessageType_Pong;
} break;
}
_onMessageCallback(WebSocket_MessageType_Message, msg, WebSocketErrorInfo()); _onMessageCallback(WebSocket_MessageType_Message, msg, WebSocketErrorInfo());
WebSocket::invokeTrafficTrackerCallback(msg.size(), true); WebSocket::invokeTrafficTrackerCallback(msg.size(), true);
@ -210,6 +230,20 @@ namespace ix {
} }
bool WebSocket::send(const std::string& text) bool WebSocket::send(const std::string& text)
{
return sendMessage(text, false);
}
bool WebSocket::ping(const std::string& text)
{
// Standard limit ping message size
constexpr size_t pingMaxPayloadSize = 125;
if (text.size() > pingMaxPayloadSize) return false;
return sendMessage(text, true);
}
bool WebSocket::sendMessage(const std::string& text, bool ping)
{ {
if (!isConnected()) return false; if (!isConnected()) return false;
@ -223,7 +257,15 @@ namespace ix {
// incoming messages are arriving / there's data to be received. // incoming messages are arriving / there's data to be received.
// //
std::lock_guard<std::mutex> lock(_writeMutex); std::lock_guard<std::mutex> lock(_writeMutex);
_ws.sendBinary(text);
if (ping)
{
_ws.sendPing(text);
}
else
{
_ws.sendBinary(text);
}
WebSocket::invokeTrafficTrackerCallback(text.size(), false); WebSocket::invokeTrafficTrackerCallback(text.size(), false);

View File

@ -32,7 +32,9 @@ namespace ix
WebSocket_MessageType_Message = 0, WebSocket_MessageType_Message = 0,
WebSocket_MessageType_Open = 1, WebSocket_MessageType_Open = 1,
WebSocket_MessageType_Close = 2, WebSocket_MessageType_Close = 2,
WebSocket_MessageType_Error = 3 WebSocket_MessageType_Error = 3,
WebSocket_MessageType_Ping = 4,
WebSocket_MessageType_Pong = 5
}; };
struct WebSocketErrorInfo struct WebSocketErrorInfo
@ -56,6 +58,7 @@ namespace ix
void start(); void start();
void stop(); void stop();
bool send(const std::string& text); bool send(const std::string& text);
bool ping(const std::string& text);
void close(); void close();
void setOnMessageCallback(const OnMessageCallback& callback); void setOnMessageCallback(const OnMessageCallback& callback);
@ -70,6 +73,8 @@ namespace ix
private: private:
void run(); void run();
bool sendMessage(const std::string& text, bool ping);
WebSocketInitResult connect(); WebSocketInitResult connect();
bool isConnected() const; bool isConnected() const;
bool isClosing() const; bool isClosing() const;

View File

@ -449,7 +449,7 @@ namespace ix {
// fire callback with a string message // fire callback with a string message
std::string stringMessage(_receivedData.begin(), std::string stringMessage(_receivedData.begin(),
_receivedData.end()); _receivedData.end());
onMessageCallback(stringMessage); onMessageCallback(stringMessage, MSG);
_receivedData.clear(); _receivedData.clear();
} }
@ -467,10 +467,27 @@ namespace ix {
std::string pingData(_rxbuf.begin()+ws.header_size, std::string pingData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N); _rxbuf.begin()+ws.header_size + (size_t) ws.N);
// Reply back right away
sendData(wsheader_type::PONG, pingData.size(), sendData(wsheader_type::PONG, pingData.size(),
pingData.begin(), pingData.end()); pingData.begin(), pingData.end());
onMessageCallback(pingData, PING);
}
else if (ws.opcode == wsheader_type::PONG)
{
if (ws.mask)
{
for (size_t j = 0; j != ws.N; ++j)
{
_rxbuf[j+ws.header_size] ^= ws.masking_key[j&0x3];
}
}
std::string pongData(_rxbuf.begin()+ws.header_size,
_rxbuf.begin()+ws.header_size + (size_t) ws.N);
onMessageCallback(pongData, PONG);
} }
else if (ws.opcode == wsheader_type::PONG) { }
else if (ws.opcode == wsheader_type::CLOSE) { close(); } else if (ws.opcode == wsheader_type::CLOSE) { close(); }
else { close(); } else { close(); }
@ -556,10 +573,9 @@ namespace ix {
sendOnSocket(); sendOnSocket();
} }
void WebSocketTransport::sendPing() void WebSocketTransport::sendPing(const std::string& message)
{ {
std::string empty; sendData(wsheader_type::PING, message.size(), message.begin(), message.end());
sendData(wsheader_type::PING, empty.size(), empty.begin(), empty.end());
} }
void WebSocketTransport::sendBinary(const std::string& message) void WebSocketTransport::sendBinary(const std::string& message)

View File

@ -54,7 +54,15 @@ namespace ix
OPEN OPEN
}; };
using OnMessageCallback = std::function<void(const std::string&)>; enum MessageKind
{
MSG,
PING,
PONG
};
using OnMessageCallback = std::function<void(const std::string&,
MessageKind)>;
using OnStateChangeCallback = std::function<void(ReadyStateValues)>; using OnStateChangeCallback = std::function<void(ReadyStateValues)>;
WebSocketTransport(); WebSocketTransport();
@ -67,7 +75,7 @@ namespace ix
void send(const std::string& message); void send(const std::string& message);
void sendBinary(const std::string& message); void sendBinary(const std::string& message);
void sendBinary(const std::vector<uint8_t>& message); void sendBinary(const std::vector<uint8_t>& message);
void sendPing(); void sendPing(const std::string& message);
void close(); void close();
ReadyStateValues getReadyState() const; ReadyStateValues getReadyState() const;
void setReadyState(ReadyStateValues readyStateValue); void setReadyState(ReadyStateValues readyStateValue);