SpringBoot集成Netty服务器接收大量数据实例
依赖<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
创建Netty服务类
@Component
@RequiredArgsConstructor
public class NettyServerConfig {
// 定义服务器端口号
private final int port = 13030;
// 注入JsonServerHandler处理器
private final JsonServerHandler jsonServerHandler;
// 初始化方法,在Bean创建时启动Netty服务器
@PostConstruct
public void startNetty() {
new Thread(() -> {
// 创建boss和worker两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建ServerBootstrap实例以配置服务器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)// 指定使用NIO传输Channel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 配置通道处理器,包括解码器、编码器和业务处理器
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(jsonServerHandler);// 使用注入的JsonServerHandler
}
})
.option(ChannelOption.SO_BACKLOG, 128)// 设置队列大小
.childOption(ChannelOption.SO_KEEPALIVE, true);// 保持连接
// 绑定端口并启动服务器
ChannelFuture f = bootstrap.bind(port).sync();
// 等待服务器通道关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅地关闭线程组
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}).start();
}
} 创建EchoServerHandler 类接收并存储数据
@Slf4j
@ChannelHandler.Sharable
@Component
public class JsonServerHandler extends SimpleChannelInboundHandler<String> {
@Autowired
private ReceptionMapper receptionMapper;
private final ObjectMapper objectMapper = new ObjectMapper();
// 处理接收到的消息
@Override
@Transactional
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
try {
// 反序列化接收到的JSON消息
ReceptionData receptionData = null;
try {
receptionData = objectMapper.readValue(msg, ReceptionData.class);
} catch (JsonProcessingException e) {
log.error("解码数据错误", e);
}
//存入数据库
receptionMapper.saveData(receptionData );
} catch (Exception e) {
log.error("接收数据错误", e);
}
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]