《黑马头条》 内容安全 主动审核 feign 延迟任务精准发布 kafka ...

十念  金牌会员 | 2024-6-23 16:01:40 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 886|帖子 886|积分 2658



  目录
  《黑马头条》SpringBoot+SpringCloud+ Nacos等企业级微服务架构项目_黑马头条项目_软工菜鸡的博客-CSDN博客
  04自媒体文章-主动审核
  1)自媒体文章主动审核流程
  2)内容安全第三方接口
  2.1)概述
  2.2)预备工作
  2.3)文本内容审核接口
  2.4)图片审核接口
  2.5)项目集成
  3)app端文章生存接口
  3.1)表结构说明
  3.2)分布式id
  分布式id-技能选型
  3.3)思路分析
  3.4)feign接口
  4)自媒体文章主动审核功能实现
  4.1)表结构说明
  4.2)实现
  4.3)单元测试
  4.4)feign长途接口调用方式
  4.5)服务降级处理
  5)发布文章提交审核集成
  5.1)同步调用与异步调用
  5.2)Springboot集成异步线程调用
  6)文章审核功能-综合测试
  6.1)服务启动列表
  6.2)测试情况列表
  7)新需求-自管理敏感词
  7.1)需求分析
  7.2)敏感词-过滤
  7.3)DFA实现原理
  7.4)自管理敏感词集成到文章审核中
  测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+
  8)新需求-图片识别文字审核敏感词
  8.1)需求分析
  8.2)图片文字识别
  8 .3)Tess4j案例
  8.4)管理敏感词和图片文字识别集成到文章审核
  9)文章详情-静态文件生成
  9.1)思路分析
  9.2)实现步骤
  05延迟任务精准发布文章
  1)文章定时发布
  2)延迟任务概述
  2.1)什么是延迟任务
  2.2)技能对比
  2.2.1)DelayQueue
  2.2.2)RabbitMQ实现延迟任务
  2.2.3)redis实现
  3)redis实现延迟任务
  锐评:完全为了学list zset而编出来的场景,现实工作中延迟队列要计划成这样只能说太蠢了
  4)延迟任务服务实现
  4.1)搭建heima-leadnews-schedule模块
  4.2)数据库预备
  乐观锁/悲观锁
  4.3)安装redis
  4.4)项目集成redis
  4.5)添加任务
  4.6)取消任务
  4.7)消耗任务
  4.8)未来数据定时刷新
  4.8.1)reids key值匹配
  4.8.2)reids管道
  4.8.3)未来数据定时刷新-功能完成
  4.9)分布式锁办理集群下的方法抢占执行
  4.9.1)问题描述
  4.9.2)分布式锁
  4.9.3)redis分布式锁
  4.9.4)在工具类CacheService中添加方法
  4.10)数据库同步到redis
  5)延迟队列办理精准时间发布文章
  5.1)延迟队列服务提供对外接口
  5.2)发布文章集成添加延迟队列接口
  序列化工具对比
  5.3)消耗任务举行审核文章
  06kafka及异步关照文章上下架
  1)自媒体文章上下架
  2)kafka概述
  消息中心件对比-选择建议
  kafka介绍-名词表明
  3)kafka安装配置
  4)kafka入门
  分区机制—topic剖析
  5)kafka高可用计划
  5.1)集群
  5.2)备份机制(Replication)
  6)kafka生产者详解
  6.1)发送范例
  6.2)参数详解
  ack确认机制
  retries 重试次数
  消息压缩
  7)kafka消耗者详解
  7.1)消耗者组
  7.2)消息有序性
  7.3)提交和偏移量
  1.提交当前偏移量(同步提交)
  2.异步提交
  3.同步和异步组合提交
  8)springboot集成kafka
  8.1)入门
  8.2)通报消息为对象
  9)自媒体文章上下架功能完成
  9.1)需求分析
  9.2)流程说明
  9.3)接口定义
  9.4)自媒体文章上下架-功能实现
  9.5)消息关照article端文章上下架
  
  
  


  04自媒体文章-主动审核

              
           1)自媒体文章主动审核流程

              
              1 自媒体端发布文章后,开始审核文章   
2 审核的重要是审核文章的   内容(文本内容和图片)   
3 借助   第三方提供的接口审核文本   
4 借助第三方提供的接口审核图片,由于图片存储到minIO中,需要先下载才气审核   
5 假如审核失败,则需要修改自媒体文章的状态,status:2 审核失败 status:3 转到人工审核   
6 假如审核成功,则需要在文章微服务中创建app端需要的文章    2)内容安全第三方接口

  2.1)概述

  内容安全是识别服务,支持对图片、视频、文本、语音等对象多样化场景检测,有效降低内容违规风险
  现在很多平台都支持内容检测,如阿里云、腾讯云、百度AI、网易云等国内大型互联网公司都对外提供了API。
  按照性能和收费来看,黑马头条项目使用的就是阿里云的内容安全接口,使用到了图片和文本的审核
  阿里云收费尺度:https://www.aliyun.com/price/product/?spm=a2c4g.11186623.2.10.4146401eg5oeu8#/lvwang/detail
  2.2)预备工作

  您在使用内容检测API之前,需要先注册阿里云账号,添加Access Key并签约云盾内容安全
  操纵步骤
  

  • 前去阿里云官网注册账号。假如已有注册账号,请跳过此步骤。
  进入阿里云首页后,假如没有阿里云的账户需要先举行注册,才可以举行登录。由于注册较为简单,课程和课本不在举行体现(注册可以使用多种方式,如淘宝账号、支付宝账号、微博账号等...)。
  需要实名认证和活体认证。
  

  • 打开云盾内容安全产品试用页面,单击立即开通,正式开通服务。
              
           内容安全控制台
              
           

  • 在AccessKey管理页面管理您的AccessKeyID和AccessKeySecret。
              
           管理自己的AccessKey,可以新建和删除AccessKey
              
           查看自己的AccessKey,
  AccessKey默认是隐藏的,第一次申请的时间可以生存AccessKey,点击表现,通过验证手机号后也可以查看
              
           2.3)文本内容审核接口

  文本垃圾内容检测:怎样调用文本检测接口举行文本内容审核_内容安全-阿里云资助中心
              
           文本垃圾内容Java SDK: 怎样使用JavaSDK文本反垃圾接口_内容安全-阿里云资助中心
  2.4)图片审核接口

  图片垃圾内容检测:调用图片同步检测接口/green/image/scan审核图片内容_内容安全-阿里云资助中心
              
           图片垃圾内容Java SDK: 怎样使用JavaSDK接口检测图片是否包含风险内容_内容安全-阿里云资助中心
  2.5)项目集成

  ①:拷贝资料文件夹中的类到common模块下面,并添加到主动配置
  包括了GreenImageScan和GreenTextScan及对应的工具类
              
           添加到主动配置中
              
           ②: accessKeyId和secret(需自己申请)
  在heima-leadnews-wemedia中的nacos配置中心添加以下配置:
  1. aliyun:
  2. accessKeyId: ...
  3. secret: ...
  4. #aliyun.scenes=porn,terrorism,ad,qrcode,live,logo
  5. scenes: terrorism
复制代码
③:在自媒体微服务中测试类中注入审核文本和图片的bean举行测试
  1. package com.heima.wemedia;
  2. import java.util.Arrays;
  3. import java.util.Map;
  4. @SpringBootTest(classes = WemediaApplication.class)
  5. @RunWith(SpringRunner.class)
  6. public class AliyunTest {
  7.     @Autowired
  8.     private GreenTextScan greenTextScan;
  9.     @Autowired
  10.     private GreenImageScan greenImageScan;
  11.     @Autowired
  12.     private FileStorageService fileStorageService;
  13.     @Test
  14.     public void testScanText() throws Exception {
  15.         Map map = greenTextScan.greeTextScan("我是一个好人,冰毒");
  16.         System.out.println(map);
  17.     }
  18.     @Test
  19.     public void testScanImage() throws Exception {
  20.         byte[] bytes = fileStorageService.downLoadFile("http://192.168.200.130:9000/leadnews/2021/04/26/ef3cbe458db249f7bd6fb4339e593e55.jpg");
  21.         Map map = greenImageScan.imageScan(Arrays.asList(bytes));
  22.         System.out.println(map);
  23.     }
  24. }
复制代码
我用的是 阿里云 云安全 加强版1小时,没审核出效果为null;估计是阿里 改接口了;
              
           图片审核页报错
  1. java.lang.RuntimeException: upload file fail.
  2.     at com.heima.common.aliyun.util.ClientUploader.uploadBytes(ClientUploader.java:129)
  3.     at com.heima.common.aliyun.GreenImageScan.imageScan(GreenImageScan.java:71)
  4.     at com.heima.wemedia.test.AliyunTest.testScanImage(AliyunTest.java:51)
  5.     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  6.     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  7.     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  8.     at java.lang.reflect.Method.invoke(Method.java:498)
  9.     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
  10.     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  11.     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
  12.     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  13.     at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
  14.     at org.springframework.test.context.junit4.statements.RunAfterTestExecutionC
复制代码
3)app端文章生存接口

  3.1)表结构说明

              
           3.2)分布式id

  随着业务的增长,文章表可能要占用很大的物理存储空间,为了办理该问题,后期使用数据库分片技能。将一个数据库举行拆分,通过数据库中心件连接。假如数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。
              
           分布式id-技能选型

              
           snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。
  其焦点头脑是:使用41bit作为毫秒数,10bit作为呆板的ID(5个bit是数据中心,5个bit的呆板ID)(最多32个机房*32台呆板(也可以自己设)),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),末了还有一个符号位,永世是0(1为负数)
              
           文章端相关的表使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content
  mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法
  第一:在实体类中的id上加入如下配置,指定范例为id_worker
  1. @TableId(value = "id",type = IdType.ID_WORKER)
  2. private Long id;
复制代码
第二:在application.yml文件中配置数据中心id和呆板id
  1. mybatis-plus:
  2.   mapper-locations: classpath*:mapper/*.xml
  3.   # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  4.   type-aliases-package: com.heima.model.article.pojos
  5.   global-config:
  6.     datacenter-id: 1
  7.     workerId: 1
复制代码
datacenter-id:数据中心id(取值范围:0-31) ;workerId:呆板id(取值范围:0-31)
  3.3)思路分析

  在文章审核成功以后需要在app的article库中新增文章数据
  1.生存文章信息 ap_article
  2.生存文章配置信息 ap_article_config
  3.生存文章内容 ap_article_content
  实现思路:
              
           3.4)feign接口

              
           ArticleDto
  1. package com.heima.model.article.dtos;
  2. import com.heima.model.article.pojos.ApArticle;
  3. import lombok.Data;
  4. @Data
  5. public class ArticleDto  extends ApArticle {
  6.     /**
  7.      * 文章内容
  8.      */
  9.     private String content;
  10. }
复制代码
功能实现:
  ①:在heima-leadnews-feign-api中新增接口
  第一:线导入feign的依靠
  1. <dependency>
  2.     <groupId>org.springframework.cloud</groupId>
  3.     <artifactId>spring-cloud-starter-openfeign</artifactId>
  4. </dependency>
复制代码
第二:定义文章端的接口
  1. package com.heima.apis.article;
  2. import org.springframework.web.bind.annotation.RequestBody;
  3. @FeignClient(value = "leadnews-article")
  4. public interface IArticleClient {
  5.     @PostMapping("/api/v1/article/save")
  6.     public ResponseResult saveArticle(@RequestBody ArticleDto dto) ;
  7. }
复制代码
②:在heima-leadnews-article中实现该方法
  1. package com.heima.article.feign;
  2. import java.io.IOException;
  3. @RestController
  4. public class ArticleClient implements IArticleClient {
  5.     @Autowired
  6.     private ApArticleService apArticleService;
  7.     @Override
  8.     @PostMapping("/api/v1/article/save")
  9.     public ResponseResult saveArticle(@RequestBody ArticleDto dto) {
  10.         return apArticleService.saveArticle(dto);
  11.     }
  12. }
复制代码
③:拷贝mapper
  在资料文件夹中拷贝ApArticleConfigMapper类到mapper文件夹中
  同时,修改ApArticleConfig类,添加如下构造函数
  1. package com.heima.model.article.pojos;
  2. import java.io.Serializable;
  3. /**
  4. * <p>
  5. * APP已发布文章配置表
  6. * </p>
  7. *
  8. * @author itheima
  9. */
  10. @Data
  11. @NoArgsConstructor
  12. @TableName("ap_article_config")
  13. public class ApArticleConfig implements Serializable {
  14.     public ApArticleConfig(Long articleId){
  15.         this.articleId = articleId;
  16.         this.isComment = true;
  17.         this.isForward = true;
  18.         this.isDelete = false;
  19.         this.isDown = false;
  20.     }
  21.     @TableId(value = "id",type = IdType.ID_WORKER)
  22.     private Long id;
  23.     /**
  24.      * 文章id
  25.      */
  26.     @TableField("article_id")
  27.     private Long articleId;
  28.     /**
  29.      * 是否可评论
  30.      * true: 可以评论   1
  31.      * false: 不可评论  0
  32.      */
  33.     @TableField("is_comment")
  34.     private Boolean isComment;
  35.     /**
  36.      * 是否转发
  37.      * true: 可以转发   1
  38.      * false: 不可转发  0
  39.      */
  40.     @TableField("is_forward")
  41.     private Boolean isForward;
  42.     /**
  43.      * 是否下架
  44.      * true: 下架   1
  45.      * false: 没有下架  0
  46.      */
  47.     @TableField("is_down")
  48.     private Boolean isDown;
  49.     /**
  50.      * 是否已删除
  51.      * true: 删除   1
  52.      * false: 没有删除  0
  53.      */
  54.     @TableField("is_delete")
  55.     private Boolean isDelete;
  56. }
复制代码
④:在ApArticleService中新增方法
  1. /**
  2.      * 保存app端相关文章
  3.      * @param dto
  4.      * @return
  5.      */
  6. ResponseResult saveArticle(ArticleDto dto) ;
复制代码
实现类:
  1. @Autowired
  2. private ApArticleConfigMapper apArticleConfigMapper;
  3. @Autowired
  4. private ApArticleContentMapper apArticleContentMapper;
  5. /**
  6.      * 保存app端相关文章
  7.      * @param dto
  8.      * @return
  9.      */
  10. @Override
  11. public ResponseResult saveArticle(ArticleDto dto) {
  12.     //1.检查参数
  13.     if(dto == null){
  14.         return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  15.     }
  16.     ApArticle apArticle = new ApArticle();
  17.     BeanUtils.copyProperties(dto,apArticle);
  18.     //2.判断是否存在id
  19.     if(dto.getId() == null){
  20.         //2.1 不存在id  保存  文章  文章配置  文章内容
  21.         //保存文章
  22.         save(apArticle);
  23.         //保存配置
  24.         ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
  25.         apArticleConfigMapper.insert(apArticleConfig);
  26.         //保存 文章内容
  27.         ApArticleContent apArticleContent = new ApArticleContent();
  28.         apArticleContent.setArticleId(apArticle.getId());
  29.         apArticleContent.setContent(dto.getContent());
  30.         apArticleContentMapper.insert(apArticleContent);
  31.     }else {
  32.         //2.2 存在id   修改  文章  文章内容
  33.         //修改  文章
  34.         updateById(apArticle);
  35.         //修改文章内容
  36.         ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
  37.         apArticleContent.setContent(dto.getContent());
  38.         apArticleContentMapper.updateById(apArticleContent);
  39.     }
  40.     //3.结果返回  文章的id
  41.     return ResponseResult.okResult(apArticle.getId());
  42. }
复制代码
⑤:测试
  编写junit单元测试,或使用postman举行测试
  http://localhost:51802/api/v1/article/save
  1. {
  2.         "id":这个id要去数据库自己找 ,
  3.     "title":"黑马头条项目背景22222222222222",
  4.     "authoId":1102,
  5.     "layout":1,
  6.     "labels":"黑马头条",
  7.     "publishTime":"2028-03-14T11:35:49.000Z",
  8.     "images": "http://192.168.200.130:9000/leadnews/2021/04/26/5ddbdb5c68094ce393b08a47860da275.jpg",
  9.     "content":"22222222222222222黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景"
  10. }
复制代码
4)自媒体文章主动审核功能实现

  4.1)表结构说明

  wm_news 自媒体文章表
              
           status字段:0 草稿 1 待审核 2 审核失败 3 人工审核 4 人工审核通过 8 审核通过(待发布) 9 已发布
  4.2)实现

              
           在heima-leadnews-wemedia中的service新增接口
  1. package com.heima.wemedia.service;
  2. public interface WmNewsAutoScanService {
  3.     /**
  4.      * 自媒体文章审核
  5.      * @param id  自媒体文章id
  6.      */
  7.     public void autoScanWmNews(Integer id);
  8. }
复制代码
实现类:
  1. package com.heima.wemedia.service.impl;
  2. import java.util.*;
  3. import java.util.stream.Collectors;
  4. @Service
  5. @Slf4j
  6. @Transactional
  7. public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {
  8.     @Autowired
  9.     private WmNewsMapper wmNewsMapper;
  10.     /**
  11.      * 自媒体文章审核
  12.      *
  13.      * @param id 自媒体文章id
  14.      */
  15.     @Override
  16.     public void autoScanWmNews(Integer id) {
  17.         //1.查询自媒体文章
  18.         WmNews wmNews = wmNewsMapper.selectById(id);
  19.         if(wmNews == null){
  20.             throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
  21.         }
  22.         if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())){
  23.             //从内容中提取纯文本内容和图片
  24.             Map<String,Object> textAndImages = handleTextAndImages(wmNews);
  25.             //2.审核文本内容  阿里云接口
  26.             boolean isTextScan = handleTextScan((String) textAndImages.get("content"),wmNews);
  27.             if(!isTextScan)return;
  28.             //3.审核图片  阿里云接口
  29.             boolean isImageScan =  handleImageScan((List<String>) textAndImages.get("images"),wmNews);
  30.             if(!isImageScan)return;
  31.             //4.审核成功,保存app端的相关的文章数据
  32.             ResponseResult responseResult = saveAppArticle(wmNews);
  33.             if(!responseResult.getCode().equals(200)){
  34.                 throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
  35.             }
  36.             //回填article_id
  37.             wmNews.setArticleId((Long) responseResult.getData());
  38.             updateWmNews(wmNews,(short) 9,"审核成功");
  39.         }
  40.     }
  41.     @Autowired
  42.     private IArticleClient articleClient;
  43.     @Autowired
  44.     private WmChannelMapper wmChannelMapper;
  45.     @Autowired
  46.     private WmUserMapper wmUserMapper;
  47.     /**
  48.      * 保存app端相关的文章数据
  49.      * @param wmNews
  50.      */
  51.     private ResponseResult saveAppArticle(WmNews wmNews) {
  52.         ArticleDto dto = new ArticleDto();
  53.         //属性的拷贝
  54.         BeanUtils.copyProperties(wmNews,dto);
  55.         //文章的布局
  56.         dto.setLayout(wmNews.getType());
  57.         //频道
  58.         WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
  59.         if(wmChannel != null){
  60.             dto.setChannelName(wmChannel.getName());
  61.         }
  62.         //作者
  63.         dto.setAuthorId(wmNews.getUserId().longValue());
  64.         WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
  65.         if(wmUser != null){
  66.             dto.setAuthorName(wmUser.getName());
  67.         }
  68.         //设置文章id
  69.         if(wmNews.getArticleId() != null){
  70.             dto.setId(wmNews.getArticleId());
  71.         }
  72.         dto.setCreatedTime(new Date());
  73.         ResponseResult responseResult = articleClient.saveArticle(dto);
  74.         return responseResult;
  75.     }
  76.     @Autowired
  77.     private FileStorageService fileStorageService;
  78.     @Autowired
  79.     private GreenImageScan greenImageScan;
  80.     /**
  81.      * 审核图片
  82.      * @param images
  83.      * @param wmNews
  84.      * @return
  85.      */
  86.     private boolean handleImageScan(List<String> images, WmNews wmNews) {
  87.         boolean flag = true;
  88.         if(images == null || images.size() == 0){
  89.             return flag;
  90.         }
  91.         //下载图片 minIO
  92.         //图片去重
  93.         images = images.stream().distinct().collect(Collectors.toList());
  94.         List<byte[]> imageList = new ArrayList<>();
  95.         for (String image : images) {
  96.             byte[] bytes = fileStorageService.downLoadFile(image);
  97.             imageList.add(bytes);
  98.         }
  99.         //审核图片
  100.         try {
  101.             Map map = greenImageScan.imageScan(imageList);
  102.             if(map != null){
  103.                 //审核失败
  104.                 if(map.get("suggestion").equals("block")){
  105.                     flag = false;
  106.                     updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  107.                 }
  108.                 //不确定信息  需要人工审核
  109.                 if(map.get("suggestion").equals("review")){
  110.                     flag = false;
  111.                     updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  112.                 }
  113.             }
  114.         } catch (Exception e) {
  115.             flag = false;
  116.             e.printStackTrace();
  117.         }
  118.         return flag;
  119.     }
  120.     @Autowired
  121.     private GreenTextScan greenTextScan;
  122.     /**
  123.      * 审核纯文本内容
  124.      * @param content
  125.      * @param wmNews
  126.      * @return
  127.      */
  128.     private boolean handleTextScan(String content, WmNews wmNews) {
  129.         boolean flag = true;
  130.         if((wmNews.getTitle()+"-"+content).length() == 0){
  131.             return flag;
  132.         }
  133.         try {
  134.             Map map = greenTextScan.greeTextScan((wmNews.getTitle()+"-"+content));
  135.             if(map != null){
  136.                 //审核失败
  137.                 if(map.get("suggestion").equals("block")){
  138.                     flag = false;
  139.                     updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  140.                 }
  141.                 //不确定信息  需要人工审核
  142.                 if(map.get("suggestion").equals("review")){
  143.                     flag = false;
  144.                     updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  145.                 }
  146.             }
  147.         } catch (Exception e) {
  148.             flag = false;
  149.             e.printStackTrace();
  150.         }
  151.         return flag;
  152.     }
  153.     /**
  154.      * 修改文章内容
  155.      * @param wmNews
  156.      * @param status
  157.      * @param reason
  158.      */
  159.     private void updateWmNews(WmNews wmNews, short status, String reason) {
  160.         wmNews.setStatus(status);
  161.         wmNews.setReason(reason);
  162.         wmNewsMapper.updateById(wmNews);
  163.     }
  164.     /**
  165.      * 1。从自媒体文章的内容中提取文本和图片
  166.      * 2.提取文章的封面图片
  167.      * @param wmNews
  168.      * @return
  169.      */
  170.     private Map<String, Object> handleTextAndImages(WmNews wmNews) {
  171.         //存储纯文本内容
  172.         StringBuilder stringBuilder = new StringBuilder();
  173.         List<String> images = new ArrayList<>();
  174.         //1。从自媒体文章的内容中提取文本和图片
  175.         if(StringUtils.isNotBlank(wmNews.getContent())){
  176.             List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
  177.             for (Map map : maps) {
  178.                 if (map.get("type").equals("text")){
  179.                     stringBuilder.append(map.get("value"));
  180.                 }
  181.                 if (map.get("type").equals("image")){
  182.                     images.add((String) map.get("value"));
  183.                 }
  184.             }
  185.         }
  186.         //2.提取文章的封面图片
  187.         if(StringUtils.isNotBlank(wmNews.getImages())){
  188.             String[] split = wmNews.getImages().split(",");
  189.             images.addAll(Arrays.asList(split));
  190.         }
  191.         Map<String, Object> resultMap = new HashMap<>();
  192.         resultMap.put("content",stringBuilder.toString());
  193.         resultMap.put("images",images);
  194.         return resultMap;
  195.     }
  196. }
复制代码
4.3)单元测试

  1. package com.heima.wemedia.service;
  2. import com.heima.wemedia.WemediaApplication;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. import static org.junit.Assert.*;
  9. @SpringBootTest(classes = WemediaApplication.class)
  10. @RunWith(SpringRunner.class)
  11. public class WmNewsAutoScanServiceTest {
  12.     @Autowired
  13.     private WmNewsAutoScanService wmNewsAutoScanService;
  14.     @Test
  15.     public void autoScanWmNews() {
  16.         wmNewsAutoScanService.autoScanWmNews(6238);
  17.     }
  18. }
复制代码
4.4)feign长途接口调用方式

              
           在heima-leadnews-wemedia服务中已经依靠了heima-leadnews-feign-apis工程,只需要在自媒体的引导类中开启feign的长途调用即可
  注解为:@EnableFeignClients(basePackages = "com.heima.apis") 需要指向apis这个包
              
           4.5)服务降级处理

              
           

  • 服务降级是服务自我保护的一种方式,大概保护下游服务的一种方式,用于确保服务不会受哀求突增影响变得不可用,确保服务不会瓦解
  

  • 服务降级虽然会导致哀求失败,但是不会导致壅闭。
  实现步骤:
  ①:在heima-leadnews-feign-api编写降级逻辑
  1. package com.heima.apis.article.fallback;
  2. import org.springframework.stereotype.Component;
  3. /**
  4. * feign失败配置
  5. * @author itheima
  6. */
  7. @Component
  8. public class IArticleClientFallback implements IArticleClient {
  9.     @Override
  10.     public ResponseResult saveArticle(ArticleDto dto)  {
  11.         return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败");
  12.     }
  13. }
复制代码
在自媒体微服务中添加类,扫描降级代码类的包
  1. package com.heima.wemedia.config;
  2. import org.springframework.context.annotation.ComponentScan;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. @ComponentScan("com.heima.apis.article.fallback")
  6. public class InitConfig {
  7. }
复制代码
②:长途接口中指向降级代码
  1. package com.heima.apis.article;
  2. import org.springframework.web.bind.annotation.RequestBody;
  3. @FeignClient(value = "leadnews-article",fallback = IArticleClientFallback.class)
  4. public interface IArticleClient {
  5.     @PostMapping("/api/v1/article/save")
  6.     public ResponseResult saveArticle(@RequestBody ArticleDto dto);
  7. }
复制代码
③:客户端开启降级heima-leadnews-wemedia
  在wemedia的nacos配置中心里添加如下内容,开启服务降级,也可以指定服务相应的超时的时间
  1. feign:
  2.   # 开启feign对hystrix熔断降级的支持
  3.   hystrix:
  4.     enabled: true
  5.   # 修改调用超时时间
  6.   client:
  7.     config:
  8.       default:
  9.         connectTimeout: 2000
  10.         readTimeout: 2000
复制代码
④:测试
  在ApArticleServiceImpl类中saveArticle方法添加代码
  1. try {
  2.     Thread.sleep(3000);
  3. } catch (InterruptedException e) {
  4.     e.printStackTrace();
  5. }
复制代码
在自媒体端举行审核测试,会出现服务降级的现象
  5)发布文章提交审核集成

  5.1)同步调用与异步调用

  同步:就是在发出一个调用时,在没有得到结果之前, 该调用就不返回(实时处理)
  异步:调用在发出之后,这个调用就直接返回了,没有返回结果(分时处理)
              
           异步线程的方式审核文章
  5.2)Springboot集成异步线程调用

  ①:在主动审核的方法上加上@Async注解(标明要异步调用)
  1. @Override
  2. @Async  //标明当前方法是一个异步方法
  3. public void autoScanWmNews(Integer id) {
  4.         //代码略
  5. }
复制代码
②:在文章发布成功后调用审核的方法
  1. @Autowired
  2. private WmNewsAutoScanService wmNewsAutoScanService;
  3. /**
  4. * 发布修改文章或保存为草稿
  5. * @param dto
  6. * @return
  7. */
  8. @Override
  9. public ResponseResult submitNews(WmNewsDto dto) {
  10.     //代码略
  11.     //审核文章
  12.     wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  13.     return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  14. }
复制代码
③:在自媒体引导类中使用@EnableAsync注解开启异步调用
  1. @SpringBootApplication
  2. @EnableDiscoveryClient
  3. @MapperScan("com.heima.wemedia.mapper")
  4. @EnableFeignClients(basePackages = "com.heima.apis")
  5. @EnableAsync  //开启异步调用
  6. public class WemediaApplication {
  7.     public static void main(String[] args) {
  8.         SpringApplication.run(WemediaApplication.class,args);
  9.     }
  10.     @Bean
  11.     public MybatisPlusInterceptor mybatisPlusInterceptor() {
  12.         MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
  13.         interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
  14.         return interceptor;
  15.     }
  16. }
复制代码
6)文章审核功能-综合测试

  6.1)服务启动列表

  1,nacos服务端
  2,article微服务
  3,wemedia微服务
  4,启动wemedia网关微服务
  5,启动前端体系wemedia
  6.2)测试情况列表

  1,自媒体前端发布一篇正常的文章
  审核成功后,app端的article相关数据是否可以正常生存,自媒体文章状态和app端文章id是否回显
  2,自媒体前端发布一篇包含敏感词的文章
  正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常生存
  3,自媒体前端发布一篇包含敏感图片的文章
  正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常生存
  7)新需求-自管理敏感词

  7.1)需求分析

  文章审核功能已经交付了,文章也能正常发布审核。突然,产品经理过来说要开会。
  会议的内容焦点有以下内容:
  

  • 文章审核不能过滤一些敏感词
  私家侦探、针孔摄象、名誉卡提现、广告署理、代开发票、刻章办、出售答案、小额贷款…
  需要完成的功能:
  需要自己维护一套敏感词,在文章审核的时间,需要验证文章是否包含这些敏感词
  7.2)敏感词-过滤

  技能选型
     
方案

说明

数据库模糊查询

服从太低

String.indexOf("")查找

数据库量大的话也是比较慢

全文检索

分词再匹配

DFA算法

确定有穷主动机(一种数据结构)

    7.3)DFA实现原理

  DFA全称为:Deterministic Finite Automaton,即确定有穷主动机
  存储:一次性的把所有的敏感词存储到了多个map中,就是下图表示这种结构
  敏感词:冰毒、大麻、大坏蛋
              
           检索的过程
              
           7.4)自管理敏感词集成到文章审核中

  ①:创建敏感词表,导入资料中wm_sensitive到leadnews_wemedia库中
              
         
  1. package com.heima.model.wemedia.pojos;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * <p>
  6. * 敏感词信息表
  7. * </p>
  8. *
  9. * @author itheima
  10. */
  11. @Data
  12. @TableName("wm_sensitive")
  13. public class WmSensitive implements Serializable {
  14.     private static final long serialVersionUID = 1L;
  15.     /**
  16.      * 主键
  17.      */
  18.     @TableId(value = "id", type = IdType.AUTO)
  19.     private Integer id;
  20.     /**
  21.      * 敏感词
  22.      */
  23.     @TableField("sensitives")
  24.     private String sensitives;
  25.     /**
  26.      * 创建时间
  27.      */
  28.     @TableField("created_time")
  29.     private Date createdTime;
  30. }
复制代码
②:拷贝对应的wm_sensitive的mapper到项目中
  1. package com.heima.wemedia.mapper;
  2. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  3. import com.heima.model.wemedia.pojos.WmSensitive;
  4. import org.apache.ibatis.annotations.Mapper;
  5. @Mapper
  6. public interface WmSensitiveMapper extends BaseMapper<WmSensitive> {
  7. }
复制代码
③:在文章审核的代码中添加自管理敏感词审核
  第一:在WmNewsAutoScanServiceImpl中的autoScanWmNews方法上添加如下代码
  1. //从内容中提取纯文本内容和图片
  2. //.....省略
  3. //自管理的敏感词过滤
  4. boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
  5. if(!isSensitive) return;
  6. //2.审核文本内容  阿里云接口
  7. //.....省略
复制代码
测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+

  
//自管理的敏感词过滤

  
boolean isSensitive = handleSensitiveScan(

  
wmNews.getTitle()+textAndImages.get("content"), wmNews);

  新增自管理敏感词审核代码
  1. @Autowired
  2. private WmSensitiveMapper wmSensitiveMapper;
  3. /**
  4.      * 自管理的敏感词审核
  5.      * @param content
  6.      * @param wmNews
  7.      * @return
  8.      */
  9. private boolean handleSensitiveScan(String content, WmNews wmNews) {
  10.     boolean flag = true;
  11.     //获取所有的敏感词
  12.     List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
  13.     List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());
  14.     //初始化敏感词库
  15.     SensitiveWordUtil.initMap(sensitiveList);
  16.     //查看文章中是否包含敏感词
  17.     Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
  18.     if(map.size() >0){
  19.         updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
  20.         flag = false;
  21.     }
  22.     return flag;
  23. }
复制代码
8)新需求-图片识别文字审核敏感词

  8.1)需求分析

  产品经理调集开会,文章审核功能已经交付了,文章也能正常发布审核。对于前次提出的自管理敏感词也很满意,这次会议焦点的内容如下:
  

  • 文章中包含的图片要识别文字,过滤掉图片文字的敏感词
              
           8.2)图片文字识别

  什么是OCR?
  OCR (Optical Character Recognition,光学字符识别)是指电子装备(比方扫描仪或数码相机)查抄纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成盘算机文字的过程
     
方案
说明
百度OCR
收费
Tesseract-OCR
Google维护的开源OCR引擎,支持Java,Python等语言调用
Tess4J
封装了Tesseract-OCR ,支持Java调用
               
           8 .3)Tess4j案例

  ①:创建项目导入tess4j对应的依靠
  1. <dependency>
  2.     <groupId>net.sourceforge.tess4j</groupId>
  3.     <artifactId>tess4j</artifactId>
  4.     <version>4.1.1</version>
  5. </dependency>
复制代码
②:导入中文字体库, 把资料中的tessdata文件夹拷贝到自己的工作空间下
              
           ③:编写测试类举行测试
  1. package com.heima.tess4j;
  2. import net.sourceforge.tess4j.ITesseract;
  3. import net.sourceforge.tess4j.Tesseract;
  4. import java.io.File;
  5. public class Application {
  6.     public static void main(String[] args) {
  7.         try {
  8.             //获取本地图片
  9.             File file = new File("D:\\26.png");
  10.             //创建Tesseract对象
  11.             ITesseract tesseract = new Tesseract();
  12.             //设置字体库路径
  13.             tesseract.setDatapath("D:\\workspace\\tessdata");
  14.             //中文识别
  15.             tesseract.setLanguage("chi_sim");
  16.             //执行ocr识别
  17.             String result = tesseract.doOCR(file);
  18.             //替换回车和tal键  使结果为一行
  19.             result = result.replaceAll("\\r|\\n","-").replaceAll(" ","");
  20.             System.out.println("识别的结果为:"+result);
  21.         } catch (Exception e) {
  22.             e.printStackTrace();
  23.         }
  24.     }
  25. }
复制代码
8.4)管理敏感词和图片文字识别集成到文章审核

  ①:在heima-leadnews-common中创建工具类,简单封装一下tess4j
  需要先导入pom
  1. <dependency>
  2.     <groupId>net.sourceforge.tess4j</groupId>
  3.     <artifactId>tess4j</artifactId>
  4.     <version>4.1.1</version>
  5. </dependency>
复制代码
工具类
  1. package com.heima.common.tess4j;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import net.sourceforge.tess4j.ITesseract;
  5. import net.sourceforge.tess4j.Tesseract;
  6. import net.sourceforge.tess4j.TesseractException;
  7. import org.springframework.boot.context.properties.ConfigurationProperties;
  8. import org.springframework.stereotype.Component;
  9. import java.awt.image.BufferedImage;
  10. @Getter
  11. @Setter
  12. @Component
  13. @ConfigurationProperties(prefix = "tess4j")
  14. public class Tess4jClient {
  15.     private String dataPath;
  16.     private String language;
  17.     public String doOCR(BufferedImage image) throws TesseractException {
  18.         //创建Tesseract对象
  19.         ITesseract tesseract = new Tesseract();
  20.         //设置字体库路径
  21.         tesseract.setDatapath(dataPath);
  22.         //中文识别
  23.         tesseract.setLanguage(language);
  24.         //执行ocr识别
  25.         String result = tesseract.doOCR(image);
  26.         //替换回车和tal键  使结果为一行
  27.         result = result.replaceAll("\\r|\\n", "-").replaceAll(" ", "");
  28.         return result;
  29.     }
  30. }
复制代码
在spring.factories配置中添加该类,完整如下:
  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2.   com.heima.common.exception.ExceptionCatch,\
  3.   com.heima.common.swagger.SwaggerConfiguration,\
  4.   com.heima.common.swagger.Swagger2Configuration,\
  5.   com.heima.common.aliyun.GreenTextScan,\
  6.   com.heima.common.aliyun.GreenImageScan,\
  7.   com.heima.common.tess4j.Tess4jClient
复制代码
②:在heima-leadnews-wemedia中的配置中添加两个属性
  1. tess4j:
  2.   data-path: D:\workspace\tessdata
  3.   language: chi_sim
复制代码
③:在WmNewsAutoScanServiceImpl中的handleImageScan方法上添加如下代码
  1. try {
  2.     for (String image : images) {
  3.         byte[] bytes = fileStorageService.downLoadFile(image);
  4.         //图片识别文字审核---begin-----
  5.         //从byte[]转换为butteredImage
  6.         ByteArrayInputStream in = new ByteArrayInputStream(bytes);
  7.         BufferedImage imageFile = ImageIO.read(in);
  8.         //识别图片的文字
  9.         String result = tess4jClient.doOCR(imageFile);
  10.         //审核是否包含自管理的敏感词
  11.         boolean isSensitive = handleSensitiveScan(result, wmNews);
  12.         if(!isSensitive){
  13.             return isSensitive;
  14.         }
  15.         //图片识别文字审核---end-----
  16.         imageList.add(bytes);
  17.     }
  18. }catch (Exception e){
  19.     e.printStackTrace();
  20. }
复制代码
末了附上文章审核的完整代码如下:
  1. package com.heima.wemedia.service.impl;
  2. import com.alibaba.fastjson.JSONArray;
  3. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  4. import com.heima.apis.article.IArticleClient;
  5. import com.heima.common.aliyun.GreenImageScan;
  6. import com.heima.common.aliyun.GreenTextScan;
  7. import com.heima.common.tess4j.Tess4jClient;
  8. import com.heima.file.service.FileStorageService;
  9. import com.heima.model.article.dtos.ArticleDto;
  10. import com.heima.model.common.dtos.ResponseResult;
  11. import com.heima.model.wemedia.pojos.WmChannel;
  12. import com.heima.model.wemedia.pojos.WmNews;
  13. import com.heima.model.wemedia.pojos.WmSensitive;
  14. import com.heima.model.wemedia.pojos.WmUser;
  15. import com.heima.utils.common.SensitiveWordUtil;
  16. import com.heima.wemedia.mapper.WmChannelMapper;
  17. import com.heima.wemedia.mapper.WmNewsMapper;
  18. import com.heima.wemedia.mapper.WmSensitiveMapper;
  19. import com.heima.wemedia.mapper.WmUserMapper;
  20. import com.heima.wemedia.service.WmNewsAutoScanService;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.commons.lang3.StringUtils;
  23. import org.springframework.beans.BeanUtils;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.scheduling.annotation.Async;
  26. import org.springframework.stereotype.Service;
  27. import org.springframework.transaction.annotation.Transactional;
  28. import javax.imageio.ImageIO;
  29. import java.awt.image.BufferedImage;
  30. import java.io.ByteArrayInputStream;
  31. import java.util.*;
  32. import java.util.stream.Collectors;
  33. @Service
  34. @Slf4j
  35. @Transactional
  36. public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {
  37.     @Autowired
  38.     private WmNewsMapper wmNewsMapper;
  39.     /**
  40.      * 自媒体文章审核
  41.      *
  42.      * @param id 自媒体文章id
  43.      */
  44.     @Override
  45.     @Async  //标明当前方法是一个异步方法
  46.     public void autoScanWmNews(Integer id) {
  47. //        int a = 1/0;
  48.         //1.查询自媒体文章
  49.         WmNews wmNews = wmNewsMapper.selectById(id);
  50.         if (wmNews == null) {
  51.             throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
  52.         }
  53.         if (wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) {
  54.             //从内容中提取纯文本内容和图片
  55.             Map<String, Object> textAndImages = handleTextAndImages(wmNews);
  56.             //自管理的敏感词过滤
  57.             boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
  58.             if(!isSensitive) return;
  59.             //2.审核文本内容  阿里云接口
  60.             boolean isTextScan = handleTextScan((String) textAndImages.get("content"), wmNews);
  61.             if (!isTextScan) return;
  62.             //3.审核图片  阿里云接口
  63.             boolean isImageScan = handleImageScan((List<String>) textAndImages.get("images"), wmNews);
  64.             if (!isImageScan) return;
  65.             //4.审核成功,保存app端的相关的文章数据
  66.             ResponseResult responseResult = saveAppArticle(wmNews);
  67.             if (!responseResult.getCode().equals(200)) {
  68.                 throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
  69.             }
  70.             //回填article_id
  71.             wmNews.setArticleId((Long) responseResult.getData());
  72.             updateWmNews(wmNews, (short) 9, "审核成功");
  73.         }
  74.     }
  75.     @Autowired
  76.     private WmSensitiveMapper wmSensitiveMapper;
  77.     /**
  78.      * 自管理的敏感词审核
  79.      * @param content
  80.      * @param wmNews
  81.      * @return
  82.      */
  83.     private boolean handleSensitiveScan(String content, WmNews wmNews) {
  84.         boolean flag = true;
  85.         //获取所有的敏感词
  86.         List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
  87.         List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());
  88.         //初始化敏感词库
  89.         SensitiveWordUtil.initMap(sensitiveList);
  90.         //查看文章中是否包含敏感词
  91.         Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
  92.         if(map.size() >0){
  93.             updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
  94.             flag = false;
  95.         }
  96.         return flag;
  97.     }
  98.     @Autowired
  99.     private IArticleClient articleClient;
  100.     @Autowired
  101.     private WmChannelMapper wmChannelMapper;
  102.     @Autowired
  103.     private WmUserMapper wmUserMapper;
  104.     /**
  105.      * 保存app端相关的文章数据
  106.      *
  107.      * @param wmNews
  108.      */
  109.     private ResponseResult saveAppArticle(WmNews wmNews) {
  110.         ArticleDto dto = new ArticleDto();
  111.         //属性的拷贝
  112.         BeanUtils.copyProperties(wmNews, dto);
  113.         //文章的布局
  114.         dto.setLayout(wmNews.getType());
  115.         //频道
  116.         WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
  117.         if (wmChannel != null) {
  118.             dto.setChannelName(wmChannel.getName());
  119.         }
  120.         //作者
  121.         dto.setAuthorId(wmNews.getUserId().longValue());
  122.         WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
  123.         if (wmUser != null) {
  124.             dto.setAuthorName(wmUser.getName());
  125.         }
  126.         //设置文章id
  127.         if (wmNews.getArticleId() != null) {
  128.             dto.setId(wmNews.getArticleId());
  129.         }
  130.         dto.setCreatedTime(new Date());
  131.         ResponseResult responseResult = articleClient.saveArticle(dto);
  132.         return responseResult;
  133.     }
  134.     @Autowired
  135.     private FileStorageService fileStorageService;
  136.     @Autowired
  137.     private GreenImageScan greenImageScan;
  138.     @Autowired
  139.     private Tess4jClient tess4jClient;
  140.     /**
  141.      * 审核图片
  142.      *
  143.      * @param images
  144.      * @param wmNews
  145.      * @return
  146.      */
  147.     private boolean handleImageScan(List<String> images, WmNews wmNews) {
  148.         boolean flag = true;
  149.         if (images == null || images.size() == 0) {
  150.             return flag;
  151.         }
  152.         //下载图片 minIO
  153.         //图片去重
  154.         images = images.stream().distinct().collect(Collectors.toList());
  155.         List<byte[]> imageList = new ArrayList<>();
  156.         try {
  157.             for (String image : images) {
  158.                 byte[] bytes = fileStorageService.downLoadFile(image);
  159.                 //图片识别文字审核---begin-----
  160.                 //从byte[]转换为butteredImage
  161.                 ByteArrayInputStream in = new ByteArrayInputStream(bytes);
  162.                 BufferedImage imageFile = ImageIO.read(in);
  163.                 //识别图片的文字
  164.                 String result = tess4jClient.doOCR(imageFile);
  165.                 //审核是否包含自管理的敏感词
  166.                 boolean isSensitive = handleSensitiveScan(result, wmNews);
  167.                 if(!isSensitive){
  168.                     return isSensitive;
  169.                 }
  170.                 //图片识别文字审核---end-----
  171.                 imageList.add(bytes);
  172.             }
  173.         }catch (Exception e){
  174.             e.printStackTrace();
  175.         }
  176.         //审核图片
  177.         try {
  178.             Map map = greenImageScan.imageScan(imageList);
  179.             if (map != null) {
  180.                 //审核失败
  181.                 if (map.get("suggestion").equals("block")) {
  182.                     flag = false;
  183.                     updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  184.                 }
  185.                 //不确定信息  需要人工审核
  186.                 if (map.get("suggestion").equals("review")) {
  187.                     flag = false;
  188.                     updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  189.                 }
  190.             }
  191.         } catch (Exception e) {
  192.             flag = false;
  193.             e.printStackTrace();
  194.         }
  195.         return flag;
  196.     }
  197.     @Autowired
  198.     private GreenTextScan greenTextScan;
  199.     /**
  200.      * 审核纯文本内容
  201.      *
  202.      * @param content
  203.      * @param wmNews
  204.      * @return
  205.      */
  206.     private boolean handleTextScan(String content, WmNews wmNews) {
  207.         boolean flag = true;
  208.         if ((wmNews.getTitle() + "-" + content).length() == 0) {
  209.             return flag;
  210.         }
  211.         try {
  212.             Map map = greenTextScan.greeTextScan((wmNews.getTitle() + "-" + content));
  213.             if (map != null) {
  214.                 //审核失败
  215.                 if (map.get("suggestion").equals("block")) {
  216.                     flag = false;
  217.                     updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  218.                 }
  219.                 //不确定信息  需要人工审核
  220.                 if (map.get("suggestion").equals("review")) {
  221.                     flag = false;
  222.                     updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  223.                 }
  224.             }
  225.         } catch (Exception e) {
  226.             flag = false;
  227.             e.printStackTrace();
  228.         }
  229.         return flag;
  230.     }
  231.     /**
  232.      * 修改文章内容
  233.      *
  234.      * @param wmNews
  235.      * @param status
  236.      * @param reason
  237.      */
  238.     private void updateWmNews(WmNews wmNews, short status, String reason) {
  239.         wmNews.setStatus(status);
  240.         wmNews.setReason(reason);
  241.         wmNewsMapper.updateById(wmNews);
  242.     }
  243.     /**
  244.      * 1。从自媒体文章的内容中提取文本和图片
  245.      * 2.提取文章的封面图片
  246.      *
  247.      * @param wmNews
  248.      * @return
  249.      */
  250.     private Map<String, Object> handleTextAndImages(WmNews wmNews) {
  251.         //存储纯文本内容
  252.         StringBuilder stringBuilder = new StringBuilder();
  253.         List<String> images = new ArrayList<>();
  254.         //1。从自媒体文章的内容中提取文本和图片
  255.         if (StringUtils.isNotBlank(wmNews.getContent())) {
  256.             List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
  257.             for (Map map : maps) {
  258.                 if (map.get("type").equals("text")) {
  259.                     stringBuilder.append(map.get("value"));
  260.                 }
  261.                 if (map.get("type").equals("image")) {
  262.                     images.add((String) map.get("value"));
  263.                 }
  264.             }
  265.         }
  266.         //2.提取文章的封面图片
  267.         if (StringUtils.isNotBlank(wmNews.getImages())) {
  268.             String[] split = wmNews.getImages().split(",");
  269.             images.addAll(Arrays.asList(split));
  270.         }
  271.         Map<String, Object> resultMap = new HashMap<>();
  272.         resultMap.put("content", stringBuilder.toString());
  273.         resultMap.put("images", images);
  274.         return resultMap;
  275.     }
  276. }
复制代码
9)文章详情-静态文件生成

  9.1)思路分析

  文章端创建app相关文章时,生成文章详情静态页上传到MinIO中
              
           9.2)实现步骤

  1.新建ArticleFreemarkerService创建静态文件并上传到minIO中
  1. package com.heima.article.service;
  2. import com.heima.model.article.pojos.ApArticle;
  3. public interface ArticleFreemarkerService {
  4.     /**
  5.      * 生成静态文件上传到minIO中
  6.      * @param apArticle
  7.      * @param content
  8.      */
  9.     public void buildArticleToMinIO(ApArticle apArticle,String content);
  10. }
复制代码
实现
  1. package com.heima.article.service.impl;
  2. import java.util.Map;
  3. @Service
  4. @Slf4j
  5. @Transactional
  6. public class ArticleFreemarkerServiceImpl implements ArticleFreemarkerService {
  7.     @Autowired
  8.     private ApArticleContentMapper apArticleContentMapper;
  9.     @Autowired
  10.     private Configuration configuration;
  11.     @Autowired
  12.     private FileStorageService fileStorageService;
  13.     @Autowired
  14.     private ApArticleService apArticleService;
  15.     /**
  16.      * 生成静态文件上传到minIO中
  17.      * @param apArticle
  18.      * @param content
  19.      */
  20.     @Async
  21.     @Override
  22.     public void buildArticleToMinIO(ApArticle apArticle, String content) {
  23.         //已知文章的id
  24.         //4.1 获取文章内容
  25.         if(StringUtils.isNotBlank(content)){
  26.             //4.2 文章内容通过freemarker生成html文件
  27.             Template template = null;
  28.             StringWriter out = new StringWriter();
  29.             try {
  30.                 template = configuration.getTemplate("article.ftl");
  31.                 //数据模型
  32.                 Map<String,Object> contentDataModel = new HashMap<>();
  33.                 contentDataModel.put("content", JSONArray.parseArray(content));
  34.                 //合成
  35.                 template.process(contentDataModel,out);
  36.             } catch (Exception e) {
  37.                 e.printStackTrace();
  38.             }
  39.             //4.3 把html文件上传到minio中
  40.             InputStream in = new ByteArrayInputStream(out.toString().getBytes());
  41.             String path = fileStorageService.uploadHtmlFile("", apArticle.getId() + ".html", in);
  42.             //4.4 修改ap_article表,保存static_url字段
  43.             apArticleService.update(Wrappers.<ApArticle>lambdaUpdate().eq(ApArticle::getId,apArticle.getId())
  44.                     .set(ApArticle::getStaticUrl,path));
  45.         }
  46.     }
  47. }
复制代码
2.在ApArticleService的saveArticle实现方法中添加调用生成文件的方法
  1. /**
  2.      * 保存app端相关文章
  3.      * @param dto
  4.      * @return
  5.      */
  6. @Override
  7. public ResponseResult saveArticle(ArticleDto dto) {
  8.     //        try {
  9.     //            Thread.sleep(3000);
  10.     //        } catch (InterruptedException e) {
  11.     //            e.printStackTrace();
  12.     //        }
  13.     //1.检查参数
  14.     if(dto == null){
  15.         return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  16.     }
  17.     ApArticle apArticle = new ApArticle();
  18.     BeanUtils.copyProperties(dto,apArticle);
  19.     //2.判断是否存在id
  20.     if(dto.getId() == null){
  21.         //2.1 不存在id  保存  文章  文章配置  文章内容
  22.         //保存文章
  23.         save(apArticle);
  24.         //保存配置
  25.         ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
  26.         apArticleConfigMapper.insert(apArticleConfig);
  27.         //保存 文章内容
  28.         ApArticleContent apArticleContent = new ApArticleContent();
  29.         apArticleContent.setArticleId(apArticle.getId());
  30.         apArticleContent.setContent(dto.getContent());
  31.         apArticleContentMapper.insert(apArticleContent);
  32.     }else {
  33.         //2.2 存在id   修改  文章  文章内容
  34.         //修改  文章
  35.         updateById(apArticle);
  36.         //修改文章内容
  37.         ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
  38.         apArticleContent.setContent(dto.getContent());
  39.         apArticleContentMapper.updateById(apArticleContent);
  40.     }
  41.     //异步调用 生成静态文件上传到minio中
  42.     articleFreemarkerService.buildArticleToMinIO(apArticle,dto.getContent());
  43.     //3.结果返回  文章的id
  44.     return ResponseResult.okResult(apArticle.getId());
  45. }
复制代码
3.文章微服务开启异步调用
              
           
  05延迟任务精准发布文章

              
           1)文章定时发布

  2)延迟任务概述

  2.1)什么是延迟任务

  

  • 定时任务:有固定周期的,有明确的触发时间
  

  • 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
              
           应用场景:
  场景一:
  订单下单之后30分钟后,假如用户没有付钱,则体系主动取消订单;假如期间下单成功,任务取消
  场景二:接口对接出现网络问题,1分钟后重试,假如失败,2分钟重试,直到出现阈值终止
  2.2)技能对比

  2.2.1)DelayQueue

  JDK自带DelayQueue 是一个支持延时获取元素的壅闭队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才气从队列中提取元素
              
           DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
  getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列
  compareTo方法:用于排序,确定元素出队列的顺序
  实现:
  1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,
  2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,
  3:循环的从延迟队列中拉取任务
  1. public class DelayedTask  implements Delayed{
  2.    
  3.     // 任务的执行时间
  4.     private int executeTime = 0;
  5.    
  6.     public DelayedTask(int delay){
  7.         Calendar calendar = Calendar.getInstance();
  8.         calendar.add(Calendar.SECOND,delay);
  9.         this.executeTime = (int)(calendar.getTimeInMillis() /1000 );
  10.     }
  11.     /**
  12.      * 元素在队列中的剩余时间
  13.      * @param unit
  14.      * @return
  15.      */
  16.     @Override
  17.     public long getDelay(TimeUnit unit) {
  18.         Calendar calendar = Calendar.getInstance();
  19.         return executeTime - (calendar.getTimeInMillis()/1000);
  20.     }
  21.     /**
  22.      * 元素排序
  23.      * @param o
  24.      * @return
  25.      */
  26.     @Override
  27.     public int compareTo(Delayed o) {
  28.         long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
  29.         return val == 0 ? 0 : ( val < 0 ? -1: 1 );
  30.     }
  31.     public static void main(String[] args) {
  32.         DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
  33.         
  34.         queue.add(new DelayedTask(5));
  35.         queue.add(new DelayedTask(10));
  36.         queue.add(new DelayedTask(15));
  37.         System.out.println(System.currentTimeMillis()/1000+" start consume ");
  38.         while(queue.size() != 0){
  39.             DelayedTask delayedTask = queue.poll();
  40.             if(delayedTask !=null ){
  41.                 System.out.println(System.currentTimeMillis()/1000+" cosume task");
  42.             }
  43.             //每隔一秒消费一次
  44.             try {
  45.                 Thread.sleep(1000);
  46.             } catch (InterruptedException e) {
  47.                 e.printStackTrace();
  48.             }
  49.         }     
  50.     }
  51. }
复制代码
DelayQueue实现完成之后思索一个问题:
  使用线程池大概原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,怎样保证数据不丢失,需要长期化(磁盘)
  2.2.2)RabbitMQ实现延迟任务

  

  • TTL:Time To Live (消息存活时间)
  

  • 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
              
           2.2.3)redis实现

  zset数据范例的去重有序(分数排序)特点举行延迟。比方:时间戳作为score举行排序
              
                       
           3)redis实现延迟任务

  实现思路
              
           问题思路
  1.为什么任务需要存储在数据库中?
  延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据长期化的问题,存储数据库中是一种数据安全的考虑
  2.为什么redis中使用两种数据范例,list和zset?
  服从问题,算法的时间复杂度; list是双向链表
              
           3.在添加zset数据的时间,为什么不需要预加载?
  任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,假如数据量特殊大,为了防止zset壅闭,只需要把未来几分钟要执行的数据存入缓存即可
  锐评:完全为了学list zset而编出来的场景,现实工作中延迟队列要计划成这样只能说太蠢了

  现实工作绝对用MQ
  4)延迟任务服务实现

  4.1)搭建heima-leadnews-schedule模块

  leadnews-schedule是一个通用的服务,单独创建模块来管理任何范例的延迟任务
  ①:导入资料文件夹的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:
              
           ②:添加bootstrap.yml
  1. server:
  2.   port: 51701
  3. spring:
  4.   application:
  5.     name: leadnews-schedule
  6.   cloud:
  7.     nacos:
  8.       discovery:
  9.         server-addr: 192.168.200.130:8848
  10.       config:
  11.         server-addr: 192.168.200.130:8848
  12.         file-extension: yml
复制代码
③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置
  1. spring:
  2.   datasource:
  3.     driver-class-name: com.mysql.jdbc.Driver
  4.     url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
  5.     username: root
  6.     password: root
  7. # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
  8. mybatis-plus:
  9.   mapper-locations: classpath*:mapper/*.xml
  10.   # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  11.   type-aliases-package: com.heima.model.schedule.pojos
复制代码
4.2)数据库预备

  导入资料中leadnews_schedule数据库
  taskinfo 任务表
              
           实体类
  1. package com.heima.model.schedule.pojos;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * <p>
  6. *
  7. * </p>
  8. *
  9. * @author itheima
  10. */
  11. @Data
  12. @TableName("taskinfo")
  13. public class Taskinfo implements Serializable {
  14.     private static final long serialVersionUID = 1L;
  15.     /**
  16.      * 任务id
  17.      */
  18.     @TableId(type = IdType.ID_WORKER)
  19.     private Long taskId;
  20.     /**
  21.      * 执行时间
  22.      */
  23.     @TableField("execute_time")
  24.     private Date executeTime;
  25.     /**
  26.      * 参数
  27.      */
  28.     @TableField("parameters")
  29.     private byte[] parameters;
  30.     /**
  31.      * 优先级
  32.      */
  33.     @TableField("priority")
  34.     private Integer priority;
  35.     /**
  36.      * 任务类型
  37.      */
  38.     @TableField("task_type")
  39.     private Integer taskType;
  40. }
复制代码
taskinfo_logs 任务日记表
              
                       
           实体类
  1. package com.heima.model.schedule.pojos;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * <p>
  6. *
  7. * </p>
  8. *
  9. * @author itheima
  10. */
  11. @Data
  12. @TableName("taskinfo_logs")
  13. public class TaskinfoLogs implements Serializable {
  14.     private static final long serialVersionUID = 1L;
  15.     /**
  16.      * 任务id
  17.      */
  18.     @TableId(type = IdType.ID_WORKER)
  19.     private Long taskId;
  20.     /**
  21.      * 执行时间
  22.      */
  23.     @TableField("execute_time")
  24.     private Date executeTime;
  25.     /**
  26.      * 参数
  27.      */
  28.     @TableField("parameters")
  29.     private byte[] parameters;
  30.     /**
  31.      * 优先级
  32.      */
  33.     @TableField("priority")
  34.     private Integer priority;
  35.     /**
  36.      * 任务类型
  37.      */
  38.     @TableField("task_type")
  39.     private Integer taskType;
  40.     /**
  41.      * 版本号,用乐观锁
  42.      */
  43.     @Version
  44.     private Integer version;
  45.     /**
  46.      * 状态 0=int 1=EXECUTED 2=CANCELLED
  47.      */
  48.     @TableField("status")
  49.     private Integer status;
  50. }
复制代码
乐观锁/悲观锁

              
           悲观锁服从低;
              
           乐观锁支持:
  1. /**
  2.      * mybatis-plus乐观锁支持
  3.      * @return
  4.      */
  5. @Bean
  6. public MybatisPlusInterceptor optimisticLockerInterceptor(){
  7.     MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
  8.     interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
  9.     return interceptor;
  10. }
复制代码
4.3)安装redis

  ①拉取镜像
  1. docker pull redis
复制代码
② 创建容器
  1. docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
复制代码
③链接测试
  打开资料中的Redis Desktop Manager,输入host、port、password链接测试
              
           能链接成功,即可
  4.4)项目集成redis

  ① 在项目导入redis相关依靠,已经完成
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
  5. <!-- redis依赖commons-pool 这个依赖一定要添加 -->
  6. <dependency>
  7.     <groupId>org.apache.commons</groupId>
  8.     <artifactId>commons-pool2</artifactId>
  9. </dependency>
复制代码
② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis
  1. spring:
  2.   redis:
  3.     host: 192.168.200.130
  4.     password: leadnews
  5.     port: 6379
复制代码
③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加主动配置
              
           ④:测试
  1. package com.heima.schedule.test;
  2. import java.util.Set;
  3. @SpringBootTest(classes = ScheduleApplication.class)
  4. @RunWith(SpringRunner.class)
  5. public class RedisTest {
  6.     @Autowired
  7.     private CacheService cacheService;
  8.     @Test
  9.     public void testList(){
  10.         //在list的左边添加元素
  11. //        cacheService.lLeftPush("list_001","hello,redis");
  12.         //在list的右边获取元素,并删除
  13.         String list_001 = cacheService.lRightPop("list_001");
  14.         System.out.println(list_001);
  15.     }
  16.     @Test
  17.     public void testZset(){
  18.         //添加数据到zset中  分值
  19.         /*cacheService.zAdd("zset_key_001","hello zset 001",1000);
  20.         cacheService.zAdd("zset_key_001","hello zset 002",8888);
  21.         cacheService.zAdd("zset_key_001","hello zset 003",7777);
  22.         cacheService.zAdd("zset_key_001","hello zset 004",999999);*/
  23.         //按照分值获取数据
  24.         Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
  25.         System.out.println(zset_key_001);
  26.     }
  27. }
复制代码
4.5)添加任务

  ①:拷贝mybatis-plus生成的文件,mapper
  ②:创建task类,用于接收添加任务的参数
  1. package com.heima.model.schedule.dtos;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. @Data
  5. public class Task implements Serializable {
  6.     /**
  7.      * 任务id
  8.      */
  9.     private Long taskId;
  10.     /**
  11.      * 类型
  12.      */
  13.     private Integer taskType;
  14.     /**
  15.      * 优先级
  16.      */
  17.     private Integer priority;
  18.     /**
  19.      * 执行id
  20.      */
  21.     private long executeTime;
  22.     /**
  23.      * task参数
  24.      */
  25.     private byte[] parameters;
  26.    
  27. }
复制代码
③:创建TaskService
  1. package com.heima.schedule.service;
  2. import com.heima.model.schedule.dtos.Task;
  3. /**
  4. * 对外访问接口
  5. */
  6. public interface TaskService {
  7.     /**
  8.      * 添加任务
  9.      * @param task   任务对象
  10.      * @return       任务id
  11.      */
  12.     public long addTask(Task task) ;
  13. }
复制代码
实现:
  1. package com.heima.schedule.service.impl;
  2. import java.util.Calendar;
  3. import java.util.Date;
  4. @Service
  5. @Transactional
  6. @Slf4j
  7. public class TaskServiceImpl implements TaskService {
  8.     /**
  9.      * 添加延迟任务
  10.      *
  11.      * @param task
  12.      * @return
  13.      */
  14.     @Override
  15.     public long addTask(Task task) {
  16.         //1.添加任务到数据库中
  17.         boolean success = addTaskToDb(task);
  18.         if (success) {
  19.             //2.添加任务到redis
  20.             addTaskToCache(task);
  21.         }
  22.         return task.getTaskId();
  23.     }
  24.     @Autowired
  25.     private CacheService cacheService;
  26.     /**
  27.      * 把任务添加到redis中
  28.      *
  29.      * @param task
  30.      */
  31.     private void addTaskToCache(Task task) {
  32.         String key = task.getTaskType() + "_" + task.getPriority();
  33.         //获取5分钟之后的时间  毫秒值
  34.         Calendar calendar = Calendar.getInstance();
  35.         calendar.add(Calendar.MINUTE, 5);
  36.         long nextScheduleTime = calendar.getTimeInMillis();
  37.         //2.1 如果任务的执行时间小于等于当前时间,存入list
  38.         if (task.getExecuteTime() <= System.currentTimeMillis()) {
  39.             cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
  40.         } else if (task.getExecuteTime() <= nextScheduleTime) {
  41.             //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
  42.             cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
  43.         }
  44.     }
  45.     @Autowired
  46.     private TaskinfoMapper taskinfoMapper;
  47.     @Autowired
  48.     private TaskinfoLogsMapper taskinfoLogsMapper;
  49.     /**
  50.      * 添加任务到数据库中
  51.      *
  52.      * @param task
  53.      * @return
  54.      */
  55.     private boolean addTaskToDb(Task task) {
  56.         boolean flag = false;
  57.         try {
  58.             //保存任务表
  59.             Taskinfo taskinfo = new Taskinfo();
  60.             BeanUtils.copyProperties(task, taskinfo);
  61.             taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
  62.             taskinfoMapper.insert(taskinfo);
  63.             //设置taskID
  64.             task.setTaskId(taskinfo.getTaskId());
  65.             //保存任务日志数据
  66.             TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
  67.             BeanUtils.copyProperties(taskinfo, taskinfoLogs);
  68.             taskinfoLogs.setVersion(1);
  69.             taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
  70.             taskinfoLogsMapper.insert(taskinfoLogs);
  71.             flag = true;
  72.         } catch (Exception e) {
  73.             e.printStackTrace();
  74.         }
  75.         return flag;
  76.     }
  77. }
复制代码
ScheduleConstants常量类
  1. package com.heima.common.constants;
  2. public class ScheduleConstants {
  3.     //task状态
  4.     public static final int SCHEDULED=0;   //初始化状态
  5.     public static final int EXECUTED=1;       //已执行状态
  6.     public static final int CANCELLED=2;   //已取消状态
  7.     public static String FUTURE="future_";   //未来数据key前缀
  8.     public static String TOPIC="topic_";     //当前数据key前缀
  9. }
复制代码
④:测试
  4.6)取消任务

  在TaskService中添加方法
  1. /**
  2.      * 取消任务
  3.      * @param taskId        任务id
  4.      * @return              取消结果
  5.      */
  6. public boolean cancelTask(long taskId);
复制代码
实现
  1. /**
  2.      * 取消任务
  3.      * @param taskId
  4.      * @return
  5.      */
  6. @Override
  7. public boolean cancelTask(long taskId) {
  8.     boolean flag = false;
  9.     //删除任务,更新日志
  10.     Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
  11.     //删除redis的数据
  12.     if(task != null){
  13.         removeTaskFromCache(task);
  14.         flag = true;
  15.     }
  16.     return false;
  17. }
  18. /**
  19.      * 删除redis中的任务数据
  20.      * @param task
  21.      */
  22. private void removeTaskFromCache(Task task) {
  23.     String key = task.getTaskType()+"_"+task.getPriority();
  24.     if(task.getExecuteTime()<=System.currentTimeMillis()){
  25.         cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
  26.     }else {
  27.         cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
  28.     }
  29. }
  30. /**
  31.      * 删除任务,更新任务日志状态
  32.      * @param taskId
  33.      * @param status
  34.      * @return
  35.      */
  36. private Task updateDb(long taskId, int status) {
  37.     Task task = null;
  38.     try {
  39.         //删除任务
  40.         taskinfoMapper.deleteById(taskId);
  41.         TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
  42.         taskinfoLogs.setStatus(status);
  43.         taskinfoLogsMapper.updateById(taskinfoLogs);
  44.         task = new Task();
  45.         BeanUtils.copyProperties(taskinfoLogs,task);
  46.         task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
  47.     }catch (Exception e){
  48.         log.error("task cancel exception taskid={}",taskId);
  49.     }
  50.     return task;
  51. }
复制代码
测试
  4.7)消耗任务

  在TaskService中添加方法
  1. /**
  2. * 按照类型和优先级来拉取任务
  3. * @param type
  4. * @param priority
  5. * @return
  6. */
  7. public Task poll(int type,int priority);
复制代码
实现
  1. /**
  2.      * 按照类型和优先级拉取任务
  3.      * @return
  4.      */
  5. @Override
  6. public Task poll(int type,int priority) {
  7.     Task task = null;
  8.     try {
  9.         String key = type+"_"+priority;
  10.         String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
  11.         if(StringUtils.isNotBlank(task_json)){
  12.             task = JSON.parseObject(task_json, Task.class);
  13.             //更新数据库信息
  14.             updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
  15.         }
  16.     }catch (Exception e){
  17.         e.printStackTrace();
  18.         log.error("poll task exception");
  19.     }
  20.     return task;
  21. }
复制代码
4.8)未来数据定时刷新

  4.8.1)reids key值匹配

  方案1:keys 模糊匹配
  keys的模糊匹配功能很方便也很强盛,但是在生产情况需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,以是公司的redis生产情况将keys命令禁用了!redis是单线程,会被堵塞
              
           方案2:scan
  SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
              
           代码案例:
  1. @Test
  2. public void testKeys(){
  3.     Set<String> keys = cacheService.keys("future_*");
  4.     System.out.println(keys);
  5.     Set<String> scan = cacheService.scan("future_*");
  6.     System.out.println(scan);
  7. }
复制代码
4.8.2)reids管道

  平凡redis客户端和服务器交互模式 性能很低
              
           Pipeline哀求模型
              
           官方测试结果数据对比
              
           测试案例对比:
  1. //耗时6151
  2. @Test
  3. public  void testPiple1(){
  4.     long start =System.currentTimeMillis();
  5.     for (int i = 0; i <10000 ; i++) {
  6.         Task task = new Task();
  7.         task.setTaskType(1001);
  8.         task.setPriority(1);
  9.         task.setExecuteTime(new Date().getTime());
  10.         cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
  11.     }
  12.     System.out.println("耗时"+(System.currentTimeMillis()- start));
  13. }
  14. @Test
  15. public void testPiple2(){
  16.     long start  = System.currentTimeMillis();
  17.     //使用管道技术
  18.     List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
  19.         @Nullable
  20.         @Override
  21.         public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  22.             for (int i = 0; i <10000 ; i++) {
  23.                 Task task = new Task();
  24.                 task.setTaskType(1001);
  25.                 task.setPriority(1);
  26.                 task.setExecuteTime(new Date().getTime());
  27.                 redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
  28.             }
  29.             return null;
  30.         }
  31.     });
  32.     System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
  33. }
复制代码
4.8.3)未来数据定时刷新-功能完成

              
           在TaskService中添加方法
  1. @Scheduled(cron = "0 */1 * * * ?")//定时 (每分钟执行一次
  2. //{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
  3. public void refresh() {
  4.     System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
  5.     // 获取所有未来数据集合的key值
  6.     Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
  7.     for (String futureKey : futureKeys) { // future_250_250
  8.         String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
  9.         //获取该组key下当前需要消费的任务数据
  10.         Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  11.         if (!tasks.isEmpty()) {
  12.             //将这些任务数据添加到消费者队列中
  13.             cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
  14.             System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
  15.         }
  16.     }
  17. }
复制代码
引导类中添加开启任务调度注解:@EnableScheduling
  4.9)分布式锁办理集群下的方法抢占执行

  4.9.1)问题描述

  启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法
              
                       
                       
           4.9.2)分布式锁

  分布式锁:控制分布式体系有序的去对共享资源举行操纵,通过互斥来保证数据的一致性
  办理方案:
              
           4.9.3)redis分布式锁

  sexnx (SET if Not eXists)命令在指定的 key 不存在时,为 key 设置指定的值
              
           这种加锁的思路是,假如 key 不存在则为 key 设置 value,假如 key 已存在则 SETNX 命令不做任何操纵
  

  • 客户端A哀求服务器设置key的值,假如设置成功就表示加锁成功
  

  • 客户端B也去哀求服务器设置key的值,假如返回失败,那么就代表加锁失败
  

  • 客户端A执行代码完成,删除锁
  

  • 客户端B在等待一段时间后再去哀求设置key的值,设置成功
  

  • 客户端B执行代码完成,删除锁
  4.9.4)在工具类CacheService中添加方法

  1. /**
  2. * 加锁
  3. *
  4. * @param name
  5. * @param expire
  6. * @return
  7. */
  8. public String tryLock(String name, long expire) {
  9.     name = name + "_lock";
  10.     String token = UUID.randomUUID().toString();
  11.     RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
  12.     RedisConnection conn = factory.getConnection();
  13.     try {
  14.         //参考redis命令:
  15.         //set key value [EX seconds] [PX milliseconds] [NX|XX]
  16.         Boolean result = conn.set(
  17.                 name.getBytes(),
  18.                 token.getBytes(),
  19.                 Expiration.from(expire, TimeUnit.MILLISECONDS),
  20.                 RedisStringCommands.SetOption.SET_IF_ABSENT //NX
  21.         );
  22.         if (result != null && result)
  23.             return token;
  24.     } finally {
  25.         RedisConnectionUtils.releaseConnection(conn, factory,false);
  26.     }
  27.     return null;
  28. }
复制代码
修改未来数据定时刷新的方法,如下:
  1. /**
  2. * 未来数据定时刷新
  3. */
  4. @Scheduled(cron = "0 */1 * * * ?")
  5. public void refresh(){
  6.     String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
  7.     if(StringUtils.isNotBlank(token)){
  8.         log.info("未来数据定时刷新---定时任务");
  9.         //获取所有未来数据的集合key
  10.         Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
  11.         for (String futureKey : futureKeys) {//future_100_50
  12.             //获取当前数据的key  topic
  13.             String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
  14.             //按照key和分值查询符合条件的数据
  15.             Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  16.             //同步数据
  17.             if(!tasks.isEmpty()){
  18.                 cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
  19.                 log.info("成功的将"+futureKey+"刷新到了"+topicKey);
  20.             }
  21.         }
  22.     }
  23. }
复制代码
4.10)数据库同步到redis

              
         
  1. @Scheduled(cron = "0 */5 * * * ?")
  2. @PostConstruct
  3. public void reloadData() {
  4.     clearCache();
  5.     log.info("数据库数据同步到缓存");
  6.     Calendar calendar = Calendar.getInstance();
  7.     calendar.add(Calendar.MINUTE, 5);
  8.     //查看小于未来5分钟的所有任务
  9.     List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
  10.     if(allTasks != null && allTasks.size() > 0){
  11.         for (Taskinfo taskinfo : allTasks) {
  12.             Task task = new Task();
  13.             BeanUtils.copyProperties(taskinfo,task);
  14.             task.setExecuteTime(taskinfo.getExecuteTime().getTime());
  15.             addTaskToCache(task);
  16.         }
  17.     }
  18. }
  19. private void clearCache(){
  20.     // 删除缓存中未来数据集合和当前消费者队列的所有key
  21.     Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
  22.     Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
  23.     cacheService.delete(futurekeys);
  24.     cacheService.delete(topickeys);
  25. }
复制代码
5)延迟队列办理精准时间发布文章

  5.1)延迟队列服务提供对外接口

  提供长途的feign接口,在heima-leadnews-feign-api编写类如下:
  1. package com.heima.apis.schedule;
  2. import org.springframework.web.bind.annotation.RequestBody;
  3. @FeignClient("leadnews-schedule")
  4. public interface IScheduleClient {
  5.     /**
  6.      * 添加任务
  7.      * @param task   任务对象
  8.      * @return       任务id
  9.      */
  10.     @PostMapping("/api/v1/task/add")
  11.     public ResponseResult  addTask(@RequestBody Task task);
  12.     /**
  13.      * 取消任务
  14.      * @param taskId        任务id
  15.      * @return              取消结果
  16.      */
  17.     @GetMapping("/api/v1/task/cancel/{taskId}")
  18.     public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
  19.     /**
  20.      * 按照类型和优先级来拉取任务
  21.      * @param type
  22.      * @param priority
  23.      * @return
  24.      */
  25.     @GetMapping("/api/v1/task/poll/{type}/{priority}")
  26.     public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority")  int priority);
  27. }
复制代码
在heima-leadnews-schedule微服务下提供对应的实现
  1. package com.heima.schedule.feign;
  2. import org.springframework.web.bind.annotation.*;
  3. @RestController
  4. public class ScheduleClient  implements IScheduleClient {
  5.     @Autowired
  6.     private TaskService taskService;
  7.     /**
  8.      * 添加任务
  9.      * @param task 任务对象
  10.      * @return 任务id
  11.      */
  12.     @PostMapping("/api/v1/task/add")
  13.     @Override
  14.     public ResponseResult addTask(@RequestBody Task task) {
  15.         return ResponseResult.okResult(taskService.addTask(task));
  16.     }
  17.     /**
  18.      * 取消任务
  19.      * @param taskId 任务id
  20.      * @return 取消结果
  21.      */
  22.     @GetMapping("/api/v1/task/cancel/{taskId}")
  23.     @Override
  24.     public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
  25.         return ResponseResult.okResult(taskService.cancelTask(taskId));
  26.     }
  27.     /**
  28.      * 按照类型和优先级来拉取任务
  29.      * @param type
  30.      * @param priority
  31.      * @return
  32.      */
  33.     @GetMapping("/api/v1/task/poll/{type}/{priority}")
  34.     @Override
  35.     public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
  36.         return ResponseResult.okResult(taskService.poll(type,priority));
  37.     }
  38. }
复制代码
5.2)发布文章集成添加延迟队列接口

  在创建WmNewsTaskService
  1. package com.heima.wemedia.service;
  2. import com.heima.model.wemedia.pojos.WmNews;
  3. public interface WmNewsTaskService {
  4.     /**
  5.      * 添加任务到延迟队列中
  6.      * @param id  文章的id
  7.      * @param publishTime  发布的时间  可以做为任务的执行时间
  8.      */
  9.     public void addNewsToTask(Integer id, Date publishTime);
  10. }
复制代码
实现:
  1. package com.heima.wemedia.service.impl;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. @Slf4j
  5. public class WmNewsTaskServiceImpl  implements WmNewsTaskService {
  6.     @Autowired
  7.     private IScheduleClient scheduleClient;
  8.     /**
  9.      * 添加任务到延迟队列中
  10.      * @param id          文章的id
  11.      * @param publishTime 发布的时间  可以做为任务的执行时间
  12.      */
  13.     @Override
  14.     @Async
  15.     public void addNewsToTask(Integer id, Date publishTime) {
  16.         log.info("添加任务到延迟服务中----begin");
  17.         Task task = new Task();
  18.         task.setExecuteTime(publishTime.getTime());
  19.         task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
  20.         task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  21.         WmNews wmNews = new WmNews();
  22.         wmNews.setId(id);
  23.         task.setParameters(ProtostuffUtil.serialize(wmNews));
  24.         scheduleClient.addTask(task);
  25.         log.info("添加任务到延迟服务中----end");
  26.     }
  27.    
  28. }
复制代码
罗列类:
  1. package com.heima.model.common.enums;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Getter;
  4. @Getter
  5. @AllArgsConstructor
  6. public enum TaskTypeEnum {
  7.     NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
  8.     REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
  9.     private final int taskType; //对应具体业务
  10.     private final int priority; //业务不同级别
  11.     private final String desc; //描述信息
  12. }
复制代码
序列化工具对比

  

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象举行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
  

  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo
  拷贝资料中的两个类到heima-leadnews-utils下
  Protostuff需要引导依靠:
  1. <dependency>
  2.     <groupId>io.protostuff</groupId>
  3.     <artifactId>protostuff-core</artifactId>
  4.     <version>1.6.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>io.protostuff</groupId>
  8.     <artifactId>protostuff-runtime</artifactId>
  9.     <version>1.6.0</version>
  10. </dependency>
复制代码
修改发布文章代码:
  把之前的异步调用修改为调用延迟任务
  1. @Autowired
  2. private WmNewsTaskService wmNewsTaskService;
  3. /**
  4.      * 发布修改文章或保存为草稿
  5.      * @param dto
  6.      * @return
  7.      */
  8. @Override
  9. public ResponseResult submitNews(WmNewsDto dto) {
  10.     //0.条件判断
  11.     if(dto == null || dto.getContent() == null){
  12.         return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  13.     }
  14.     //1.保存或修改文章
  15.     WmNews wmNews = new WmNews();
  16.     //属性拷贝 属性名词和类型相同才能拷贝
  17.     BeanUtils.copyProperties(dto,wmNews);
  18.     //封面图片  list---> string
  19.     if(dto.getImages() != null && dto.getImages().size() > 0){
  20.         //[1dddfsd.jpg,sdlfjldk.jpg]-->   1dddfsd.jpg,sdlfjldk.jpg
  21.         String imageStr = StringUtils.join(dto.getImages(), ",");
  22.         wmNews.setImages(imageStr);
  23.     }
  24.     //如果当前封面类型为自动 -1
  25.     if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
  26.         wmNews.setType(null);
  27.     }
  28.     saveOrUpdateWmNews(wmNews);
  29.     //2.判断是否为草稿  如果为草稿结束当前方法
  30.     if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
  31.         return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  32.     }
  33.     //3.不是草稿,保存文章内容图片与素材的关系
  34.     //获取到文章内容中的图片信息
  35.     List<String> materials =  ectractUrlInfo(dto.getContent());
  36.     saveRelativeInfoForContent(materials,wmNews.getId());
  37.     //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
  38.     saveRelativeInfoForCover(dto,wmNews,materials);
  39.     //审核文章
  40.     //        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  41.     wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
  42.     return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  43. }
复制代码
5.3)消耗任务举行审核文章

  WmNewsTaskService中添加方法
  1. /**
  2. * 消费延迟队列数据
  3. */
  4. public void scanNewsByTask();
复制代码
实现
  1. @Autowired
  2. private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
  3. /**
  4.      * 消费延迟队列数据
  5.      */
  6. @Scheduled(fixedRate = 1000)
  7. @Override
  8. @SneakyThrows
  9. public void scanNewsByTask() {
  10.     log.info("文章审核---消费任务执行---begin---");
  11.     ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  12.     if(responseResult.getCode().equals(200) && responseResult.getData() != null){
  13.         String json_str = JSON.toJSONString(responseResult.getData());
  14.         Task task = JSON.parseObject(json_str, Task.class);
  15.         byte[] parameters = task.getParameters();
  16.         WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
  17.         System.out.println(wmNews.getId()+"-----------");
  18.         wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  19.     }
  20.     log.info("文章审核---消费任务执行---end---");
  21. }
复制代码
在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling
  
  06kafka及异步关照文章上下架

              
           1)自媒体文章上下架

  需求分析
              
                       
           2)kafka概述

  消息中心件对比
              
           消息中心件对比-选择建议

     
消息中心件
建议
Kafka
追求高吞吐量,得当产生大量数据的互联网服务的数据收集业务
RocketMQ
可靠性要求很高的金互联网范畴,稳定性高,经历了多次阿里双11考验
RabbitMQ
性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ
    kafka介绍
  Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息通报体系
  kafka官网:Apache Kafka
              
           kafka介绍-名词表明

              
           
  

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  

  • consumer:订阅消息并处剃头布的消息的对象称之为主题消耗者(consumers)
  

  • broker:已发布的消息生存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个署理(Broker)。 消耗者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消耗这些已发布的消息。
  3)kafka安装配置

  Kafka对于zookeeper是强依靠,生存kafka相关的节点数据,以是安装Kafka之前必须先安装zookeeper
  

  • Docker安装zookeeper
  下载镜像:
  1. docker pull zookeeper:3.4.14
复制代码
创建容器
  1. docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
复制代码


  • Docker安装kafka
  下载镜像:
  1. docker pull wurstmeister/kafka:2.12-2.3.1
复制代码
创建容器
  1. docker run -d --name kafka \
  2. --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
  3. --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
  4. --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
  5. --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
  6. --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
  7. --net=host wurstmeister/kafka:2.12-2.3.1
复制代码
            
           云主机无法使用--net
  4)kafka入门

              
           

  • 生产者发送消息,多个消耗者只能有一个消耗者接收到消息
  

  • 生产者发送消息,多个消耗者都可以接收到消息
  (1)创建kafka-demo项目,导入依靠
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4. </dependency>
复制代码
(2)生产者发送消息
  1. package com.heima.kafka.sample;
  2. import java.util.Properties;
  3. /**
  4. * 生产者
  5. */
  6. public class ProducerQuickStart {
  7.     public static void main(String[] args) {
  8.         //1.kafka的配置信息
  9.         Properties properties = new Properties();
  10.         //kafka的连接地址
  11.         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
  12.         //发送失败,失败的重试次数
  13.         properties.put(ProducerConfig.RETRIES_CONFIG,5);
  14.         //消息key的序列化器
  15.         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  16.         //消息value的序列化器
  17.         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  18.         //2.生产者对象
  19.         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  20.         //封装发送的消息
  21.         ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
  22.         //3.发送消息
  23.         producer.send(record);
  24.         //4.关闭消息通道,必须关闭,否则消息发送不成功
  25.         producer.close();
  26.     }
  27. }
复制代码
(3)消耗者接收消息
  1. package com.heima.kafka.sample;
  2. import java.util.Properties;
  3. /**
  4. * 消费者
  5. */
  6. public class ConsumerQuickStart {
  7.     public static void main(String[] args) {
  8.         //1.添加kafka的配置信息
  9.         Properties properties = new Properties();
  10.         //kafka的连接地址
  11.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
  12.         //消费者组
  13.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
  14.         //消息的反序列化器
  15.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  16.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  17.         //2.消费者对象
  18.         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  19.         //3.订阅主题
  20.         consumer.subscribe(Collections.singletonList("itheima-topic"));
  21.         //当前线程一直处于监听状态
  22.         while (true) {
  23.             //4.获取消息
  24.             ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  25.             for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  26.                 System.out.println(consumerRecord.key());
  27.                 System.out.println(consumerRecord.value());
  28.             }
  29.         }
  30.     }
  31. }
复制代码
总结
  

  • 生产者发送消息,多个消耗者订阅同一个主题,只能有一个消耗者收到消息(一对一)同一个组
  

  • 生产者发送消息,多个消耗者订阅同一个主题,所有消耗者都能收到消息(一对多)多个组
  分区机制—topic剖析

              
                       
                       
           5)kafka高可用计划

  5.1)集群

              
           

  • Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成
  

  • 这样假如集群中某一台呆板宕机,其他呆板上的 Broker 也依然可以大概对外提供服务。这其实就是 Kafka 提供高可用的本领之一
  5.2)备份机制(Replication)

              
           Kafka 中消息的备份又叫做 副本(Replica)
  Kafka 定义了两类副本:
  

  • 领导者副本(Leader Replica)
  

  • 跟随者副本(Follower Replica)
  备份机制—同步方式
              
           ISR(in-sync replica)需要同步复制生存的follower
  假如leader失效后,需要选出新的leader,推举的原则如下:
  第一:推举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步
  第二:假如ISR列表中的follower都不行了,就只能从其他follower中选取
  
  极度情况,就是所有副本都失效了,这时有两种方案
  第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
  第二:选择第一个活过来的Replication,不肯定是ISR中的,选为leader,以最快速率规复可用性,但数据不肯定完整
  6)kafka生产者详解

  6.1)发送范例

  

  • 同步发送
  使用send()方法发送,它会返回一个Future对象,调用get()方法举行等待,就可以知道消息是否发送成功
  1. RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
  2. System.out.println(recordMetadata.offset());
复制代码


  • 异步发送
  调用send()方法,并指定一个回调函数,服务器在返回相应时调用函数
  1. //异步消息发送
  2. producer.send(kvProducerRecord, new Callback() {
  3.     @Override
  4.     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  5.         if(e != null){
  6.             System.out.println("记录异常信息到日志表中");
  7.         }
  8.         System.out.println(recordMetadata.offset());
  9.     }
  10. });
复制代码
6.2)参数详解

  

  • ack确认机制
              
           
  代码的配置方式:
  1. //ack配置  消息确认机制
  2. prop.put(ProducerConfig.ACKS_CONFIG,"all");
复制代码
参数的选择说明
     
确认机制
说明
acks=0
生产者在成功写入消息之前不会等待(不需要)任何来自服务器的相应,消息有丢失的风险,但是速率最快
acks=1(默认值)
只要集群Leader节点收到消息,生产者就会收到一个来自服务器的成功相应
acks=all
只有当所有加入赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功相应
   

  • retries 重试次数
              
           生产者从服务器收到的错误有可能是暂时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,假如到达这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
  代码中配置方式:
  1. //重试次数
  2. prop.put(ProducerConfig.RETRIES_CONFIG,10);
复制代码


  • 消息压缩
  默认情况下, 消息发送时不会被压缩。
  代码中配置方式:
  1. //数据压缩
  2. prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
复制代码
   
压缩算法
说明
snappy
占用较少的 CPU, 却能提供较好的性能和相称可观的压缩比, 假如看重性能和网络带宽,建议采用
lz4
占用较少的 CPU, 压缩和解压缩速率较快,压缩比也很客观
gzip
占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
    使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
  7)kafka消耗者详解

  7.1)消耗者组

              
           

  • 消耗者组(Consumer Group) :指的就是由一个或多个消耗者组成的群体
  

  • 一个发布在Topic上消息被分发给此消耗者组中的一个消耗者
  

  • 所有的消耗者都在一个组中,那么这就变成了queue模型 消息队列 一对一
  

  • 所有的消耗者都在不同的组中,那么就完全变成了发布-订阅模型 一对多消耗者
  7.2)消息有序性

  应用场景:
  

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
  

  • 充值转账两个渠道在同一个时间举行余额变更,短信关照必须要有顺序
              
           topic分区中消息只能由消耗者组中的唯逐一个消耗者处理,以是消息肯定是按照先后顺序举行处理的。但是它也仅仅是保证Topic的一个分区顺序处理不能保证跨分区的消息先后处理顺序。 以是,假如你想要顺序的处理Topic的所有消息,那就只提供一个分区
  7.3)提交和偏移量

  kafka不会像其他JMS队列那样需要得到消耗者简直认,消耗者可以使用kafka来追踪消息在分区的位置(偏移量)
  消耗者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。假如消费者发生瓦解或有新的消耗者加入群组,就会触发再均衡
              
           正常的情况
              
           假如消耗者2挂掉以后,会发生再均衡,消耗者2负责的分区会被其他消耗者举行消耗
  再均衡后不可避免会出现一些问题
  问题一:
              
           假如提交偏移量2小于客户端处理的末了一个消息10的偏移量,那么处于两个偏移量之间的消息就会被重复处理
  问题二:
              
           假如提交的偏移量5大于客户端末了一个消息11的偏移量,那么处于两个偏移量之间的消息将会丢失
  假如想要办理这些问题,还要知道现在kafka提交偏移量的方式
  提交偏移量的方式有两种,分别是主动提交偏移量和手动提交
  

  • 主动提交偏移量
  当enable.auto.commit被设置为true提交方式就是让消耗者主动提交偏移量,每隔5秒消耗者会主动把从poll()方法接收的最大偏移量提交上去
  

  • 手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式
  

  • 提交当前偏移量(同步提交)
  

  • 异步提交
  

  • 同步和异步组合提交
  1.提交当前偏移量(同步提交)

  把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,以是在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
  只要没有发生不可规复的错误,commitSync()方法会一直尝试直至提交成功,假如提交失败也可以记录到错误日记里。
  1. while (true){
  2.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  3.     for (ConsumerRecord<String, String> record : records) {
  4.         System.out.println(record.value());
  5.         System.out.println(record.key());
  6.         try {
  7.             consumer.commitSync();//同步提交当前最新的偏移量
  8.         }catch (CommitFailedException e){
  9.             System.out.println("记录提交失败的异常:"+e);
  10.         }
  11.     }
  12. }
复制代码
2.异步提交

  手动提交有一个缺点,那就是当发起提交调用时应用会壅闭。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和主动提交一样)。别的一个办理办法是,使用异步提交的API :commitAsync()
  1. while (true){
  2.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  3.     for (ConsumerRecord<String, String> record : records) {
  4.         System.out.println(record.value());
  5.         System.out.println(record.key());
  6.     }
  7.     consumer.commitAsync(new OffsetCommitCallback() {
  8.         @Override
  9.         public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
  10.             if(e!=null){
  11.                 System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
  12.             }
  13.         }
  14.     });
  15. }
复制代码
3.同步和异步组合提交

  异步提交也有个缺点,那就是假如服务器返回提交失败,异步提交不会举行重试
  相比较起来,同步提交会举行重试直到成功大概末了抛出异常给应用。异步提交没有实现重试是因为,假如同时存在多个异步提交,举行重试可能会导致位移覆盖
  举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA举行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消耗
  1. try {
  2.     while (true){
  3.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  4.         for (ConsumerRecord<String, String> record : records) {
  5.             System.out.println(record.value());
  6.             System.out.println(record.key());
  7.         }
  8.         consumer.commitAsync();
  9.     }
  10. }catch (Exception e){+
  11.     e.printStackTrace();
  12.     System.out.println("记录错误信息:"+e);
  13. }finally {
  14.     try {
  15.         consumer.commitSync();
  16.     }finally {
  17.         consumer.close();
  18.     }
  19. }
复制代码
8)springboot集成kafka

  8.1)入门

  1.导入spring-kafka依靠信息
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         <artifactId>spring-boot-starter-web</artifactId>
  5.     </dependency>
  6.     <!-- kafkfa -->
  7.     <dependency>
  8.         <groupId>org.springframework.kafka</groupId>
  9.         <artifactId>spring-kafka</artifactId>
  10.         <exclusions>
  11.             <exclusion>
  12.                 <groupId>org.apache.kafka</groupId>
  13.                 <artifactId>kafka-clients</artifactId>
  14.             </exclusion>
  15.         </exclusions>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>org.apache.kafka</groupId>
  19.         <artifactId>kafka-clients</artifactId>
  20.     </dependency>
  21.     <dependency>
  22.         <groupId>com.alibaba</groupId>
  23.         <artifactId>fastjson</artifactId>
  24.     </dependency>
  25. </dependencies>
复制代码
2.在resources下创建文件application.yml
  1. server:
  2.   port: 9991
  3. spring:
  4.   application:
  5.     name: kafka-demo
  6.   kafka:
  7.     bootstrap-servers: 192.168.200.130:9092
  8.     producer:
  9.       retries: 10
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.     consumer:
  13.       group-id: ${spring.application.name}-test
  14.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码
3.消息生产者
  1. package com.heima.kafka.controller;
  2. import org.springframework.web.bind.annotation.RestController;
  3. @RestController
  4. public class HelloController {
  5.     @Autowired
  6.     private KafkaTemplate<String,String> kafkaTemplate;
  7.     @GetMapping("/hello")
  8.     public String hello(){
  9.         kafkaTemplate.send("itcast-topic","黑马程序员");
  10.         return "ok";
  11.     }
  12. }
复制代码
4.消息消耗者
  1. package com.heima.kafka.listener;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.util.StringUtils;
  5. @Component
  6. public class HelloListener {
  7.     @KafkaListener(topics = "itcast-topic")
  8.     public void onMessage(String message){
  9.         if(!StringUtils.isEmpty(message)){
  10.             System.out.println(message);
  11.         }
  12.     }
  13. }
复制代码
8.2)通报消息为对象

  现在springboot整合后的kafka,因为序列化器是StringSerializer,这个时间假如需要通报对象可以有两种方式
  方式一:可以自定义序列化器,对象范例浩繁,这种方式通用性不强,本章节不介绍
  方式二:可以把要通报的对象举行转json字符串,接收消息后再转为对象即可,本项目采用这种方式
  

  • 发送消息
  1. @GetMapping("/hello")
  2. public String hello(){
  3.     User user = new User();
  4.     user.setUsername("xiaowang");
  5.     user.setAge(18);
  6.    
  7.     kafkaTemplate.send("user-topic", JSON.toJSONString(user));
  8.     return "ok";
  9. }
复制代码


  • 接收消息
  1. package com.heima.kafka.listener;
  2. import org.springframework.util.StringUtils;
  3. @Component
  4. public class HelloListener {
  5.     @KafkaListener(topics = "user-topic")
  6.     public void onMessage(String message){
  7.         if(!StringUtils.isEmpty(message)){
  8.             User user = JSON.parseObject(message, User.class);
  9.             System.out.println(user);
  10.         }
  11.     }
  12. }
复制代码
9)自媒体文章上下架功能完成

  9.1)需求分析

              
           

  • 已发表且已上架的文章可以下架
  

  • 已发表且已下架的文章可以上架
  9.2)流程说明

              
           9.3)接口定义

     

说明
接口路径
/api/v1/news/down_or_up
哀求方式
POST
参数
DTO
相应结果
ResponseResult
    DTO
  1. @Data
  2. public class WmNewsDto {
  3.    
  4.     private Integer id;
  5.     /**
  6.     * 是否上架  0 下架  1 上架
  7.     */
  8.     private Short enable;
  9.                        
  10. }
复制代码
ResponseResult
              
           9.4)自媒体文章上下架-功能实现

  9.4.1)接口定义
  在heima-leadnews-wemedia工程下的WmNewsController新增方法
  1. @PostMapping("/down_or_up")
  2. public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
  3.     return null;
  4. }
复制代码
在WmNewsDto中新增enable属性 ,完整的代码如下:
  1. package com.heima.model.wemedia.dtos;
  2. import lombok.Data;
  3. import java.util.Date;
  4. import java.util.List;
  5. @Data
  6. public class WmNewsDto {
  7.    
  8.     private Integer id;
  9.      /**
  10.      * 标题
  11.      */
  12.     private String title;
  13.      /**
  14.      * 频道id
  15.      */
  16.     private Integer channelId;
  17.      /**
  18.      * 标签
  19.      */
  20.     private String labels;
  21.      /**
  22.      * 发布时间
  23.      */
  24.     private Date publishTime;
  25.      /**
  26.      * 文章内容
  27.      */
  28.     private String content;
  29.      /**
  30.      * 文章封面类型  0 无图 1 单图 3 多图 -1 自动
  31.      */
  32.     private Short type;
  33.      /**
  34.      * 提交时间
  35.      */
  36.     private Date submitedTime;
  37.      /**
  38.      * 状态 提交为1  草稿为0
  39.      */
  40.     private Short status;
  41.      
  42.      /**
  43.      * 封面图片列表 多张图以逗号隔开
  44.      */
  45.     private List<String> images;
  46.     /**
  47.      * 上下架 0 下架  1 上架
  48.      */
  49.     private Short enable;
  50. }
复制代码
9.4.2)业务层编写
  在WmNewsService新增方法
  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */
  6. public ResponseResult downOrUp(WmNewsDto dto);
复制代码
实现方法
  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */
  6. @Override
  7. public ResponseResult downOrUp(WmNewsDto dto) {
  8.     //1.检查参数
  9.     if(dto.getId() == null){
  10.         return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  11.     }
  12.     //2.查询文章
  13.     WmNews wmNews = getById(dto.getId());
  14.     if(wmNews == null){
  15.         return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
  16.     }
  17.     //3.判断文章是否已发布
  18.     if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
  19.         return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
  20.     }
  21.     //4.修改文章enable
  22.     if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
  23.         update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
  24.                 .eq(WmNews::getId,wmNews.getId()));
  25.     }
  26.     return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  27. }
复制代码
9.4.3)控制器
  1. @PostMapping("/down_or_up")
  2. public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
  3.     return wmNewsService.downOrUp(dto);
  4. }
复制代码
9.4.4)测试
  9.5)消息关照article端文章上下架

  9.5.1)在heima-leadnews-common模块下导入kafka依靠
  1. <!-- kafkfa --><dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency><dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4. </dependency>
复制代码
9.5.2)在自媒体端的nacos配置中心配置kafka的生产者
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 192.168.200.130:9092
  4.     producer:
  5.       retries: 10
  6.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
9.5.3)在自媒体端文章上下架后发送消息
  1. //发送消息,通知article端修改文章配置
  2. if(wmNews.getArticleId() != null){
  3.     Map<String,Object> map = new HashMap<>();
  4.     map.put("articleId",wmNews.getArticleId());
  5.     map.put("enable",dto.getEnable());
  6.     kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
  7. }
复制代码
常量类:
  1. public class WmNewsMessageConstants {
  2.     public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
  3. }
复制代码
9.5.4)在article端的nacos配置中心配置kafka的消耗者
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 192.168.200.130:9092
  4.     consumer:
  5.       group-id: ${spring.application.name}
  6.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码
9.5.5)在article端编写监听,接收数据
  1. package com.heima.article.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.article.service.ApArticleConfigService;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.kafka.annotation.KafkaListener;
  8. import org.springframework.stereotype.Component;
  9. import java.util.Map;
  10. @Component
  11. @Slf4j
  12. public class ArtilceIsDownListener {
  13.     @Autowired
  14.     private ApArticleConfigService apArticleConfigService;
  15.     @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
  16.     public void onMessage(String message){
  17.         if(StringUtils.isNotBlank(message)){
  18.             Map map = JSON.parseObject(message, Map.class);
  19.             apArticleConfigService.updateByMap(map);
  20.             log.info("article端文章配置修改,articleId={}",map.get("articleId"));
  21.         }
  22.     }
  23. }
复制代码
9.5.6)修改ap_article_config表的数据
  新建ApArticleConfigService
  1. package com.heima.article.service;
  2. import com.baomidou.mybatisplus.extension.service.IService;
  3. import com.heima.model.article.pojos.ApArticleConfig;
  4. import java.util.Map;
  5. public interface ApArticleConfigService extends IService<ApArticleConfig> {
  6.     /**
  7.      * 修改文章配置
  8.      * @param map
  9.      */
  10.     public void updateByMap(Map map);
  11. }
复制代码
实现类:
  1. package com.heima.article.service.impl;
  2. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  3. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  4. import com.heima.article.mapper.ApArticleConfigMapper;
  5. import com.heima.article.service.ApArticleConfigService;
  6. import com.heima.model.article.pojos.ApArticleConfig;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.transaction.annotation.Transactional;
  10. import java.util.Map;
  11. @Service
  12. @Slf4j
  13. @Transactional
  14. public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {
  15.     /**
  16.      * 修改文章配置
  17.      * @param map
  18.      */
  19.     @Override
  20.     public void updateByMap(Map map) {
  21.         //0 下架 1 上架
  22.         Object enable = map.get("enable");
  23.         boolean isDown = true;
  24.         if(enable.equals(1)){
  25.             isDown = false;
  26.         }
  27.         //修改文章配置
  28.         update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));
  29.     }
  30. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

十念

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表