在 Python 中接收 SSE(服务器发送事件)的最佳方式

现在,很多在线的大语言模型 API 都使用 SSE(服务器发送事件)的方式发送数据。那么 SSE 是什么呢?接收 SSE 的最佳方式是什么?

什么是 SSE(Server-Sent Events)?

在现代 Web 应用中,有时我们希望服务器能主动“推送”消息给客户端,比如实时聊天、新消息提醒、日志流、进度更新等。SSE(Server-Sent Events) 就是专门为这种场景设计的:一种基于 HTTP 协议单向通信机制,由服务器不断将事件流推送给浏览器或客户端。

SSE 的工作方式是这样的:客户端通过一个 HTTP 请求连接到服务器,服务器使用特殊的 text/event-stream 内容类型持续推送事件,而不关闭连接。客户端监听这个连接,随时接收服务器传来的更新。

比如,一个最简单的 SSE 响应内容如下:

data: 你好,这是来自服务器的第一条消息

data: 第二条消息来了!

event: customEvent
data: {"value": 42}

在前端 JavaScript 中,只需要这样监听:

const source = new EventSource('/sse');
source.onmessage = (event) => {
    console.log('收到消息:', event.data);
};

这样就实现了“服务器自动通知客户端”的功能,非常适合需要实时更新的网页应用。

一条 SSE 响应通常包含哪些字段?

SSE 的响应内容是基于文本的,每一条“事件”由一组字段组成,以换行符分隔。常见的字段包括:

  • data:事件的主要内容(必须有);
  • id:事件的唯一标识,用于断线重连时从上次位置继续接收;
  • event:事件类型(可选);
  • retry:指定客户端尝试重连的间隔时间(单位毫秒,可选)。

一个完整的 SSE 消息例子如下:

id: 101
event: message
data: {"text": "你好,欢迎访问!"}

每条消息必须以一个空行结尾,代表该事件已经发送完成。

客户端会根据这些字段进行处理。例如,EventSource 会自动识别 id 并保存,如果连接中断,它会自动带上 Last-Event-ID 请求头重新连接,服务器就可以从上次中断的位置继续推送,确保数据不丢失。

使用 Python 的标准库接收 SSE 消息

如果你不想依赖额外库,可以使用标准库 + requests 来手动解析 SSE 流。原理很简单:SSE 本质上就是一个文本流,我们只需要逐行读取、解码,然后提取 dataevent 等字段。

一、建立流式请求连接

import requests

response = requests.get('http://example.com/sse', stream=True)

确保 stream=True,这样我们才能逐行读取服务器推送的数据。

二、逐行读取响应并解码

SSE 是 UTF-8 编码的文本流,我们按行解码、缓存字段,并在遇到空行时组成一条完整事件:

def parse_sse_line_stream(resp):
    event = {
        "event": "message",
        "data": "",
        "id": None,
        "retry": None
    }
    for raw_line in resp.iter_lines(decode_unicode=True):
        if raw_line is None:
            continue
        line = raw_line.strip()

        if line == "":
            # 空行表示一个事件发送完成
            yield event
            # 重置事件缓存
            event = {
                "event": "message",
                "data": "",
                "id": None,
                "retry": None
            }
        elif line.startswith(":"):
            # 注释,跳过
            continue
        else:
            if ":" in line:
                field, value = line.split(":", 1)
                value = value.lstrip()
            else:
                field, value = line, ""

            if field == "data":
                event["data"] += value + "\n"  # 多行data要拼接
            elif field == "event":
                event["event"] = value
            elif field == "id":
                event["id"] = value
            elif field == "retry":
                event["retry"] = value

三、使用这个生成器监听事件

for event in parse_sse_line_stream(response):
    print(f"[{event['event']}] {event['data'].strip()}")

使用 Python 的 sseclient 读取 SSE 内容

在 Python 中,我们可以使用 sseclient-py 来读取服务器推送的事件流。这个库可以将 requests 返回的流式响应转换为一个事件生成器,非常适合处理像 LLM 生成、实时日志等场景。

第一步:使用 requests 建立一个流式连接

我们需要确保请求设置了 stream=True,这样响应内容才会逐块读取,而不是一次性读取全部:

import requests

response = requests.get('http://example.com/sse', stream=True)

第二步:把响应转换为字节生成器

SSEClient 接收的是一个生成器,因此我们要写一个简单的生成器函数,把响应内容一行一行读出来:

def byte_stream(response: requests.Response):
    for chunk in response.iter_content(chunk_size=2048):
        yield chunk

第三步:创建 SSEClient 实例并读取事件

然后,就可以使用 SSEClient 来解析事件流了:

from sseclient import SSEClient

client = SSEClient(byte_stream(response))

for event in client.events():
    print(f"事件类型: {event.event}")
    print(f"事件数据: {event.data}")
    print(f"事件ID: {event.id}")

这样,程序就会持续监听服务器的事件流,并逐个输出事件内容,直到服务器关闭连接或手动中断。

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇