天津储鑫盛钢材现货供应商 发表于 2024-10-11 17:53:29

netty之SpringBoot+Netty+Elasticsearch网络日志信息数据存储

#开发环境
https://i-blog.csdnimg.cn/direct/a8fe8d4be2d14fcaa41ac8d4c967c54d.png
环境准备
windows安装包 下载
注意 es是以来java环境 以是需要安装jdk 支持1.7以上
es-hander下载可视化操作插件
@Document(indexName = "stack", type = "group_user")
public class User {

    @Id
    private String id;
    private String name;   //姓名
    private Integer age;   //年龄
    private String level;//级别
    private Date entryDate;//时间
    private String mobile; //电话
    private String email;//邮箱
    private String address;//地址


    public User(String id, String name, Integer age, String level, Date entryDate, String mobile, String email, String address) {
      this.id = id;
      this.name = name;
      this.age = age;
      this.level = level;
      this.entryDate = entryDate;
      this.mobile = mobile;
      this.email = email;
      this.address = address;

    }

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }

    public String getName() {
      return name;
    }

    public void setName(String name) {
      this.name = name;
    }

    public Integer getAge() {
      return age;
    }

    public void setAge(Integer age) {
      this.age = age;
    }

    public String getLevel() {
      return level;
    }

    public void setLevel(String level) {
      this.level = level;
    }

    public Date getEntryDate() {
      return entryDate;
    }

    public void setEntryDate(Date entryDate) {
      this.entryDate = entryDate;
    }

    public String getMobile() {
      return mobile;
    }

    public void setMobile(String mobile) {
      this.mobile = mobile;
    }

    public String getEmail() {
      return email;
    }

    public void setEmail(String email) {
      this.email = email;
    }

    public String getAddress() {
      return address;
    }

    public void setAddress(String address) {
      this.address = address;
    }
}
@Service("myServerHandler")
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);

    @Autowired
    private UserService userService;

    /**
   * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
   */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
      SocketChannel channel = (SocketChannel) ctx.channel();
      logger.info("链接报告开始");
      logger.info("链接报告信息:有一客户端链接到本服务端");
      logger.info("链接报告IP:{}", channel.localAddress().getHostString());
      logger.info("链接报告Port:{}", channel.localAddress().getPort());
      logger.info("链接报告完毕");
    }

    /**
   * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
   */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      logger.info("客户端断开链接{}", ctx.channel().localAddress().toString());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
      logger.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + JSON.toJSONString(msg));
      //接收数据写入到Elasticsearch
      TransportProtocol transportProtocol = (TransportProtocol) msg;
      userService.save((User) transportProtocol.getObj());
    }

    /**
   * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
   */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
      logger.info("异常信息:\r\n" + cause.getMessage());
    }

}

@Component("nettyServer")
public class NettyServer {

    private Logger logger = LoggerFactory.getLogger(NettyServer.class);

    @Resource
    private MyChannelInitializer myChannelInitializer;

    //配置服务端NIO线程组
    private final EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    private final EventLoopGroup childGroup = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture bing(InetSocketAddress address) {
      ChannelFuture channelFuture = null;
      try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                  .channel(NioServerSocketChannel.class)    //非阻塞模式
                  .option(ChannelOption.SO_BACKLOG, 128)
                  .childHandler(myChannelInitializer);

            channelFuture = b.bind(address).syncUninterruptibly();
            channel = channelFuture.channel();
      } catch (Exception e) {
            logger.error(e.getMessage());
      } finally {
            if (null != channelFuture && channelFuture.isSuccess()) {
                logger.info("itstack-demo-netty server start done. {关注明哥,获取源码}");
            } else {
                logger.error("itstack-demo-netty server start error. {关注明哥,获取源码}");
            }
      }
      return channelFuture;
    }

    public void destroy() {
      if (null == channel) return;
      channel.close();
      parentGroup.shutdownGracefully();
      childGroup.shutdownGracefully();
    }

    public Channel getChannel() {
      return channel;
    }

}


public interface UserService {

    void save(User user);

    void deleteById(String id);

    User queryUserById(String id);

    Iterable<User> queryAll();

    Page<User> findByName(String name, PageRequest request);

}
提供一个可拓展的操作实体表的接口

public interface UserRepository extends ElasticsearchRepository<User, String> {

    Page<User> findByName(String name, Pageable pageable);

}

@Service("userService")
public class UserServiceImpl implements UserService {

    private UserRepository dataRepository;

    @Autowired
    public void setDataRepository(UserRepository dataRepository) {
      this.dataRepository = dataRepository;
    }

    @Override
    public void save(User user) {
      dataRepository.save(user);
    }

    @Override
    public void deleteById(String id) {
      dataRepository.deleteById(id);
    }

    @Override
    public User queryUserById(String id) {
      Optional<User> optionalUser = dataRepository.findById(id);
      return optionalUser.get();
    }

    @Override
    public Iterable<User> queryAll() {
      return dataRepository.findAll();
    }

    @Override
    public Page<User> findByName(String name, PageRequest request) {
      return dataRepository.findByName(name, request);
    }

}

@RestController
public class NettyController {

    @Resource
    private NettyServer nettyServer;

    @RequestMapping("/localAddress")
    public String localAddress() {
      return "nettyServer localAddress " + nettyServer.getChannel().localAddress();
    }

}

@SpringBootApplication
public class Application implements CommandLineRunner {

    private Logger logger = LoggerFactory.getLogger(Application.class);

    @Value("${netty.host}")
    private String host;
    @Value("${netty.port}")
    private int port;
    @Resource
    private NettyServer nettyServer;

    public static void main(String[] args) {
      System.setProperty("es.set.netty.runtime.available.processors", "false");
      SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
      InetSocketAddress address = new InetSocketAddress(host, port);
      ChannelFuture channelFuture = nettyServer.bing(address);
      Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
      channelFuture.channel().closeFuture().syncUninterruptibly();
    }

}

## 服务端口
server.port = 8080

## Netty服务端配置
netty.host = 127.0.0.1
netty.port = 7397

## Elasticsearch配置{更换为自己的cluster-name、cluster-nodes}
spring.data.elasticsearch.cluster-name=es-itstack
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
spring.data.elasticsearch.repositories.enabled=true

ApiTest.java *Netty客户端,用于向服务端发送数据
public class ApiTest {

    public static void main(String[] args) {
      System.out.println("hi 微信公众号:关注明哥");
      EventLoopGroup workerGroup = new NioEventLoopGroup();
      try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                  //对象传输处理
                  channel.pipeline().addLast(new ObjDecoder(TransportProtocol.class));
                  channel.pipeline().addLast(new ObjEncoder(TransportProtocol.class));
                  // 在管道中添加我们自己的接收数据实现方法
                  channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                        }
                  });
                }
            });
            ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
            System.out.println("itstack-demo-netty client start done. {关注明哥,获取源码}");

            TransportProtocol tp1 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李小明", 1, "T0-1", new Date(), "13566668888", "184172133@qq.com", "北京"));
            TransportProtocol tp2 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "张大明", 2, "T0-2", new Date(), "13566660001", "huahua@qq.com", "南京"));
            TransportProtocol tp3 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李书鹏", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));
            TransportProtocol tp4 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "韩小雪", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));
            TransportProtocol tp5 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "董叔飞", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "河北"));
            TransportProtocol tp6 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "候明相", 2, "T5-1", new Date(), "13566660002", "xiaobai@qq.com", "下花园"));
            TransportProtocol tp7 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "田明明", 2, "T3-1", new Date(), "13566660002", "xiaobai@qq.com", "东平"));
            TransportProtocol tp8 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "王大伟", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "西湖"));
            TransportProtocol tp9 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李雪明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "南昌"));
            TransportProtocol tp10 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "朱小飞", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "吉林"));
            TransportProtocol tp11 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "牛大明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "长春"));
            TransportProtocol tp12 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "关雪儿", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "深圳"));

            //向服务端发送信息
            f.channel().writeAndFlush(tp1);
            f.channel().writeAndFlush(tp2);
            f.channel().writeAndFlush(tp3);
            f.channel().writeAndFlush(tp4);
            f.channel().writeAndFlush(tp5);
            f.channel().writeAndFlush(tp6);
            f.channel().writeAndFlush(tp7);
            f.channel().writeAndFlush(tp8);
            f.channel().writeAndFlush(tp9);
            f.channel().writeAndFlush(tp10);
            f.channel().writeAndFlush(tp11);
            f.channel().writeAndFlush(tp12);

            f.channel().closeFuture().syncUninterruptibly();
      } catch (InterruptedException e) {
            e.printStackTrace();
      } finally {
            workerGroup.shutdownGracefully();
      }
    }

}

好了到这里就结束了netty之SpringBoot+Netty+Elasticsearch网络日志信息数据存储的学习,各人一定要跟着动手操作起来。需要的源码的 可si我获取;

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: netty之SpringBoot+Netty+Elasticsearch网络日志信息数据存储