IXWebSocket/doc/redis_conf_2019/slides.html

1177 lines
33 KiB
HTML
Raw Normal View History

2019-03-27 23:53:55 +01:00
<!DOCTYPE html>
<html>
<head>
<title>Title</title>
<meta charset="utf-8">
<style>
@import url(https://fonts.googleapis.com/css?family=Yanone+Kaffeesatz);
@import url(https://fonts.googleapis.com/css?family=Droid+Serif:400,700,400italic);
@import url(https://fonts.googleapis.com/css?family=Ubuntu+Mono:400,700,400italic);
body { font-family: 'Droid Serif'; }
h1, h2, h3 {
font-family: 'Yanone Kaffeesatz';
font-weight: normal;
}
.remark-slide-content h1 { font-size: 3em; }
.remark-slide-content h2 { font-size: 2em; }
.remark-slide-content h3 { font-size: 1.6em; }
.remark-code, .remark-inline-code { font-family: 'Ubuntu Mono'; }
/* Two-column layout */
.left-column {
color: #777;
width: 20%;
height: 92%;
float: left;
}
.left-column h2:last-of-type, .left-column h3:last-child {
color: #000;
}
.right-column {
width: 75%;
float: right;
padding-top: 1em;
}
#slide-first {
background-image: url(redisconf_first_slide.png);
background-position: center;
background-repeat: no-repeat;
background-size: contain;
}
#slide-last {
background-image: url(redisconf_last_slide.png);
background-position: center;
background-repeat: no-repeat;
background-size: contain;
}
#slide-tenyears {
background-image: url(redisconf_10_years.png);
background-position: center;
background-repeat: no-repeat;
background-size: contain;
}
</style>
</style>
</head>
<body>
<textarea id="source">
class: center, middle
# RedisConf 2019
Real-Time Health Analytics with WebSockets, Python 3 and Redis PubSub
---
name: first
---
name: ten-years
---
class: center, middle
# Hello world
- Benjamin Sergeant
![Alt text](mz_engine.png)
???
Been working on Graphics a lot, at MZ I've worked on the Game Engine, on fast asset delivery, the game chat and monitoring a lot.
???
== 15 seconds
---
# Agenda
1. Architecture
2. Clients metrics publishing
3. Message broker Cobra
4. Bots and data sinks
---
# Using AWK (stream programming) for analytics
```
Stdin AWK Stdout
+---------------+
| |
| $1 == "fps" |
fps:60 +--->+ { print $2 } +----> 60
mem:100000 | |
... | |
+---------------+
```
```shell
$ cat /tmp/input
memory:100000
fps:60
$
$
$ awk -F: '$1 == "fps" { print $2 }' < /tmp/input
60
```
???
Awk is a very simple programming language following the UNIX principles of processing text passed in on standard-in, and writing it to standard out.
The stream is defined by multiple lines, and each lines contains multiple tokens. A field separator defines how lines are broken down into tokens.
In the common case, an awk program consists of a pattern, and of an action. Awk will search the input for a pattern, and for each line that match the pattern it will execute an action.
Let's imagine that we are building an analytic solution with awk. Each event that we care about will be a line, and we'll have fields to represent an id and other data for that event.
If we want to write a report that extracts all the information about the FPS, our awk program pattern will match the fps term, and the action will extract the value (60 because our game is perfect and always run at the ideal frame rate :). Something that awk could do is sum up those values and display an average.
---
# Using PubSub for analytics
## One subscriber
```
Publishers Broker Subscribers
+-------------------------------+
channel | | channel
{ id: fps_id, -----> | SELECT data.fps from `fps_id` +---------> 60
data: { fps: 60 } } | |
channel | |
{ id: mem_id, -----> | |
data: { mem: 100000 } } | |
| |
+-------------------------------+
```
???
== 60 seconds
---
# PubSub for analytics
## N subscribers
```
Publishers Broker Subscribers
+-------------------------------+
| |
{ id: fps_id, -----> | SELECT data.fps from `fps_id` +------> 60
data: { fps: 60 } } | |
| |
{ id: mem_id, -----> | SELECT data.mem from `mem_id` +------> 100000
data: { mem: 100000 } } | |
| |
+-------------------------------+
```
???
== 15 seconds
---
# Requirements
- Should:
- Real Time (fast)
- Work on Mobile
- Easy to write subscribers
- Should *not*:
- Guaranteed message delivery (no ack support)
- Filling hard drives
???
---
.left-column[
## Messaging protocol
## Publishers
]
---
# Publisher <-> Broker
```
Publisher Broker
+-------------------------------+ +---------------------+
| | | |
| { data: { fps: 30 }, | handshake | |
| device: +------------->+ |
| { app_version: '3.25', | | |
| game: 'wiso', | handshake/ok | |
| model: 'iPhone', +<-------------+ Cobra |
| os_language: 'en', | | |
| os_name: 'iOS', | auth | * Python |
| os_version: '10.3.3' }, +------------->+ * Redis PubSub |
| id: 'engine_fps_id', | | |
| session: '4cb5989ba', | auth/ok | |
| timestamp: 1501190629268, +<-------------+ |
| version: 1 } | | |
| | publish | |
| +------------->+ |
+-------------------------------+ +---------------------+
JSON
WebSocket Transport
```
---
# WebSocket
1. TCP based
- Persistent connection
- Bi-directional (C <-> S)
- Full duplex
2. Compatible with HTTP(S)
3. Inter-operability (browser)
4. zlib compression
5. Heart-beat
6. Large message segmentation
---
# IXWebSocket
- We open-sourced our C++ implementation (client + server + http client)
- On [GitHub](https://github.com/machinezone/IXWebSocket) (star it !)
- Why ?
- No boost dependency
- Asynchronous and Interruptible/Cancellable
- Auto-reconnection logic
---
# ws
```
$ ws --help
ws is a websocket tool
Usage: ws [OPTIONS] SUBCOMMAND
Options:
-h,--help Print this help message and exit
Subcommands:
send Send a file
receive Receive a file
transfer Broadcasting server
connect Connect to a remote server
chat Group chat
echo_server Echo server
broadcast_server Broadcasting server
ping Ping pong
curl HTTP Client
redis_publish Redis publisher
redis_subscribe Redis subscriber
```
---
.left-column[
## Messaging protocol
## Publishers
## Broker
]
---
# Features
## PubSub
## Also
1. Authentication
2. StreamSQL
3. Websocket for the transport
4. JSON serialization
---
2019-03-28 00:27:52 +01:00
# Cobra
```
,,'6''-,.
<====,.;;--.
_`---===. """==__
//""@@-\===\@@@@ ""\\
|( @@@ |===| @@@ ||
\\ @@ |===| @@ //
\\ @@ |===|@@@ //
\\ |===| //
___________\\|===| //_____,----""""""""""-----,_
""""---,__`\===`/ _________,---------,____ `,
|==|| `\ \
|==| | Python 3 + Redis ) |
|==| | _____ ______,--' '
|=| `----""" `"""""""" _,-'
`=\ __,---"""-------------"""''
""""
```
---
2019-03-27 23:53:55 +01:00
# Why Redis ?
- Redis Pub/Sub very easy to get going. Redis Stream (5.0-rc5) could be used.
- Very simple to operate, has good client support (asyncio)
- Simpler than the elephant in the room (Kafka)
---
# Why Python ?
- This is what I know :)
- Python 3.5+ has builtin asynchronous IO. async and await keywords makes code very simple and readable
- Very fast libuv based event loop, [uvloop](https://uvloop.readthedocs.io/). Claimed to be faster than node.js and comparable to Go.
- Nice libraries [websockets](http://mypy-lang.org/), fast json processing with UJson or rapidjson (Tencent).
- [mypy](http://mypy-lang.org/) type checker, memory debugging tool.
---
2019-03-28 00:27:52 +01:00
# Memory optimizations (thanks to tracemalloc module)
## A simple asyncio PUBLISH only client (see code in the annex)
2019-03-27 23:53:55 +01:00
```
2019-03-28 00:27:52 +01:00
262.5 KB new 262.5 KB total / 2994 new 2994 total memory blocks:
...
File "/src/cobra/server/pipelined_publisher.py", line 35
pipe.publish(appChannel, data)
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 145
task = asyncio.ensure_future(attr(*args, **kw),
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/pubsub.py", line 14
return self.execute(b'PUBLISH', channel, message)
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/__init__.py", line 50
return self._pool_or_conn.execute(command, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/aioredis/commands/transaction.py", line 102
self._pipeline.append((fut, cmd, args, kw))
2019-03-27 23:53:55 +01:00
```
---
# Pipelined redis publishing
## Sacrifice some latency for throughput.
1. Incoming published messages are queued.
2. Then published in a batch mode every 100ms, using a redis pipe, to minimize RTT.
---
# Channel sharding / many redis instances
```
Cobra
+-------------+ +---------+
| | | redis-1 |
engine_fps_id +-------+ | +---------+
| | hash() |
| | | +---------+
engine_mem_id +--------------------------->+ redis-2 |
| | | +---------+
| | |
.... | | | +---------+
| +------------------->+ redis-3 |
| | +---------+
+-------------+
.....
```
???
To be able to scale horizontally we run multiple redis instances, and we select a redis instance to talk to by computing a hash on the channel name.
---
# SQL like language to process messages
- Filter data on the broker
```
Broker Subscriber
+-------------+
| |
+---------->+ | +-------------+
| SELECT ... | | few messages|
High frenquency | +---->+ received |
message | (subset | | |
| of data) | +-------------+
+---------->+ |
| |
+-------------+
```
---
# Deployment
- OpenShift (Redhat tooling around Kubernete)
- In memory redis config.
- It works (tm)
```
redis1-1-gbjvt 1/1 Running 2 145d
redis2-1-8jvj7 1/1 Running 0 52d
redis3-1-s9hxj 1/1 Running 0 52d
redis4-1-v96gx 1/1 Running 2 140d
redis5-1-z6262 1/1 Running 0 52d
```
```
cobra-44-4xfbw 1/1 Running 0 1h
cobra-44-58vch 1/1 Running 0 1h
cobra-44-5m4ff 1/1 Running 0 1h
cobra-44-6vvk5 1/1 Running 0 1h
cobra-44-7bmcx 1/1 Running 0 1h
cobra-44-85q78 1/1 Running 0 1h
cobra-subscribers-30-7bpqh 1/1 Running 0 1h
cobra-subscribers-30-fd477 1/1 Running 0 1h
cobra-subscribers-30-h5ntn 1/1 Running 0 1h
cobra-subscribers-30-hlcd6 1/1 Running 0 1h
```
???
Very interesting to notice the uptime of Redis, which is very easy to operate and super reliable.
---
.left-column[
## Publishers
## Broker
## Subscribers
]
---
# Subscribers ecosystem
```
Broker Subscribers
+----------------------+ +-------------+ http
| | ws | +--------> Sentry
| +----->+ node.js |
| Cobra | | +--------> Grafana
| * Python3 | +-------------+ udp
| * Redis PubSub |
| | +-------------+ mysql
| | ws | +--------> tableau
| +----->+ Python |
| | | +--------> "Terminal"
| | +-------------+
| |
| | +-------------+
| | ws | | websocket
| +----->+ Web Browser |--------> Neo (custom dashboard)
| +--+ | |
+----------------------+ | +-------------+
|
| +----------------+
| | |
+-->+ Java, PHP, C++ |
| |
+----------------+
```
???
Flexible and rich ecosystem of subscribers or bots, used by various individuals or teams to solve different problems. Simple WebSocket API + JSON
Good property of a flexible system.
---
# Command line tools
- Great for one-off custom things, or fast experimentation.
- Lowest barrier to entry (good for programmers)
- Does not scale well
```
+-------------+-------------+----------------+---------------+
| Frame Rate | iPad mini 2 | MacBookPro11,5 | iPhone 8 Plus |
+-------------+-------------+----------------+---------------+
| Samples | 1 | 1 | 1 |
+-------------+-------------+----------------+---------------+
| > 60 fps | 0.0 % | 1.6 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| 60 fps | 69.5 % | 98.1 % | 100.0 % |
+-------------+-------------+----------------+---------------+
| 30 fps | 30.5 % | 0.2 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| 15 fps | 0.0 % | 0.0 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| 10 fps | 0.0 % | 0.0 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| < 8 fps | 0.0 % | 0.0 % | 0.0 % |
+-------------+-------------+----------------+---------------+
| Average FPS | 50.85 | 55.80 | 59.99 |
+-------------+-------------+----------------+---------------+
```
---
# Neo
- Interactive real-time shell (good for non programmers)
- Search/Filter with StreamSQL
![Alt text](neo.png)
---
# Neo
- Recording of user sessions
- Charting system memory, lua memory FPS, annotated by user actions
- Scenes or documents visited
![Alt text](neo_session.png)
---
# Grafana
- Charting a signal
- Trends
![Alt text](grafana_critical_logs.png)
---
# Grafana
- zlib compression -> traffic reduction
![Alt text](grafana_zlib.png)
---
# Sentry
- Great to provide extra source code context, such as a stacktrace.
- Soft crashes
- Critical errors
![Alt text](sentry.png)
---
# Tableau
- Data exploration
![](tableau.png)
???
Tableau was used to let us know that our Crash Handler was not working on Android x86, as the crash rate looked bogus on this platform.
---
name: last
---
name: tenyears
---
class: center, middle
# Appendix
???
I will not go through this in the presentation
---
# Example Stream SQL expressions
## HTTP Errors
```
SELECT data.url,data.status FROM \`engine_net_request_id\`
WHERE data.status != 200 AND
data.url LIKE 'why'
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_request', 'data.status': 404}
{'data.url': 'https://why_are_we_still_making/this_one_too', 'data.status': 404}
```
## CDN requests
```
SELECT data.url,data.duration_ms FROM \`engine_net_request_id\`
WHERE data.url LIKE '.png'
{'data.url': 'https://big.cdn.provider/asset_1.png', 'data.duration_ms': 28}}
{'data.url': 'https://big.cdn.provider/asset_2.png', 'data.duration_ms': 1008}}
{'data.url': 'https://other.cdn.provider/asset_2.png', 'data.duration_ms': 512}}
```
???
We used this to deprecate sending low res assets to old devices, as very few of them remained in use, and it would speed up our deployment process, and reduce our CDNs costs to do so.
---
class: center, middle
# Messaging protocol
---
# Health check (messages sent / received)
Client cobra command
1. Handshake + authenticate
2. Subscribe to a channel
3. Publish a message
4. Make sure that message is received
```
$ cobra health
> {'action': 'auth/handshake', 'body': {'data': {'role': 'health_publisher_subscriber'}, 'method': 'role_secret'}, 'id': 1}
< {"action": "auth/handshake/ok", "body": {"data": {"nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==", "version": "0.0.24"}}, "id": 1}
> {'action': 'auth/authenticate', 'body': {'method': 'role_secret', 'credentials': {'hash': '/Ec2gh0AQpAbzq7vQTvuIw=='}}, 'id': 2}
< {"action": "auth/authenticate/ok", "body": {}, "id": 2}
> {'action': 'rtm/subscribe', 'body': {'subscription_id': 'ws_subscribe', 'channel': 'sms_republished_v1_neo', 'fast_forward': True, 'filter': ''}, 'id': 3}
> {"action": "rtm/subscribe/ok", "body": {"position": "1519190184:559034812775", "subscription_id": "ws_subscribe"}, "id": 3}
Received {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}], "position": "1519190184:568807785938"}}
System is healthy
```
---
# Server health check traffic (messages sent / received)
```
$ cobra run --verbose
runServer {'host': '127.0.0.1', 'port': '8765', 'redisUrls': 'redis://localhost;redis://localhost', 'redisPassword': None, 'verbose': True, 'debugMemory': False}
node caad17054b494236aae5964743b22f7b
appkey EEEEEEEEEEEEEEEEFFFFFFFFFFFFFFFF
(open) connections 1
< {"action":"auth\/handshake","body":{"data":{"role":"health_publisher_subscriber"},"method":"role_secret"},"id":1}
> {'action': 'auth/handshake/ok', 'body': {'data': {'nonce': 'MTI0Njg4NTAyMjYxMzgxMzgzMg==', 'version': '0.0.24'}}, 'id': 1}
< {"action":"auth\/authenticate","body":{"method":"role_secret","credentials":{"hash":"\/Ec2gh0AQpAbzq7vQTvuIw=="}},"id":2}
server hash /Ec2gh0AQpAbzq7vQTvuIw==
client hash /Ec2gh0AQpAbzq7vQTvuIw==
> {'action': 'auth/authenticate/ok', 'body': {}, 'id': 2}
< {"action":"rtm\/subscribe","body":{"subscription_id":"ws_subscribe","channel":"sms_republished_v1_neo","fast_forward":true,"filter":""},"id":3}
> {'action': 'rtm/subscribe/ok', 'body': {'position': '1519190184:559034812775', 'subscription_id': 'ws_subscribe'}, 'id': 3}
< {"action": "rtm/publish", "body": {"channel": "sms_republished_v1_neo", "message": {"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "8dab3f6ecb2742669c8a838366be3cab"}}]}}}
> {"action": "rtm/subscription/data", "body": {"subscription_id": "ws_subscribe", "messages": [{"position": "1519190184:547873030411", "subscription_id": "cobra_republisher", "messages": [{"device": {"game": "test", "android_id": "95a18ef313bb45de83f6ddfd59a964a2"}}]}], "position": "1519190184:568807785938"}}
#messages 1 msg/s 1
< {"action": "rtm/unsubscribe", "body": {"subscription_id": "sms_republished_v1_neo"}}
> {"action": "rtm/unsubscribe/ok", "id": 3}
subscribe redis connection closed
(close) connections 0
```
---
# Handshake action (C -> S)
```json
{
"action": "auth/handshake",
"body": {
"data": {
"role": "health_publisher_subscriber"
},
"method": "role_secret"
},
"id": 1
}
```
???
Each pdu (protocol data unit) has an action and a body. The handshake method specify a role name.
---
# Handshake action (S -> C)
```json
{
"action": "auth/handshake/ok",
"body": {
"data": {
"nonce": "MTI0Njg4NTAyMjYxMzgxMzgzMg==",
"version": "0.0.24"
}
},
"id": 1
}
```
???
The server respond with a method whose action is <action> slash ok, or slash error if an error happens (invalid syntax etc...). Here the client receive a nonce from the server which it will use for authentication.
---
# Authenticate action (C -> S)
```json
{
"action": "auth/authenticate",
"body": {
"method": "role_secret",
"credentials": {
"hash": "/Ec2gh0AQpAbzq7vQTvuIw=="
}
},
"id": 2
}
```
???
A hash is computed using the nonce, and passed through the authenticate pdu (or message)
---
# Authenticate action (S -> C)
```json
{
"action": "auth/authenticate/ok",
"body": {},
"id": 2
}
```
1. Lookup secret for role
2. Compute hash: `serverHash = computeHash(secret, nonce)`
3. Compare with the one the client supplied
```
server hash /Ec2gh0AQpAbzq7vQTvuIw==
client hash /Ec2gh0AQpAbzq7vQTvuIw==
```
???
When the server receive the client auth message, it does ...
---
# Publish action (C -> S)
```json
{
"action": "rtm/publish",
"body": {
"channel": "sms_republished_v1_neo",
"message": {
"data": {
"info": "test_from_minix"
},
"device": {
"foo": {
"foo": true,
"stuff": 3.0
}
},
"id": "engine_test_id",
"session": "4dacbafefc214578aa3f8c1d2975ec92",
"timestamp": 1536270895637,
"version": 1
}
}
}
```
???
The client can publish messages.
---
# Subscribe action (C -> S)
- Filter (StreamSQL) optional
```json
{
"action": "rtm/subscribe",
"body": {
"channel": "sms_republished_v1_neo",
"subscription_id": "ws_subscribe" /* optional */
},
"id": 3
}
```
---
# Subscription data (S -> C)
```json
{
"action": "rtm/subscription/data",
"body": {
"subscription_id": "ws_subscribe",
"messages": [
{
"position": "1519190184:547873030411",
"subscription_id": "cobra_republisher",
"messages": [
{
"device": {
"game": "test",
"android_id": "95a18ef313bb45de83f6ddfd59a964a2"
}
}
]
}
],
"position": "1519190184:568807785938"
}
}
```
---
# Unsubscription data (C -> S)
```json
{
"action": "rtm/unsubscribe",
"body": {
"subscription_id": "sms_republished_v1_neo"
}
}
```
---
# Unsubscription data (S -> C)
```json
{
"action": "rtm/unsubscribe/ok",
"id": 3
}
```
---
# Command line tools
```
(venv) cobra$ cobra ws_subscribe --url 'ws://foo.bar.com/v2?appkey=ZZZZZZZZZZZZZZZZ' \
--stream_sql "select data.fps from \`engine_fps_id\`" --verbose
> {'action': 'auth/handshake', 'id': 0, 'body': {'data': {'role': '_sub'}, 'method': 'role_secret'}}
< {"action": "auth/handshake/ok", "id": 0, "body": {"data": {"nonce": "MTU1NTM4NDM5Mjg3Nzk0NjIyNzA=", "version": "0.0.177", "connection_id": "794e630d", "node": "cobra-subscribers-30-7bpqh"}}}
> {'action': 'auth/authenticate', 'id': 1, 'body': {'method': 'role_secret', 'credentials': {'hash': 'xeBLmfJjUjGmrfpsH9/ksg=='}}}
< {"action": "auth/authenticate/ok", "id": 1, "body": {}}
> {'action': 'rtm/subscribe', 'body': {'subscription_id': 'engine_payload_id', 'channel': 'engine_payload_id', 'fast_forward': True, 'filter': 'select data.fps from `engine_fps_id`'}, 'id': 3}
< {"action": "rtm/subscribe/ok", "id": 3, "body": {"position": "1519190184:559034812775", "subscription_id": "engine_payload_id", "redis_node": "redis5"}}
{'data.fps': 25.0}
#messages 1 msg/s 1
{'data.fps': 19.0}
{'data.fps': 19.0}
{'data.fps': 24.0}
{'data.fps': 28.0}
{'data.fps': 27.0}
{'data.fps': 23.0}
{'data.fps': 26.0}
{'data.fps': 2.0}
{'data.fps': 29.0}
{'data.fps': 7.0}
{'data.fps': 26.0}
```
---
# Message framing with WebSocket
- Multiple opcodes (PING, PONG, CONTINUATION, TEXT, BINARY)
```
http://tools.ietf.org/html/rfc6455#section-5.2 Base Framing Protocol
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
```
---
# Complexity of the code base
< 2000 lines of code for the server
```
$ wc -l src/cobra/server/*.py src/cobra/common/*.py src/cobra/runner/commands/run.py
0 src/cobra/server/__init__.py
234 src/cobra/server/app.py
64 src/cobra/server/apps_config.py
17 src/cobra/server/connection_state.py
56 src/cobra/server/pipelined_publisher.py
41 src/cobra/server/pipelined_publishers.py
477 src/cobra/server/protocol.py
42 src/cobra/server/redis_connections.py
92 src/cobra/server/redis_publisher.py
129 src/cobra/server/stats.py
207 src/cobra/server/stream_sql.py
74 src/cobra/server/subscribe.py
0 src/cobra/common/__init__.py
8 src/cobra/common/algorithm.py
13 src/cobra/common/auth_hash.py
4 src/cobra/common/cobra_types.py
27 src/cobra/common/memoize.py
113 src/cobra/common/memory_debugger.py
23 src/cobra/common/memory_usage.py
24 src/cobra/common/task_cleanup.py
22 src/cobra/common/throttle.py
16 src/cobra/common/version.py
75 src/cobra/runner/commands/run.py
1758 total
```
---
# "Plugins" / "Republishing"
- Help with high-throughput channels
- Can republish message from a high-traffic channel to lower traffic ones, more specialized
- We use it for our http metric id.
---
# A simple redis client (1/4)
```
import asyncio
class RedisPublisher(object):
'''
See https://redis.io/topics/mass-insert
'''
def __init__(self, host, port, password, verbose=False):
self.host = host
self.port = port
self.password = password
self.verbose = verbose
self.reset()
self.lock = asyncio.Lock()
def reset(self):
self.publishCount = 0
def writeString(self, data: bytes):
self.writer.write(b'$%d\r\n' % len(data))
self.writer.write(data)
self.writer.write(b'\r\n')
```
---
# A simple redis client (2/4)
```
async def connect(self):
async with self.lock:
print(f'Opening connection to redis at {self.host}:{self.port}')
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port)
if self.password:
self.writer.write(b'*2\r\n')
self.writeString(b'AUTH')
password = self.password
if not isinstance(password, bytes):
password = password.encode('utf8')
self.writeString(password)
await self.execute()
```
---
# A simple redis client (3/4)
```
def publish(self, channel, msg):
self.publishCount += 1
if not isinstance(channel, bytes):
channel = channel.encode('utf8')
if not isinstance(msg, bytes):
msg = msg.encode('utf8')
self.writer.write(b'*3\r\n')
self.writeString(b'PUBLISH')
self.writeString(channel)
self.writeString(msg)
```
---
# A simple redis client (4/4)
```
async def execute(self):
async with self.lock:
await self.writer.drain()
results = []
# read until we get something
for i in range(self.publishCount):
line = await self.reader.readline()
results.append(line)
# FIXME: proper error handling !!!
if self.verbose:
print(f'Received: {line.decode()!r}')
if 'NOAUTH Authentication required' in line.decode():
raise ValueError('Authentication failed')
self.reset()
return results
```
---
# Pipelined redis publishing
```
class PipelinedPublisher():
def __init__(self, redis):
self.redis = redis
self.queue = asyncio.Queue()
async def run(self):
while True:
await asyncio.sleep(0.1)
N = self.queue.qsize()
if N == 0: continue
pipe = self.redis.pipeline()
for i in range(N):
item = await self.queue.get()
appkey, channel, data = item
appChannel = '{}::{}'.format(appkey, channel)
pipe.publish(appChannel, data)
self.queue.task_done()
await pipe.execute()
def enqueue(self, job):
self.queue.put_nowait(job)
```
---
# Memory optimizations
- Memory usage problematic early on.
- We ended up writing a very simple redis client, which is just doing batch publish with a pipeline.
- And we threw more hardware at it, CPU + memory (easy with 'the cloud' + containers).
```
cobra-44-7bmcx 0/1 OOMKilled 17 23h
```
---
# Architecture
1. Publishers
2. Message broker
3. Subscribers
(it's like a chat system)
```
Publishers Message Broker Subscribers
+-----------------------+
| |
+-----------> | Cobra | +-------------->
| |
+-----------> | * Python3 | +-------------->
| * Redis PubSub |
+-----------> | | +-------------->
| |
+-----------------------+
```
???
The pubsub architecture has 3 components, publishers, broker and subscribers. In our system a message leaves our games
awk '/^==/ { sum += $2} END { print sum/60 }' slides.html
</textarea>
<script src="https://remarkjs.com/downloads/remark-latest.min.js">
</script>
<script>
var slideshow = remark.create();
</script>
</body>
</html>