SpringBoot集成Netty服务器接收大量数据实例

打印 上一主题 下一主题

主题 891|帖子 891|积分 2673

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.68.Final</version>
</dependency>
 
创建Netty服务类

  1. @Component
  2. @RequiredArgsConstructor
  3. public class NettyServerConfig {
  4.     // 定义服务器端口号
  5.     private final int port = 13030;
  6.     // 注入JsonServerHandler处理器
  7.     private final JsonServerHandler jsonServerHandler;
  8.     // 初始化方法,在Bean创建时启动Netty服务器
  9.     @PostConstruct
  10.     public void startNetty() {
  11.         new Thread(() -> {
  12.             // 创建boss和worker两个线程组
  13.             EventLoopGroup bossGroup = new NioEventLoopGroup();
  14.             EventLoopGroup workerGroup = new NioEventLoopGroup();
  15.             try {
  16.                 // 创建ServerBootstrap实例以配置服务器
  17.                 ServerBootstrap bootstrap = new ServerBootstrap();
  18.                 bootstrap.group(bossGroup, workerGroup)
  19.                         .channel(NioServerSocketChannel.class)  // 指定使用NIO传输Channel
  20.                         .childHandler(new ChannelInitializer<SocketChannel>() {
  21.                             @Override
  22.                             protected void initChannel(SocketChannel ch) throws Exception {
  23.                                 // 配置通道处理器,包括解码器、编码器和业务处理器
  24.                                 ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
  25.                                 ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
  26.                                 ch.pipeline().addLast(jsonServerHandler);  // 使用注入的JsonServerHandler
  27.                             }
  28.                         })
  29.                         .option(ChannelOption.SO_BACKLOG, 128)  // 设置队列大小
  30.                         .childOption(ChannelOption.SO_KEEPALIVE, true);  // 保持连接
  31.                 // 绑定端口并启动服务器
  32.                 ChannelFuture f = bootstrap.bind(port).sync();
  33.                 // 等待服务器通道关闭
  34.                 f.channel().closeFuture().sync();
  35.             } catch (InterruptedException e) {
  36.                 e.printStackTrace();
  37.             } finally {
  38.                 // 优雅地关闭线程组
  39.                 workerGroup.shutdownGracefully();
  40.                 bossGroup.shutdownGracefully();
  41.             }
  42.         }).start();
  43.     }
  44. }
复制代码
创建EchoServerHandler 类接收并存储数据

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. @Component
  4. public class JsonServerHandler extends SimpleChannelInboundHandler<String> {
  5.     @Autowired
  6.     private ReceptionMapper receptionMapper;
  7.     private final ObjectMapper objectMapper = new ObjectMapper();
  8.     // 处理接收到的消息
  9.     @Override
  10.     @Transactional
  11.     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  12.         try {
  13.             // 反序列化接收到的JSON消息
  14.             ReceptionData receptionData = null;
  15.             try {
  16.                 receptionData = objectMapper.readValue(msg, ReceptionData.class);
  17.             } catch (JsonProcessingException e) {
  18.                 log.error("解码数据错误", e);
  19.             }
  20.             //存入数据库
  21.             receptionMapper.saveData(receptionData );
  22.         } catch (Exception e) {
  23.             log.error("接收数据错误", e);
  24.         }
  25.     }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

杀鸡焉用牛刀

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表