Java分布式流处理惩罚,flink+kafka实现电商网站个性化商品推荐体系 ...

打印 上一主题 下一主题

主题 844|帖子 844|积分 2532

在当代电商情况中,用户每天都会浏览大量商品页面,而这些举动数据中蕴藏着丰富的信息。通过分析用户的浏览汗青、购买记载以及对特定商品的兴趣程度,我们可以为用户提供更加个性化的商品推荐,从而提升用户体验和转化率。为了实现实时的个性化推荐,我们需要构建一个可以或许快速处理惩罚并响应用户举动变化的数据处理惩罚管道。
戳底部名片,一起变现

技术栈选择



  • Apache Kafka:作为消息队列服务,Kafka 可以或许高效地网络来自前端应用的日志变乱(如页面点击、商品查看等),并且具备高吞吐量和低耽误的特点,非常得当用来传输实时数据流。
  • Apache Flink:Flink 是一款强大的流处理惩罚框架,它提供了丰富的 API 和内置算法库,可以轻松地对流入的数据举行清洗、转换、聚合等操作,并且支持变乱时间语义,确保即使在网络故障或节点重启的情况下也能保持同等性和准确性。
  • Elasticsearch (ES):作为一个分布式搜索和分析引擎,ES 不仅能存储大量的结构化和非结构化数据,还允许我们执行复杂的查询和排序任务,是构建推荐体系的理想选择之一。别的,它的 RESTful API 方便与其他服务集成。
设计实现思路


  • 数据收罗:从电商网站捕获用户的交互举动(如点击、添加到购物车、购买等),并通过 Kafka 发布这些变乱。
  • 数据预处理惩罚:利用 Flink 消费 Kafka 中的数据,举行初步的清洗和格式化,例如去除无效记载、解析 JSON 字符串等。
  • 特征工程:基于用户的举动模式提取有用的特征向量,比如最近浏览的商品类别、偏好品牌、价格区间等。
  • 模子训练:利用机器学习算法(如协同过滤、内容推荐)构建推荐模子,并定期更新以适应新的趋势。
  • 效果输出:将天生的推荐列表存入 Elasticsearch 中,以便快速检索和展示给用户。
  • 前端展示:当用户访问商品详情页或其他相干页面时,调用 ES 的 API 获取个性化推荐效果,并动态加载到页面上。
实现步骤及示例代码

1. 数据收罗

首先,我们需要在电商网站上部署 JavaScript 或服务器端代码来跟踪用户的每一次互动,并将这些变乱发送到 Kafka 主题 user_events 中。这里给出一个简单的 Python 示例,演示怎样创建一个 Kafka 生产者来模拟用户举动日志的发送。
  1. from kafka import KafkaProducer
  2. import json
  3. import time
  4. import random
  5. # 初始化 Kafka 生产者
  6. producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
  7. topic_name = 'user_events'
  8. def simulate_user_behavior():
  9.     user_ids = ["user_001", "user_002", "user_003"]
  10.     product_ids = ["prod_001", "prod_002", "prod_003"]
  11.     actions = ["view", "add_to_cart", "purchase"]
  12.     while True:
  13.         # 模拟生成用户行为
  14.         user_id = random.choice(user_ids)
  15.         product_id = random.choice(product_ids)
  16.         action = random.choice(actions)
  17.         event = {
  18.             "event_type": action,
  19.             "user_id": user_id,
  20.             "product_id": product_id,
  21.             "timestamp": int(time.time() * 1000)
  22.         }
  23.         # 将事件发送到 Kafka 主题
  24.         producer.send(topic_name, value=event)
  25.         print(f"Sent event: {event}")
  26.         # 每隔一秒发送一次事件
  27.         time.sleep(1)
  28. if __name__ == "__main__":
  29.     try:
  30.         simulate_user_behavior()
  31.     except KeyboardInterrupt:
  32.         producer.close()
复制代码
2. 数据预处理惩罚

接下来,我们将编写一个 Flink 应用步伐来消费 Kafka 中的数据,并对其举行必要的预处理惩罚。这包括解析原始变乱、过滤掉不完整的记载,以及为后续的特征提取做准备。
  1. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  6. import com.fasterxml.jackson.databind.JsonNode;
  7. import com.fasterxml.jackson.databind.ObjectMapper;
  8. public class UserBehaviorPreprocessing {
  9.     public static void main(String[] args) throws Exception {
  10.         // 设置执行环境
  11.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         // 配置 Kafka 消费者参数
  13.         Properties properties = new Properties();
  14.         properties.setProperty("bootstrap.servers", "localhost:9092");
  15.         properties.setProperty("group.id", "flink-user-behavior-group");
  16.         // 创建 Kafka 消费者
  17.         FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
  18.             "user_events",
  19.             new SimpleStringSchema(),
  20.             properties
  21.         );
  22.         // 解析 Kafka 消息为 UserEvent 对象
  23.         DataStream<UserEvent> events = env.addSource(kafkaSource)
  24.             .map(message -> parseJsonToUserEvent(message));
  25.         // 输出预处理后的事件到控制台(实际应用中应继续处理)
  26.         events.print();
  27.         // 执行程序
  28.         env.execute("User Behavior Preprocessing");
  29.     }
  30.     // 辅助方法:将 JSON 字符串解析成 UserEvent 对象
  31.     private static UserEvent parseJsonToUserEvent(String jsonStr) {
  32.         ObjectMapper mapper = new ObjectMapper();
  33.         try {
  34.             JsonNode rootNode = mapper.readTree(jsonStr);
  35.             UserEvent event = new UserEvent();
  36.             event.eventType = rootNode.path("event_type").asText();
  37.             event.userId = rootNode.path("user_id").asText();
  38.             event.productId = rootNode.path("product_id").asText();
  39.             event.timestamp = rootNode.path("timestamp").asLong();
  40.             return event;
  41.         } catch (Exception e) {
  42.             throw new RuntimeException("Failed to parse JSON", e);
  43.         }
  44.     }
  45.     // 辅助类:表示单个用户行为事件
  46.     public static class UserEvent {
  47.         public String eventType;
  48.         public String userId;
  49.         public String productId;
  50.         public long timestamp;
  51.     }
  52. }
复制代码
3. 特征工程

在此阶段,我们会根据用户的举动模式计算出一系列特征向量,用于后续的推荐模子训练。例如,我们可以统计每个用户在过去一周内最常浏览的商品类别,大概他们倾向于购买哪个价位段的产品。这部门逻辑可以在 Flink 中通过窗口操作来实现。
  1. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  2. import org.apache.flink.api.common.functions.AggregateFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  7. import org.apache.flink.streaming.api.windowing.time.Time;
  8. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  9. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  10. import org.apache.flink.util.Collector;
  11. public class FeatureEngineering {
  12.     public static void main(String[] args) throws Exception {
  13.         // 设置执行环境
  14.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         // 假设我们已经有了一个预处理过的用户行为流
  16.         DataStream<UserEvent> events = ...; // 省略具体来源
  17.         // 计算每个用户在过去7天内的浏览商品类别分布
  18.         DataStream<Tuple2<String, Tuple2<String, Integer>>> categoryDistribution =
  19.             events
  20.                 .assignTimestampsAndWatermarks(WatermarkStrategy.<UserEvent>forMonotonousTimestamps()
  21.                     .withTimestampAssigner((event, timestamp) -> event.timestamp))
  22.                 .keyBy(event -> event.userId) // 按用户 ID 分组
  23.                 .window(SlidingEventTimeWindows.of(Time.days(7), Time.hours(1))) // 每小时滑动窗口,覆盖过去7天的数据
  24.                 .aggregate(new CategoryAggregator(), new CategoryFormatter());
  25.         // 输出特征向量到控制台(实际应用中应保存到 ES 或其他存储)
  26.         categoryDistribution.print();
  27.         // 执行程序
  28.         env.execute("Feature Engineering for Recommendations");
  29.     }
  30.     // 自定义聚合函数,用于计算每个用户浏览的商品类别分布
  31.     public static class CategoryAggregator implements AggregateFunction<UserEvent, CategoryAccumulator, Tuple2<String, Integer>> {
  32.         @Override
  33.         public CategoryAccumulator createAccumulator() {
  34.             return new CategoryAccumulator();
  35.         }
  36.         @Override
  37.         public CategoryAccumulator add(UserEvent value, CategoryAccumulator accumulator) {
  38.             if ("view".equals(value.eventType)) {
  39.                 // 这里假设产品 ID 包含了分类信息,例如 prod_001_gadgets 表示这是一个小工具类别的商品
  40.                 String categoryId = value.productId.split("_")[2];
  41.                 accumulator.categoryCounts.put(categoryId, accumulator.categoryCounts.getOrDefault(categoryId, 0) + 1);
  42.             }
  43.             return accumulator;
  44.         }
  45.         @Override
  46.         public Tuple2<String, Integer> getResult(CategoryAccumulator accumulator) {
  47.             // 返回浏览次数最多的商品类别及其出现次数
  48.             Entry<String, Integer> maxEntry = null;
  49.             for (Entry<String, Integer> entry : accumulator.categoryCounts.entrySet()) {
  50.                 if (maxEntry == null || entry.getValue().compareTo(maxEntry.getValue()) > 0) {
  51.                     maxEntry = entry;
  52.                 }
  53.             }
  54.             return maxEntry != null ? Tuple2.of(maxEntry.getKey(), maxEntry.getValue()) : null;
  55.         }
  56.         @Override
  57.         public CategoryAccumulator merge(CategoryAccumulator a, CategoryAccumulator b) {
  58.             for (Entry<String, Integer> entry : b.categoryCounts.entrySet()) {
  59.                 a.categoryCounts.put(entry.getKey(), a.categoryCounts.getOrDefault(entry.getKey(), 0) + entry.getValue());
  60.             }
  61.             return a;
  62.         }
  63.     }
  64.     // 格式化输出结果
  65.     public static class CategoryFormatter implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Tuple2<String, Integer>>, String, TimeWindow> {
  66.         @Override
  67.         public void apply(
  68.             String key,
  69.             TimeWindow window,
  70.             Iterable<Tuple2<String, Integer>> input,
  71.             Collector<Tuple2<String, Tuple2<String, Integer>>> out
  72.         ) {
  73.             for (Tuple2<String, Integer> result : input) {
  74.                 if (result != null) {
  75.                     out.collect(Tuple2.of(key, result));
  76.                 }
  77.             }
  78.         }
  79.     }
  80.     // 辅助类:用于保存中间状态
  81.     public static class CategoryAccumulator {
  82.         public Map<String, Integer> categoryCounts = new HashMap<>();
  83.     }
  84. }
复制代码
4. 模子训练

对于推荐模子的训练部门,通常会涉及到较为复杂的机器学习算法,如矩阵分解、深度神经网络等。由于这部门内容相对独立于 Flink 流程,因此可以思量利用 Spark MLlib 或 TensorFlow 等工具来举行离线训练,并将训练好的模子导出为 PMML 文件或其他格式供在线预测利用。
5. 效果输出

末了一步是将天生的推荐列表存入 Elasticsearch 中,以便快速检索和展示给用户。这里我们利用 Java 客户端 API 来插入文档。
  1. import org.elasticsearch.action.index.IndexRequest;
  2. import org.elasticsearch.client.RequestOptions;
  3. import org.elasticsearch.client.RestHighLevelClient;
  4. import org.elasticsearch.common.xcontent.XContentType;
  5. import java.io.IOException;
  6. public class SaveRecommendationsToES {
  7.     private static RestHighLevelClient client = new RestHighLevelClient(
  8.         RestClient.builder(new HttpHost("localhost", 9200, "http")));
  9.     public static void saveRecommendation(String userId, List<String> recommendedProducts) throws IOException {
  10.         IndexRequest request = new IndexRequest("recommendations")
  11.             .id(userId)
  12.             .source(XContentType.JSON, "products", recommendedProducts);
  13.         client.index(request, RequestOptions.DEFAULT);
  14.     }
  15.     public static void closeClient() throws IOException {
  16.         client.close();
  17.     }
  18. }
复制代码
6. 前端展示

当用户访问商品详情页或其他相干页面时,可以通过 AJAX 请求调用 ES 的 API 获取个性化推荐效果,并动态加载到页面上。以下是 JavaScript 示例代码片段:
  1. function loadPersonalizedRecommendations(userId) {
  2.     fetch(`/api/recommendations/${userId}`)
  3.         .then(response => response.json())
  4.         .then(data => {
  5.             const recommendationsContainer = document.getElementById('recommendations');
  6.             data.products.forEach(productId => {
  7.                 // 动态创建 HTML 元素并添加到容器中
  8.                 const productElement = document.createElement('div');
  9.                 productElement.className = 'recommended-product';
  10.                 productElement.innerHTML = `<a href="/product/${productId}">${productId}</a>`;
  11.                 recommendationsContainer.appendChild(productElement);
  12.             });
  13.         })
  14.         .catch(error => console.error('Error loading personalized recommendations:', error));
  15. }
复制代码
戳底部名片,一起变现


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表