Kafka+Fink 实战+工具类
[*]LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {
private static final String TOPIC_NAME = "ods_link_visit_topic";
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 记录日志
*
* @param request
* @param shortLinkCode
* @param accountNo
* @return
*/
@Override
public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
// ip、 浏览器信息
String ip = CommonUtil.getIpAddr(request);
// 全部请求头
Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);
Map<String,String> availableMap = new HashMap<>();
availableMap.put("user-agent",headerMap.get("user-agent"));
availableMap.put("referer",headerMap.get("referer"));
availableMap.put("accountNo",accountNo.toString());
LogRecord logRecord = LogRecord.builder()
//日志类型
.event(LogTypeEnum.SHORT_LINK_TYPE.name())
//日志内容
.data(availableMap)
//客户端ip
.ip(ip)
// 时间
.ts(CommonUtil.getCurrentTimestamp())
//业务唯一标识(短链码)
.bizId(shortLinkCode).build();
String jsonLog = JsonUtil.obj2Json(logRecord);
//打印日志 in 控制台
log.info(jsonLog);
// 发送kafka
kafkaTemplate.send(TOPIC_NAME,jsonLog);
}
}
[*]DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
//定义 topic
public static final String SOURCE_TOPIC = "ods_link_visit_topic";
//定义 消费组
public static final String SINK_TOPIC = "dwd_link_visit_topic";
//定义 消费组
public static final String GROUP_ID = "dwd_short_link_group";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);
FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);
DataStreamSource<String> ds = env.addSource(kafkaConsumer);
ds.print();
SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String value, Collector<JSONObject> out) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
// 生成web端设备唯一标识
String udid = getDeviceId(jsonObject);
jsonObject.put("udid",udid);
String referer = getReferer(jsonObject);
jsonObject.put("referer",referer);
out.collect(jsonObject);
}
});
// 分组
KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject value) throws Exception {
return value.getString("udid");
}
});
// 识别新老访客 richMap open函数,对状态以及日期格式进行初始化
SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());
jsonDSWithVisitorState.print("ods新老访客");
// 存储到dwd
FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);
jsonDSWithVisitorState.addSink(kafkaProducer);
env.execute();
}
/**
* 获取referer
* @param jsonObject
* @return
*/
public static String getReferer(JSONObject jsonObject){
JSONObject dataJsonObj = jsonObject.getJSONObject("data");
if(dataJsonObj.containsKey("referer")){
String referer = dataJsonObj.getString("referer");
if(StringUtils.isNotBlank(referer)){
try {
URL url = new URL(referer);
return url.getHost();
} catch (MalformedURLException e) {
log.error("提取referer失败:{}",e.toString());
}
}
}
return "";
}
/**
* 生成设备唯一标识
*
* @param jsonObject
* @return
*/
public static String getDeviceId(JSONObject jsonObject){
Map<String,String> map= new TreeMap<>();
try{
map.put("ip",jsonObject.getString("ip"));
map.put("event",jsonObject.getString("event"));
map.put("bizId",jsonObject.getString("bizId"));
map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));
return DeviceUtil.geneWebUniqueDeviceId(map);
}catch (Exception e){
log.error("生产唯一deviceId异常:{}", jsonObject);
return null;
}
}
}
[*]KafkaUtil
@Slf4j
public class KafkaUtil {
/**
* kafka 的 broker 地址
*/
private static String KAFKA_SERVER = null;
static {
Properties properties = new Properties();
InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
try {
properties.load(in);
} catch (IOException e) {
e.printStackTrace();
log.error("加载Kafka配置文件失败:{}",e.getMessage());
}
//获取配置文件中的value
KAFKA_SERVER = properties.getProperty("kafka.servers");
}
/**
* 获取flink的kafka消费者
* @param topic
* @param groupId
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
}
/**
* 获取flink的kafka生产者
* @param topic
* @return
*/
public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
}
}
[*]TimeUtil
public class TimeUtil {
/**
* 默认日期格式
*/
private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
/**
* 默认日期格式
*/
private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER= DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
/**
* LocalDateTime 转 字符串,指定日期格式
* @param localDateTime
* @param pattern
* @return
*/
public static String format(LocalDateTime localDateTime, String pattern){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* Date 转 字符串, 指定日期格式
* @param time
* @param pattern
* @return
*/
public static String format(Date time, String pattern){
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
*Date 转 字符串,默认日期格式
* @param time
* @return
*/
public static String format(Date time){
String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* timestamp 转 字符串,默认日期格式
*
* @param timestamp
* @return
*/
public static String format(long timestamp) {
String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
return timeStr;
}
/**
* 字符串 转 Date
*
* @param time
* @return
*/
public static Date strToDate(String time) {
LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
}
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]