# 前言

最近由于一些乱七八糟的原因,接触到了国内开发的一些类 ChatGPT 的 API 的前端调用与功能集成。概括的来说,就是有一个需求,需要在 Web 前端页面中集成类似于 AI 聊天助手的功能,而怎么去跟这个 AI 助手一起聊天呢?这时候就需要调这个 GPT 助手的 API 来实现了。

不过我们都知道,在平常和 ChatGPT 一起聊天的时候,问他问题,他不是一下子全部加载好回答出来的,他是会将所有的返回信息处理成一个 信息流 的方式进行返回,所以我们是可以看到 ChatGPT 在回答消息的时候是逐字逐行来输出的。而浏览器本身也是通过 EventSource 这一个内置的 API 来接收这 流式 SSE 的数据并处理。

不过这个 EventSource 有一个非常致命的缺点,那就是 只支持 GET 类型的请求,并且不支持任何自定义的头部 。这也就意味着,如果想要和 ChatGPT 双向聊天,发给他消息,他以信息流的方式返回给数据,再在这个消息的基础之上再发给它消息,这时使用 EventSource 就是行不通的。

而这个时候就得使用我们今天的主角了,微软开发的一个专门用于处理双向 SSE 数据流的 npm 库:fetch-event-source。这个包的主要作用是提供一个遵循 WHATWG Fetch 标准的 API 来处理 SSE,不但允许我们可以和对应的 url 地址简历持久连接,并且允许我们在接收数据流信息的同时将我们想要发送的消息也通过相同的 url 进行发送。实际上目前的 ChatGPT 实现的双向信息流也是基于这个库进行开发的。

# 使用方法

先来讲一讲这个库的一些基本使用方法(主要是举我自己的使用例子)。

# 安装

输入以下的命令即可安装:

npm install --save @microsoft/fetch-event-source

# 使用步骤

其实 fetch-event-source 的使用非常非常的简单纯粹,不过前提是得有那种使用的场景。使用 @microsoft/fetch-event-source 的步骤可以这样拆分:

  1. 导入模块:在编写 TS/JS 代码的部分导入 fetchEventSource 函数,我是在 Vue3 中的 TS 环境使用的。

    import { fetchEventSource } from "@microsoft/fetch-event-source";
  2. 配置请求:配置的请求,包括指定要连接的服务器端点 URL 和其他任何需要的 HTTP 请求头或设置。

    const requestOptions = {
      method: "GET", // 请求方法,SSE 通常是 GET 请求。如果涉及到双向通信,需要改为 POST。
      headers: {
        "Content-Type": "text/event-stream", // 设置内容类型为 SSE,即
      },
      // 可以添加其他需要的配置:
      // 如果为 POST 请求,需要携带 body 参数以传递请求体;
      // 如果希望用户切换到另一个页面后仍能保持 SSE 连接,可以配置 openWhenHidden 属性为 true;
      // 如果需要使用 AbortController 来实现检测到问题终止连接的,可以配置 signal 属性等。
    };
  3. 处理事件:使用 fetchEventSource 发起请求并处理不同的事件。可以定义 onmessageonopenonerroronclose 回调来处理相应的事件。

    fetchEventSource("/path/to/sse", {
      ...requestOptions,
      onopen(response) {
        console.log("Connection opened!", response);
      },
      onmessage(event) {
        console.log("Received message:", event.data);
      },
      onerror(error) {
        console.error("Error:", error);
      },
      onclose() {
        console.log("Connection closed!");
      },
    });
  4. 处理消息:在 onmessage 回调中,会接收到服务器发送的所有消息。这些消息通过 event.data 访问。可以根据应用的需求来处理这些消息。比如目前的返回数据是以流式的形式一点点逐渐接收的,那么这个 onmessage 回调就会 一直被不停的触发 ,每一次触发都会调用其内部的逻辑。

  5. 错误处理:在 onerror 回调中,可以处理任何在连接或接收数据过程中发生的错误,这种错误也是由与建立连接的服务端进行定义并发送。接收到服务端传来的错误后,会触发这个回调进行自定义的错误处理。 一般地,如果有引入 signal 作为断开连接的机制,在这个回调中会使用 signal.abort() 方法来终止连接。

  6. 关闭连接:在 onclose 回调中,可以处理连接关闭的事件。这个回调会在连接关闭时被调用,可以在这里进行一些清理工作。

# 实际例子

我先举一个我在使用的例子,目前我有一个类 GPT 的人工智能流式对话接口 url,即下方的 /api/chat/sseResponse ,我可以将我自己的消息发给它,然后来接收它返回的数据流,组装拼接成一条完整的消息之后插到整个消息列表中作为一个 item。

以下是提交对话的具体函数,环境为 Vue3 当中的 <script setup lang="ts"> 脚本当中:

// 提交对话
const submitChat = async () => {
  if (loading.value) return;
  if (!inputContent.value) {
    ElMessage.warning("请输入内容");
    return;
  }
  loading.value = true;
  const chatItem: ChatMessage = {
    id: String(chatsList.value.length + 1),
    content: inputContent.value,
    isMe: true,
  };
  chatsList.value.push(chatItem);
  inputContent.value = "";
  const ctrl = new AbortController(); // 创建 AbortController 实例,以便中止请求
  await fetchEventSource("/api/chat/sseResponse", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: JSON.stringify(chatMessageList.value),
    openWhenHidden: true, // 取消 visibilityChange 事件
    signal: ctrl.signal, // AbortSignal
    async onmessage(ev) {
      const data = JSON.parse(ev.data);
      if (data.sceneUuid !== sceneUuid.value) sceneUuid.value = data.sceneUuid;
      if (data.token !== "") answer.value += data.token;
      await nextTick();
      if (main.value) {
        const { scrollHeight } = main.value;
        main.value.scrollTop = scrollHeight;
      }
    },
    onclose() {
      if (answer.value) {
        const chatItem: ChatMessage = {
          id: String(chatsList.value.length + 1),
          content: answer.value,
          isMe: false,
        };
        chatsList.value.push(chatItem);
        answer.value = "";
      }
      loading.value = false;
    },
    onerror(err) {
      chatsList.value.pop();
      answer.value = "";
      loading.value = false;
      ElMessage.error("对话请求发生网络错误或涉及违规话题");
      ctrl.abort();
      throw err; // 直接抛出错误,避免反复调用
    },
  });
};

在这个函数中,由于是涉及到双向的数据传递,我启用了 POST 方法,并且将维护的消息列表传递给这个接口作为沟通对话的上下文。

在最重要的 onmessage 回调中,我使用 event 来接收返回的数据,在服务端数据流每发生一次变化的时候都会触发这个回调,然后我将 event 中携带的 token 来一点点添加到 answer 这个响应式字符串当中,然后在 template 中使用这个变量以实现和 ChatGPT 类似的逐字逐句文本输出的效果。至于这里为什么要使用 await nextTick() ,是为了确保每次对话 GPT 返回的消息过长而导致换行的时候总是将整个窗口移动到滚动条最下方,确保用户的良好交互。

最后,在 onclose 回调当中将这个消息 item 添加到维护的消息列表当中,为下一次对话的上下文做准备。

# 简介、原理与构造

那么 fetchEventSource 究竟是怎么实现数据的双向通信,并且能够使用不同的请求方式如 POST 的呢?

fetchEventSource 本身的源码并不多,主要的实现代码在 fetch.tsparse.ts 中,并且后者是工具函数的集成。

源码地址:https://github.com/Azure/fetch-event-source

# parse.ts

这里面最最主要的就是三个函数: getBytesgetLinesgetMessages

# getBytes

export async function getBytes(
  stream: ReadableStream<Uint8Array>,
  onChunk: (arr: Uint8Array) => void
) {
  const reader = stream.getReader();
  let result: ReadableStreamDefaultReadResult<Uint8Array>;
  while (!(result = await reader.read()).done) {
    onChunk(result.value);
  }
}

它的作用是从一个 ReadableStream<Uint8Array> 流中读取数据。这个函数接收两个参数: streamonChunk

  • stream 是一个 ReadableStream<Uint8Array> 对象,代表一个可读取的二进制数据流。
  • onChunk 是一个回调函数,每当从流中读取到一块数据时,就会调用这个函数,并将读取到的数据( Uint8Array 类型)作为参数传递给这个函数。

函数的工作流程如下:

  1. 通过调用 stream.getReader() 创建一个流的阅读器 reader
  2. 使用 while 循环来持续从流中读取数据。在循环内部,调用 reader.read() 异步读取流中的数据块。这个调用会返回一个 Promise ,它解析为一个对象,这个对象包含两个属性: donevalue
    • done 是一个布尔值,表示是否已经读取完所有数据。
    • value 是一个 Uint8Array 类型的对象,表示从流中读取的数据块。
  3. 如果 donetrue ,表示没有更多的数据可以读取,循环结束。
  4. 如果 donefalse ,则表示成功读取到数据块。此时,会调用 onChunk 回调函数,并将读取到的数据块 value 作为参数传递给 onChunk
  5. 循环继续,直到读取完所有数据。

简单来说,这个函数就是负责把整个数据流转为 bytes chunk,然后再传递给回调函数 onChunk 里面进行下一步处理。

# getLines

export function getLines(
  onLine: (line: Uint8Array, fieldLength: number) => void
) {
  let buffer: Uint8Array | undefined;
  let position: number; // current read position
  let fieldLength: number; // length of the `field` portion of the line
  let discardTrailingNewline = false;
  // return a function that can process each incoming byte chunk:
  return function onChunk(arr: Uint8Array) {
    if (buffer === undefined) {
      buffer = arr;
      position = 0;
      fieldLength = -1;
    } else {
      // we're still parsing the old line. Append the new bytes into buffer:
      buffer = concat(buffer, arr);
    }
    const bufLength = buffer.length;
    let lineStart = 0; // index where the current line starts
    while (position < bufLength) {
      if (discardTrailingNewline) {
        if (buffer[position] === ControlChars.NewLine) {
          lineStart = ++position; // skip to next char
        }
        discardTrailingNewline = false;
      }
      // start looking forward till the end of line:
      let lineEnd = -1; // index of the \r or \n char
      for (; position < bufLength && lineEnd === -1; ++position) {
        switch (buffer[position]) {
          case ControlChars.Colon:
            if (fieldLength === -1) {
              // first colon in line
              fieldLength = position - lineStart;
            }
            break;
          // @ts-ignore:7029 \r case below should fallthrough to \n:
          case ControlChars.CarriageReturn:
            discardTrailingNewline = true;
          case ControlChars.NewLine:
            lineEnd = position;
            break;
        }
      }
      if (lineEnd === -1) {
        // We reached the end of the buffer but the line hasn't ended.
        // Wait for the next arr and then continue parsing:
        break;
      }
      // we've reached the line end, send it out:
      onLine(buffer.subarray(lineStart, lineEnd), fieldLength);
      lineStart = position; // we're now on the next line
      fieldLength = -1;
    }
    if (lineStart === bufLength) {
      buffer = undefined; // we've finished reading it
    } else if (lineStart !== 0) {
      // Create a new view into buffer beginning at lineStart so we don't
      // need to copy over the previous lines when we get the new arr:
      buffer = buffer.subarray(lineStart);
      position -= lineStart;
    }
  };
}

getLines 接收一个回调函数 onLine 作为参数,并返回一个新的函数 onChunk ,这个结构用来处理 逐块 接收的数据流。

onLine 是一个回调函数,每当检测到一行数据时,就会调用它。它接收两个参数: line (这一行的数据, Uint8Array 类型)和 fieldLength (这一行中 field 部分的长度)。

getLines 函数的返回值 onChunk 是一个用来处理字节块( Uint8Array 类型)的函数。这个函数的作用是逐个解析传入的字节块,以找到数据中的行结束符,并在每找到一个完整的行时调用 onLine 函数

下面是 onChunk 函数的详细工作流程:

  1. 如果 buffer 未定义(即这是第一次调用或者前一个缓冲区已完全处理完毕),则初始化 bufferpositionfieldLength
  2. 如果 buffer 已定义(即正在处理一个较大的数据块或连续的数据块),则将新的数据块 arr 追加到现有的 buffer 上。
  3. 遍历 buffer ,使用 position 来追踪当前读取的位置。
  4. 如果设置了 discardTrailingNewline ,则跳过行结束符之后的新行字符。
  5. 通过循环查找行结束符( \n\r ),同时识别字段长度(第一个冒号 : 出现的位置)。
  6. 当找到行结束符时,使用 buffer.subarray(lineStart, lineEnd) 获取完整的行,并调用 onLine(buffer.subarray(lineStart, lineEnd), fieldLength) ,处理这一行数据。
  7. 更新 lineStartfieldLength 为下一行的开始位置和初始值。
  8. 如果处理完当前缓冲区中的所有行,根据需要调整 buffer (如果还有未处理的数据,将 buffer 设置为剩余的部分)。

总之这个函数的功能是将一连串的字节流分割成单独的行,并对每一行执行某些操作,将 byte chunk 转换成 eventsource buffer 并进行下一步的处理。

# getMessages

export function getMessages(
  onId: (id: string) => void,
  onRetry: (retry: number) => void,
  onMessage?: (msg: EventSourceMessage) => void
) {
  let message = newMessage();
  const decoder = new TextDecoder();
  // return a function that can process each incoming line buffer:
  return function onLine(line: Uint8Array, fieldLength: number) {
    if (line.length === 0) {
      // empty line denotes end of message. Trigger the callback and start a new message:
      onMessage?.(message);
      message = newMessage();
    } else if (fieldLength > 0) {
      // exclude comments and lines with no values
      // line is of format "<field>:<value>" or "<field>: <value>"
      // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
      const field = decoder.decode(line.subarray(0, fieldLength));
      const valueOffset =
        fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);
      const value = decoder.decode(line.subarray(valueOffset));
      switch (field) {
        case "data":
          // if this message already has data, append the new value to the old.
          // otherwise, just set to the new value:
          message.data = message.data ? message.data + "\n" + value : value; // otherwise,
          break;
        case "event":
          message.event = value;
          break;
        case "id":
          onId((message.id = value));
          break;
        case "retry":
          const retry = parseInt(value, 10);
          if (!isNaN(retry)) {
            // per spec, ignore non-integers
            onRetry((message.retry = retry));
          }
          break;
      }
    }
  };
}

getMessages 用于处理 Server-Sent Events (SSE) 类型的数据。它接收三个回调函数作为参数: onIdonRetryonMessage ,并返回一个函数 onLine 用于处理数据行。

  • onId 回调在每次检测到消息 ID 时调用,传递 ID 字符串作为参数。
  • onRetry 回调在每次检测到重试时间时调用,传递重试时间的数值作为参数。
  • onMessage (可选)回调在每次消息结束时调用,传递完整的消息对象作为参数。

onLine 函数的工作流程如下:

  1. 当接收到一个空行时,表示一个消息的结束。此时,如果定义了 onMessage 回调,就会使用当前积累的消息数据调用它,然后开始一个新的消息。
  2. 如果行不是空的,且 fieldLength 大于 0,说明这一行包含有效的数据。行的格式遵循 <field>:<value><field>: <value>
  3. 使用 TextDecoderUint8Array 类型的行数据解码为字符串。
  4. 根据字段名( field )的不同,处理并更新消息对象( message )的相应属性:
    • data : 追加数据字符串到 message.data
    • event : 设置 message.event
    • id : 调用 onId 回调,并设置 message.id
    • retry : 解析重试时间,调用 onRetry 回调,并设置 message.retry

这个函数主要用于处理从服务器接收的事件流数据,通过逐行解析,从而能够将数据组装成完整的消息,并通过回调函数处理消息的不同部分,也就是将 getLine 处理好的 eventsource buffer 转换成 EventSourceMessage 类型的数据。

# fetch.ts

上述的这三个函数互相配合,能够将整个数据流转为 EventSourceMessage 类型的数据。在 fetch.ts 定义了它们具体的配合方法:

const response = await fetch(input, {
  ...rest,
  headers,
  signal: curRequestController.signal,
});
await onopen(response);
await getBytes(
  response.body!,
  getLines(
    getMessages(
      (id) => {
        if (id) {
          // store the id and send it back on the next retry:
          headers[LastEventId] = id;
        } else {
          // don't send the last-event-id header anymore:
          delete headers[LastEventId];
        }
      },
      (retry) => {
        retryInterval = retry;
      },
      onmessage
    )
  )
);
onclose?.();
dispose();
resolve();

在此处,三个的组合使用体现了一种流式处理数据的模式,逐层解析数据,从字节流到数据行,再到具体的消息。

  1. 外层 - getBytes 函数:它接收 response.body (一个 ReadableStream<Uint8Array> 类型的对象)和一个回调函数。这个回调函数是通过 getLines 函数生成的。 getBytes 负责从 response.body 中读取数据块,并将每个数据块传递给 getLines 生成的函数。 这个 response 是通过 fetch 来拿到的数据

    const response = await fetch(input, {
      ...rest,
      headers,
      signal: curRequestController.signal,
    });
  2. 中层 - getLines 函数:它创建并返回一个 onLine 函数,这个函数能够处理由 getBytes 传入的字节块,并将这些块分割成单独的行。对于每一行,它调用 getMessages 生成的函数进行处理。

  3. 内层 - getMessages 函数:它创建一个用于处理单独行的函数。这个函数会解析每行的内容,根据内容更新消息对象,并在适当的时候触发回调函数( onIdonRetryonmessage ):

    • onId 回调用于处理消息中的 ID 字段。如果 ID 存在,将其存储并准备在下次重试时发送回服务器。如果 ID 不存在,则从头中删除 LastEventId
    • onRetry 回调用于处理消息中的重试时间。
    • onmessage 回调在每个消息结束时调用,处理完整的消息内容。

# 结语

经过上述的解析,这一切看似非常的自然与逻辑自洽,可是为什么我们 fetch 的时候拿到的 response.data 是 ReadableStream<Uint8Array> 类型的对象呢?换句话说,为什么 fetch 可以建立 SSE 的链接呢?

其实这很简单,因为 SSE 本身就是基于 HTTP 协议的一种实现而已 ,而 fetch 这个 api 本身被设计就是用来处理 HTTP 响应的。而之所以能够源源不断地接收服务端返回的数据,其实也只是 SSE 返回的数据格式为 text/event-stream ,而我们一般的一次性获取数据的返回格式一般为 application/json