netty之SpringBoot+Netty+Elasticsearch网络日志信息数据存储

打印 上一主题 下一主题

主题 807|帖子 807|积分 2421

#开发环境

环境准备
windows安装包 下载
注意 es是以来java环境 以是需要安装jdk 支持1.7以上
es-hander下载可视化操作插件
  1. @Document(indexName = "stack", type = "group_user")
  2. public class User {
  3.     @Id
  4.     private String id;
  5.     private String name;   //姓名
  6.     private Integer age;   //年龄
  7.     private String level;  //级别
  8.     private Date entryDate;//时间
  9.     private String mobile; //电话
  10.     private String email;  //邮箱
  11.     private String address;//地址
  12.     public User(String id, String name, Integer age, String level, Date entryDate, String mobile, String email, String address) {
  13.         this.id = id;
  14.         this.name = name;
  15.         this.age = age;
  16.         this.level = level;
  17.         this.entryDate = entryDate;
  18.         this.mobile = mobile;
  19.         this.email = email;
  20.         this.address = address;
  21.     }
  22.     public String getId() {
  23.         return id;
  24.     }
  25.     public void setId(String id) {
  26.         this.id = id;
  27.     }
  28.     public String getName() {
  29.         return name;
  30.     }
  31.     public void setName(String name) {
  32.         this.name = name;
  33.     }
  34.     public Integer getAge() {
  35.         return age;
  36.     }
  37.     public void setAge(Integer age) {
  38.         this.age = age;
  39.     }
  40.     public String getLevel() {
  41.         return level;
  42.     }
  43.     public void setLevel(String level) {
  44.         this.level = level;
  45.     }
  46.     public Date getEntryDate() {
  47.         return entryDate;
  48.     }
  49.     public void setEntryDate(Date entryDate) {
  50.         this.entryDate = entryDate;
  51.     }
  52.     public String getMobile() {
  53.         return mobile;
  54.     }
  55.     public void setMobile(String mobile) {
  56.         this.mobile = mobile;
  57.     }
  58.     public String getEmail() {
  59.         return email;
  60.     }
  61.     public void setEmail(String email) {
  62.         this.email = email;
  63.     }
  64.     public String getAddress() {
  65.         return address;
  66.     }
  67.     public void setAddress(String address) {
  68.         this.address = address;
  69.     }
  70. }
复制代码
  1. @Service("myServerHandler")
  2. public class MyServerHandler extends ChannelInboundHandlerAdapter {
  3.     private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
  4.     @Autowired
  5.     private UserService userService;
  6.     /**
  7.      * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
  8.      */
  9.     @Override
  10.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  11.         SocketChannel channel = (SocketChannel) ctx.channel();
  12.         logger.info("链接报告开始");
  13.         logger.info("链接报告信息:有一客户端链接到本服务端");
  14.         logger.info("链接报告IP:{}", channel.localAddress().getHostString());
  15.         logger.info("链接报告Port:{}", channel.localAddress().getPort());
  16.         logger.info("链接报告完毕");
  17.     }
  18.     /**
  19.      * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
  20.      */
  21.     @Override
  22.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  23.         logger.info("客户端断开链接{}", ctx.channel().localAddress().toString());
  24.     }
  25.     @Override
  26.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  27.         //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
  28.         logger.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + JSON.toJSONString(msg));
  29.         //接收数据写入到Elasticsearch
  30.         TransportProtocol transportProtocol = (TransportProtocol) msg;
  31.         userService.save((User) transportProtocol.getObj());
  32.     }
  33.     /**
  34.      * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
  35.      */
  36.     @Override
  37.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  38.         ctx.close();
  39.         logger.info("异常信息:\r\n" + cause.getMessage());
  40.     }
  41. }
复制代码
  1. @Component("nettyServer")
  2. public class NettyServer {
  3.     private Logger logger = LoggerFactory.getLogger(NettyServer.class);
  4.     @Resource
  5.     private MyChannelInitializer myChannelInitializer;
  6.     //配置服务端NIO线程组
  7.     private final EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
  8.     private final EventLoopGroup childGroup = new NioEventLoopGroup();
  9.     private Channel channel;
  10.     public ChannelFuture bing(InetSocketAddress address) {
  11.         ChannelFuture channelFuture = null;
  12.         try {
  13.             ServerBootstrap b = new ServerBootstrap();
  14.             b.group(parentGroup, childGroup)
  15.                     .channel(NioServerSocketChannel.class)    //非阻塞模式
  16.                     .option(ChannelOption.SO_BACKLOG, 128)
  17.                     .childHandler(myChannelInitializer);
  18.             channelFuture = b.bind(address).syncUninterruptibly();
  19.             channel = channelFuture.channel();
  20.         } catch (Exception e) {
  21.             logger.error(e.getMessage());
  22.         } finally {
  23.             if (null != channelFuture && channelFuture.isSuccess()) {
  24.                 logger.info("itstack-demo-netty server start done. {关注明哥,获取源码}");
  25.             } else {
  26.                 logger.error("itstack-demo-netty server start error. {关注明哥,获取源码}");
  27.             }
  28.         }
  29.         return channelFuture;
  30.     }
  31.     public void destroy() {
  32.         if (null == channel) return;
  33.         channel.close();
  34.         parentGroup.shutdownGracefully();
  35.         childGroup.shutdownGracefully();
  36.     }
  37.     public Channel getChannel() {
  38.         return channel;
  39.     }
  40. }
复制代码
  1. public interface UserService {
  2.     void save(User user);
  3.     void deleteById(String id);
  4.     User queryUserById(String id);
  5.     Iterable<User> queryAll();
  6.     Page<User> findByName(String name, PageRequest request);
  7. }
复制代码
提供一个可拓展的操作实体表的接口
  1. public interface UserRepository extends ElasticsearchRepository<User, String> {
  2.     Page<User> findByName(String name, Pageable pageable);
  3. }
复制代码
  1. @Service("userService")
  2. public class UserServiceImpl implements UserService {
  3.     private UserRepository dataRepository;
  4.     @Autowired
  5.     public void setDataRepository(UserRepository dataRepository) {
  6.         this.dataRepository = dataRepository;
  7.     }
  8.     @Override
  9.     public void save(User user) {
  10.         dataRepository.save(user);
  11.     }
  12.     @Override
  13.     public void deleteById(String id) {
  14.         dataRepository.deleteById(id);
  15.     }
  16.     @Override
  17.     public User queryUserById(String id) {
  18.         Optional<User> optionalUser = dataRepository.findById(id);
  19.         return optionalUser.get();
  20.     }
  21.     @Override
  22.     public Iterable<User> queryAll() {
  23.         return dataRepository.findAll();
  24.     }
  25.     @Override
  26.     public Page<User> findByName(String name, PageRequest request) {
  27.         return dataRepository.findByName(name, request);
  28.     }
  29. }
复制代码
  1. @RestController
  2. public class NettyController {
  3.     @Resource
  4.     private NettyServer nettyServer;
  5.     @RequestMapping("/localAddress")
  6.     public String localAddress() {
  7.         return "nettyServer localAddress " + nettyServer.getChannel().localAddress();
  8.     }
  9. }
复制代码
  1. @SpringBootApplication
  2. public class Application implements CommandLineRunner {
  3.     private Logger logger = LoggerFactory.getLogger(Application.class);
  4.     @Value("${netty.host}")
  5.     private String host;
  6.     @Value("${netty.port}")
  7.     private int port;
  8.     @Resource
  9.     private NettyServer nettyServer;
  10.     public static void main(String[] args) {
  11.         System.setProperty("es.set.netty.runtime.available.processors", "false");
  12.         SpringApplication.run(Application.class, args);
  13.     }
  14.     @Override
  15.     public void run(String... args) throws Exception {
  16.         InetSocketAddress address = new InetSocketAddress(host, port);
  17.         ChannelFuture channelFuture = nettyServer.bing(address);
  18.         Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
  19.         channelFuture.channel().closeFuture().syncUninterruptibly();
  20.     }
  21. }
复制代码
  1. ## 服务端口
  2. server.port = 8080
  3. ## Netty服务端配置
  4. netty.host = 127.0.0.1
  5. netty.port = 7397
  6. ## Elasticsearch配置{更换为自己的cluster-name、cluster-nodes}
  7. spring.data.elasticsearch.cluster-name=es-itstack
  8. spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
  9. spring.data.elasticsearch.repositories.enabled=true
复制代码
ApiTest.java *Netty客户端,用于向服务端发送数据
  1. public class ApiTest {
  2.     public static void main(String[] args) {
  3.         System.out.println("hi 微信公众号:关注明哥");
  4.         EventLoopGroup workerGroup = new NioEventLoopGroup();
  5.         try {
  6.             Bootstrap b = new Bootstrap();
  7.             b.group(workerGroup);
  8.             b.channel(NioSocketChannel.class);
  9.             b.option(ChannelOption.AUTO_READ, true);
  10.             b.handler(new ChannelInitializer<SocketChannel>() {
  11.                 @Override
  12.                 protected void initChannel(SocketChannel channel) throws Exception {
  13.                     //对象传输处理
  14.                     channel.pipeline().addLast(new ObjDecoder(TransportProtocol.class));
  15.                     channel.pipeline().addLast(new ObjEncoder(TransportProtocol.class));
  16.                     // 在管道中添加我们自己的接收数据实现方法
  17.                     channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  18.                         @Override
  19.                         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  20.                         }
  21.                     });
  22.                 }
  23.             });
  24.             ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
  25.             System.out.println("itstack-demo-netty client start done. {关注明哥,获取源码}");
  26.             TransportProtocol tp1 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李小明", 1, "T0-1", new Date(), "13566668888", "184172133@qq.com", "北京"));
  27.             TransportProtocol tp2 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "张大明", 2, "T0-2", new Date(), "13566660001", "huahua@qq.com", "南京"));
  28.             TransportProtocol tp3 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李书鹏", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));
  29.             TransportProtocol tp4 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "韩小雪", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));
  30.             TransportProtocol tp5 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "董叔飞", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "河北"));
  31.             TransportProtocol tp6 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "候明相", 2, "T5-1", new Date(), "13566660002", "xiaobai@qq.com", "下花园"));
  32.             TransportProtocol tp7 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "田明明", 2, "T3-1", new Date(), "13566660002", "xiaobai@qq.com", "东平"));
  33.             TransportProtocol tp8 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "王大伟", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "西湖"));
  34.             TransportProtocol tp9 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李雪明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "南昌"));
  35.             TransportProtocol tp10 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "朱小飞", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "吉林"));
  36.             TransportProtocol tp11 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "牛大明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "长春"));
  37.             TransportProtocol tp12 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "关雪儿", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "深圳"));
  38.             //向服务端发送信息
  39.             f.channel().writeAndFlush(tp1);
  40.             f.channel().writeAndFlush(tp2);
  41.             f.channel().writeAndFlush(tp3);
  42.             f.channel().writeAndFlush(tp4);
  43.             f.channel().writeAndFlush(tp5);
  44.             f.channel().writeAndFlush(tp6);
  45.             f.channel().writeAndFlush(tp7);
  46.             f.channel().writeAndFlush(tp8);
  47.             f.channel().writeAndFlush(tp9);
  48.             f.channel().writeAndFlush(tp10);
  49.             f.channel().writeAndFlush(tp11);
  50.             f.channel().writeAndFlush(tp12);
  51.             f.channel().closeFuture().syncUninterruptibly();
  52.         } catch (InterruptedException e) {
  53.             e.printStackTrace();
  54.         } finally {
  55.             workerGroup.shutdownGracefully();
  56.         }
  57.     }
  58. }
复制代码
好了到这里就结束了netty之SpringBoot+Netty+Elasticsearch网络日志信息数据存储的学习,各人一定要跟着动手操作起来。需要的源码的 可si我获取;

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天津储鑫盛钢材现货供应商

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表