基于FLink实现的实时安全检测(一段时间内连续登录失败20次后,下一次登录 ...

打印 上一主题 下一主题

主题 663|帖子 663|积分 1991

研发背景

    公司安全部目前针对内部系统的网络访问日志的安全审计,大部分都是T+1时效,每日当天,启动Python编写的定时任务,完成昨日的日志审计和检测,定时任务运行完成后,统一进行企业微信告警推送。这种方案在目前的网络环境和人员规模下,呈现两个痛点,一是面对日益频繁的网络攻击、钓鱼链接,T+1的定时任务,难以及时进行告警,因此也难以有效避免如关键信息泄露等问题,二是目前以Python为主的单机定时任务,针对不同场景的处理时效,从一小时到十几小时不等,效率低下。为解决以上问题,本人协助公司安全部同时对告警采集平台进行改造,由之前的python单机任务处理,切换到基于Flink集群的并行处理,且告警推送时效,由之前的T+1天,提升到秒级实时告警。本次改造涉及网络日志审计的多个常见场景,如端口扫描、黑名单统计、异常流量、连续恶意登录等。本次以一段时间内连续登录失败20次后,下一次登录成功场景来进行介绍。

场景描述

    针对一个内部系统,如邮件系统,公司员工的访问行为日志,存放于kafka,我们希望对于一个用户账号在同一个IP下,任意的3分钟时间内,连续登录邮件系统20次失败,下一次登录成功,这种场景能够及时获取并推送到企业微信某个指定的安全接口人。kafka中的数据,能够通过某个关键字,区分当前网络访问是否一次登录事件,且有访问时间(也就是事件时间)。在解析到符合需求的用户账号之后,第一时间进行企业微信告警推送,并将其这段时间内的访问行为,写入下游ElasticSearch。
组件版本


  • Flink-1.14.4
  • Java8
  • ElasticSearch-7.3.2
  • Kafka-2.12_2.8.1
日志结构

IP和账号皆为测试使用。
  1. {
  2.    "user": "wangxm",
  3.    "client_ip": "110.68.6.182",
  4.    "source": "login",
  5.    "loginname": "wangxm@test.com",
  6.    "IP": "110.8.148.58",
  7.    "timestamp": "17:58:12",
  8.    "@timestamp": "2022-04-20T09:58:13.647Z",
  9.    "ip": "110.7.231.25",
  10.    "clienttype": "POP3",
  11.    "result": "success",
  12.    "@version": "1"
  13. }
复制代码
 

技术方案

    上述场景,可考虑使用FlinkCEP及Flink的滑动窗口进行实现。由于本人在采用FlinkCEP的方案进行代码编写调试后,发现并不能满足,因此改用滑动窗口进行实现。
 

关键代码

主入口类

    主入口类,创建了flink环境、设置了基础参数,创建了kafkaSource,接入消息后,进行了映射、过滤,并设置了水位线,进行了分组,之后设置了滑动窗口,在窗口内进行了事件统计,将复合条件的事件收集返回并写入ElasticSearch。
    针对map、filter、keyBy、window等算子,都单独进行了编写,后面会一一列出来。
 
  1. package com.data.dev.flink.mailTopic.main;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  4. import com.data.dev.elasticsearch.ElasticSearchInfo;
  5. import com.data.dev.elasticsearch.SinkToEs;
  6. import com.data.dev.flink.FlinkEnv;
  7. import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*;
  8. import com.data.dev.kafka.KafkaSourceBuilder;
  9. import com.data.dev.key.ConfigurationKey;
  10. import com.data.dev.utils.TimeUtils;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  13. import org.apache.flink.connector.kafka.source.KafkaSource;
  14. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  15. import org.apache.flink.streaming.api.datastream.KeyedStream;
  16. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  17. import org.apache.flink.streaming.api.datastream.WindowedStream;
  18. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  19. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  20. import org.apache.flink.streaming.api.windowing.time.Time;
  21. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  22. import java.time.Duration;
  23. /**
  24. * Flink处理在3分钟内连续登录失败20次后登录成功的场景
  25. * 采用滑动窗口来实现
  26. * @author wangxiaomin 2022-06-01
  27. */
  28. @Slf4j
  29. public class MailMsg extends BaseBean {
  30.     /**
  31.      * Flink作业名称
  32.      */
  33.     public static final  String JobName = "告警采集平台——连续登录失败后登录成功告警";
  34.     /**
  35.      * Kafka消息名
  36.      */
  37.     public static final  String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic";
  38.     public MailMsg(){
  39.         log.info("初始化滑动窗口场景告警程序");
  40.     }
  41.     /**
  42.      * 执行逻辑统计场景,实现告警推送
  43.      */
  44.     public static void execute(){
  45.         //① 创建Flink执行环境并设置checkpoint等必要的参数
  46.         StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();
  47.         KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ;
  48.         DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName);
  49.         //② 筛选登录消息,创建初始登录事件流
  50.         SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工");
  51.         SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工");
  52.         //③ 设置水位线
  53.         WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1))
  54.                         .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime()));
  55.         SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位线");
  56.         //④ 设置主键
  57.         KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector());
  58.         //⑥ 转化为滑动窗口
  59.         WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L)));
  60.         //⑦ 在窗口内进行逻辑统计
  61.         SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs  = loginWindowDs.process(new WindowProcessFuncImpl()).name("窗口处理逻辑");
  62.         //⑧ 将结果转化为通用DataStream<String>格式
  63.         SingleOutputStreamOperator<String> resultDs  = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("窗口结果转化为标准格式");
  64.         //⑨ 将最终结果写入ES
  65.         resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build());
  66.         //⑩ 提交Flink集群进行执行
  67.         FlinkEnv.envExec(env,JobName);
  68.     }
  69. }
复制代码
 
mapper算子
  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.alibaba.fastjson.JSON;
  3. import com.data.dev.common.javabean.BaseBean;
  4. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. /**
  8. *  逻辑统计场景告警推送ES消息体
  9. *  @author wangxiaoming-ghq 2022-06-01
  10. */
  11. @Slf4j
  12. public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {
  13.     @Override
  14.     public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
  15.         return JSON.toJSONString(mailMsgAlarm);
  16.     }
  17. }
复制代码
filter算子
  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.flink.api.common.functions.FilterFunction;
  6. /**
  7. * ② 消费mail主题的消息,过滤其中login的事件
  8. * @author wangxiaoming-ghq 2022-06-01
  9. */
  10. @Slf4j
  11. public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> {
  12.     @Override
  13.     public boolean filter(MailMsg mailMsg) {
  14.         if("login".equals(mailMsg.getSource())) {
  15.             log.info("筛选原始的login事件:【" + mailMsg + "】");
  16.         }
  17.         return "login".equals(mailMsg.getSource());
  18.     }
  19. }
复制代码
keyBy算子
  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.flink.api.java.functions.KeySelector;
  6. /**
  7. * CEP 编程,需要进行key选取
  8. */
  9. @Slf4j
  10. public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> {
  11.     @Override
  12.     public String getKey(MailMsg mailMsg) {
  13.         return mailMsg.getUser() + "@" + mailMsg.getClient_ip();
  14.     }
  15. }
复制代码
窗口函数(核心代码)

    这里我们主要考虑使用一个事件列表,用来存储每一个窗口期内得到的连续登录,当检测到登陆失败的事件,即存入事件列表中,之后判断下一次登录失败事件,如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测。一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送。
 
  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  4. import com.data.dev.utils.HttpUtils;
  5. import com.data.dev.utils.IPUtils;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  8. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  9. import org.apache.flink.util.Collector;
  10. import java.io.Serializable;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. /**
  14. *  滑动窗口内复杂事件解析逻辑实现
  15. *  @author wangxiaoming-ghq 2022-06-01
  16. */
  17. @Slf4j
  18. public   class WindowProcessFuncImpl extends  ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable {
  19.     @Override
  20.     public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) {
  21.         List<MailMsg> loginEventList = new ArrayList<>();
  22.         MailMsgAlarm mailMsgAlarm;
  23.         for (MailMsg mailMsg : iterable) {
  24.             log.info("收集到的登录事件【" + mailMsg + "】");<br>
  25.             if (mailMsg.getResult().equals("fail")) { //开始检测当前窗口内的事件,并将失败的事件收集到loginEventList
  26.                 log.info("开始检测当前窗口内的事件,并将失败的事件收集到loginEventList");
  27.                 loginEventList.add(mailMsg);
  28.             } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测
  29.                 log.info("检测到登录成功事件,但此时登录失败的次数为【" + loginEventList.size() + "】不足20次,清空loginEventList,等待下一次检测");
  30.                 loginEventList.clear();
  31.             } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) {
  32.                 mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg);
  33.                 log.info("检测到登录成功的事件,此时窗口内连续登录失败的次数为【" + mailMsgAlarm.getFailTimes() + "】");
  34.                 //一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送;
  35.                 loginEventList.clear();
  36.                 doAlarmPush(mailMsgAlarm);
  37.                 collector.collect(mailMsgAlarm);//将当前登录成功的事件进行收集上报
  38.             } else {
  39.                 log.info(mailMsg.getUser() + "当前已连续:【" + loginEventList.size() + "】 次登录失败");
  40.             }
  41.         }
  42.     }
  43.     /**
  44.      * 2022年6月17日15:03:06
  45.      * @param eventList:当前窗口内的事件列表
  46.      * @param eventCurrent:当前登录成功的事件
  47.      * @return mailMsgAlarm:告警消息体
  48.      */
  49.     public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){
  50.         String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip();
  51.         String loginFailStartTime = eventList.get(0).getTimestamp_datetime();
  52.         String loginSuccessTime = eventCurrent.getTimestamp_datetime();
  53.         int loginFailTimes = eventList.size();
  54.         MailMsgAlarm mailMsgAlarm = new MailMsgAlarm();
  55.         mailMsgAlarm.setMailMsg(eventCurrent);
  56.         mailMsgAlarm.setAlarmKey(alarmKey);
  57.         mailMsgAlarm.setStartTime(loginFailStartTime);
  58.         mailMsgAlarm.setEndTime(loginSuccessTime);
  59.         mailMsgAlarm.setFailTimes(loginFailTimes);
  60.         return mailMsgAlarm;
  61.     }
  62.     /**
  63.      * 2022年6月17日14:47:53
  64.      * @param mailMsgAlarm :当前构建的需要告警的事件
  65.      */
  66.     public void doAlarmPush(MailMsgAlarm mailMsgAlarm){
  67.         String userKey = mailMsgAlarm.getAlarmKey();
  68.         String clientIp = mailMsgAlarm.mailMsg.getClient_ip();
  69.         boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp);
  70.         if(isWhiteListIp){//如果是白名单IP,不告警
  71.             log.info("当前登录用户【" + userKey + "】属于白名单IP");
  72.         }else {
  73.             //IP归属查询结果、企业微信推送告警
  74.             String user = HttpUtils.getUserByClientIp(clientIp);
  75.             HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString());
  76.         }
  77.     }
  78. }
复制代码
最后一次map算子
  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.alibaba.fastjson.JSON;
  3. import com.data.dev.common.javabean.BaseBean;
  4. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. /**
  8. *  逻辑统计场景告警推送ES消息体
  9. *  @author wangxiaoming-ghq 2022-06-01
  10. */
  11. @Slf4j
  12. public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {
  13.     @Override
  14.     public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
  15.         return JSON.toJSONString(mailMsgAlarm);
  16.     }
  17. }
复制代码
ElasticSearch工具类
  1. package com.data.dev.elasticsearch;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.key.ConfigurationKey;
  4. import com.data.dev.key.ElasticSearchKey;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.flink.api.common.functions.RuntimeContext;
  7. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  8. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  9. import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
  10. import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
  11. import org.apache.http.HttpHost;
  12. import org.apache.http.auth.AuthScope;
  13. import org.apache.http.auth.UsernamePasswordCredentials;
  14. import org.apache.http.client.CredentialsProvider;
  15. import org.apache.http.impl.client.BasicCredentialsProvider;
  16. import org.elasticsearch.action.index.IndexRequest;
  17. import org.elasticsearch.client.Requests;
  18. import java.util.ArrayList;
  19. import java.util.HashMap;
  20. import java.util.List;
  21. import java.util.Map;
  22. /**
  23. * 2022年6月17日15:15:06
  24. * @author wangxiaoming-ghq
  25. * Flink流计算结果写入ES公共方法
  26. */
  27. @Slf4j
  28. public class SinkToEs extends BaseBean {
  29.     public static final long serialVersionUID = 2L;
  30.     private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps();
  31.     private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST);
  32.     private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD);
  33.     private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME);
  34.     private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT);
  35.     /**
  36.      * 2022年6月17日15:17:55
  37.      * 获取ES连接信息
  38.      * @return esInfoMap:ES连接信息持久化
  39.      */
  40.     public static HashMap<String,String > getElasticSearchInfo(){
  41.         log.info("获取ES连接信息:【 " + "HOST="+HOST + "PORT="+PORT+"USERNAME="+USERNAME+"PASSWORD=********" + " 】");
  42.         HashMap<String,String> esInfoMap = new HashMap<>();
  43.         esInfoMap.put(ElasticSearchKey.HOST,HOST);
  44.         esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD);
  45.         esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME);
  46.         esInfoMap.put(ElasticSearchKey.PORT,PORT);
  47.         return esInfoMap;
  48.     }
  49.     /**
  50.      * @param esIndexName:写入索引名称
  51.      * @param esType:写入索引类型
  52.      * @return ElasticsearchSink.Builder<String>:构建器
  53.      */
  54.     public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){
  55.         HashMap<String, String> esInfoMap = getElasticSearchInfo();
  56.         List<HttpHost> httpHosts = new ArrayList<>();
  57.         httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), "http"));
  58.         ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  59.                 httpHosts,
  60.                 new ElasticsearchSinkFunction<String>() {
  61.                     public IndexRequest createIndexRequest() {
  62.                         Map<String, String> json = new HashMap<>();
  63.                         //log.info("写入ES的data:【"+json+"】");
  64.                         IndexRequest index  = Requests.indexRequest()
  65.                                 .index(esIndexName)
  66.                                 .type(esType)
  67.                                 .source(json);
  68.                         return index;
  69.                     }
  70.                     @Override
  71.                     public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  72.                         indexer.add(createIndexRequest());
  73.                     }
  74.                 }
  75.         );
  76.         //定义es的连接配置  带用户名密码
  77.         RestClientFactory restClientFactory = restClientBuilder -> {
  78.             CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  79.             credentialsProvider.setCredentials(
  80.                     AuthScope.ANY,
  81.                     new UsernamePasswordCredentials(
  82.                             String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)),
  83.                             String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD))
  84.                     )
  85.             );
  86.             restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
  87.                 httpAsyncClientBuilder.disableAuthCaching();
  88.                 return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  89.             });
  90.         };
  91.         esSinkBuilder.setRestClientFactory(restClientFactory);
  92.         return esSinkBuilder;
  93.     }
  94. }
复制代码
事件实体类
  1. package com.data.dev.common.javabean.kafkaMailTopic;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import lombok.Data;
  4. import java.util.Objects;
  5. /**
  6. * @author wangxiaoming-ghq 2022-05-15
  7. * 逻辑统计场景告警事件
  8. */
  9. @Data
  10. public class MailMsgAlarm extends BaseBean {
  11.     /**
  12.      * 当前登录成功的事件
  13.      */
  14.    public  MailMsg mailMsg;
  15.     /**
  16.      * 当前捕获的告警主键:username@client_ip
  17.      */
  18.    public  String alarmKey;
  19.     /**
  20.      * 第一次登录失败的事件时间
  21.      */
  22.    public  String startTime;
  23.     /**
  24.      * 连续登录失败后下一次登录成功的事件时间
  25.      */
  26.    public  String endTime;
  27.     /**
  28.      * 连续登录失败的次数
  29.      */
  30.    public  int failTimes;
  31.     @Override
  32.     public String toString() {
  33.         return "{" +
  34.                 "  'mailMsg_login_success':'" + mailMsg + "'" +
  35.                 ", 'alarmKey':'" + alarmKey + "'" +
  36.                 ", 'start_login_time_in3min':'"  +startTime + "'" +
  37.                 ", 'end_login_time_in3min':'"  +endTime + "'" +
  38.                 ", 'login_fail_times':'"  +failTimes +  "'" +
  39.                 "}";
  40.     }
  41.     public MailMsgAlarm() {
  42.     }
  43.     @Override
  44.     public boolean equals(Object o) {
  45.         if (this == o) return true;
  46.         if (!(o instanceof MailMsgAlarm)) return false;
  47.         MailMsgAlarm that = (MailMsgAlarm) o;
  48.         return getFailTimes() == that.getFailTimes() && getMailMsg().equals(that.getMailMsg()) && getAlarmKey().equals(that.getAlarmKey()) && getStartTime().equals(that.getStartTime()) && getEndTime().equals(that.getEndTime());
  49.     }
  50.     @Override
  51.     public int hashCode() {
  52.         return Objects.hash(getMailMsg(), getAlarmKey(), getStartTime(), getEndTime(), getFailTimes());
  53.     }
  54. }
复制代码
消息实体类
  1. package com.data.dev.common.javabean.kafkaMailTopic;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import lombok.Data;
  4. import java.util.Objects;
  5. /**
  6. * {
  7. *   "user": "wangxm",
  8. *   "client_ip": "110.68.6.182",
  9. *   "source": "login",
  10. *   "loginname": "wangxm@test.com",
  11. *   "IP": "110.8.148.58",
  12. *   "timestamp": "17:58:12",
  13. *   "@timestamp": "2022-04-20T09:58:13.647Z",
  14. *   "ip": "110.7.231.25",
  15. *   "clienttype": "POP3",
  16. *   "result": "success",
  17. *   "@version": "1"
  18. * }
  19. *
  20. * user登录用户
  21. * client_ip 来源ip
  22. * source 类型
  23. * loginname 登录用户邮箱地址
  24. * ip 目标前端ip
  25. * timestamp 发送时间
  26. * @timestamp  发送日期时间
  27. * IP 邮件日志发送来源IP
  28. * clienttype 客户端登录类型
  29. * result 登录状态
  30. */
  31. @Data
  32. public class MailMsg extends BaseBean {
  33.     public String user;
  34.     public String client_ip;
  35.     public String source;
  36.     public String loginName;
  37.     public String mailSenderSourceIp;
  38.     public String timestamp_time;
  39.     public String timestamp_datetime;
  40.     public String ip;
  41.     public String clientType;
  42.     public String result;
  43.     public String version;
  44.     public MailMsg() {
  45.     }
  46.     public MailMsg(String user, String client_ip, String source, String loginName, String mailSenderSourceIp, String timestamp_time, String timestamp_datetime, String ip, String clientType, String result, String version) {
  47.         this.user = user;
  48.         this.client_ip = client_ip;
  49.         this.source = source;
  50.         this.loginName = loginName;
  51.         this.mailSenderSourceIp = mailSenderSourceIp;
  52.         this.timestamp_time = timestamp_time;
  53.         this.timestamp_datetime = timestamp_datetime;
  54.         this.ip = ip;
  55.         this.clientType = clientType;
  56.         this.result = result;
  57.         this.version = version;
  58.     }
  59.     @Override
  60.     public boolean equals(Object o) {
  61.         if (this == o) return true;
  62.         if (!(o instanceof MailMsg)) return false;
  63.         MailMsg mailMsg = (MailMsg) o;
  64.         return getUser().equals(mailMsg.getUser()) && getClient_ip().equals(mailMsg.getClient_ip()) && getSource().equals(mailMsg.getSource()) && getLoginName().equals(mailMsg.getLoginName()) && getMailSenderSourceIp().equals(mailMsg.getMailSenderSourceIp()) && getTimestamp_time().equals(mailMsg.getTimestamp_time()) && getTimestamp_datetime().equals(mailMsg.getTimestamp_datetime()) && getIp().equals(mailMsg.getIp()) && getClientType().equals(mailMsg.getClientType()) && getResult().equals(mailMsg.getResult()) && getVersion().equals(mailMsg.getVersion());
  65.     }
  66.     @Override
  67.     public int hashCode() {
  68.         return Objects.hash(getUser(), getClient_ip(), getSource(), getLoginName(), getMailSenderSourceIp(), getTimestamp_time(), getTimestamp_datetime(), getIp(), getClientType(), getResult(), getVersion());
  69.     }
  70.     @Override
  71.     public String toString() {
  72.         return "{" +
  73.                 "  'user':'" + user + "'" +
  74.                 ", 'client_ip':'" + client_ip  + "'" +
  75.                 ", 'source':'" + source  + "'" +
  76.                 ", 'loginName':'" + loginName  + "'" +
  77.                 ", 'IP':'" + mailSenderSourceIp + "'" +
  78.                 ", 'timestamp':'" + timestamp_time + "'" +
  79.                 ", '@timestamp':'" + timestamp_datetime + "'" +
  80.                 ", 'ip':'"  + "'" +
  81.                 ", 'clientType':'" + clientType  + "'" +
  82.                 ", 'result':'" + result  + "'" +
  83.                 ", 'version':'" + version + "'" +
  84.                 "}";
  85.     }
  86. }
复制代码
 源代码已去掉敏感信息,地址:https://gitee.com/wangxm-2270/alarmCollectByFlink.git

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南七星之家

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表