SpringBoot整合Flink,实现一个用户个性化推荐系统

打印 上一主题 下一主题

主题 979|帖子 979|积分 2937

通过SpringBoot联合Flink的高性能流处理本事,开发者可以构建出高效、可靠的及时数据处理办理方案。这些应用不仅可以或许提高企业的运营服从,还能增强竞争力。

  应用场景

  1. 推荐系统:

  

  • 根据用户的及时行为生成个性化推荐。

  • 更新用户画像,以适应不断变化的兴趣和偏好。

  2. 变乱驱动架构:

  

  • 在微服务架构中处理跨服务的消息通报,确保系统的低耽误和高吞吐量。

  • 处理来自不同泉源的变乱,并根据业务规则执行相应的操作。

  3. 欺诈检测:

  

  • 及时监控金融交易或网络活动,辨认异常模式并触发警报。

  • 提供快速相应机制,减少欺诈损失。

  4. 流式数据分析:

  

  • 及时分析传感器数据,如物联网设备的数据流。

  • 监控和分析用户行为数据,提供即时反馈。

  5. 日志处理:

  

  • 及时收集、解析和聚合日志数据,帮助进行性能调优和故障清除。

  • 提供及时陈诉和仪表板,便于监控系统状态。

  6. 及时报表:

  

  • 生成及时销售陈诉、市场趋势分析等。

  • 支持决议订定者获得最新的业务洞察力。

  7. 供应链管理:

  

  • 及时跟踪库存水平、订单状态和物流信息。

  • 主动调整生产计划以满足需求变化。

  8. 交际媒体分析:

  

  • 及时监测舆情,分析公众对特定话题的见解。

  • 提供及时的情感分析,帮助企业相识品牌形象。

  9. 网络安全:

  

  • 及时监控网络流量,检测埋伏的安全威胁。

  • 快速相应攻击,保护企业免受损害。

  代码演示

  
我们现在用简朴代码例子演示怎样根据用户的及时行为生成个性化推荐。以下代码将展示怎样从Kafka读取用户行为数据,并使用Flink进行及时处理,最后生成个性化的推荐效果并将其写回到另一个Kafka主题中。

  1. 依赖项

  
在pom.xml中添加须要的依赖项:

  1. <dependencies>
  2.     <!-- Spring Boot Starter Web -->
  3.     <dependency>
  4.         <groupId>org.springframework.boot</groupId>
  5.         <artifactId>spring-boot-starter-web</artifactId>
  6.     </dependency>
  7.     <!-- Flink dependencies -->
  8.     <dependency>
  9.         <groupId>org.apache.flink</groupId>
  10.         <artifactId>flink-java</artifactId>
  11.         <version>1.14.6</version>
  12.     </dependency>
  13.     <dependency>
  14.         <groupId>org.apache.flink</groupId>
  15.         <artifactId>flink-streaming-java_2.12</artifactId>
  16.         <version>1.14.6</version>
  17.     </dependency>
  18.     <dependency>
  19.         <groupId>org.apache.flink</groupId>
  20.         <artifactId>flink-clients_2.12</artifactId>
  21.         <version>1.14.6</version>
  22.     </dependency>
  23.     <dependency>
  24.         <groupId>org.apache.flink</groupId>
  25.         <artifactId>flink-connector-kafka_2.12</artifactId>
  26.         <version>1.14.6</version>
  27.     </dependency>
  28.     <!-- JSON processing with Jackson -->
  29.     <dependency>
  30.         <groupId>com.fasterxml.jackson.core</groupId>
  31.         <artifactId>jackson-databind</artifactId>
  32.     </dependency>
  33. </dependencies>
复制代码
2. 界说数据模型

  
界说用户行为数据和推荐效果的数据模型。

  1. package com.example.demo.model;
  2. publicclass UserBehavior {
  3.     private String userId;
  4.     private String productId;
  5.     private String action; // e.g., "view", "click", "purchase"
  6.     privatelong timestamp;
  7.     // Getters and setters
  8.     public String getUserId() {
  9.         return userId;
  10.     }
  11.     public void setUserId(String userId) {
  12.         this.userId = userId;
  13.     }
  14.     public String getProductId() {
  15.         return productId;
  16.     }
  17.     public void setProductId(String productId) {
  18.         this.productId = productId;
  19.     }
  20.     public String getAction() {
  21.         return action;
  22.     }
  23.     public void setAction(String action) {
  24.         this.action = action;
  25.     }
  26.     public long getTimestamp() {
  27.         return timestamp;
  28.     }
  29.     public void setTimestamp(long timestamp) {
  30.         this.timestamp = timestamp;
  31.     }
  32. }
复制代码
  1. package com.example.demo.model;
  2. import java.util.List;
  3. publicclass RecommendationResult {
  4.     private String userId;
  5.     private List<String> recommendedProducts;
  6.     // Getters and setters
  7.     public String getUserId() {
  8.         return userId;
  9.     }
  10.     public void setUserId(String userId) {
  11.         this.userId = userId;
  12.     }
  13.     public List<String> getRecommendedProducts() {
  14.         return recommendedProducts;
  15.     }
  16.     public void setRecommendedProducts(List<String> recommendedProducts) {
  17.         this.recommendedProducts = recommendedProducts;
  18.     }
  19. }
复制代码
3. 编写Flink Job

  
创建一个简朴的Flink作业,它从Kafka读取用户行为数据,计算每个用户的热门产品,并将推荐效果写入另一个Kafka主题。

  1. package com.example.demo;
  2. import com.example.demo.model.RecommendationResult;
  3. import com.example.demo.model.UserBehavior;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  9. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  10. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  11. import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
  12. import java.util.ArrayList;
  13. import java.util.HashMap;
  14. import java.util.List;
  15. import java.util.Map;
  16. import java.util.Properties;
  17. publicclass RecommendationJob {
  18.     public static void main(String[] args) throws Exception {
  19.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20.         Properties properties = new Properties();
  21.         properties.setProperty("bootstrap.servers", "localhost:9092");
  22.         properties.setProperty("group.id", "test");
  23.         FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("user-behavior-topic", new SimpleStringSchema(), properties);
  24.         kafkaConsumer.setStartFromEarliest();
  25.         ObjectMapper objectMapper = new ObjectMapper();
  26.         DataStream<UserBehavior> userBehaviors = env.addSource(kafkaConsumer)
  27.                 .map((MapFunction<String, UserBehavior>) value -> objectMapper.readValue(value, UserBehavior.class));
  28.         DataStream<Tuple2<String, Map<String, Integer>>> productCountsPerUser = userBehaviors
  29.                 .filter(behavior -> behavior.getAction().equals("purchase"))
  30.                 .keyBy(UserBehavior::getUserId)
  31.                 .flatMap((FlatMapFunction<UserBehavior, Tuple2<String, Map<String, Integer>>>) (value, out) -> {
  32.                     Map<String, Integer> productCounts = new HashMap<>();
  33.                     if (productCounts.containsKey(value.getProductId())) {
  34.                         productCounts.put(value.getProductId(), productCounts.get(value.getProductId()) + 1);
  35.                     } else {
  36.                         productCounts.put(value.getProductId(), 1);
  37.                     }
  38.                     out.collect(Tuple2.of(value.getUserId(), productCounts));
  39.                 })
  40.                 .keyBy(Tuple2::f0)
  41.                 .reduce((t1, t2) -> {
  42.                     t1.f1.forEach((productId, count) -> t2.f1.merge(productId, count, Integer::sum));
  43.                     return t1;
  44.                 });
  45.         DataStream<RecommendationResult> recommendations = productCountsPerUser
  46.                 .map((MapFunction<Tuple2<String, Map<String, Integer>>, RecommendationResult>) value -> {
  47.                     RecommendationResult result = new RecommendationResult();
  48.                     result.setUserId(value.f0);
  49.                     List<String> recommendedProducts = new ArrayList<>(value.f1.keySet());
  50.                     result.setRecommendedProducts(recommendedProducts.subList(0, Math.min(5, recommendedProducts.size())));
  51.                     return result;
  52.                 });
  53.         FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("recommendations-topic",
  54.                 (RecommendationResult recommendation) -> objectMapper.writeValueAsString(recommendation),
  55.                 properties);
  56.         recommendations.map(ObjectMapper::writeValueAsString).addSink(kafkaProducer);
  57.         env.execute("Recommendation Job");
  58.     }
  59. }
复制代码
4. 创建Spring Boot Application

  
创建一个简朴的Spring Boot应用步伐来启动Flink作业。

  1. package com.example.demo;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.context.annotation.Bean;
  5. @SpringBootApplication
  6. publicclass DemoApplication {
  7.     public static void main(String[] args) {
  8.         SpringApplication.run(DemoApplication.class, args);
  9.     }
  10.     @Bean
  11.     public Runnable flinkRunner() {
  12.         return () -> {
  13.             try {
  14.                 RecommendationJob.main(new String[]{});
  15.             } catch (Exception e) {
  16.                 thrownew RuntimeException(e);
  17.             }
  18.         };
  19.     }
  20. }
复制代码
测试效果

     “   
请确保你的环境中已经设置好了Kafka和其他须要的服务,以便代码可以或许正常运行。

   
我本地的Kafka集群正在运行,并且有两个主题:user-behavior-topic 和 recommendations-topic。

  

  • 向user-behavior-topic发送消息:
    1. {"userId": "user1", "productId": "productA", "action": "purchase", "timestamp": 1672531200000}
    2. {"userId": "user1", "productId": "productB", "action": "purchase", "timestamp": 1672531200000}
    3. {"userId": "user1", "productId": "productC", "action": "purchase", "timestamp": 1672531200000}
    4. {"userId": "user2", "productId": "productB", "action": "purchase", "timestamp": 1672531200000}
    5. {"userId": "user2", "productId": "productD", "action": "purchase", "timestamp": 1672531200000}
    复制代码
  • 检查recommendations-topic中的消息:
    1. {"userId":"user1","recommendedProducts":["productA","productB","productC"]}
    2. {"userId":"user2","recommendedProducts":["productB","productD"]}
    复制代码
  关注我,送Java福利

  1. /**
  2.  * 这段代码只有Java开发者才能看得懂!
  3.  * 关注我微信公众号之后,
  4.  * 发送:"666",
  5.  * 即可获得一本由Java大神一手面试经验诚意出品
  6.  * 《Java开发者面试百宝书》Pdf电子书
  7.  * 福利截止日期为2025年01月22日止
  8.  * 手快有手慢没!!!
  9. */
  10. System.out.println("请关注我的微信公众号:");
  11. System.out.println("Java知识日历");
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天空闲话

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表