ToB企服应用市场:ToB评测及商务社交产业平台

标题: 利用 OkHttp 或者 WebClient 对接大模型的流式接口 [打印本页]

作者: 涛声依旧在    时间: 9 小时前
标题: 利用 OkHttp 或者 WebClient 对接大模型的流式接口
随着大模型(如 GPT、BERT 等)的流行,越来越多的 API 提供商开始提供流式接口,这使得我们可以或许更高效地处理大规模数据,尤其是在相应体非常大的情况下。本文将探讨怎样利用 OkHttpSpring WebClient 两种方式对接大模型的流式接口,实现高效的异步请求处理。
一、流式接口概述
流式接口是一种允许客户端渐渐吸收服务器相应数据的机制。相比传统的“一次性返回”方式,流式接口通常用于处理大体积数据或者需要长时间处理的请求。客户端通过创建持久毗连,持续从服务器获取数据,而无需等待整个相应体加载完成。
大模型 API 的流式接口通常采取 HTTP 2.0WebSocket 协议举行数据传播输,可以或许以流的方式发送大量的数据块或结果,这对实现低延迟、高吞吐量的请求处理至关紧张。
二、利用 OkHttp 对接流式接口
2.1 设置 OkHttp 客户端
OkHttp 是一个强大的 HTTP 客户端,支持异步请求、毗连池和流式相应等特性。下面是怎样利用 OkHttp 实现对接流式接口的示例。
  1. <dependency>
  2.     <groupId>com.squareup.okhttp3</groupId>
  3.     <artifactId>okhttp</artifactId>
  4.     <version>4.12.0</version>在这里插入代码片
  5. </dependency>
复制代码
  1. package com.dolphin.bootstrap.agent.streamchat;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.dolphin.bootstrap.agent.BaseAgent;
  4. import io.micrometer.common.util.StringUtils;
  5. import lombok.extern.slf4j.Slf4j;
  6. import okhttp3.*;
  7. import okio.BufferedSource;
  8. import org.jetbrains.annotations.NotNull;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.http.HttpHeaders;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  13. import java.io.IOException;
  14. import java.util.Collections;
  15. import java.util.concurrent.TimeUnit;
  16. /**
  17. * 利用 OKHttp 库,实现 Stream 流式调用
  18. */
  19. @Component
  20. @Slf4j
  21. public class OKHttpStream implements BaseAgent {
  22.     private static final Integer TIME_OUT = 5 * 60;
  23.     @Value("ai.agent.streamChatUrl")
  24.     private String streamChatUrl;
  25.     @Override
  26.     public void chatStream(SseEmitter sseEmitter, String question) {
  27.         // 1. 创建 OkHttpClient 对象,并设置超时时间
  28.         OkHttpClient client = new OkHttpClient.Builder()
  29.                 .connectTimeout(TIME_OUT, TimeUnit.SECONDS)
  30.                 .readTimeout(TIME_OUT, TimeUnit.SECONDS)
  31.                 .writeTimeout(TIME_OUT, TimeUnit.SECONDS)
  32.                 .build();
  33.         //  封装请求体参数
  34.         JSONObject params = new JSONObject();
  35.         params.put("question", question);
  36.         RequestBody requestBody = RequestBody.create(params.toJSONString(), MediaType.parse("application/json; charset=utf-8"));
  37.         // 封装请求头
  38.         Headers headers = new Headers.Builder()
  39.                 .set("Content-Type","application/json")
  40.                 .set("Accept","text/event-stream")
  41.                 .build();
  42.         // 2. 构建 Request 对象
  43.         Request request = new Request.Builder()
  44.                 .url(streamChatUrl)
  45.                 .headers(headers)
  46.                 .post(requestBody)
  47.                 .build();
  48.         // 3. 创建 Call 对象
  49.         Call call = client.newCall(request);
  50.         // 4. 监听回调
  51.         call.enqueue(new Callback() {
  52.             @Override
  53.             public void onFailure(@NotNull Call call, @NotNull IOException e) {
  54.                 log.error("进入 onFailure方法 {}", e.getMessage(), e);
  55.                 sseEmitter.completeWithError(e);
  56.             }
  57.             @Override
  58.             public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
  59.                 if (response.isSuccessful()) {
  60.                     String chunkMessage = "";
  61.                     try (ResponseBody responseBody = response.body();) {
  62.                         BufferedSource source = responseBody.source();
  63.                         while (!source.exhausted()) {
  64.                             chunkMessage = source.readUtf8Line();
  65.                             if (StringUtils.isBlank(chunkMessage)) {
  66.                                 continue;
  67.                             }
  68.                             JSONObject jsonObject = JSONObject.parseObject(chunkMessage);
  69.                             if (null != jsonObject && null != jsonObject.getJSONObject("data")) {
  70.                                 String answer = jsonObject.getJSONObject("data").getString("answer");
  71.                                 sseEmitter.send(answer);
  72.                             }
  73.                            
  74.                         }
  75.                     } catch (Exception e) {
  76.                         log.error("解析失败: {}", e.getMessage(), e);
  77.                         sseEmitter.completeWithError(e);
  78.                     }
  79.                 } else {
  80.                     log.error("onResponse 方法请求失败 {}", response.message());
  81.                     // TODO 重新发起请求
  82.                 }
  83.             }
  84.         });
  85.         BaseAgent.super.chatStream(sseEmitter, question);
  86.     }
  87. }
复制代码
2.2 代码解释
​ • call.enqueue(new Callback() {…});:异步执行请求,回调函数用于处理相应。
这种方式非常适合大模型 API,由于它可以或许及时获取并处理每一个数据块,而不需要等待整个相应体加载完成。
三、利用 WebClient 对接流式接口
Spring 5 引入的 WebClient 是基于相应式编程的 HTTP 客户端,特别适用于流式数据处理和高并发场景。WebClient 支持通过 MonoFlux 来处理异步数据流,非常适合与大模型的流式接口对接。
3.1 设置 WebClient
首先,需要在 Spring Boot 项目中设置 WebClient。可以通过 WebClient.Builder 来定制客户端设置。
  1. <dependency>
  2.     <groupId>org.springframework</groupId>
  3.     <artifactId>spring-webflux</artifactId>
  4.     <version>6.1.4</version>
  5. </dependency>
复制代码
  1. package com.dolphin.bootstrap.agent.streamchat;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.dolphin.bootstrap.agent.BaseAgent;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.http.HttpHeaders;
  8. import org.springframework.http.HttpStatus;
  9. import org.springframework.http.MediaType;
  10. import org.springframework.web.reactive.function.client.WebClient;
  11. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  12. import reactor.core.publisher.Flux;
  13. import java.io.IOException;
  14. import java.util.Collections;
  15. /**
  16. * 利用 WebClient 类,实现 Stream 流式调用
  17. */
  18. public class WebClientStream implements BaseAgent {
  19.     private static final Logger log = LoggerFactory.getLogger(WebClientStream.class);
  20.     @Value("ai.agent.streamChatUrl")
  21.     private String streamChatUrl;
  22.     @Override
  23.     public void chatStream(SseEmitter sseEmitter, String question) {
  24.         // 创建 WebClient 客户端
  25.         WebClient webClient = WebClient.builder().baseUrl(streamChatUrl).build();
  26.         // 封装参数
  27.         JSONObject params = new JSONObject();
  28.         params.put("question", question);
  29.         // 封装请求头
  30.         HttpHeaders headers = new HttpHeaders();
  31.         headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
  32.         // accept 一定要设置为 TEXT_EVENT_STREAM
  33.         headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM));
  34.         Flux<String> eventStream = webClient
  35.                 .post()
  36.                 .uri("/stream")
  37.                 .accept(MediaType.valueOf("text/event-stream;charset=UTF-8")) // 一定要设置
  38.                 .headers(httpHeaders -> httpHeaders.addAll(headers))
  39.                 .bodyValue(params.toJSONString())
  40.                 .retrieve()
  41.                 .bodyToFlux(String.class);
  42.         eventStream.subscribe(
  43.             data -> { // data 有什么key,value。具体看你对接agent的文档
  44.                 try {
  45.                     JSONObject bodyJson = JSONObject.parseObject(data);
  46.                     if (bodyJson.getIntValue("code") == HttpStatus.OK.value()) {
  47.                         // 事件类型
  48.                         String event = bodyJson.getString("event");
  49.                         // agent回答
  50.                         String answer = bodyJson.getJSONObject("data").getString("answer");
  51.                         if ("message".equals(event)) {
  52.                             // 中间一段一段的消息
  53.                             sseEmitter.send(answer);
  54.                         } else if ("end".equals(event)) {
  55.                             // 最后会全部返回
  56.                             sseEmitter.send(answer);
  57.                         }
  58.                     }
  59.                 } catch (IOException e) {
  60.                     throw new RuntimeException(e);
  61.                 }
  62.             },
  63.             error -> {
  64.                 sseEmitter.completeWithError(error);
  65.                 log.error("报错了:{}", error.getMessage(), error);
  66.             },
  67.             sseEmitter::complete
  68.         );
  69.     }
  70. }
复制代码
3.2 代码解释
​ • bodyValue :添加请求体。
​ • retrieve :执行请求。
​ • bodyToFlux :将相应体处理为 Flux,Flux 代表一个可以包含多个元素的数据流。。
​ • subscribe :非壅闭,注册回调函数。第一个回调:正常数据返回。第二个回调:堕落。第三个回调:请求完成。
3.3 错误处理
WebClient 还提供了完善的错误处理机制,可以通过 .onStatus() 或 .onErrorResume() 方法捕获 HTTP 错误或其他异常情况。
  1. webClient.get()
  2.     .uri("/")
  3.     .retrieve()
  4.     .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException("客户端错误")))
  5.     .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new RuntimeException("服务器错误")))
  6.     .bodyToFlux(String.class)
  7.     .doOnNext(data -> System.out.println("接收到数据块:" + data))
  8.     .subscribe();
复制代码
四、OkHttp 与 WebClient 的对比
特性OkHttpWebClient编程模型壅闭或异步操作相应式编程,支持异步和流式处理并发处理支持异步请求,但不具备相应式特性支持高并发,且更适合流式数据处理适用场景传统的 HTTP 请求,适合简单的客户端高并发、相应式编程,流式接口等集成生态单独的 HTTP 客户端库与 Spring WebFlux 集成,适用于微服务架构 选择建议:
​ • 如果你已经在利用 Spring Boot,且需要处理流式接口,WebClient 是更保举的选择。它与 Spring WebFlux 共同默契,能处理高并发和复杂的数据流场景。
​ • 如果你只是需要处理简单的流式 HTTP 请求,OkHttp 是一个非常轻量且功能强大的库,适合用于快速集成。
五、总结
无论是 OkHttp 还是 WebClient,它们都提供了高效的流式数据处理本领,可以或许帮助我们在与大模型 API 对接时减少等待时间,提高相应速度和吞吐量。选择哪个工具取决于你的项目需求和技能栈,如果你在利用 Spring Boot 或 WebFlux,WebClient 无疑是一个抱负的选择。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4