用户国营 发表于 2023-8-30 06:47:19

Kafka+Fink 实战+工具类


[*]LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {

    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
   * 记录日志
   *
   * @param request
   * @param shortLinkCode
   * @param accountNo
   * @return
   */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
      // ip、 浏览器信息
      String ip = CommonUtil.getIpAddr(request);
      // 全部请求头
      Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

      Map<String,String> availableMap = new HashMap<>();
      availableMap.put("user-agent",headerMap.get("user-agent"));
      availableMap.put("referer",headerMap.get("referer"));
      availableMap.put("accountNo",accountNo.toString());

      LogRecord logRecord = LogRecord.builder()
                //日志类型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日志内容
                .data(availableMap)
                //客户端ip
                .ip(ip)
                // 时间
                .ts(CommonUtil.getCurrentTimestamp())
                //业务唯一标识(短链码)
                .bizId(shortLinkCode).build();

      String jsonLog = JsonUtil.obj2Json(logRecord);

      //打印日志 in 控制台
      log.info(jsonLog);

      // 发送kafka
      kafkaTemplate.send(TOPIC_NAME,jsonLog);


    }
}

[*]DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
    //定义 topic
    public static final String SOURCE_TOPIC = "ods_link_visit_topic";

    //定义 消费组
    public static final String SINK_TOPIC = "dwd_link_visit_topic";

    //定义 消费组
    public static final String GROUP_ID = "dwd_short_link_group";


    public static void main(String[] args) throws Exception {

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      env.setParallelism(1);

//      DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);

      FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);

      DataStreamSource<String> ds = env.addSource(kafkaConsumer);

      ds.print();

      SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {

            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                // 生成web端设备唯一标识
                String udid = getDeviceId(jsonObject);
                jsonObject.put("udid",udid);

                String referer = getReferer(jsonObject);
                jsonObject.put("referer",referer);

                out.collect(jsonObject);

            }
      });

      // 分组
      KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {

            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getString("udid");

            }
      });


      // 识别新老访客    richMap open函数,对状态以及日期格式进行初始化

      SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());

      jsonDSWithVisitorState.print("ods新老访客");

      // 存储到dwd
      FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);

      jsonDSWithVisitorState.addSink(kafkaProducer);


      env.execute();
    }

    /**
   * 获取referer
   * @param jsonObject
   * @return
   */
    public static String getReferer(JSONObject jsonObject){
      JSONObject dataJsonObj = jsonObject.getJSONObject("data");
      if(dataJsonObj.containsKey("referer")){

            String referer = dataJsonObj.getString("referer");
            if(StringUtils.isNotBlank(referer)){
                try {
                  URL url = new URL(referer);
                  return url.getHost();
                } catch (MalformedURLException e) {
                  log.error("提取referer失败:{}",e.toString());
                }
            }
      }

      return "";

    }

    /**
   * 生成设备唯一标识
   *
   * @param jsonObject
   * @return
   */
    public static String getDeviceId(JSONObject jsonObject){
      Map<String,String> map= new TreeMap<>();

      try{
            map.put("ip",jsonObject.getString("ip"));
            map.put("event",jsonObject.getString("event"));
            map.put("bizId",jsonObject.getString("bizId"));
            map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));

            return DeviceUtil.geneWebUniqueDeviceId(map);

      }catch (Exception e){
            log.error("生产唯一deviceId异常:{}", jsonObject);
            return null;
      }


    }


}

[*]KafkaUtil
@Slf4j
public class KafkaUtil {

    /**
   * kafka 的 broker 地址
   */
    private static String KAFKA_SERVER = null;

    static {
      Properties properties = new Properties();

      InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");

      try {
            properties.load(in);
      } catch (IOException e) {
            e.printStackTrace();
            log.error("加载Kafka配置文件失败:{}",e.getMessage());
      }

      //获取配置文件中的value
      KAFKA_SERVER = properties.getProperty("kafka.servers");

    }

    /**
   * 获取flink的kafka消费者
   * @param topic
   * @param groupId
   * @return
   */
    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
      Properties properties = new Properties();
      properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
      properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);

      return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
    }

    /**
   * 获取flink的kafka生产者
   * @param topic
   * @return
   */
    public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
      return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
    }
}
[*]TimeUtil
public class TimeUtil {

    /**
   * 默认日期格式
   */
    private static final String DEFAULT_PATTERN = "yyyy-MM-dd";

    /**
   * 默认日期格式
   */
    private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER= DateTimeFormatter.ofPattern(DEFAULT_PATTERN);

    private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();


    /**
   * LocalDateTime 转 字符串,指定日期格式
   * @param localDateTime
   * @param pattern
   * @return
   */
    public static String format(LocalDateTime localDateTime, String pattern){
      DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
      String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
      return timeStr;
    }


    /**
   * Date 转 字符串, 指定日期格式
   * @param time
   * @param pattern
   * @return
   */
    public static String format(Date time, String pattern){
      DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
      String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
      return timeStr;
    }

    /**
   *Date 转 字符串,默认日期格式
   * @param time
   * @return
   */
    public static String format(Date time){

      String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
      return timeStr;
    }

    /**
   * timestamp 转 字符串,默认日期格式
   *
   * @param timestamp
   * @return
   */
    public static String format(long timestamp) {
      String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
      return timeStr;
    }


    /**
   * 字符串 转 Date
   *
   * @param time
   * @return
   */
    public static Date strToDate(String time) {
      LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
      return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());

    }

}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Kafka+Fink 实战+工具类