- @Service
- @Slf4j
- public class LogServiceImpl implements LogService {
- private static final String TOPIC_NAME = "ods_link_visit_topic";
- @Autowired
- private KafkaTemplate kafkaTemplate;
- /**
- * 记录日志
- *
- * @param request
- * @param shortLinkCode
- * @param accountNo
- * @return
- */
- @Override
- public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
- // ip、 浏览器信息
- String ip = CommonUtil.getIpAddr(request);
- // 全部请求头
- Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);
- Map<String,String> availableMap = new HashMap<>();
- availableMap.put("user-agent",headerMap.get("user-agent"));
- availableMap.put("referer",headerMap.get("referer"));
- availableMap.put("accountNo",accountNo.toString());
- LogRecord logRecord = LogRecord.builder()
- //日志类型
- .event(LogTypeEnum.SHORT_LINK_TYPE.name())
- //日志内容
- .data(availableMap)
- //客户端ip
- .ip(ip)
- // 时间
- .ts(CommonUtil.getCurrentTimestamp())
- //业务唯一标识(短链码)
- .bizId(shortLinkCode).build();
- String jsonLog = JsonUtil.obj2Json(logRecord);
- //打印日志 in 控制台
- log.info(jsonLog);
- // 发送kafka
- kafkaTemplate.send(TOPIC_NAME,jsonLog);
- }
- }
复制代码- @Slf4j
- public class DwdShortLinkLogApp {
- //定义 topic
- public static final String SOURCE_TOPIC = "ods_link_visit_topic";
- //定义 消费组
- public static final String SINK_TOPIC = "dwd_link_visit_topic";
- //定义 消费组
- public static final String GROUP_ID = "dwd_short_link_group";
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);
- FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);
- DataStreamSource<String> ds = env.addSource(kafkaConsumer);
- ds.print();
- SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {
- @Override
- public void flatMap(String value, Collector<JSONObject> out) throws Exception {
- JSONObject jsonObject = JSON.parseObject(value);
- // 生成web端设备唯一标识
- String udid = getDeviceId(jsonObject);
- jsonObject.put("udid",udid);
- String referer = getReferer(jsonObject);
- jsonObject.put("referer",referer);
- out.collect(jsonObject);
- }
- });
- // 分组
- KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {
- @Override
- public String getKey(JSONObject value) throws Exception {
- return value.getString("udid");
- }
- });
- // 识别新老访客 richMap open函数,对状态以及日期格式进行初始化
- SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());
- jsonDSWithVisitorState.print("ods新老访客");
- // 存储到dwd
- FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);
- jsonDSWithVisitorState.addSink(kafkaProducer);
- env.execute();
- }
- /**
- * 获取referer
- * @param jsonObject
- * @return
- */
- public static String getReferer(JSONObject jsonObject){
- JSONObject dataJsonObj = jsonObject.getJSONObject("data");
- if(dataJsonObj.containsKey("referer")){
- String referer = dataJsonObj.getString("referer");
- if(StringUtils.isNotBlank(referer)){
- try {
- URL url = new URL(referer);
- return url.getHost();
- } catch (MalformedURLException e) {
- log.error("提取referer失败:{}",e.toString());
- }
- }
- }
- return "";
- }
- /**
- * 生成设备唯一标识
- *
- * @param jsonObject
- * @return
- */
- public static String getDeviceId(JSONObject jsonObject){
- Map<String,String> map= new TreeMap<>();
- try{
- map.put("ip",jsonObject.getString("ip"));
- map.put("event",jsonObject.getString("event"));
- map.put("bizId",jsonObject.getString("bizId"));
- map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));
- return DeviceUtil.geneWebUniqueDeviceId(map);
- }catch (Exception e){
- log.error("生产唯一deviceId异常:{}", jsonObject);
- return null;
- }
- }
- }
复制代码
- KafkaUtil
- @Slf4j
- public class KafkaUtil {
- /**
- * kafka 的 broker 地址
- */
- private static String KAFKA_SERVER = null;
- static {
- Properties properties = new Properties();
- InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
- try {
- properties.load(in);
- } catch (IOException e) {
- e.printStackTrace();
- log.error("加载Kafka配置文件失败:{}",e.getMessage());
- }
- //获取配置文件中的value
- KAFKA_SERVER = properties.getProperty("kafka.servers");
- }
- /**
- * 获取flink的kafka消费者
- * @param topic
- * @param groupId
- * @return
- */
- public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
- Properties properties = new Properties();
- properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
- properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
- return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
- }
- /**
- * 获取flink的kafka生产者
- * @param topic
- * @return
- */
- public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
- return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
- }
- }
复制代码 - TimeUtil
- public class TimeUtil {
- /**
- * 默认日期格式
- */
- private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
- /**
- * 默认日期格式
- */
- private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
- private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
- /**
- * LocalDateTime 转 字符串,指定日期格式
- * @param localDateTime
- * @param pattern
- * @return
- */
- public static String format(LocalDateTime localDateTime, String pattern){
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
- String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
- return timeStr;
- }
- /**
- * Date 转 字符串, 指定日期格式
- * @param time
- * @param pattern
- * @return
- */
- public static String format(Date time, String pattern){
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
- String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
- return timeStr;
- }
- /**
- * Date 转 字符串,默认日期格式
- * @param time
- * @return
- */
- public static String format(Date time){
- String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
- return timeStr;
- }
- /**
- * timestamp 转 字符串,默认日期格式
- *
- * @param timestamp
- * @return
- */
- public static String format(long timestamp) {
- String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
- return timeStr;
- }
- /**
- * 字符串 转 Date
- *
- * @param time
- * @return
- */
- public static Date strToDate(String time) {
- LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
- return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |