随着大模型(如 GPT、BERT 等)的流行,越来越多的 API 提供商开始提供流式接口,这使得我们可以或许更高效地处理大规模数据,尤其是在相应体非常大的情况下。本文将探讨怎样利用 OkHttp 和 Spring WebClient 两种方式对接大模型的流式接口,实现高效的异步请求处理。
一、流式接口概述
流式接口是一种允许客户端渐渐吸收服务器相应数据的机制。相比传统的“一次性返回”方式,流式接口通常用于处理大体积数据或者需要长时间处理的请求。客户端通过创建持久毗连,持续从服务器获取数据,而无需等待整个相应体加载完成。
大模型 API 的流式接口通常采取 HTTP 2.0 或 WebSocket 协议举行数据传播输,可以或许以流的方式发送大量的数据块或结果,这对实现低延迟、高吞吐量的请求处理至关紧张。
二、利用 OkHttp 对接流式接口
2.1 设置 OkHttp 客户端
OkHttp 是一个强大的 HTTP 客户端,支持异步请求、毗连池和流式相应等特性。下面是怎样利用 OkHttp 实现对接流式接口的示例。
- <dependency>
- <groupId>com.squareup.okhttp3</groupId>
- <artifactId>okhttp</artifactId>
- <version>4.12.0</version>在这里插入代码片
- </dependency>
复制代码- package com.dolphin.bootstrap.agent.streamchat;
- import com.alibaba.fastjson.JSONObject;
- import com.dolphin.bootstrap.agent.BaseAgent;
- import io.micrometer.common.util.StringUtils;
- import lombok.extern.slf4j.Slf4j;
- import okhttp3.*;
- import okio.BufferedSource;
- import org.jetbrains.annotations.NotNull;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.http.HttpHeaders;
- import org.springframework.stereotype.Component;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
- import java.io.IOException;
- import java.util.Collections;
- import java.util.concurrent.TimeUnit;
- /**
- * 利用 OKHttp 库,实现 Stream 流式调用
- */
- @Component
- @Slf4j
- public class OKHttpStream implements BaseAgent {
- private static final Integer TIME_OUT = 5 * 60;
- @Value("ai.agent.streamChatUrl")
- private String streamChatUrl;
- @Override
- public void chatStream(SseEmitter sseEmitter, String question) {
- // 1. 创建 OkHttpClient 对象,并设置超时时间
- OkHttpClient client = new OkHttpClient.Builder()
- .connectTimeout(TIME_OUT, TimeUnit.SECONDS)
- .readTimeout(TIME_OUT, TimeUnit.SECONDS)
- .writeTimeout(TIME_OUT, TimeUnit.SECONDS)
- .build();
- // 封装请求体参数
- JSONObject params = new JSONObject();
- params.put("question", question);
- RequestBody requestBody = RequestBody.create(params.toJSONString(), MediaType.parse("application/json; charset=utf-8"));
- // 封装请求头
- Headers headers = new Headers.Builder()
- .set("Content-Type","application/json")
- .set("Accept","text/event-stream")
- .build();
- // 2. 构建 Request 对象
- Request request = new Request.Builder()
- .url(streamChatUrl)
- .headers(headers)
- .post(requestBody)
- .build();
- // 3. 创建 Call 对象
- Call call = client.newCall(request);
- // 4. 监听回调
- call.enqueue(new Callback() {
- @Override
- public void onFailure(@NotNull Call call, @NotNull IOException e) {
- log.error("进入 onFailure方法 {}", e.getMessage(), e);
- sseEmitter.completeWithError(e);
- }
- @Override
- public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
- if (response.isSuccessful()) {
- String chunkMessage = "";
- try (ResponseBody responseBody = response.body();) {
- BufferedSource source = responseBody.source();
- while (!source.exhausted()) {
- chunkMessage = source.readUtf8Line();
- if (StringUtils.isBlank(chunkMessage)) {
- continue;
- }
- JSONObject jsonObject = JSONObject.parseObject(chunkMessage);
- if (null != jsonObject && null != jsonObject.getJSONObject("data")) {
- String answer = jsonObject.getJSONObject("data").getString("answer");
- sseEmitter.send(answer);
- }
-
- }
- } catch (Exception e) {
- log.error("解析失败: {}", e.getMessage(), e);
- sseEmitter.completeWithError(e);
- }
- } else {
- log.error("onResponse 方法请求失败 {}", response.message());
- // TODO 重新发起请求
- }
- }
- });
- BaseAgent.super.chatStream(sseEmitter, question);
- }
- }
复制代码 2.2 代码解释
• call.enqueue(new Callback() {…});:异步执行请求,回调函数用于处理相应。
这种方式非常适合大模型 API,由于它可以或许及时获取并处理每一个数据块,而不需要等待整个相应体加载完成。
三、利用 WebClient 对接流式接口
Spring 5 引入的 WebClient 是基于相应式编程的 HTTP 客户端,特别适用于流式数据处理和高并发场景。WebClient 支持通过 Mono 和 Flux 来处理异步数据流,非常适合与大模型的流式接口对接。
3.1 设置 WebClient
首先,需要在 Spring Boot 项目中设置 WebClient。可以通过 WebClient.Builder 来定制客户端设置。
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-webflux</artifactId>
- <version>6.1.4</version>
- </dependency>
复制代码- package com.dolphin.bootstrap.agent.streamchat;
- import com.alibaba.fastjson.JSONObject;
- import com.dolphin.bootstrap.agent.BaseAgent;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.http.HttpHeaders;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.MediaType;
- import org.springframework.web.reactive.function.client.WebClient;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
- import reactor.core.publisher.Flux;
- import java.io.IOException;
- import java.util.Collections;
- /**
- * 利用 WebClient 类,实现 Stream 流式调用
- */
- public class WebClientStream implements BaseAgent {
- private static final Logger log = LoggerFactory.getLogger(WebClientStream.class);
- @Value("ai.agent.streamChatUrl")
- private String streamChatUrl;
- @Override
- public void chatStream(SseEmitter sseEmitter, String question) {
- // 创建 WebClient 客户端
- WebClient webClient = WebClient.builder().baseUrl(streamChatUrl).build();
- // 封装参数
- JSONObject params = new JSONObject();
- params.put("question", question);
- // 封装请求头
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
- // accept 一定要设置为 TEXT_EVENT_STREAM
- headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM));
- Flux<String> eventStream = webClient
- .post()
- .uri("/stream")
- .accept(MediaType.valueOf("text/event-stream;charset=UTF-8")) // 一定要设置
- .headers(httpHeaders -> httpHeaders.addAll(headers))
- .bodyValue(params.toJSONString())
- .retrieve()
- .bodyToFlux(String.class);
- eventStream.subscribe(
- data -> { // data 有什么key,value。具体看你对接agent的文档
- try {
- JSONObject bodyJson = JSONObject.parseObject(data);
- if (bodyJson.getIntValue("code") == HttpStatus.OK.value()) {
- // 事件类型
- String event = bodyJson.getString("event");
- // agent回答
- String answer = bodyJson.getJSONObject("data").getString("answer");
- if ("message".equals(event)) {
- // 中间一段一段的消息
- sseEmitter.send(answer);
- } else if ("end".equals(event)) {
- // 最后会全部返回
- sseEmitter.send(answer);
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- },
- error -> {
- sseEmitter.completeWithError(error);
- log.error("报错了:{}", error.getMessage(), error);
- },
- sseEmitter::complete
- );
- }
- }
复制代码 3.2 代码解释
• bodyValue :添加请求体。
• retrieve :执行请求。
• bodyToFlux :将相应体处理为 Flux,Flux 代表一个可以包含多个元素的数据流。。
• subscribe :非壅闭,注册回调函数。第一个回调:正常数据返回。第二个回调:堕落。第三个回调:请求完成。
3.3 错误处理
WebClient 还提供了完善的错误处理机制,可以通过 .onStatus() 或 .onErrorResume() 方法捕获 HTTP 错误或其他异常情况。
- webClient.get()
- .uri("/")
- .retrieve()
- .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException("客户端错误")))
- .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new RuntimeException("服务器错误")))
- .bodyToFlux(String.class)
- .doOnNext(data -> System.out.println("接收到数据块:" + data))
- .subscribe();
复制代码 四、OkHttp 与 WebClient 的对比
特性OkHttpWebClient编程模型壅闭或异步操作相应式编程,支持异步和流式处理并发处理支持异步请求,但不具备相应式特性支持高并发,且更适合流式数据处理适用场景传统的 HTTP 请求,适合简单的客户端高并发、相应式编程,流式接口等集成生态单独的 HTTP 客户端库与 Spring WebFlux 集成,适用于微服务架构 选择建议:
• 如果你已经在利用 Spring Boot,且需要处理流式接口,WebClient 是更保举的选择。它与 Spring WebFlux 共同默契,能处理高并发和复杂的数据流场景。
• 如果你只是需要处理简单的流式 HTTP 请求,OkHttp 是一个非常轻量且功能强大的库,适合用于快速集成。
五、总结
无论是 OkHttp 还是 WebClient,它们都提供了高效的流式数据处理本领,可以或许帮助我们在与大模型 API 对接时减少等待时间,提高相应速度和吞吐量。选择哪个工具取决于你的项目需求和技能栈,如果你在利用 Spring Boot 或 WebFlux,WebClient 无疑是一个抱负的选择。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |