ToB企服应用市场:ToB评测及商务社交产业平台
标题:
利用 OkHttp 或者 WebClient 对接大模型的流式接口
[打印本页]
作者:
涛声依旧在
时间:
9 小时前
标题:
利用 OkHttp 或者 WebClient 对接大模型的流式接口
随着大模型(如 GPT、BERT 等)的流行,越来越多的 API 提供商开始提供流式接口,这使得我们可以或许更高效地处理大规模数据,尤其是在相应体非常大的情况下。本文将探讨怎样利用
OkHttp
和
Spring WebClient
两种方式对接大模型的流式接口,实现高效的异步请求处理。
一、流式接口概述
流式接口是一种允许客户端渐渐吸收服务器相应数据的机制。相比传统的“一次性返回”方式,流式接口通常用于处理大体积数据或者需要长时间处理的请求。客户端通过创建持久毗连,持续从服务器获取数据,而无需等待整个相应体加载完成。
大模型 API 的流式接口通常采取
HTTP 2.0
或
WebSocket
协议举行数据传播输,可以或许以流的方式发送大量的数据块或结果,这对实现低延迟、高吞吐量的请求处理至关紧张。
二、利用 OkHttp 对接流式接口
2.1 设置 OkHttp 客户端
OkHttp 是一个强大的 HTTP 客户端,支持异步请求、毗连池和流式相应等特性。下面是怎样利用 OkHttp 实现对接流式接口的示例。
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>在这里插入代码片
</dependency>
复制代码
package com.dolphin.bootstrap.agent.streamchat;
import com.alibaba.fastjson.JSONObject;
import com.dolphin.bootstrap.agent.BaseAgent;
import io.micrometer.common.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okio.BufferedSource;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* 利用 OKHttp 库,实现 Stream 流式调用
*/
@Component
@Slf4j
public class OKHttpStream implements BaseAgent {
private static final Integer TIME_OUT = 5 * 60;
@Value("ai.agent.streamChatUrl")
private String streamChatUrl;
@Override
public void chatStream(SseEmitter sseEmitter, String question) {
// 1. 创建 OkHttpClient 对象,并设置超时时间
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(TIME_OUT, TimeUnit.SECONDS)
.readTimeout(TIME_OUT, TimeUnit.SECONDS)
.writeTimeout(TIME_OUT, TimeUnit.SECONDS)
.build();
// 封装请求体参数
JSONObject params = new JSONObject();
params.put("question", question);
RequestBody requestBody = RequestBody.create(params.toJSONString(), MediaType.parse("application/json; charset=utf-8"));
// 封装请求头
Headers headers = new Headers.Builder()
.set("Content-Type","application/json")
.set("Accept","text/event-stream")
.build();
// 2. 构建 Request 对象
Request request = new Request.Builder()
.url(streamChatUrl)
.headers(headers)
.post(requestBody)
.build();
// 3. 创建 Call 对象
Call call = client.newCall(request);
// 4. 监听回调
call.enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
log.error("进入 onFailure方法 {}", e.getMessage(), e);
sseEmitter.completeWithError(e);
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
if (response.isSuccessful()) {
String chunkMessage = "";
try (ResponseBody responseBody = response.body();) {
BufferedSource source = responseBody.source();
while (!source.exhausted()) {
chunkMessage = source.readUtf8Line();
if (StringUtils.isBlank(chunkMessage)) {
continue;
}
JSONObject jsonObject = JSONObject.parseObject(chunkMessage);
if (null != jsonObject && null != jsonObject.getJSONObject("data")) {
String answer = jsonObject.getJSONObject("data").getString("answer");
sseEmitter.send(answer);
}
}
} catch (Exception e) {
log.error("解析失败: {}", e.getMessage(), e);
sseEmitter.completeWithError(e);
}
} else {
log.error("onResponse 方法请求失败 {}", response.message());
// TODO 重新发起请求
}
}
});
BaseAgent.super.chatStream(sseEmitter, question);
}
}
复制代码
2.2 代码解释
• call.enqueue(new Callback() {…});:异步执行请求,回调函数用于处理相应。
这种方式非常适合大模型 API,由于它可以或许及时获取并处理每一个数据块,而不需要等待整个相应体加载完成。
三、利用 WebClient 对接流式接口
Spring 5 引入的 WebClient 是基于相应式编程的 HTTP 客户端,特别适用于流式数据处理和高并发场景。WebClient 支持通过
Mono
和
Flux
来处理异步数据流,非常适合与大模型的流式接口对接。
3.1 设置 WebClient
首先,需要在 Spring Boot 项目中设置 WebClient。可以通过 WebClient.Builder 来定制客户端设置。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>6.1.4</version>
</dependency>
复制代码
package com.dolphin.bootstrap.agent.streamchat;
import com.alibaba.fastjson.JSONObject;
import com.dolphin.bootstrap.agent.BaseAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.Collections;
/**
* 利用 WebClient 类,实现 Stream 流式调用
*/
public class WebClientStream implements BaseAgent {
private static final Logger log = LoggerFactory.getLogger(WebClientStream.class);
@Value("ai.agent.streamChatUrl")
private String streamChatUrl;
@Override
public void chatStream(SseEmitter sseEmitter, String question) {
// 创建 WebClient 客户端
WebClient webClient = WebClient.builder().baseUrl(streamChatUrl).build();
// 封装参数
JSONObject params = new JSONObject();
params.put("question", question);
// 封装请求头
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
// accept 一定要设置为 TEXT_EVENT_STREAM
headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM));
Flux<String> eventStream = webClient
.post()
.uri("/stream")
.accept(MediaType.valueOf("text/event-stream;charset=UTF-8")) // 一定要设置
.headers(httpHeaders -> httpHeaders.addAll(headers))
.bodyValue(params.toJSONString())
.retrieve()
.bodyToFlux(String.class);
eventStream.subscribe(
data -> { // data 有什么key,value。具体看你对接agent的文档
try {
JSONObject bodyJson = JSONObject.parseObject(data);
if (bodyJson.getIntValue("code") == HttpStatus.OK.value()) {
// 事件类型
String event = bodyJson.getString("event");
// agent回答
String answer = bodyJson.getJSONObject("data").getString("answer");
if ("message".equals(event)) {
// 中间一段一段的消息
sseEmitter.send(answer);
} else if ("end".equals(event)) {
// 最后会全部返回
sseEmitter.send(answer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
},
error -> {
sseEmitter.completeWithError(error);
log.error("报错了:{}", error.getMessage(), error);
},
sseEmitter::complete
);
}
}
复制代码
3.2 代码解释
• bodyValue :添加请求体。
• retrieve :执行请求。
• bodyToFlux :将相应体处理为 Flux,Flux 代表一个可以包含多个元素的数据流。。
• subscribe :非壅闭,注册回调函数。第一个回调:正常数据返回。第二个回调:堕落。第三个回调:请求完成。
3.3 错误处理
WebClient 还提供了完善的错误处理机制,可以通过 .onStatus() 或 .onErrorResume() 方法捕获 HTTP 错误或其他异常情况。
webClient.get()
.uri("/")
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException("客户端错误")))
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new RuntimeException("服务器错误")))
.bodyToFlux(String.class)
.doOnNext(data -> System.out.println("接收到数据块:" + data))
.subscribe();
复制代码
四、OkHttp 与 WebClient 的对比
特性
OkHttp
WebClient
编程模型
壅闭或异步操作相应式编程,支持异步和流式处理
并发处理
支持异步请求,但不具备相应式特性支持高并发,且更适合流式数据处理
适用场景
传统的 HTTP 请求,适合简单的客户端高并发、相应式编程,流式接口等
集成生态
单独的 HTTP 客户端库与 Spring WebFlux 集成,适用于微服务架构
选择建议:
• 如果你已经在利用 Spring Boot,且需要处理流式接口,WebClient 是更保举的选择。它与 Spring WebFlux 共同默契,能处理高并发和复杂的数据流场景。
• 如果你只是需要处理简单的流式 HTTP 请求,OkHttp 是一个非常轻量且功能强大的库,适合用于快速集成。
五、总结
无论是
OkHttp
还是
WebClient
,它们都提供了高效的流式数据处理本领,可以或许帮助我们在与大模型 API 对接时减少等待时间,提高相应速度和吞吐量。选择哪个工具取决于你的项目需求和技能栈,如果你在利用 Spring Boot 或 WebFlux,WebClient 无疑是一个抱负的选择。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4