深入探索Spring AI:源码分析流式回答

打印 上一主题 下一主题

主题 884|帖子 884|积分 2652

在上一章节中,我们深入分析了Spring AI的阻塞式哀求与响应机制,并探究了如何加强其影象能力。今天,我们将重点讲解流式响应的概念与实现。毕竟,AI的流式回答功能与其交互体验密切干系,是提升用户满意度的重要组成部门。
基本用法

基本用法非常简单,只需增加一个 stream 方法即可实现所需功能。接下来,我们将通过代码示例来展示这一过程,资助您更清楚地理解如何在现实应用中举行操作。请看以下代码:
  1. @GetMapping(value = "/ai-stream",produces = MediaType.APPLICATION_OCTET_STREAM_VALUE + ";charset=UTF-8")
  2. Flux<String> generationByStream(@RequestParam("userInput") String userInput) {
  3.     Flux<String> output = chatClient.prompt()
  4.             .user(userInput)
  5.             .stream()
  6.             .content();
  7.     return output;
  8. }
复制代码
在我们增加 stream 方法之后,返回的对象范例将不再是原来的阻塞式 CallResponseSpec,而是转换为非阻塞的 StreamResponseSpec。与此同时,返回的数据范例也由之前的 String 变动为 Flux。
在深入探究其具体应用之前,起首让我来介绍一下 Flux 的概念与特性。
Spring WebFlux的处置惩罚器实现

起首,在 WebFlux 中,处置惩罚器已经实现了非阻塞式的功能。这意味着,只要我们的代码返回一个 Flux 对象,就能轻松实现响应功能。通过这种方式,应用步伐能够高效地处置惩罚并发哀求,而不会因阻塞操作而影响整体性能。
  1.     @Override
  2.     public Mono<Void> handle(ServerWebExchange exchange) {
  3.         if (this.handlerMappings == null) {
  4.             return createNotFoundError();
  5.         }
  6.         if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
  7.             return handlePreFlight(exchange);
  8.         }
  9.         return Flux.fromIterable(this.handlerMappings)
  10.                 .concatMap(mapping -> mapping.getHandler(exchange))
  11.                 .next()
  12.                 .switchIfEmpty(createNotFoundError())
  13.                 .onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex)))
  14.                 .flatMap(handler -> handleRequestWith(exchange, handler));
  15.     }
复制代码
这里简单介绍一下 Spring WebFlux,固然这不是我们的重点,但了解其基本概念还是很有资助的。Spring WebFlux 是 Spring 框架的一部门,专为构建反应式应用而计划。它支持异步和非阻塞的编程模型,使得处置惩罚高并发哀求变得更加高效。以下是 WebFlux 的几个关键特性:

  • 反应式编程:WebFlux 基于反应式编程模型,使用 Mono 和 Flux 范例来处置惩罚数据流。Mono 表示零或一个元素,而 Flux 则表示零个或多个元素。这种模型使得我们可以轻松处置惩罚异步数据流,从而提高代码的可读性和可维护性。
  • 非阻塞 I/O:WebFlux 通过非阻塞的 I/O 操作(如 Netty 或 Servlet 3.1+ 容器)来实现高效的资源使用。与传统的阻塞 I/O 不同,WebFlux 在等候响应时能够开释线程,这样一来,就可以显著提高应用的并发能力,支持更多的同时哀求而不增加线程开销。
了解这些特性将为后续的非阻塞式响应计划奠定基础,资助我们更好地使用 WebFlux 的能力来提升应用性能。
源码分析

如今我们来详细看看我们的 content 是如何操作的。接下来的代码示例将展示具体的实现方式,资助我们理解在 WebFlux 中如那边置惩罚数据流和响应:
  1. public Flux<String> content() {
  2.     return doGetFluxChatResponse(this.request).map(r -> {
  3.         if (r.getResult() == null || r.getResult().getOutput() == null
  4.                 || r.getResult().getOutput().getContent() == null) {
  5.             return "";
  6.         }
  7.         return r.getResult().getOutput().getContent();
  8.     }).filter(StringUtils::hasLength);
  9. }
复制代码
这里的实现相对简单,主要是传入了一个函数。接下来,我们将深入分析 doGetFluxChatResponse 的代码实现,以便更好地理解其具体逻辑和运作方式:
  1. private Flux<ChatResponse> doGetFluxChatResponse2(DefaultChatClientRequestSpec inputRequest) {
  2. //此处省略重复代码
  3.     var fluxChatResponse = this.chatModel.stream(prompt);
  4. //此处省略重复代码
  5.     return advisedResponse;
  6. }
复制代码
这里的代码逻辑与阻塞回答基本相同,唯一的不同之处在于它调用了 chatModel.stream(prompt) 方法。接下来,我们将深入探究 chatModel.stream(prompt) 方法的具体实现和其背后的计划思绪:
  1. public Flux<ChatResponse> stream(Prompt prompt) {
  2.         return Flux.deferContextual(contextView -> {
  3.         //此处省略重复代码
  4.             Flux<OpenAiApi.ChatCompletionChunk> completionChunks = this.openAiApi.chatCompletionStream(request,
  5.                     getAdditionalHttpHeaders(prompt));
  6. //此处省略重复代码
  7.             Flux<ChatResponse> chatResponse = completionChunks.map(this::chunkToChatCompletion)
  8.                 .switchMap(chatCompletion -> Mono.just(chatCompletion).map(chatCompletion2 -> {
  9. //此处省略重复代码
  10.                         return new ChatResponse(generations, from(chatCompletion2, null));
  11.                     }
  12.                 }));
  13. //此处省略重复代码
  14.             return new MessageAggregator().aggregate(flux, observationContext::setResponse);
  15.         });
  16.     }
复制代码
同样的逻辑在这里就不再赘述,我们将重点关注其中的区别。在这一部门,我们使用了 chatCompletionStream,而且与之前不同的是,这里不再使用 retryTemplate,而是引入了 webClient,这是一个能够吸收事件流的工具类。
  1. public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chatRequest,
  2.         MultiValueMap<String, String> additionalHttpHeader) {
  3.     Assert.notNull(chatRequest, "The request body can not be null.");
  4.     Assert.isTrue(chatRequest.stream(), "Request must set the stream property to true.");
  5.     AtomicBoolean isInsideTool = new AtomicBoolean(false);
  6.     return this.webClient.post()
  7.         .uri(this.completionsPath)
  8.         .headers(headers -> headers.addAll(additionalHttpHeader))
  9.         .body(Mono.just(chatRequest), ChatCompletionRequest.class)
  10.         .retrieve()
  11.         .bodyToFlux(String.class)
  12.         // cancels the flux stream after the "[DONE]" is received.
  13.         .takeUntil(SSE_DONE_PREDICATE)
  14.         // filters out the "[DONE]" message.
  15.         .filter(SSE_DONE_PREDICATE.negate())
  16.         .map(content -> ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class))
  17. //此处省略一堆代码
复制代码
这段代码的主要目标是通过 webClient 向指定路径发起一个 POST 哀求,同时设置合适的哀求头和哀求体。在获取响应数据时,使用了事件流的方式(通过 bodyToFlux 方法)来吸收响应内容,并对数据举行过滤和转换,最终将其转化为 ChatCompletionChunk 对象。
只管其余的业务逻辑与之前相似,但有一点显著的区别,即整个流程的返回范例以及与 OpenAI API 的调用方式都是非阻塞式的。
总结

在当今的数字时代,流式响应机制不仅提升了系统的性能,还在用户体验上饰演了关键脚色。通过引入 Flux 范例,Spring WebFlux 的计划理念使得应用能够以非阻塞的方式处置惩罚并发哀求,从而有效使用资源并减少响应延迟。
我们终于全面讲解了Spring AI的基本操作,包罗阻塞式回答、流式回答以及影象加强功能。这些内容为我们深入理解其工作机制奠定了基础。接下来,我们将继续深入探索源码,重点分析回调函数、实体类映射等重要功能。
这将资助我们更好地理解Spring AI的内部运作原理,并为进一步的优化和定制化提供指导。
我是努力的小雨,一名 Java 服务端码农,潜心研究着 AI 技术的奥秘。我热爱技术交流与分享,对开源社区充满热情。同时也是一位腾讯云创作之星、阿里云专家博主、华为云云享专家、掘金优秀作者。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

盛世宏图

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表