python实现sse服务器单向消息推送给客户端

SSE ( Server-sent Events )通俗解释起来就是一种基于HTTP的,以流的形式由服务端持续向客户端发送数据的技术,是 WebSocket 的一种轻量代替方案。

优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(EventSource)
缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接

服务端代码

基于flask

import json
import time

from flask import Flask, request
from flask import Response
from flask import render_template

app = Flask(__name__)


def get_message():
    """this could be any function that blocks until data is ready"""
    time.sleep(1)
    s = time.ctime(time.time())
    return json.dumps(['当前时间:' + s , 'a'], ensure_ascii=False)


@app.route('/')
def hello_world():
    return render_template('index.html')


@app.route('/stream')
def stream():
    user_id = request.args.get('user_id')
    print(user_id)
    def eventStream():
        id = 0
        while True:
            id +=1
            # wait for source data to be available, then push it

            yield 'id: {}\nevent: add\ndata: {}\n\n'.format(id,get_message())


    return Response(eventStream(), mimetype="text/event-stream")


if __name__ == '__main__':
    app.run()

因为SSE是http请求,可是又限定是一个长链接,因此要设置MIME类型为text/event-stream。返回的为字符串。

消息的格式

服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本;

每一次发送的信息,由若干个message组成,每一个message之间用\n\n分隔。每一个message内部由若干行组成

格式

[field]:value\n

其中在规范中为消息定义了 4 个字段

id: 表明id
event: 表明消息的事件类型
data: 消息的数据字段
retry: 客户端重连的时间。只接受整数,单位是毫秒。如果这个值不是整数则会被自动忽略

需要注意的是,id字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。

一个很有意思的地方是,规范中规定以冒号开头的消息都会被当作注释,一条普通的注释(:\n\n)对于服务器来说只占 5个字符,但是发送到客户端上的时候不会触发任何事件,这对客户端来说是非常友好的。所以注释一般被用于维持服务器和客户端的长连接。

基于django

原生django

from django.http import StreamingHttpResponse
import datetime

def Sse1View(request):

    def event_stream():
        while True:
            time.sleep(3)
            yield 'data: The server time is: %s\n\n' % datetime.datetime.now()

    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')

代码中我们可以看到,我们只给前端推送了data,并没有指定id和event,忽略是可行的

RestFramework

上面说完django,其实再说这个django-restframework想想已经没必要了,因为并不像api接口一样携带自定义header之类的,那么所谓的数字签名、token认证等我这里是暂时转移到query参数了(没有深入了解,有其他方式可以告诉我),所以你用restframework反而麻烦了,restframework磨人的render渲染器并解不开这种流数据,需要你自定义渲染器

class SSEView(mixins.ListModelMixin, CommonGenericViewSet):
    """消息即时提醒"""
    authentication_classes = ()
    permission_classes = ()
    renderer_classes = (EventSteamRenderer,)

    def list(self, request, *args, **kwargs):
        def generate_events():
            a = random.randint(1, 10)
            # 生成事件数据的逻辑
            # 逻辑修改为自己的,我这里初衷是要根据这个用户查询有没有维度消息推送到前端的
            if request.query_params.get("token") == '123' and a < 6:
                yield 'event: notice\ndata: 牛逼啊\n\n'

        return StreamingHttpResponse(generate_events(), content_type='text/event-stream')

EventSteamRenderer渲染器

from rest_framework import renderers


class EventSteamRenderer(renderers.BaseRenderer):
    media_type = 'text/event-stream'

    def render(self, data, media_type=None, renderer_context=None):
        return data.encode(self.charset)

客户端代码

方式一


//判断是否支持SSE
if('EventSource' in window){

//初始化SSE
var url="http:localhost:8000/stream";
var source=new EventSource(url);

// 连接成功后会触发 open 事件
source.onopen=(event)=>{

console.log("开启SSE");

}

// 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
source.onmessage=(event)=>{

var data=event.data;

$("body").append($("<p>").text(data));

}

//监听like事件
source.addEventListener('like',function(event){

var data=event.data;

$("body").append($("<p>").text(data));
},false);

// 连接异常时会触发 error 事件并自动重连
source.onerror=(event)=>{

console.log(event);

}

方式二

使用 addEventListener 方法来添加相应的事件处理方法

if (window.EventSource) {
  // 创建 EventSource 对象连接服务器
  const source = new EventSource('http://localhost:2000');

  // 连接成功后会触发 open 事件
  source.addEventListener('open', () => {
    console.log('Connected');
  }, false);

  // 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
  source.addEventListener('message', e => {
    console.log(`data: ${e.data}`);
  }, false);

  // 自定义 EventHandler,在收到 event 字段为 slide 的消息时触发
  source.addEventListener('slide', e => {
    console.log(`data: ${e.data}`); // => data: 7
  }, false);

  // 连接异常时会触发 error 事件并自动重连
  source.addEventListener('error', e => {
    if (e.target.readyState === EventSource.CLOSED) {
      console.log('Disconnected');
    } else if (e.target.readyState === EventSource.CONNECTING) {
      console.log('Connecting...');
    }
  }, false);
} else {
  console.error('Your browser doesn\'t support SSE');
}

EventSource从父接口 EventTarget 中继承了属性和方法,其内置了 3 个 EventHandler属性、2 个只读属性和 1 个方法:

EventHandler 属性

EventSource.onopen 在连接打开时被调用。

EventSource.onmessage 在收到一个没有 event 属性的消息时被调用。

EventSource.onerror 在连接异常时被调用。 只读属性

EventSource.readyState 一个 unsigned short 值,代表连接状态。可能值是 CONNECTING (0), OPEN (1), 或者 CLOSED (2)

EventSource.url 连接的 URL方法

EventSource.close() 关闭连接

EventSource 对象的 onmessage 属性的作用类似于 addEventListener( ‘ message ’ )

注意事项

SSE 如何保证数据完整性

客户端在每次接收到消息时,会把消息的 id 字段作为内部属性 Last-Event-ID 储存起来。

SSE 默认支持断线重连机制,在连接断开时会 触发 EventSource 的 error 事件,同时自动重连。再次连接成功时
EventSource 会把 Last-Event-ID 属性作为请求头发送给服务器,这样服务器就可以根据这个Last-Event-ID作出相应的处理。

这里需要注意的是,id 字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。

减少开销

在 SSE 的草案中提到,“text/event-stream” 的 MIME 类型传输应当在静置 15秒后自动断开。在实际的项目中也会有这个机制,但是断开的时间没有被列入标准中。

为了减少服务器的开销,我们也可以有目的的断开和重连。

简单的办法是服务器发送一个 关闭消息并指定一个重连的时间戳,客户端在触发关闭事件时关闭当前连接并创建 一个计时器,在重连时把计时器销毁

function connectSSE() {
  if (window.EventSource) {
    const source = new EventSource('http://localhost:2000/sse/');
    let reconnectTimeout;

    source.addEventListener('open', () => {
      console.log('Connected');
      clearTimeout(reconnectTimeout);
    }, false);

    source.addEventListener('pause', e => {
      source.close();
      const reconnectTime = +e.data;
      const currentTime = +new Date();
      reconnectTimeout = setTimeout(() => {
        connectSSE();
      }, reconnectTime - currentTime);
    }, false);
  } else {
    console.error('Your browser doesn\'t support SSE');
  }
}

connectSSE();

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!