python实现sse服务器单向消息推送给客户端
SSE ( Server-sent Events )通俗解释起来就是一种基于HTTP的,以流的形式由服务端持续向客户端发送数据的技术,是 WebSocket 的一种轻量代替方案。
优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(EventSource)
缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接
服务端代码
基于flask
import json
import time
from flask import Flask, request
from flask import Response
from flask import render_template
app = Flask(__name__)
def get_message():
"""this could be any function that blocks until data is ready"""
time.sleep(1)
s = time.ctime(time.time())
return json.dumps(['当前时间:' + s , 'a'], ensure_ascii=False)
@app.route('/')
def hello_world():
return render_template('index.html')
@app.route('/stream')
def stream():
user_id = request.args.get('user_id')
print(user_id)
def eventStream():
id = 0
while True:
id +=1
# wait for source data to be available, then push it
yield 'id: {}\nevent: add\ndata: {}\n\n'.format(id,get_message())
return Response(eventStream(), mimetype="text/event-stream")
if __name__ == '__main__':
app.run()
因为SSE是http请求,可是又限定是一个长链接,因此要设置MIME类型为text/event-stream。返回的为字符串。
消息的格式
服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本;
每一次发送的信息,由若干个message组成,每一个message之间用\n\n分隔。每一个message内部由若干行组成
格式
[field]:value\n
其中在规范中为消息定义了 4 个字段
id: 表明id
event: 表明消息的事件类型
data: 消息的数据字段
retry: 客户端重连的时间。只接受整数,单位是毫秒。如果这个值不是整数则会被自动忽略需要注意的是,id字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。
一个很有意思的地方是,规范中规定以冒号开头的消息都会被当作注释,一条普通的注释(:\n\n)对于服务器来说只占 5个字符,但是发送到客户端上的时候不会触发任何事件,这对客户端来说是非常友好的。所以注释一般被用于维持服务器和客户端的长连接。
基于django
原生django
from django.http import StreamingHttpResponse
import datetime
def Sse1View(request):
def event_stream():
while True:
time.sleep(3)
yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
return StreamingHttpResponse(event_stream(), content_type='text/event-stream')
代码中我们可以看到,我们只给前端推送了data,并没有指定id和event,忽略是可行的
RestFramework
上面说完django,其实再说这个django-restframework想想已经没必要了,因为并不像api接口一样携带自定义header之类的,那么所谓的数字签名、token认证等我这里是暂时转移到query参数了(没有深入了解,有其他方式可以告诉我),所以你用restframework反而麻烦了,restframework磨人的render渲染器并解不开这种流数据,需要你自定义渲染器
class SSEView(mixins.ListModelMixin, CommonGenericViewSet):
"""消息即时提醒"""
authentication_classes = ()
permission_classes = ()
renderer_classes = (EventSteamRenderer,)
def list(self, request, *args, **kwargs):
def generate_events():
a = random.randint(1, 10)
# 生成事件数据的逻辑
# 逻辑修改为自己的,我这里初衷是要根据这个用户查询有没有维度消息推送到前端的
if request.query_params.get("token") == '123' and a < 6:
yield 'event: notice\ndata: 牛逼啊\n\n'
return StreamingHttpResponse(generate_events(), content_type='text/event-stream')
EventSteamRenderer渲染器
from rest_framework import renderers
class EventSteamRenderer(renderers.BaseRenderer):
media_type = 'text/event-stream'
def render(self, data, media_type=None, renderer_context=None):
return data.encode(self.charset)
客户端代码
方式一
//判断是否支持SSE
if('EventSource' in window){
//初始化SSE
var url="http:localhost:8000/stream";
var source=new EventSource(url);
// 连接成功后会触发 open 事件
source.onopen=(event)=>{
console.log("开启SSE");
}
// 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
source.onmessage=(event)=>{
var data=event.data;
$("body").append($("<p>").text(data));
}
//监听like事件
source.addEventListener('like',function(event){
var data=event.data;
$("body").append($("<p>").text(data));
},false);
// 连接异常时会触发 error 事件并自动重连
source.onerror=(event)=>{
console.log(event);
}
方式二
使用 addEventListener 方法来添加相应的事件处理方法
if (window.EventSource) {
// 创建 EventSource 对象连接服务器
const source = new EventSource('http://localhost:2000');
// 连接成功后会触发 open 事件
source.addEventListener('open', () => {
console.log('Connected');
}, false);
// 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
source.addEventListener('message', e => {
console.log(`data: ${e.data}`);
}, false);
// 自定义 EventHandler,在收到 event 字段为 slide 的消息时触发
source.addEventListener('slide', e => {
console.log(`data: ${e.data}`); // => data: 7
}, false);
// 连接异常时会触发 error 事件并自动重连
source.addEventListener('error', e => {
if (e.target.readyState === EventSource.CLOSED) {
console.log('Disconnected');
} else if (e.target.readyState === EventSource.CONNECTING) {
console.log('Connecting...');
}
}, false);
} else {
console.error('Your browser doesn\'t support SSE');
}
EventSource从父接口 EventTarget 中继承了属性和方法,其内置了 3 个 EventHandler属性、2 个只读属性和 1 个方法:
EventHandler 属性
EventSource.onopen
在连接打开时被调用。
EventSource.onmessage
在收到一个没有 event 属性的消息时被调用。
EventSource.onerror
在连接异常时被调用。 只读属性
EventSource.readyState
一个 unsigned short 值,代表连接状态。可能值是CONNECTING (0)
,OPEN (1)
, 或者CLOSED (2)
。
EventSource.url
连接的 URL方法
EventSource.close()
关闭连接
EventSource
对象的onmessage
属性的作用类似于addEventListener( ‘ message ’ )
注意事项
SSE 如何保证数据完整性
客户端在每次接收到消息时,会把消息的 id 字段作为内部属性 Last-Event-ID 储存起来。
SSE 默认支持断线重连机制,在连接断开时会 触发 EventSource 的 error 事件,同时自动重连。再次连接成功时
EventSource 会把 Last-Event-ID 属性作为请求头发送给服务器,这样服务器就可以根据这个Last-Event-ID作出相应的处理。
这里需要注意的是,id 字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。
减少开销
在 SSE 的草案中提到,“text/event-stream” 的 MIME 类型传输应当在静置 15秒后自动断开。在实际的项目中也会有这个机制,但是断开的时间没有被列入标准中。
为了减少服务器的开销,我们也可以有目的的断开和重连。
简单的办法是服务器发送一个 关闭消息并指定一个重连的时间戳,客户端在触发关闭事件时关闭当前连接并创建 一个计时器,在重连时把计时器销毁
function connectSSE() {
if (window.EventSource) {
const source = new EventSource('http://localhost:2000/sse/');
let reconnectTimeout;
source.addEventListener('open', () => {
console.log('Connected');
clearTimeout(reconnectTimeout);
}, false);
source.addEventListener('pause', e => {
source.close();
const reconnectTime = +e.data;
const currentTime = +new Date();
reconnectTimeout = setTimeout(() => {
connectSSE();
}, reconnectTime - currentTime);
}, false);
} else {
console.error('Your browser doesn\'t support SSE');
}
}
connectSSE();
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: