Kafka+Fink 实战+工具类

打印 上一主题 下一主题

主题 894|帖子 894|积分 2682


  • LogServiceImpl
  1. @Service
  2. @Slf4j
  3. public class LogServiceImpl implements LogService {
  4.     private static final String TOPIC_NAME = "ods_link_visit_topic";
  5.     @Autowired
  6.     private KafkaTemplate kafkaTemplate;
  7.     /**
  8.      * 记录日志
  9.      *
  10.      * @param request
  11.      * @param shortLinkCode
  12.      * @param accountNo
  13.      * @return
  14.      */
  15.     @Override
  16.     public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
  17.         // ip、 浏览器信息
  18.         String ip = CommonUtil.getIpAddr(request);
  19.         // 全部请求头
  20.         Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);
  21.         Map<String,String> availableMap = new HashMap<>();
  22.         availableMap.put("user-agent",headerMap.get("user-agent"));
  23.         availableMap.put("referer",headerMap.get("referer"));
  24.         availableMap.put("accountNo",accountNo.toString());
  25.         LogRecord logRecord = LogRecord.builder()
  26.                 //日志类型
  27.                 .event(LogTypeEnum.SHORT_LINK_TYPE.name())
  28.                 //日志内容
  29.                 .data(availableMap)
  30.                 //客户端ip
  31.                 .ip(ip)
  32.                 // 时间
  33.                 .ts(CommonUtil.getCurrentTimestamp())
  34.                 //业务唯一标识(短链码)
  35.                 .bizId(shortLinkCode).build();
  36.         String jsonLog = JsonUtil.obj2Json(logRecord);
  37.         //打印日志 in 控制台
  38.         log.info(jsonLog);
  39.         // 发送kafka
  40.         kafkaTemplate.send(TOPIC_NAME,jsonLog);
  41.     }
  42. }
复制代码

  • DwdShortLinkLogApp
  1. @Slf4j
  2. public class DwdShortLinkLogApp {
  3.     //定义 topic
  4.     public static final String SOURCE_TOPIC = "ods_link_visit_topic";
  5.     //定义 消费组
  6.     public static final String SINK_TOPIC = "dwd_link_visit_topic";
  7.     //定义 消费组
  8.     public static final String GROUP_ID = "dwd_short_link_group";
  9.     public static void main(String[] args) throws Exception {
  10.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11.         env.setParallelism(1);
  12. //        DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);
  13.         FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);
  14.         DataStreamSource<String> ds = env.addSource(kafkaConsumer);
  15.         ds.print();
  16.         SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {
  17.             @Override
  18.             public void flatMap(String value, Collector<JSONObject> out) throws Exception {
  19.                 JSONObject jsonObject = JSON.parseObject(value);
  20.                 // 生成web端设备唯一标识
  21.                 String udid = getDeviceId(jsonObject);
  22.                 jsonObject.put("udid",udid);
  23.                 String referer = getReferer(jsonObject);
  24.                 jsonObject.put("referer",referer);
  25.                 out.collect(jsonObject);
  26.             }
  27.         });
  28.         // 分组
  29.         KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {
  30.             @Override
  31.             public String getKey(JSONObject value) throws Exception {
  32.                 return value.getString("udid");
  33.             }
  34.         });
  35.         // 识别新老访客    richMap open函数,对状态以及日期格式进行初始化
  36.         SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());
  37.         jsonDSWithVisitorState.print("ods新老访客");
  38.         // 存储到dwd
  39.         FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);
  40.         jsonDSWithVisitorState.addSink(kafkaProducer);
  41.         env.execute();
  42.     }
  43.     /**
  44.      * 获取referer
  45.      * @param jsonObject
  46.      * @return
  47.      */
  48.     public static String getReferer(JSONObject jsonObject){
  49.         JSONObject dataJsonObj = jsonObject.getJSONObject("data");
  50.         if(dataJsonObj.containsKey("referer")){
  51.             String referer = dataJsonObj.getString("referer");
  52.             if(StringUtils.isNotBlank(referer)){
  53.                 try {
  54.                     URL url = new URL(referer);
  55.                     return url.getHost();
  56.                 } catch (MalformedURLException e) {
  57.                     log.error("提取referer失败:{}",e.toString());
  58.                 }
  59.             }
  60.         }
  61.         return "";
  62.     }
  63.     /**
  64.      * 生成设备唯一标识
  65.      *
  66.      * @param jsonObject
  67.      * @return
  68.      */
  69.     public static String getDeviceId(JSONObject jsonObject){
  70.         Map<String,String> map= new TreeMap<>();
  71.         try{
  72.             map.put("ip",jsonObject.getString("ip"));
  73.             map.put("event",jsonObject.getString("event"));
  74.             map.put("bizId",jsonObject.getString("bizId"));
  75.             map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));
  76.             return DeviceUtil.geneWebUniqueDeviceId(map);
  77.         }catch (Exception e){
  78.             log.error("生产唯一deviceId异常:{}", jsonObject);
  79.             return null;
  80.         }
  81.     }
  82. }
复制代码

  • KafkaUtil
    1. @Slf4j
    2. public class KafkaUtil {
    3.     /**
    4.      * kafka 的 broker 地址
    5.      */
    6.     private static String KAFKA_SERVER = null;
    7.     static {
    8.         Properties properties = new Properties();
    9.         InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
    10.         try {
    11.             properties.load(in);
    12.         } catch (IOException e) {
    13.             e.printStackTrace();
    14.             log.error("加载Kafka配置文件失败:{}",e.getMessage());
    15.         }
    16.         //获取配置文件中的value
    17.         KAFKA_SERVER = properties.getProperty("kafka.servers");
    18.     }
    19.     /**
    20.      * 获取flink的kafka消费者
    21.      * @param topic
    22.      * @param groupId
    23.      * @return
    24.      */
    25.     public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
    26.         Properties properties = new Properties();
    27.         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
    28.         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
    29.         return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
    30.     }
    31.     /**
    32.      * 获取flink的kafka生产者
    33.      * @param topic
    34.      * @return
    35.      */
    36.     public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
    37.         return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
    38.     }
    39. }
    复制代码
  • TimeUtil
    1. public class TimeUtil {
    2.     /**
    3.      * 默认日期格式
    4.      */
    5.     private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
    6.     /**
    7.      * 默认日期格式
    8.      */
    9.     private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER  = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
    10.     private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
    11.     /**
    12.      * LocalDateTime 转 字符串,指定日期格式
    13.      * @param localDateTime
    14.      * @param pattern
    15.      * @return
    16.      */
    17.     public static String format(LocalDateTime localDateTime, String pattern){
    18.         DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
    19.         String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
    20.         return timeStr;
    21.     }
    22.     /**
    23.      * Date 转 字符串, 指定日期格式
    24.      * @param time
    25.      * @param pattern
    26.      * @return
    27.      */
    28.     public static String format(Date time, String pattern){
    29.         DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
    30.         String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
    31.         return timeStr;
    32.     }
    33.     /**
    34.      *  Date 转 字符串,默认日期格式
    35.      * @param time
    36.      * @return
    37.      */
    38.     public static String format(Date time){
    39.         String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
    40.         return timeStr;
    41.     }
    42.     /**
    43.      * timestamp 转 字符串,默认日期格式
    44.      *
    45.      * @param timestamp
    46.      * @return
    47.      */
    48.     public static String format(long timestamp) {
    49.         String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
    50.         return timeStr;
    51.     }
    52.     /**
    53.      * 字符串 转 Date
    54.      *
    55.      * @param time
    56.      * @return
    57.      */
    58.     public static Date strToDate(String time) {
    59.         LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
    60.         return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
    61.     }
    62. }
    复制代码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户国营

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表