十念 发表于 2024-6-23 16:01:40

《黑马头条》 内容安全 主动审核 feign 延迟任务精准发布 kafka



目录
《黑马头条》SpringBoot+SpringCloud+ Nacos等企业级微服务架构项目_黑马头条项目_软工菜鸡的博客-CSDN博客
04自媒体文章-主动审核
1)自媒体文章主动审核流程
2)内容安全第三方接口
2.1)概述
2.2)预备工作
2.3)文本内容审核接口
2.4)图片审核接口
2.5)项目集成
3)app端文章生存接口
3.1)表结构说明
3.2)分布式id
分布式id-技能选型
3.3)思路分析
3.4)feign接口
4)自媒体文章主动审核功能实现
4.1)表结构说明
4.2)实现
4.3)单元测试
4.4)feign长途接口调用方式
4.5)服务降级处理
5)发布文章提交审核集成
5.1)同步调用与异步调用
5.2)Springboot集成异步线程调用
6)文章审核功能-综合测试
6.1)服务启动列表
6.2)测试情况列表
7)新需求-自管理敏感词
7.1)需求分析
7.2)敏感词-过滤
7.3)DFA实现原理
7.4)自管理敏感词集成到文章审核中
测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+
8)新需求-图片识别文字审核敏感词
8.1)需求分析
8.2)图片文字识别
8 .3)Tess4j案例
8.4)管理敏感词和图片文字识别集成到文章审核
9)文章详情-静态文件生成
9.1)思路分析
9.2)实现步骤
05延迟任务精准发布文章
1)文章定时发布
2)延迟任务概述
2.1)什么是延迟任务
2.2)技能对比
2.2.1)DelayQueue
2.2.2)RabbitMQ实现延迟任务
2.2.3)redis实现
3)redis实现延迟任务
锐评:完全为了学list zset而编出来的场景,现实工作中延迟队列要计划成这样只能说太蠢了
4)延迟任务服务实现
4.1)搭建heima-leadnews-schedule模块
4.2)数据库预备
乐观锁/悲观锁
4.3)安装redis
4.4)项目集成redis
4.5)添加任务
4.6)取消任务
4.7)消耗任务
4.8)未来数据定时刷新
4.8.1)reids key值匹配
4.8.2)reids管道
4.8.3)未来数据定时刷新-功能完成
4.9)分布式锁办理集群下的方法抢占执行
4.9.1)问题描述
4.9.2)分布式锁
4.9.3)redis分布式锁
4.9.4)在工具类CacheService中添加方法
4.10)数据库同步到redis
5)延迟队列办理精准时间发布文章
5.1)延迟队列服务提供对外接口
5.2)发布文章集成添加延迟队列接口
序列化工具对比
5.3)消耗任务举行审核文章
06kafka及异步关照文章上下架
1)自媒体文章上下架
2)kafka概述
消息中心件对比-选择建议
kafka介绍-名词表明
3)kafka安装配置
4)kafka入门
分区机制—topic剖析
5)kafka高可用计划
5.1)集群
5.2)备份机制(Replication)
6)kafka生产者详解
6.1)发送范例
6.2)参数详解
ack确认机制
retries 重试次数
消息压缩
7)kafka消耗者详解
7.1)消耗者组
7.2)消息有序性
7.3)提交和偏移量
1.提交当前偏移量(同步提交)
2.异步提交
3.同步和异步组合提交
8)springboot集成kafka
8.1)入门
8.2)通报消息为对象
9)自媒体文章上下架功能完成
9.1)需求分析
9.2)流程说明
9.3)接口定义
9.4)自媒体文章上下架-功能实现
9.5)消息关照article端文章上下架




04自媒体文章-主动审核

            https://img-blog.csdnimg.cn/img_convert/9c6ae6bec7d841ccb1d65bf4d1973db8.png         1)自媒体文章主动审核流程

            https://img-blog.csdnimg.cn/img_convert/31e726ad421f462ab111b05449eabff7.png            1 自媒体端发布文章后,开始审核文章   
2 审核的重要是审核文章的   内容(文本内容和图片)   
3 借助   第三方提供的接口审核文本   
4 借助第三方提供的接口审核图片,由于图片存储到minIO中,需要先下载才气审核   
5 假如审核失败,则需要修改自媒体文章的状态,status:2 审核失败 status:3 转到人工审核   
6 假如审核成功,则需要在文章微服务中创建app端需要的文章    2)内容安全第三方接口

2.1)概述

内容安全是识别服务,支持对图片、视频、文本、语音等对象多样化场景检测,有效降低内容违规风险
现在很多平台都支持内容检测,如阿里云、腾讯云、百度AI、网易云等国内大型互联网公司都对外提供了API。
按照性能和收费来看,黑马头条项目使用的就是阿里云的内容安全接口,使用到了图片和文本的审核。
阿里云收费尺度:https://www.aliyun.com/price/product/?spm=a2c4g.11186623.2.10.4146401eg5oeu8#/lvwang/detail
2.2)预备工作

您在使用内容检测API之前,需要先注册阿里云账号,添加Access Key并签约云盾内容安全。
操纵步骤

[*] 前去阿里云官网注册账号。假如已有注册账号,请跳过此步骤。
进入阿里云首页后,假如没有阿里云的账户需要先举行注册,才可以举行登录。由于注册较为简单,课程和课本不在举行体现(注册可以使用多种方式,如淘宝账号、支付宝账号、微博账号等...)。
需要实名认证和活体认证。

[*] 打开云盾内容安全产品试用页面,单击立即开通,正式开通服务。
            https://img-blog.csdnimg.cn/img_convert/2dfaf53ce335424f98cde5a716b2ded9.png         内容安全控制台
            https://img-blog.csdnimg.cn/img_convert/e1f33b40411044bc973cb0311636cc65.png         
[*] 在AccessKey管理页面管理您的AccessKeyID和AccessKeySecret。
            https://img-blog.csdnimg.cn/img_convert/b89dfae0224c44ffa3e21d10de9050b1.png         管理自己的AccessKey,可以新建和删除AccessKey
            https://img-blog.csdnimg.cn/img_convert/c3d86dbc5edd46dbaa2cc3570700d482.png         查看自己的AccessKey,
AccessKey默认是隐藏的,第一次申请的时间可以生存AccessKey,点击表现,通过验证手机号后也可以查看
            https://img-blog.csdnimg.cn/img_convert/be02e544ea0949c786396e956740977f.png         2.3)文本内容审核接口

文本垃圾内容检测:怎样调用文本检测接口举行文本内容审核_内容安全-阿里云资助中心
            https://img-blog.csdnimg.cn/img_convert/8e63b70c2f914bdcb8e793ba2507ebe2.png         文本垃圾内容Java SDK: 怎样使用JavaSDK文本反垃圾接口_内容安全-阿里云资助中心
2.4)图片审核接口

图片垃圾内容检测:调用图片同步检测接口/green/image/scan审核图片内容_内容安全-阿里云资助中心
            https://img-blog.csdnimg.cn/img_convert/29675ee9a5434bc2b567e8847587579d.png         图片垃圾内容Java SDK: 怎样使用JavaSDK接口检测图片是否包含风险内容_内容安全-阿里云资助中心
2.5)项目集成

①:拷贝资料文件夹中的类到common模块下面,并添加到主动配置
包括了GreenImageScan和GreenTextScan及对应的工具类
            https://img-blog.csdnimg.cn/img_convert/a0ddb865b4c649e094c70bdcc64f2f74.png         添加到主动配置中
            https://img-blog.csdnimg.cn/img_convert/318088556f164f4a91a17d130ea57bb2.png         ②: accessKeyId和secret(需自己申请)
在heima-leadnews-wemedia中的nacos配置中心添加以下配置:
aliyun:
accessKeyId: ...
secret: ...
#aliyun.scenes=porn,terrorism,ad,qrcode,live,logo
scenes: terrorism③:在自媒体微服务中测试类中注入审核文本和图片的bean举行测试

package com.heima.wemedia;

import java.util.Arrays;
import java.util.Map;

@SpringBootTest(classes = WemediaApplication.class)
@RunWith(SpringRunner.class)
public class AliyunTest {

    @Autowired
    private GreenTextScan greenTextScan;

    @Autowired
    private GreenImageScan greenImageScan;

    @Autowired
    private FileStorageService fileStorageService;

    @Test
    public void testScanText() throws Exception {
      Map map = greenTextScan.greeTextScan("我是一个好人,冰毒");
      System.out.println(map);
    }

    @Test
    public void testScanImage() throws Exception {
      byte[] bytes = fileStorageService.downLoadFile("http://192.168.200.130:9000/leadnews/2021/04/26/ef3cbe458db249f7bd6fb4339e593e55.jpg");
      Map map = greenImageScan.imageScan(Arrays.asList(bytes));
      System.out.println(map);
    }
}我用的是 阿里云 云安全 加强版1小时,没审核出效果为null;估计是阿里 改接口了;
            https://img-blog.csdnimg.cn/img_convert/4a95dd2df2ef431e8951de6a2bd7f26c.png         图片审核页报错
java.lang.RuntimeException: upload file fail.

    at com.heima.common.aliyun.util.ClientUploader.uploadBytes(ClientUploader.java:129)
    at com.heima.common.aliyun.GreenImageScan.imageScan(GreenImageScan.java:71)
    at com.heima.wemedia.test.AliyunTest.testScanImage(AliyunTest.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestExecutionC3)app端文章生存接口

3.1)表结构说明

            https://img-blog.csdnimg.cn/img_convert/d61f710e7cf5481a84bf0ec394ea5695.png         3.2)分布式id

随着业务的增长,文章表可能要占用很大的物理存储空间,为了办理该问题,后期使用数据库分片技能。将一个数据库举行拆分,通过数据库中心件连接。假如数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。
            https://img-blog.csdnimg.cn/img_convert/0718006851834573be15b68cebd1a2e0.png         分布式id-技能选型

            https://img-blog.csdnimg.cn/img_convert/2ca8e92044fc4b1bae681dea72c68bc0.png         snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。
其焦点头脑是:使用41bit作为毫秒数,10bit作为呆板的ID(5个bit是数据中心,5个bit的呆板ID)(最多32个机房*32台呆板(也可以自己设)),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),末了还有一个符号位,永世是0(1为负数)
            https://img-blog.csdnimg.cn/img_convert/1e2aa8d907fe4539841527a6d0d45509.png         文章端相关的表都使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content
mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法
第一:在实体类中的id上加入如下配置,指定范例为id_worker

@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;第二:在application.yml文件中配置数据中心id和呆板id
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.article.pojos
global-config:
    datacenter-id: 1
    workerId: 1datacenter-id:数据中心id(取值范围:0-31) ;workerId:呆板id(取值范围:0-31)
3.3)思路分析

在文章审核成功以后需要在app的article库中新增文章数据
1.生存文章信息 ap_article
2.生存文章配置信息 ap_article_config
3.生存文章内容 ap_article_content
实现思路:
            https://img-blog.csdnimg.cn/img_convert/3b2d6eb7fe80461b806a9f32e0916480.png         3.4)feign接口

            https://img-blog.csdnimg.cn/img_convert/72bd3a342fb2409bad3a669e15938b03.png         ArticleDto

package com.heima.model.article.dtos;

import com.heima.model.article.pojos.ApArticle;
import lombok.Data;

@Data
public class ArticleDtoextends ApArticle {
    /**
   * 文章内容
   */
    private String content;
}功能实现:
①:在heima-leadnews-feign-api中新增接口
第一:线导入feign的依靠
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>第二:定义文章端的接口

package com.heima.apis.article;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient(value = "leadnews-article")
public interface IArticleClient {

    @PostMapping("/api/v1/article/save")
    public ResponseResult saveArticle(@RequestBody ArticleDto dto) ;
}②:在heima-leadnews-article中实现该方法

package com.heima.article.feign;
import java.io.IOException;

@RestController
public class ArticleClient implements IArticleClient {

    @Autowired
    private ApArticleService apArticleService;

    @Override
    @PostMapping("/api/v1/article/save")
    public ResponseResult saveArticle(@RequestBody ArticleDto dto) {
      return apArticleService.saveArticle(dto);
    }
}③:拷贝mapper
在资料文件夹中拷贝ApArticleConfigMapper类到mapper文件夹中
同时,修改ApArticleConfig类,添加如下构造函数

package com.heima.model.article.pojos;

import java.io.Serializable;

/**
* <p>
* APP已发布文章配置表
* </p>
*
* @author itheima
*/

@Data
@NoArgsConstructor
@TableName("ap_article_config")
public class ApArticleConfig implements Serializable {


    public ApArticleConfig(Long articleId){
      this.articleId = articleId;
      this.isComment = true;
      this.isForward = true;
      this.isDelete = false;
      this.isDown = false;
    }

    @TableId(value = "id",type = IdType.ID_WORKER)
    private Long id;

    /**
   * 文章id
   */
    @TableField("article_id")
    private Long articleId;

    /**
   * 是否可评论
   * true: 可以评论   1
   * false: 不可评论0
   */
    @TableField("is_comment")
    private Boolean isComment;

    /**
   * 是否转发
   * true: 可以转发   1
   * false: 不可转发0
   */
    @TableField("is_forward")
    private Boolean isForward;

    /**
   * 是否下架
   * true: 下架   1
   * false: 没有下架0
   */
    @TableField("is_down")
    private Boolean isDown;

    /**
   * 是否已删除
   * true: 删除   1
   * false: 没有删除0
   */
    @TableField("is_delete")
    private Boolean isDelete;
}④:在ApArticleService中新增方法

/**
   * 保存app端相关文章
   * @param dto
   * @return
   */
ResponseResult saveArticle(ArticleDto dto) ;实现类:

@Autowired
private ApArticleConfigMapper apArticleConfigMapper;

@Autowired
private ApArticleContentMapper apArticleContentMapper;

/**
   * 保存app端相关文章
   * @param dto
   * @return
   */
@Override
public ResponseResult saveArticle(ArticleDto dto) {
    //1.检查参数
    if(dto == null){
      return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    ApArticle apArticle = new ApArticle();
    BeanUtils.copyProperties(dto,apArticle);

    //2.判断是否存在id
    if(dto.getId() == null){
      //2.1 不存在id保存文章文章配置文章内容

      //保存文章
      save(apArticle);

      //保存配置
      ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
      apArticleConfigMapper.insert(apArticleConfig);

      //保存 文章内容
      ApArticleContent apArticleContent = new ApArticleContent();
      apArticleContent.setArticleId(apArticle.getId());
      apArticleContent.setContent(dto.getContent());
      apArticleContentMapper.insert(apArticleContent);

    }else {
      //2.2 存在id   修改文章文章内容

      //修改文章
      updateById(apArticle);

      //修改文章内容
      ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
      apArticleContent.setContent(dto.getContent());
      apArticleContentMapper.updateById(apArticleContent);
    }


    //3.结果返回文章的id
    return ResponseResult.okResult(apArticle.getId());
}⑤:测试
编写junit单元测试,或使用postman举行测试
http://localhost:51802/api/v1/article/save
{
        "id":这个id要去数据库自己找 ,
    "title":"黑马头条项目背景22222222222222",
    "authoId":1102,
    "layout":1,
    "labels":"黑马头条",
    "publishTime":"2028-03-14T11:35:49.000Z",
    "images": "http://192.168.200.130:9000/leadnews/2021/04/26/5ddbdb5c68094ce393b08a47860da275.jpg",
    "content":"22222222222222222黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景"
}4)自媒体文章主动审核功能实现

4.1)表结构说明

wm_news 自媒体文章表
            https://img-blog.csdnimg.cn/img_convert/f96d05f284db429890632c60a331884b.png         status字段:0 草稿 1 待审核 2 审核失败 3 人工审核 4 人工审核通过 8 审核通过(待发布) 9 已发布
4.2)实现

            https://img-blog.csdnimg.cn/img_convert/0021ad32e95d45398fd960ae0710a676.png         在heima-leadnews-wemedia中的service新增接口

package com.heima.wemedia.service;
public interface WmNewsAutoScanService {

    /**
   * 自媒体文章审核
   * @param id自媒体文章id
   */
    public void autoScanWmNews(Integer id);
}实现类:

package com.heima.wemedia.service.impl;
import java.util.*;
import java.util.stream.Collectors;

@Service
@Slf4j
@Transactional
public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {

    @Autowired
    private WmNewsMapper wmNewsMapper;

    /**
   * 自媒体文章审核
   *
   * @param id 自媒体文章id
   */
    @Override
    public void autoScanWmNews(Integer id) {
      //1.查询自媒体文章
      WmNews wmNews = wmNewsMapper.selectById(id);
      if(wmNews == null){
            throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
      }

      if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())){
            //从内容中提取纯文本内容和图片
            Map<String,Object> textAndImages = handleTextAndImages(wmNews);

            //2.审核文本内容阿里云接口
            boolean isTextScan = handleTextScan((String) textAndImages.get("content"),wmNews);
            if(!isTextScan)return;

            //3.审核图片阿里云接口
            boolean isImageScan =handleImageScan((List<String>) textAndImages.get("images"),wmNews);
            if(!isImageScan)return;

            //4.审核成功,保存app端的相关的文章数据
            ResponseResult responseResult = saveAppArticle(wmNews);
            if(!responseResult.getCode().equals(200)){
                throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
            }
            //回填article_id
            wmNews.setArticleId((Long) responseResult.getData());
            updateWmNews(wmNews,(short) 9,"审核成功");

      }
    }

    @Autowired
    private IArticleClient articleClient;

    @Autowired
    private WmChannelMapper wmChannelMapper;

    @Autowired
    private WmUserMapper wmUserMapper;

    /**
   * 保存app端相关的文章数据
   * @param wmNews
   */
    private ResponseResult saveAppArticle(WmNews wmNews) {

      ArticleDto dto = new ArticleDto();
      //属性的拷贝
      BeanUtils.copyProperties(wmNews,dto);
      //文章的布局
      dto.setLayout(wmNews.getType());
      //频道
      WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
      if(wmChannel != null){
            dto.setChannelName(wmChannel.getName());
      }

      //作者
      dto.setAuthorId(wmNews.getUserId().longValue());
      WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
      if(wmUser != null){
            dto.setAuthorName(wmUser.getName());
      }

      //设置文章id
      if(wmNews.getArticleId() != null){
            dto.setId(wmNews.getArticleId());
      }
      dto.setCreatedTime(new Date());

      ResponseResult responseResult = articleClient.saveArticle(dto);
      return responseResult;
    }


    @Autowired
    private FileStorageService fileStorageService;
    @Autowired
    private GreenImageScan greenImageScan;

    /**
   * 审核图片
   * @param images
   * @param wmNews
   * @return
   */
    private boolean handleImageScan(List<String> images, WmNews wmNews) {

      boolean flag = true;

      if(images == null || images.size() == 0){
            return flag;
      }

      //下载图片 minIO
      //图片去重
      images = images.stream().distinct().collect(Collectors.toList());

      List<byte[]> imageList = new ArrayList<>();

      for (String image : images) {
            byte[] bytes = fileStorageService.downLoadFile(image);
            imageList.add(bytes);
      }


      //审核图片
      try {
            Map map = greenImageScan.imageScan(imageList);
            if(map != null){
                //审核失败
                if(map.get("suggestion").equals("block")){
                  flag = false;
                  updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }

                //不确定信息需要人工审核
                if(map.get("suggestion").equals("review")){
                  flag = false;
                  updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                }
            }

      } catch (Exception e) {
            flag = false;
            e.printStackTrace();
      }
      return flag;
    }

    @Autowired
    private GreenTextScan greenTextScan;

    /**
   * 审核纯文本内容
   * @param content
   * @param wmNews
   * @return
   */
    private boolean handleTextScan(String content, WmNews wmNews) {

      boolean flag = true;

      if((wmNews.getTitle()+"-"+content).length() == 0){
            return flag;
      }

      try {
            Map map = greenTextScan.greeTextScan((wmNews.getTitle()+"-"+content));
            if(map != null){
                //审核失败
                if(map.get("suggestion").equals("block")){
                  flag = false;
                  updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }

                //不确定信息需要人工审核
                if(map.get("suggestion").equals("review")){
                  flag = false;
                  updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                }
            }
      } catch (Exception e) {
            flag = false;
            e.printStackTrace();
      }

      return flag;

    }

    /**
   * 修改文章内容
   * @param wmNews
   * @param status
   * @param reason
   */
    private void updateWmNews(WmNews wmNews, short status, String reason) {
      wmNews.setStatus(status);
      wmNews.setReason(reason);
      wmNewsMapper.updateById(wmNews);
    }

    /**
   * 1。从自媒体文章的内容中提取文本和图片
   * 2.提取文章的封面图片
   * @param wmNews
   * @return
   */
    private Map<String, Object> handleTextAndImages(WmNews wmNews) {

      //存储纯文本内容
      StringBuilder stringBuilder = new StringBuilder();

      List<String> images = new ArrayList<>();

      //1。从自媒体文章的内容中提取文本和图片
      if(StringUtils.isNotBlank(wmNews.getContent())){
            List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
            for (Map map : maps) {
                if (map.get("type").equals("text")){
                  stringBuilder.append(map.get("value"));
                }

                if (map.get("type").equals("image")){
                  images.add((String) map.get("value"));
                }
            }
      }
      //2.提取文章的封面图片
      if(StringUtils.isNotBlank(wmNews.getImages())){
            String[] split = wmNews.getImages().split(",");
            images.addAll(Arrays.asList(split));
      }

      Map<String, Object> resultMap = new HashMap<>();
      resultMap.put("content",stringBuilder.toString());
      resultMap.put("images",images);
      return resultMap;

    }
}4.3)单元测试


package com.heima.wemedia.service;

import com.heima.wemedia.WemediaApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.Assert.*;


@SpringBootTest(classes = WemediaApplication.class)
@RunWith(SpringRunner.class)
public class WmNewsAutoScanServiceTest {

    @Autowired
    private WmNewsAutoScanService wmNewsAutoScanService;

    @Test
    public void autoScanWmNews() {

      wmNewsAutoScanService.autoScanWmNews(6238);
    }
}4.4)feign长途接口调用方式

            https://img-blog.csdnimg.cn/img_convert/af44cb5296934a0dbcf6b29026ae04c2.png         在heima-leadnews-wemedia服务中已经依靠了heima-leadnews-feign-apis工程,只需要在自媒体的引导类中开启feign的长途调用即可
注解为:@EnableFeignClients(basePackages = "com.heima.apis") 需要指向apis这个包
            https://img-blog.csdnimg.cn/img_convert/d79f9feb1b3e4abfa64bcc2f0da9c143.png         4.5)服务降级处理

            https://img-blog.csdnimg.cn/img_convert/8cb570ffdb994094a5cc542b6a16e021.png         

[*] 服务降级是服务自我保护的一种方式,大概保护下游服务的一种方式,用于确保服务不会受哀求突增影响变得不可用,确保服务不会瓦解


[*] 服务降级虽然会导致哀求失败,但是不会导致壅闭。
实现步骤:
①:在heima-leadnews-feign-api编写降级逻辑

package com.heima.apis.article.fallback;
import org.springframework.stereotype.Component;

/**
* feign失败配置
* @author itheima
*/
@Component
public class IArticleClientFallback implements IArticleClient {
    @Override
    public ResponseResult saveArticle(ArticleDto dto){
      return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败");
    }
}在自媒体微服务中添加类,扫描降级代码类的包

package com.heima.wemedia.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.heima.apis.article.fallback")
public class InitConfig {
}②:长途接口中指向降级代码

package com.heima.apis.article;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(value = "leadnews-article",fallback = IArticleClientFallback.class)
public interface IArticleClient {

    @PostMapping("/api/v1/article/save")
    public ResponseResult saveArticle(@RequestBody ArticleDto dto);
}③:客户端开启降级heima-leadnews-wemedia
在wemedia的nacos配置中心里添加如下内容,开启服务降级,也可以指定服务相应的超时的时间
feign:
# 开启feign对hystrix熔断降级的支持
hystrix:
    enabled: true
# 修改调用超时时间
client:
    config:
      default:
      connectTimeout: 2000
      readTimeout: 2000④:测试
在ApArticleServiceImpl类中saveArticle方法添加代码

try {
    Thread.sleep(3000);
} catch (InterruptedException e) {
    e.printStackTrace();
}在自媒体端举行审核测试,会出现服务降级的现象
5)发布文章提交审核集成

5.1)同步调用与异步调用

同步:就是在发出一个调用时,在没有得到结果之前, 该调用就不返回(实时处理)
异步:调用在发出之后,这个调用就直接返回了,没有返回结果(分时处理)
            https://img-blog.csdnimg.cn/img_convert/086f90d90d274a9e8422301e16018222.png         异步线程的方式审核文章
5.2)Springboot集成异步线程调用

①:在主动审核的方法上加上@Async注解(标明要异步调用)

@Override
@Async//标明当前方法是一个异步方法
public void autoScanWmNews(Integer id) {
        //代码略
}②:在文章发布成功后调用审核的方法

@Autowired
private WmNewsAutoScanService wmNewsAutoScanService;

/**
* 发布修改文章或保存为草稿
* @param dto
* @return
*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {

    //代码略

    //审核文章
    wmNewsAutoScanService.autoScanWmNews(wmNews.getId());

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}③:在自媒体引导类中使用@EnableAsync注解开启异步调用

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync//开启异步调用
public class WemediaApplication {

    public static void main(String[] args) {
      SpringApplication.run(WemediaApplication.class,args);
    }

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
      MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
      interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
      return interceptor;
    }
}6)文章审核功能-综合测试

6.1)服务启动列表

1,nacos服务端
2,article微服务
3,wemedia微服务
4,启动wemedia网关微服务
5,启动前端体系wemedia
6.2)测试情况列表

1,自媒体前端发布一篇正常的文章
审核成功后,app端的article相关数据是否可以正常生存,自媒体文章状态和app端文章id是否回显
2,自媒体前端发布一篇包含敏感词的文章
正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常生存
3,自媒体前端发布一篇包含敏感图片的文章
正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常生存
7)新需求-自管理敏感词

7.1)需求分析

文章审核功能已经交付了,文章也能正常发布审核。突然,产品经理过来说要开会。
会议的内容焦点有以下内容:


[*] 文章审核不能过滤一些敏感词:
私家侦探、针孔摄象、名誉卡提现、广告署理、代开发票、刻章办、出售答案、小额贷款…
需要完成的功能:
需要自己维护一套敏感词,在文章审核的时间,需要验证文章是否包含这些敏感词
7.2)敏感词-过滤

技能选型
    方案
说明
数据库模糊查询
服从太低
String.indexOf("")查找
数据库量大的话也是比较慢
全文检索
分词再匹配
DFA算法
确定有穷主动机(一种数据结构)
    7.3)DFA实现原理

DFA全称为:Deterministic Finite Automaton,即确定有穷主动机。
存储:一次性的把所有的敏感词存储到了多个map中,就是下图表示这种结构
敏感词:冰毒、大麻、大坏蛋
            https://img-blog.csdnimg.cn/img_convert/8b379fc33c484f25a2cd0d0e431f487e.png         检索的过程
            https://img-blog.csdnimg.cn/img_convert/8180ea8fdca5440daac784279014c294.png         7.4)自管理敏感词集成到文章审核中

①:创建敏感词表,导入资料中wm_sensitive到leadnews_wemedia库中
            https://img-blog.csdnimg.cn/img_convert/a92f6657d27c4640b55cbbed05033d94.png         
package com.heima.model.wemedia.pojos;
import java.io.Serializable;
import java.util.Date;

/**
* <p>
* 敏感词信息表
* </p>
*
* @author itheima
*/
@Data
@TableName("wm_sensitive")
public class WmSensitive implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
   * 主键
   */
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    /**
   * 敏感词
   */
    @TableField("sensitives")
    private String sensitives;

    /**
   * 创建时间
   */
    @TableField("created_time")
    private Date createdTime;

}②:拷贝对应的wm_sensitive的mapper到项目中

package com.heima.wemedia.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.wemedia.pojos.WmSensitive;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface WmSensitiveMapper extends BaseMapper<WmSensitive> {
}③:在文章审核的代码中添加自管理敏感词审核
第一:在WmNewsAutoScanServiceImpl中的autoScanWmNews方法上添加如下代码

//从内容中提取纯文本内容和图片
//.....省略

//自管理的敏感词过滤
boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
if(!isSensitive) return;

//2.审核文本内容阿里云接口
//.....省略测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+

//自管理的敏感词过滤
boolean isSensitive = handleSensitiveScan(
wmNews.getTitle()+textAndImages.get("content"), wmNews);
新增自管理敏感词审核代码

@Autowired
private WmSensitiveMapper wmSensitiveMapper;

/**
   * 自管理的敏感词审核
   * @param content
   * @param wmNews
   * @return
   */
private boolean handleSensitiveScan(String content, WmNews wmNews) {

    boolean flag = true;

    //获取所有的敏感词
    List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
    List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());

    //初始化敏感词库
    SensitiveWordUtil.initMap(sensitiveList);

    //查看文章中是否包含敏感词
    Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
    if(map.size() >0){
      updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
      flag = false;
    }

    return flag;
}8)新需求-图片识别文字审核敏感词

8.1)需求分析

产品经理调集开会,文章审核功能已经交付了,文章也能正常发布审核。对于前次提出的自管理敏感词也很满意,这次会议焦点的内容如下:


[*] 文章中包含的图片要识别文字,过滤掉图片文字的敏感词
            https://img-blog.csdnimg.cn/img_convert/bf25ed672b7d4852b60c6f166e100730.png         8.2)图片文字识别

什么是OCR?
OCR (Optical Character Recognition,光学字符识别)是指电子装备(比方扫描仪或数码相机)查抄纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成盘算机文字的过程
    方案
说明
百度OCR
收费
Tesseract-OCR
Google维护的开源OCR引擎,支持Java,Python等语言调用
Tess4J
封装了Tesseract-OCR ,支持Java调用
                https://img-blog.csdnimg.cn/img_convert/6991251ce03c4280866d2944d44220f6.png         8 .3)Tess4j案例

①:创建项目导入tess4j对应的依靠
<dependency>
    <groupId>net.sourceforge.tess4j</groupId>
    <artifactId>tess4j</artifactId>
    <version>4.1.1</version>
</dependency>②:导入中文字体库, 把资料中的tessdata文件夹拷贝到自己的工作空间下
            https://img-blog.csdnimg.cn/img_convert/924ed8da589d4e46a66a2ddcb621acd7.png         ③:编写测试类举行测试

package com.heima.tess4j;

import net.sourceforge.tess4j.ITesseract;
import net.sourceforge.tess4j.Tesseract;

import java.io.File;

public class Application {

    public static void main(String[] args) {
      try {
            //获取本地图片
            File file = new File("D:\\26.png");
            //创建Tesseract对象
            ITesseract tesseract = new Tesseract();
            //设置字体库路径
            tesseract.setDatapath("D:\\workspace\\tessdata");
            //中文识别
            tesseract.setLanguage("chi_sim");
            //执行ocr识别
            String result = tesseract.doOCR(file);
            //替换回车和tal键使结果为一行
            result = result.replaceAll("\\r|\\n","-").replaceAll(" ","");
            System.out.println("识别的结果为:"+result);
      } catch (Exception e) {
            e.printStackTrace();
      }
    }
}8.4)管理敏感词和图片文字识别集成到文章审核

①:在heima-leadnews-common中创建工具类,简单封装一下tess4j
需要先导入pom
<dependency>
    <groupId>net.sourceforge.tess4j</groupId>
    <artifactId>tess4j</artifactId>
    <version>4.1.1</version>
</dependency>工具类

package com.heima.common.tess4j;

import lombok.Getter;
import lombok.Setter;
import net.sourceforge.tess4j.ITesseract;
import net.sourceforge.tess4j.Tesseract;
import net.sourceforge.tess4j.TesseractException;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.awt.image.BufferedImage;

@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "tess4j")
public class Tess4jClient {

    private String dataPath;
    private String language;

    public String doOCR(BufferedImage image) throws TesseractException {
      //创建Tesseract对象
      ITesseract tesseract = new Tesseract();
      //设置字体库路径
      tesseract.setDatapath(dataPath);
      //中文识别
      tesseract.setLanguage(language);
      //执行ocr识别
      String result = tesseract.doOCR(image);
      //替换回车和tal键使结果为一行
      result = result.replaceAll("\\r|\\n", "-").replaceAll(" ", "");
      return result;
    }

}在spring.factories配置中添加该类,完整如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.heima.common.exception.ExceptionCatch,\
com.heima.common.swagger.SwaggerConfiguration,\
com.heima.common.swagger.Swagger2Configuration,\
com.heima.common.aliyun.GreenTextScan,\
com.heima.common.aliyun.GreenImageScan,\
com.heima.common.tess4j.Tess4jClient②:在heima-leadnews-wemedia中的配置中添加两个属性
tess4j:
data-path: D:\workspace\tessdata
language: chi_sim③:在WmNewsAutoScanServiceImpl中的handleImageScan方法上添加如下代码

try {
    for (String image : images) {
      byte[] bytes = fileStorageService.downLoadFile(image);

      //图片识别文字审核---begin-----

      //从byte[]转换为butteredImage
      ByteArrayInputStream in = new ByteArrayInputStream(bytes);
      BufferedImage imageFile = ImageIO.read(in);
      //识别图片的文字
      String result = tess4jClient.doOCR(imageFile);

      //审核是否包含自管理的敏感词
      boolean isSensitive = handleSensitiveScan(result, wmNews);
      if(!isSensitive){
            return isSensitive;
      }

      //图片识别文字审核---end-----


      imageList.add(bytes);

    }
}catch (Exception e){
    e.printStackTrace();
}末了附上文章审核的完整代码如下:

package com.heima.wemedia.service.impl;

import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.heima.apis.article.IArticleClient;
import com.heima.common.aliyun.GreenImageScan;
import com.heima.common.aliyun.GreenTextScan;
import com.heima.common.tess4j.Tess4jClient;
import com.heima.file.service.FileStorageService;
import com.heima.model.article.dtos.ArticleDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.model.wemedia.pojos.WmSensitive;
import com.heima.model.wemedia.pojos.WmUser;
import com.heima.utils.common.SensitiveWordUtil;
import com.heima.wemedia.mapper.WmChannelMapper;
import com.heima.wemedia.mapper.WmNewsMapper;
import com.heima.wemedia.mapper.WmSensitiveMapper;
import com.heima.wemedia.mapper.WmUserMapper;
import com.heima.wemedia.service.WmNewsAutoScanService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.util.*;
import java.util.stream.Collectors;


@Service
@Slf4j
@Transactional
public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {

    @Autowired
    private WmNewsMapper wmNewsMapper;

    /**
   * 自媒体文章审核
   *
   * @param id 自媒体文章id
   */
    @Override
    @Async//标明当前方法是一个异步方法
    public void autoScanWmNews(Integer id) {

//      int a = 1/0;

      //1.查询自媒体文章
      WmNews wmNews = wmNewsMapper.selectById(id);
      if (wmNews == null) {
            throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
      }

      if (wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) {
            //从内容中提取纯文本内容和图片
            Map<String, Object> textAndImages = handleTextAndImages(wmNews);

            //自管理的敏感词过滤
            boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
            if(!isSensitive) return;

            //2.审核文本内容阿里云接口
            boolean isTextScan = handleTextScan((String) textAndImages.get("content"), wmNews);
            if (!isTextScan) return;

            //3.审核图片阿里云接口
            boolean isImageScan = handleImageScan((List<String>) textAndImages.get("images"), wmNews);
            if (!isImageScan) return;

            //4.审核成功,保存app端的相关的文章数据
            ResponseResult responseResult = saveAppArticle(wmNews);
            if (!responseResult.getCode().equals(200)) {
                throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
            }
            //回填article_id
            wmNews.setArticleId((Long) responseResult.getData());
            updateWmNews(wmNews, (short) 9, "审核成功");

      }
    }

    @Autowired
    private WmSensitiveMapper wmSensitiveMapper;

    /**
   * 自管理的敏感词审核
   * @param content
   * @param wmNews
   * @return
   */
    private boolean handleSensitiveScan(String content, WmNews wmNews) {

      boolean flag = true;

      //获取所有的敏感词
      List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
      List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());

      //初始化敏感词库
      SensitiveWordUtil.initMap(sensitiveList);

      //查看文章中是否包含敏感词
      Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
      if(map.size() >0){
            updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
            flag = false;
      }

      return flag;
    }

    @Autowired
    private IArticleClient articleClient;

    @Autowired
    private WmChannelMapper wmChannelMapper;

    @Autowired
    private WmUserMapper wmUserMapper;

    /**
   * 保存app端相关的文章数据
   *
   * @param wmNews
   */
    private ResponseResult saveAppArticle(WmNews wmNews) {

      ArticleDto dto = new ArticleDto();
      //属性的拷贝
      BeanUtils.copyProperties(wmNews, dto);
      //文章的布局
      dto.setLayout(wmNews.getType());
      //频道
      WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
      if (wmChannel != null) {
            dto.setChannelName(wmChannel.getName());
      }

      //作者
      dto.setAuthorId(wmNews.getUserId().longValue());
      WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
      if (wmUser != null) {
            dto.setAuthorName(wmUser.getName());
      }

      //设置文章id
      if (wmNews.getArticleId() != null) {
            dto.setId(wmNews.getArticleId());
      }
      dto.setCreatedTime(new Date());

      ResponseResult responseResult = articleClient.saveArticle(dto);
      return responseResult;

    }


    @Autowired
    private FileStorageService fileStorageService;

    @Autowired
    private GreenImageScan greenImageScan;

    @Autowired
    private Tess4jClient tess4jClient;

    /**
   * 审核图片
   *
   * @param images
   * @param wmNews
   * @return
   */
    private boolean handleImageScan(List<String> images, WmNews wmNews) {

      boolean flag = true;

      if (images == null || images.size() == 0) {
            return flag;
      }

      //下载图片 minIO
      //图片去重
      images = images.stream().distinct().collect(Collectors.toList());

      List<byte[]> imageList = new ArrayList<>();

      try {
            for (String image : images) {
                byte[] bytes = fileStorageService.downLoadFile(image);

                //图片识别文字审核---begin-----

                //从byte[]转换为butteredImage
                ByteArrayInputStream in = new ByteArrayInputStream(bytes);
                BufferedImage imageFile = ImageIO.read(in);
                //识别图片的文字
                String result = tess4jClient.doOCR(imageFile);

                //审核是否包含自管理的敏感词
                boolean isSensitive = handleSensitiveScan(result, wmNews);
                if(!isSensitive){
                  return isSensitive;
                }

                //图片识别文字审核---end-----


                imageList.add(bytes);

            }
      }catch (Exception e){
            e.printStackTrace();
      }


      //审核图片
      try {
            Map map = greenImageScan.imageScan(imageList);
            if (map != null) {
                //审核失败
                if (map.get("suggestion").equals("block")) {
                  flag = false;
                  updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }

                //不确定信息需要人工审核
                if (map.get("suggestion").equals("review")) {
                  flag = false;
                  updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                }
            }

      } catch (Exception e) {
            flag = false;
            e.printStackTrace();
      }
      return flag;
    }

    @Autowired
    private GreenTextScan greenTextScan;

    /**
   * 审核纯文本内容
   *
   * @param content
   * @param wmNews
   * @return
   */
    private boolean handleTextScan(String content, WmNews wmNews) {

      boolean flag = true;

      if ((wmNews.getTitle() + "-" + content).length() == 0) {
            return flag;
      }
      try {
            Map map = greenTextScan.greeTextScan((wmNews.getTitle() + "-" + content));
            if (map != null) {
                //审核失败
                if (map.get("suggestion").equals("block")) {
                  flag = false;
                  updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }

                //不确定信息需要人工审核
                if (map.get("suggestion").equals("review")) {
                  flag = false;
                  updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                }
            }
      } catch (Exception e) {
            flag = false;
            e.printStackTrace();
      }

      return flag;

    }

    /**
   * 修改文章内容
   *
   * @param wmNews
   * @param status
   * @param reason
   */
    private void updateWmNews(WmNews wmNews, short status, String reason) {
      wmNews.setStatus(status);
      wmNews.setReason(reason);
      wmNewsMapper.updateById(wmNews);
    }

    /**
   * 1。从自媒体文章的内容中提取文本和图片
   * 2.提取文章的封面图片
   *
   * @param wmNews
   * @return
   */
    private Map<String, Object> handleTextAndImages(WmNews wmNews) {

      //存储纯文本内容
      StringBuilder stringBuilder = new StringBuilder();

      List<String> images = new ArrayList<>();

      //1。从自媒体文章的内容中提取文本和图片
      if (StringUtils.isNotBlank(wmNews.getContent())) {
            List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
            for (Map map : maps) {
                if (map.get("type").equals("text")) {
                  stringBuilder.append(map.get("value"));
                }

                if (map.get("type").equals("image")) {
                  images.add((String) map.get("value"));
                }
            }
      }
      //2.提取文章的封面图片
      if (StringUtils.isNotBlank(wmNews.getImages())) {
            String[] split = wmNews.getImages().split(",");
            images.addAll(Arrays.asList(split));
      }

      Map<String, Object> resultMap = new HashMap<>();
      resultMap.put("content", stringBuilder.toString());
      resultMap.put("images", images);
      return resultMap;

    }
}9)文章详情-静态文件生成

9.1)思路分析

文章端创建app相关文章时,生成文章详情静态页上传到MinIO中
            https://img-blog.csdnimg.cn/img_convert/70e36c9feba54e7e908152b926a608aa.png         9.2)实现步骤

1.新建ArticleFreemarkerService创建静态文件并上传到minIO中

package com.heima.article.service;
import com.heima.model.article.pojos.ApArticle;

public interface ArticleFreemarkerService {

    /**
   * 生成静态文件上传到minIO中
   * @param apArticle
   * @param content
   */
    public void buildArticleToMinIO(ApArticle apArticle,String content);
}实现

package com.heima.article.service.impl;
import java.util.Map;

@Service
@Slf4j
@Transactional
public class ArticleFreemarkerServiceImpl implements ArticleFreemarkerService {

    @Autowired
    private ApArticleContentMapper apArticleContentMapper;

    @Autowired
    private Configuration configuration;

    @Autowired
    private FileStorageService fileStorageService;

    @Autowired
    private ApArticleService apArticleService;

    /**
   * 生成静态文件上传到minIO中
   * @param apArticle
   * @param content
   */
    @Async
    @Override
    public void buildArticleToMinIO(ApArticle apArticle, String content) {
      //已知文章的id
      //4.1 获取文章内容
      if(StringUtils.isNotBlank(content)){
            //4.2 文章内容通过freemarker生成html文件
            Template template = null;
            StringWriter out = new StringWriter();
            try {
                template = configuration.getTemplate("article.ftl");
                //数据模型
                Map<String,Object> contentDataModel = new HashMap<>();
                contentDataModel.put("content", JSONArray.parseArray(content));
                //合成
                template.process(contentDataModel,out);
            } catch (Exception e) {
                e.printStackTrace();
            }

            //4.3 把html文件上传到minio中
            InputStream in = new ByteArrayInputStream(out.toString().getBytes());
            String path = fileStorageService.uploadHtmlFile("", apArticle.getId() + ".html", in);


            //4.4 修改ap_article表,保存static_url字段
            apArticleService.update(Wrappers.<ApArticle>lambdaUpdate().eq(ApArticle::getId,apArticle.getId())
                  .set(ApArticle::getStaticUrl,path));
      }
    }
}2.在ApArticleService的saveArticle实现方法中添加调用生成文件的方法

/**
   * 保存app端相关文章
   * @param dto
   * @return
   */
@Override
public ResponseResult saveArticle(ArticleDto dto) {

    //      try {
    //            Thread.sleep(3000);
    //      } catch (InterruptedException e) {
    //            e.printStackTrace();
    //      }
    //1.检查参数
    if(dto == null){
      return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    ApArticle apArticle = new ApArticle();
    BeanUtils.copyProperties(dto,apArticle);

    //2.判断是否存在id
    if(dto.getId() == null){
      //2.1 不存在id保存文章文章配置文章内容

      //保存文章
      save(apArticle);

      //保存配置
      ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
      apArticleConfigMapper.insert(apArticleConfig);

      //保存 文章内容
      ApArticleContent apArticleContent = new ApArticleContent();
      apArticleContent.setArticleId(apArticle.getId());
      apArticleContent.setContent(dto.getContent());
      apArticleContentMapper.insert(apArticleContent);

    }else {
      //2.2 存在id   修改文章文章内容

      //修改文章
      updateById(apArticle);

      //修改文章内容
      ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
      apArticleContent.setContent(dto.getContent());
      apArticleContentMapper.updateById(apArticleContent);
    }

    //异步调用 生成静态文件上传到minio中
    articleFreemarkerService.buildArticleToMinIO(apArticle,dto.getContent());


    //3.结果返回文章的id
    return ResponseResult.okResult(apArticle.getId());
}3.文章微服务开启异步调用
            https://img-blog.csdnimg.cn/img_convert/f68f1c624b62451c84d24a51286f5332.png         05延迟任务精准发布文章

            https://img-blog.csdnimg.cn/img_convert/68901ec274b244229880e61171dd43a0.png         1)文章定时发布

2)延迟任务概述

2.1)什么是延迟任务



[*] 定时任务:有固定周期的,有明确的触发时间


[*] 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
            https://img-blog.csdnimg.cn/img_convert/96e4d595d6364da18f5f2df62ed6ad4c.png         应用场景:
场景一:
订单下单之后30分钟后,假如用户没有付钱,则体系主动取消订单;假如期间下单成功,任务取消
场景二:接口对接出现网络问题,1分钟后重试,假如失败,2分钟重试,直到出现阈值终止
2.2)技能对比

2.2.1)DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的壅闭队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才气从队列中提取元素
            https://img-blog.csdnimg.cn/img_convert/2f3261ee9cb14551bb2ddf59319fd665.png         DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
实现:
1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,
2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,
3:循环的从延迟队列中拉取任务

public class DelayedTaskimplements Delayed{
   
    // 任务的执行时间
    private int executeTime = 0;
   
    public DelayedTask(int delay){
      Calendar calendar = Calendar.getInstance();
      calendar.add(Calendar.SECOND,delay);
      this.executeTime = (int)(calendar.getTimeInMillis() /1000 );
    }

    /**
   * 元素在队列中的剩余时间
   * @param unit
   * @return
   */
    @Override
    public long getDelay(TimeUnit unit) {
      Calendar calendar = Calendar.getInstance();
      return executeTime - (calendar.getTimeInMillis()/1000);
    }

    /**
   * 元素排序
   * @param o
   * @return
   */
    @Override
    public int compareTo(Delayed o) {
      long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
      return val == 0 ? 0 : ( val < 0 ? -1: 1 );
    }


    public static void main(String[] args) {
      DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
      
      queue.add(new DelayedTask(5));
      queue.add(new DelayedTask(10));
      queue.add(new DelayedTask(15));

      System.out.println(System.currentTimeMillis()/1000+" start consume ");
      while(queue.size() != 0){
            DelayedTask delayedTask = queue.poll();
            if(delayedTask !=null ){
                System.out.println(System.currentTimeMillis()/1000+" cosume task");
            }
            //每隔一秒消费一次
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
      }   
    }
}DelayQueue实现完成之后思索一个问题:
使用线程池大概原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,怎样保证数据不丢失,需要长期化(磁盘)
2.2.2)RabbitMQ实现延迟任务



[*] TTL:Time To Live (消息存活时间)


[*] 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
            https://img-blog.csdnimg.cn/img_convert/e0aeff7a43bb4634b42b3d815ffaa954.png         2.2.3)redis实现

zset数据范例的去重有序(分数排序)特点举行延迟。比方:时间戳作为score举行排序
            https://img-blog.csdnimg.cn/img_convert/690a660243a0437dac775ddd155dd23c.png                     https://img-blog.csdnimg.cn/img_convert/d6149982be934a9dab6884949df47ea1.png         3)redis实现延迟任务

实现思路
            https://img-blog.csdnimg.cn/img_convert/376e4a56bc1d46d6b3dd1a26720e27ff.png         问题思路
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据长期化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据范例,list和zset?
服从问题,算法的时间复杂度; list是双向链表
            https://img-blog.csdnimg.cn/img_convert/3ffbbb05274547f2b76043b621e2f318.png         3.在添加zset数据的时间,为什么不需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,假如数据量特殊大,为了防止zset壅闭,只需要把未来几分钟要执行的数据存入缓存即可。
锐评:完全为了学list zset而编出来的场景,现实工作中延迟队列要计划成这样只能说太蠢了

现实工作绝对用MQ
4)延迟任务服务实现

4.1)搭建heima-leadnews-schedule模块

leadnews-schedule是一个通用的服务,单独创建模块来管理任何范例的延迟任务
①:导入资料文件夹的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:
            https://img-blog.csdnimg.cn/img_convert/75782be82c394d10b3a841bc06102e5e.png         ②:添加bootstrap.yml
server:
port: 51701
spring:
application:
    name: leadnews-schedule
cloud:
    nacos:
      discovery:
      server-addr: 192.168.200.130:8848
      config:
      server-addr: 192.168.200.130:8848
      file-extension: yml③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置
spring:
datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: root
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.schedule.pojos4.2)数据库预备

导入资料中leadnews_schedule数据库
taskinfo 任务表
            https://img-blog.csdnimg.cn/img_convert/68c0bf4db9f9463584e8883fcf32f20d.png         实体类

package com.heima.model.schedule.pojos;
import java.io.Serializable;
import java.util.Date;

/**
* <p>
*
* </p>
*
* @author itheima
*/
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
   * 任务id
   */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
   * 执行时间
   */
    @TableField("execute_time")
    private Date executeTime;

    /**
   * 参数
   */
    @TableField("parameters")
    private byte[] parameters;

    /**
   * 优先级
   */
    @TableField("priority")
    private Integer priority;

    /**
   * 任务类型
   */
    @TableField("task_type")
    private Integer taskType;


}taskinfo_logs 任务日记表
            https://img-blog.csdnimg.cn/img_convert/7a374a590cc4425db2be2c2725459670.png                     https://img-blog.csdnimg.cn/img_convert/a5c66c98cf584f1080d5c41e3128feca.png         实体类

package com.heima.model.schedule.pojos;
import java.io.Serializable;
import java.util.Date;

/**
* <p>
*
* </p>
*
* @author itheima
*/
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
   * 任务id
   */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
   * 执行时间
   */
    @TableField("execute_time")
    private Date executeTime;

    /**
   * 参数
   */
    @TableField("parameters")
    private byte[] parameters;

    /**
   * 优先级
   */
    @TableField("priority")
    private Integer priority;

    /**
   * 任务类型
   */
    @TableField("task_type")
    private Integer taskType;

    /**
   * 版本号,用乐观锁
   */
    @Version
    private Integer version;

    /**
   * 状态 0=int 1=EXECUTED 2=CANCELLED
   */
    @TableField("status")
    private Integer status;


}乐观锁/悲观锁

            https://img-blog.csdnimg.cn/img_convert/e58e0188efcb4420943a037689880b98.png         悲观锁服从低;
            https://img-blog.csdnimg.cn/img_convert/13ad81fbdd9f4edfb3b39848e23e2fca.png         乐观锁支持:

/**
   * mybatis-plus乐观锁支持
   * @return
   */
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
    MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
    interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
    return interceptor;
}4.3)安装redis

①拉取镜像
docker pull redis② 创建容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"③链接测试
打开资料中的Redis Desktop Manager,输入host、port、password链接测试
            https://img-blog.csdnimg.cn/img_convert/16659b9fe8b24457a833772df8e4c6b7.png         能链接成功,即可
4.4)项目集成redis

① 在项目导入redis相关依靠,已经完成
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis
spring:
redis:
    host: 192.168.200.130
    password: leadnews
    port: 6379③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加主动配置
            https://img-blog.csdnimg.cn/img_convert/4a517ad143df4fedaffa3c8f5f2541e0.png         ④:测试

package com.heima.schedule.test;
import java.util.Set;

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {

    @Autowired
    private CacheService cacheService;

    @Test
    public void testList(){

      //在list的左边添加元素
//      cacheService.lLeftPush("list_001","hello,redis");

      //在list的右边获取元素,并删除
      String list_001 = cacheService.lRightPop("list_001");
      System.out.println(list_001);
    }

    @Test
    public void testZset(){
      //添加数据到zset中分值
      /*cacheService.zAdd("zset_key_001","hello zset 001",1000);
      cacheService.zAdd("zset_key_001","hello zset 002",8888);
      cacheService.zAdd("zset_key_001","hello zset 003",7777);
      cacheService.zAdd("zset_key_001","hello zset 004",999999);*/

      //按照分值获取数据
      Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
      System.out.println(zset_key_001);
    }
}4.5)添加任务

①:拷贝mybatis-plus生成的文件,mapper
②:创建task类,用于接收添加任务的参数

package com.heima.model.schedule.dtos;
import lombok.Data;
import java.io.Serializable;

@Data
public class Task implements Serializable {

    /**
   * 任务id
   */
    private Long taskId;
    /**
   * 类型
   */
    private Integer taskType;

    /**
   * 优先级
   */
    private Integer priority;

    /**
   * 执行id
   */
    private long executeTime;

    /**
   * task参数
   */
    private byte[] parameters;
   
}③:创建TaskService

package com.heima.schedule.service;
import com.heima.model.schedule.dtos.Task;
/**
* 对外访问接口
*/
public interface TaskService {

    /**
   * 添加任务
   * @param task   任务对象
   * @return       任务id
   */
    public long addTask(Task task) ;

}实现:

package com.heima.schedule.service.impl;
import java.util.Calendar;
import java.util.Date;

@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {
    /**
   * 添加延迟任务
   *
   * @param task
   * @return
   */
    @Override
    public long addTask(Task task) {
      //1.添加任务到数据库中

      boolean success = addTaskToDb(task);

      if (success) {
            //2.添加任务到redis
            addTaskToCache(task);
      }
      return task.getTaskId();
    }

    @Autowired
    private CacheService cacheService;

    /**
   * 把任务添加到redis中
   *
   * @param task
   */
    private void addTaskToCache(Task task) {

      String key = task.getTaskType() + "_" + task.getPriority();

      //获取5分钟之后的时间毫秒值
      Calendar calendar = Calendar.getInstance();
      calendar.add(Calendar.MINUTE, 5);
      long nextScheduleTime = calendar.getTimeInMillis();

      //2.1 如果任务的执行时间小于等于当前时间,存入list
      if (task.getExecuteTime() <= System.currentTimeMillis()) {
            cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
      } else if (task.getExecuteTime() <= nextScheduleTime) {
            //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
            cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
      }
    }

    @Autowired
    private TaskinfoMapper taskinfoMapper;

    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;

    /**
   * 添加任务到数据库中
   *
   * @param task
   * @return
   */
    private boolean addTaskToDb(Task task) {

      boolean flag = false;

      try {
            //保存任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task, taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);

            //设置taskID
            task.setTaskId(taskinfo.getTaskId());

            //保存任务日志数据
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo, taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);

            flag = true;
      } catch (Exception e) {
            e.printStackTrace();
      }

      return flag;
    }
}ScheduleConstants常量类

package com.heima.common.constants;

public class ScheduleConstants {

    //task状态
    public static final int SCHEDULED=0;   //初始化状态

    public static final int EXECUTED=1;       //已执行状态

    public static final int CANCELLED=2;   //已取消状态

    public static String FUTURE="future_";   //未来数据key前缀

    public static String TOPIC="topic_";   //当前数据key前缀
}④:测试
4.6)取消任务

在TaskService中添加方法

/**
   * 取消任务
   * @param taskId      任务id
   * @return            取消结果
   */
public boolean cancelTask(long taskId);实现

/**
   * 取消任务
   * @param taskId
   * @return
   */
@Override
public boolean cancelTask(long taskId) {

    boolean flag = false;

    //删除任务,更新日志
    Task task = updateDb(taskId,ScheduleConstants.EXECUTED);

    //删除redis的数据
    if(task != null){
      removeTaskFromCache(task);
      flag = true;
    }
    return false;
}

/**
   * 删除redis中的任务数据
   * @param task
   */
private void removeTaskFromCache(Task task) {

    String key = task.getTaskType()+"_"+task.getPriority();

    if(task.getExecuteTime()<=System.currentTimeMillis()){
      cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
    }else {
      cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
    }
}

/**
   * 删除任务,更新任务日志状态
   * @param taskId
   * @param status
   * @return
   */
private Task updateDb(long taskId, int status) {
    Task task = null;
    try {
      //删除任务
      taskinfoMapper.deleteById(taskId);

      TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
      taskinfoLogs.setStatus(status);
      taskinfoLogsMapper.updateById(taskinfoLogs);

      task = new Task();
      BeanUtils.copyProperties(taskinfoLogs,task);
      task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
    }catch (Exception e){
      log.error("task cancel exception taskid={}",taskId);
    }
    return task;
}测试
4.7)消耗任务

在TaskService中添加方法

/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
public Task poll(int type,int priority);实现

/**
   * 按照类型和优先级拉取任务
   * @return
   */
@Override
public Task poll(int type,int priority) {
    Task task = null;
    try {
      String key = type+"_"+priority;
      String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
      if(StringUtils.isNotBlank(task_json)){
            task = JSON.parseObject(task_json, Task.class);
            //更新数据库信息
            updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
      }
    }catch (Exception e){
      e.printStackTrace();
      log.error("poll task exception");
    }

    return task;
}4.8)未来数据定时刷新

4.8.1)reids key值匹配

方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很强盛,但是在生产情况需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,以是公司的redis生产情况将keys命令禁用了!redis是单线程,会被堵塞
            https://img-blog.csdnimg.cn/img_convert/a1ed55a337914743afeb0a0c56db74d7.png         方案2:scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
            https://img-blog.csdnimg.cn/img_convert/6795bc0fb0ee4d948772042aba1d1edf.png         代码案例:

@Test
public void testKeys(){
    Set<String> keys = cacheService.keys("future_*");
    System.out.println(keys);

    Set<String> scan = cacheService.scan("future_*");
    System.out.println(scan);
}4.8.2)reids管道

平凡redis客户端和服务器交互模式 性能很低
            https://img-blog.csdnimg.cn/img_convert/06910e9620f844c6a92195c663fc71e4.png         Pipeline哀求模型
            https://img-blog.csdnimg.cn/img_convert/842604ad29b44ec1b9ca97bc571ace42.png         官方测试结果数据对比
            https://img-blog.csdnimg.cn/img_convert/43da2b4da7bc412184dc4709c887849a.png         测试案例对比:

//耗时6151
@Test
publicvoid testPiple1(){
    long start =System.currentTimeMillis();
    for (int i = 0; i <10000 ; i++) {
      Task task = new Task();
      task.setTaskType(1001);
      task.setPriority(1);
      task.setExecuteTime(new Date().getTime());
      cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
    }
    System.out.println("耗时"+(System.currentTimeMillis()- start));
}

@Test
public void testPiple2(){
    long start= System.currentTimeMillis();
    //使用管道技术
    List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
      @Nullable
      @Override
      public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
            for (int i = 0; i <10000 ; i++) {
                Task task = new Task();
                task.setTaskType(1001);
                task.setPriority(1);
                task.setExecuteTime(new Date().getTime());
                redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
            }
            return null;
      }
    });
    System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}4.8.3)未来数据定时刷新-功能完成

            https://img-blog.csdnimg.cn/img_convert/f2ec0e83dfe2498fbda255b1f956509c.png         在TaskService中添加方法

@Scheduled(cron = "0 */1 * * * ?")//定时 (每分钟执行一次
//{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
public void refresh() {
    System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");

    // 获取所有未来数据集合的key值
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
    for (String futureKey : futureKeys) { // future_250_250

      String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE);
      //获取该组key下当前需要消费的任务数据
      Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
      if (!tasks.isEmpty()) {
            //将这些任务数据添加到消费者队列中
            cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
            System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
      }
    }
}在引导类中添加开启任务调度注解:@EnableScheduling
4.9)分布式锁办理集群下的方法抢占执行

4.9.1)问题描述

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法
            https://img-blog.csdnimg.cn/img_convert/b2ecc0c6ec194ae5b24e1446fa0e7207.png                     https://img-blog.csdnimg.cn/img_convert/fc5865c618574477b3cd7bb1bda60b90.png                     https://img-blog.csdnimg.cn/img_convert/7f178c8eca4c4a638958a360db9e3d3c.png         4.9.2)分布式锁

分布式锁:控制分布式体系有序的去对共享资源举行操纵,通过互斥来保证数据的一致性。
办理方案:
            https://img-blog.csdnimg.cn/img_convert/07acc9e0282746a9a002d5e8c987d4c8.png         4.9.3)redis分布式锁

sexnx (SET if Not eXists)命令在指定的 key 不存在时,为 key 设置指定的值。
            https://img-blog.csdnimg.cn/img_convert/f01d349d6d8f4aa7a898d1595ce0fb55.png         这种加锁的思路是,假如 key 不存在则为 key 设置 value,假如 key 已存在则 SETNX 命令不做任何操纵


[*] 客户端A哀求服务器设置key的值,假如设置成功就表示加锁成功


[*] 客户端B也去哀求服务器设置key的值,假如返回失败,那么就代表加锁失败


[*] 客户端A执行代码完成,删除锁


[*] 客户端B在等待一段时间后再去哀求设置key的值,设置成功


[*] 客户端B执行代码完成,删除锁
4.9.4)在工具类CacheService中添加方法


/**
* 加锁
*
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) {
    name = name + "_lock";
    String token = UUID.randomUUID().toString();
    RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
    RedisConnection conn = factory.getConnection();
    try {

      //参考redis命令:
      //set key value
      Boolean result = conn.set(
                name.getBytes(),
                token.getBytes(),
                Expiration.from(expire, TimeUnit.MILLISECONDS),
                RedisStringCommands.SetOption.SET_IF_ABSENT //NX
      );
      if (result != null && result)
            return token;
    } finally {
      RedisConnectionUtils.releaseConnection(conn, factory,false);
    }
    return null;
}修改未来数据定时刷新的方法,如下:

/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){

    String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
    if(StringUtils.isNotBlank(token)){
      log.info("未来数据定时刷新---定时任务");

      //获取所有未来数据的集合key
      Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
      for (String futureKey : futureKeys) {//future_100_50

            //获取当前数据的keytopic
            String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE);

            //按照key和分值查询符合条件的数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());

            //同步数据
            if(!tasks.isEmpty()){
                cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                log.info("成功的将"+futureKey+"刷新到了"+topicKey);
            }
      }
    }
}4.10)数据库同步到redis

            https://img-blog.csdnimg.cn/img_convert/91b01d98de5944f78ce0ecec4c1bce11.png         
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
    clearCache();
    log.info("数据库数据同步到缓存");
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);

    //查看小于未来5分钟的所有任务
    List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
    if(allTasks != null && allTasks.size() > 0){
      for (Taskinfo taskinfo : allTasks) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo,task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            addTaskToCache(task);
      }
    }
}

private void clearCache(){
    // 删除缓存中未来数据集合和当前消费者队列的所有key
    Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
    Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
    cacheService.delete(futurekeys);
    cacheService.delete(topickeys);
}5)延迟队列办理精准时间发布文章

5.1)延迟队列服务提供对外接口

提供长途的feign接口,在heima-leadnews-feign-api编写类如下:

package com.heima.apis.schedule;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient("leadnews-schedule")
public interface IScheduleClient {

    /**
   * 添加任务
   * @param task   任务对象
   * @return       任务id
   */
    @PostMapping("/api/v1/task/add")
    public ResponseResultaddTask(@RequestBody Task task);

    /**
   * 取消任务
   * @param taskId      任务id
   * @return            取消结果
   */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

    /**
   * 按照类型和优先级来拉取任务
   * @param type
   * @param priority
   * @return
   */
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority")int priority);
}在heima-leadnews-schedule微服务下提供对应的实现

package com.heima.schedule.feign;
import org.springframework.web.bind.annotation.*;

@RestController
public class ScheduleClientimplements IScheduleClient {

    @Autowired
    private TaskService taskService;

    /**
   * 添加任务
   * @param task 任务对象
   * @return 任务id
   */
    @PostMapping("/api/v1/task/add")
    @Override
    public ResponseResult addTask(@RequestBody Task task) {
      return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
   * 取消任务
   * @param taskId 任务id
   * @return 取消结果
   */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    @Override
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
      return ResponseResult.okResult(taskService.cancelTask(taskId));
    }

    /**
   * 按照类型和优先级来拉取任务
   * @param type
   * @param priority
   * @return
   */
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    @Override
    public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
      return ResponseResult.okResult(taskService.poll(type,priority));
    }
}5.2)发布文章集成添加延迟队列接口

在创建WmNewsTaskService

package com.heima.wemedia.service;

import com.heima.model.wemedia.pojos.WmNews;


public interface WmNewsTaskService {

    /**
   * 添加任务到延迟队列中
   * @param id文章的id
   * @param publishTime发布的时间可以做为任务的执行时间
   */
    public void addNewsToTask(Integer id, Date publishTime);
}实现:

package com.heima.wemedia.service.impl;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class WmNewsTaskServiceImplimplements WmNewsTaskService {


    @Autowired
    private IScheduleClient scheduleClient;

    /**
   * 添加任务到延迟队列中
   * @param id          文章的id
   * @param publishTime 发布的时间可以做为任务的执行时间
   */
    @Override
    @Async
    public void addNewsToTask(Integer id, Date publishTime) {

      log.info("添加任务到延迟服务中----begin");

      Task task = new Task();
      task.setExecuteTime(publishTime.getTime());
      task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
      task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
      WmNews wmNews = new WmNews();
      wmNews.setId(id);
      task.setParameters(ProtostuffUtil.serialize(wmNews));

      scheduleClient.addTask(task);

      log.info("添加任务到延迟服务中----end");

    }
   
}罗列类:

package com.heima.model.common.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {

    NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
    REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
    private final int taskType; //对应具体业务
    private final int priority; //业务不同级别
    private final String desc; //描述信息
}序列化工具对比



[*] JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象举行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组


[*] Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类
拷贝资料中的两个类到heima-leadnews-utils下
Protostuff需要引导依靠:
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.0</version>
</dependency>修改发布文章代码:
把之前的异步调用修改为调用延迟任务

@Autowired
private WmNewsTaskService wmNewsTaskService;

/**
   * 发布修改文章或保存为草稿
   * @param dto
   * @return
   */
@Override
public ResponseResult submitNews(WmNewsDto dto) {

    //0.条件判断
    if(dto == null || dto.getContent() == null){
      return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    //1.保存或修改文章

    WmNews wmNews = new WmNews();
    //属性拷贝 属性名词和类型相同才能拷贝
    BeanUtils.copyProperties(dto,wmNews);
    //封面图片list---> string
    if(dto.getImages() != null && dto.getImages().size() > 0){
      //-->   1dddfsd.jpg,sdlfjldk.jpg
      String imageStr = StringUtils.join(dto.getImages(), ",");
      wmNews.setImages(imageStr);
    }
    //如果当前封面类型为自动 -1
    if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
      wmNews.setType(null);
    }

    saveOrUpdateWmNews(wmNews);

    //2.判断是否为草稿如果为草稿结束当前方法
    if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
      return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }

    //3.不是草稿,保存文章内容图片与素材的关系
    //获取到文章内容中的图片信息
    List<String> materials =ectractUrlInfo(dto.getContent());
    saveRelativeInfoForContent(materials,wmNews.getId());

    //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
    saveRelativeInfoForCover(dto,wmNews,materials);

    //审核文章
    //      wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
    wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}5.3)消耗任务举行审核文章

WmNewsTaskService中添加方法

/**
* 消费延迟队列数据
*/
public void scanNewsByTask();实现

@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;

/**
   * 消费延迟队列数据
   */
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {

    log.info("文章审核---消费任务执行---begin---");

    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    if(responseResult.getCode().equals(200) && responseResult.getData() != null){
      String json_str = JSON.toJSONString(responseResult.getData());
      Task task = JSON.parseObject(json_str, Task.class);
      byte[] parameters = task.getParameters();
      WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
      System.out.println(wmNews.getId()+"-----------");
      wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
    }
    log.info("文章审核---消费任务执行---end---");
}在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling
06kafka及异步关照文章上下架

            https://img-blog.csdnimg.cn/img_convert/dc08a1e086c344c99bdcb9e32851e8b4.png         1)自媒体文章上下架

需求分析
            https://img-blog.csdnimg.cn/img_convert/0042d08997fd4be7add4d1fa7ba5ed20.png                     https://img-blog.csdnimg.cn/img_convert/3bbc397ae1fb490e9ef5df0d3d93420d.png         2)kafka概述

消息中心件对比
            https://img-blog.csdnimg.cn/img_convert/a101c7ab381b46ecba5c7557849d2116.png         消息中心件对比-选择建议

    消息中心件
建议
Kafka
追求高吞吐量,得当产生大量数据的互联网服务的数据收集业务
RocketMQ
可靠性要求很高的金融互联网范畴,稳定性高,经历了多次阿里双11考验
RabbitMQ
性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ
    kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息通报体系。
kafka官网:Apache Kafka
            https://img-blog.csdnimg.cn/img_convert/e7f4fdb787084d5aae50b3544bb05b14.png         kafka介绍-名词表明

            https://img-blog.csdnimg.cn/img_convert/3f7d667d60cf4d3286ecbe4afd5e9e78.png         


[*] producer:发布消息的对象称之为主题生产者(Kafka topic producer)


[*] topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)


[*] consumer:订阅消息并处剃头布的消息的对象称之为主题消耗者(consumers)


[*] broker:已发布的消息生存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个署理(Broker)。 消耗者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消耗这些已发布的消息。
3)kafka安装配置

Kafka对于zookeeper是强依靠,生存kafka相关的节点数据,以是安装Kafka之前必须先安装zookeeper


[*] Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

[*] Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1            https://img-blog.csdnimg.cn/img_convert/be6dae2136bd45299c158de9d1b978de.png         云主机无法使用--net
4)kafka入门

            https://img-blog.csdnimg.cn/img_convert/18a9af76b8cb4ac2aa372ab7c01639ca.png         

[*] 生产者发送消息,多个消耗者只能有一个消耗者接收到消息


[*] 生产者发送消息,多个消耗者都可以接收到消息
(1)创建kafka-demo项目,导入依靠
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>(2)生产者发送消息

package com.heima.kafka.sample;
import java.util.Properties;

/**
* 生产者
*/
public class ProducerQuickStart {

    public static void main(String[] args) {
      //1.kafka的配置信息
      Properties properties = new Properties();
      //kafka的连接地址
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
      //发送失败,失败的重试次数
      properties.put(ProducerConfig.RETRIES_CONFIG,5);
      //消息key的序列化器
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
      //消息value的序列化器
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

      //2.生产者对象
      KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

      //封装发送的消息
      ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

      //3.发送消息
      producer.send(record);

      //4.关闭消息通道,必须关闭,否则消息发送不成功
      producer.close();
    }

}(3)消耗者接收消息

package com.heima.kafka.sample;
import java.util.Properties;

/**
* 消费者
*/
public class ConsumerQuickStart {

    public static void main(String[] args) {
      //1.添加kafka的配置信息
      Properties properties = new Properties();
      //kafka的连接地址
      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
      //消费者组
      properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
      //消息的反序列化器
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

      //2.消费者对象
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

      //3.订阅主题
      consumer.subscribe(Collections.singletonList("itheima-topic"));

      //当前线程一直处于监听状态
      while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
      }

    }

}总结


[*] 生产者发送消息,多个消耗者订阅同一个主题,只能有一个消耗者收到消息(一对一)同一个组


[*] 生产者发送消息,多个消耗者订阅同一个主题,所有消耗者都能收到消息(一对多)多个组
分区机制—topic剖析

            https://img-blog.csdnimg.cn/img_convert/79a017d3ff5a4f12a6804d24044e11dd.png                     https://img-blog.csdnimg.cn/img_convert/5acff32d323044e3829a7b1333f41c04.png                     https://img-blog.csdnimg.cn/img_convert/f38af17724c54054a0146f381e35bc00.png         5)kafka高可用计划

5.1)集群

            https://img-blog.csdnimg.cn/img_convert/d5058a3865314a83b2e2405230af3b17.png         

[*] Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成


[*] 这样假如集群中某一台呆板宕机,其他呆板上的 Broker 也依然可以大概对外提供服务。这其实就是 Kafka 提供高可用的本领之一
5.2)备份机制(Replication)

            https://img-blog.csdnimg.cn/img_convert/e1759d7595d34af8915261a748cd82b8.png         Kafka 中消息的备份又叫做 副本(Replica)
Kafka 定义了两类副本:


[*] 领导者副本(Leader Replica)


[*] 跟随者副本(Follower Replica)
备份机制—同步方式
            https://img-blog.csdnimg.cn/img_convert/06faf41c560c4eb2b9af121b122975f0.png         ISR(in-sync replica)需要同步复制生存的follower
假如leader失效后,需要选出新的leader,推举的原则如下:
第一:推举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
第二:假如ISR列表中的follower都不行了,就只能从其他follower中选取

极度情况,就是所有副本都失效了,这时有两种方案
第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
第二:选择第一个活过来的Replication,不肯定是ISR中的,选为leader,以最快速率规复可用性,但数据不肯定完整
6)kafka生产者详解

6.1)发送范例



[*] 同步发送
使用send()方法发送,它会返回一个Future对象,调用get()方法举行等待,就可以知道消息是否发送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());

[*] 异步发送
调用send()方法,并指定一个回调函数,服务器在返回相应时调用函数

//异步消息发送
producer.send(kvProducerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      if(e != null){
            System.out.println("记录异常信息到日志表中");
      }
      System.out.println(recordMetadata.offset());
    }
});6.2)参数详解



[*] ack确认机制
            https://img-blog.csdnimg.cn/img_convert/f19afbf9ecbb45bfb3e7cc383951b375.png         
代码的配置方式:

//ack配置消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");参数的选择说明
    确认机制
说明
acks=0
生产者在成功写入消息之前不会等待(不需要)任何来自服务器的相应,消息有丢失的风险,但是速率最快
acks=1(默认值)
只要集群Leader节点收到消息,生产者就会收到一个来自服务器的成功相应
acks=all
只有当所有加入赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功相应
   

[*] retries 重试次数
            https://img-blog.csdnimg.cn/img_convert/287a5580b12740f7ada0f3f8b2118d07.png         生产者从服务器收到的错误有可能是暂时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,假如到达这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
代码中配置方式:

//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

[*] 消息压缩
默认情况下, 消息发送时不会被压缩。
代码中配置方式:

//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");    压缩算法
说明
snappy
占用较少的 CPU, 却能提供较好的性能和相称可观的压缩比, 假如看重性能和网络带宽,建议采用
lz4
占用较少的 CPU, 压缩和解压缩速率较快,压缩比也很客观
gzip
占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
    使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
7)kafka消耗者详解

7.1)消耗者组

            https://img-blog.csdnimg.cn/img_convert/59edfdbf56cf4b3d8c4f90cd43980a3d.png         

[*] 消耗者组(Consumer Group) :指的就是由一个或多个消耗者组成的群体


[*] 一个发布在Topic上消息被分发给此消耗者组中的一个消耗者


[*] 所有的消耗者都在一个组中,那么这就变成了queue模型 消息队列 一对一


[*] 所有的消耗者都在不同的组中,那么就完全变成了发布-订阅模型 一对多消耗者
7.2)消息有序性

应用场景:


[*] 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致


[*] 充值转账两个渠道在同一个时间举行余额变更,短信关照必须要有顺序
            https://img-blog.csdnimg.cn/img_convert/79f5bdce3cff45a6a04a72393df6bf86.png         topic分区中消息只能由消耗者组中的唯逐一个消耗者处理,以是消息肯定是按照先后顺序举行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 以是,假如你想要顺序的处理Topic的所有消息,那就只提供一个分区。
7.3)提交和偏移量

kafka不会像其他JMS队列那样需要得到消耗者简直认,消耗者可以使用kafka来追踪消息在分区的位置(偏移量)
消耗者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。假如消费者发生瓦解或有新的消耗者加入群组,就会触发再均衡
            https://img-blog.csdnimg.cn/img_convert/24312c8c723b4cb58f7e538470ac984a.png         正常的情况
            https://img-blog.csdnimg.cn/img_convert/7df765d4921b4091b75f3e0b1c6502c7.png         假如消耗者2挂掉以后,会发生再均衡,消耗者2负责的分区会被其他消耗者举行消耗
再均衡后不可避免会出现一些问题
问题一:
            https://img-blog.csdnimg.cn/img_convert/46b0f9cd6fca428a864dd5298d9b4ccb.png         假如提交偏移量2小于客户端处理的末了一个消息10的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
问题二:
            https://img-blog.csdnimg.cn/img_convert/e4027f783ad04e5aa66fe673f9e3dea1.png         假如提交的偏移量5大于客户端末了一个消息11的偏移量,那么处于两个偏移量之间的消息将会丢失。
假如想要办理这些问题,还要知道现在kafka提交偏移量的方式:
提交偏移量的方式有两种,分别是主动提交偏移量和手动提交


[*] 主动提交偏移量
当enable.auto.commit被设置为true,提交方式就是让消耗者主动提交偏移量,每隔5秒消耗者会主动把从poll()方法接收的最大偏移量提交上去


[*] 手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式


[*] 提交当前偏移量(同步提交)


[*] 异步提交


[*] 同步和异步组合提交
1.提交当前偏移量(同步提交)

把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,以是在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可规复的错误,commitSync()方法会一直尝试直至提交成功,假如提交失败也可以记录到错误日记里。

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
      System.out.println(record.value());
      System.out.println(record.key());
      try {
            consumer.commitSync();//同步提交当前最新的偏移量
      }catch (CommitFailedException e){
            System.out.println("记录提交失败的异常:"+e);
      }
    }
}2.异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会壅闭。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和主动提交一样)。别的一个办理办法是,使用异步提交的API :commitAsync()

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
      System.out.println(record.value());
      System.out.println(record.key());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e!=null){
                System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
            }
      }
    });
}3.同步和异步组合提交

异步提交也有个缺点,那就是假如服务器返回提交失败,异步提交不会举行重试。
相比较起来,同步提交会举行重试直到成功大概末了抛出异常给应用。异步提交没有实现重试是因为,假如同时存在多个异步提交,举行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA举行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消耗。

try {
    while (true){
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
      }
      consumer.commitAsync();
    }
}catch (Exception e){+
    e.printStackTrace();
    System.out.println("记录错误信息:"+e);
}finally {
    try {
      consumer.commitSync();
    }finally {
      consumer.close();
    }
}8)springboot集成kafka

8.1)入门

1.导入spring-kafka依靠信息
<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>2.在resources下创建文件application.yml
server:
port: 9991
spring:
application:
    name: kafka-demo
kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer3.消息生产者

package com.heima.kafka.controller;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
      kafkaTemplate.send("itcast-topic","黑马程序员");
      return "ok";
    }
}4.消息消耗者

package com.heima.kafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "itcast-topic")
    public void onMessage(String message){
      if(!StringUtils.isEmpty(message)){
            System.out.println(message);
      }
    }
}8.2)通报消息为对象

现在springboot整合后的kafka,因为序列化器是StringSerializer,这个时间假如需要通报对象可以有两种方式
方式一:可以自定义序列化器,对象范例浩繁,这种方式通用性不强,本章节不介绍
方式二:可以把要通报的对象举行转json字符串,接收消息后再转为对象即可,本项目采用这种方式


[*] 发送消息

@GetMapping("/hello")
public String hello(){
    User user = new User();
    user.setUsername("xiaowang");
    user.setAge(18);
   
    kafkaTemplate.send("user-topic", JSON.toJSONString(user));

    return "ok";
}

[*] 接收消息

package com.heima.kafka.listener;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
      if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
      }
    }
}9)自媒体文章上下架功能完成

9.1)需求分析

            https://img-blog.csdnimg.cn/img_convert/d9a874f5d2a54ac88e188ed543f1e918.png         

[*] 已发表且已上架的文章可以下架


[*] 已发表且已下架的文章可以上架
9.2)流程说明

            https://img-blog.csdnimg.cn/img_convert/e471156bda06424ca34995e0fccb9e5d.png         9.3)接口定义

   
说明
接口路径
/api/v1/news/down_or_up
哀求方式
POST
参数
DTO
相应结果
ResponseResult
    DTO

@Data
public class WmNewsDto {
   
    private Integer id;
    /**
    * 是否上架0 下架1 上架
    */
    private Short enable;
                     
}ResponseResult
            https://img-blog.csdnimg.cn/img_convert/2d74e0d9d7a8486dbcc163b7e002db12.png         9.4)自媒体文章上下架-功能实现

9.4.1)接口定义
在heima-leadnews-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
    return null;
}在WmNewsDto中新增enable属性 ,完整的代码如下:

package com.heima.model.wemedia.dtos;

import lombok.Data;

import java.util.Date;
import java.util.List;

@Data
public class WmNewsDto {
   
    private Integer id;
   /**
   * 标题
   */
    private String title;
   /**
   * 频道id
   */
    private Integer channelId;
   /**
   * 标签
   */
    private String labels;
   /**
   * 发布时间
   */
    private Date publishTime;
   /**
   * 文章内容
   */
    private String content;
   /**
   * 文章封面类型0 无图 1 单图 3 多图 -1 自动
   */
    private Short type;
   /**
   * 提交时间
   */
    private Date submitedTime;
   /**
   * 状态 提交为1草稿为0
   */
    private Short status;
   
   /**
   * 封面图片列表 多张图以逗号隔开
   */
    private List<String> images;

    /**
   * 上下架 0 下架1 上架
   */
    private Short enable;
}9.4.2)业务层编写
在WmNewsService新增方法

/**
* 文章的上下架
* @param dto
* @return
*/
public ResponseResult downOrUp(WmNewsDto dto);实现方法

/**
* 文章的上下架
* @param dto
* @return
*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {
    //1.检查参数
    if(dto.getId() == null){
      return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    //2.查询文章
    WmNews wmNews = getById(dto.getId());
    if(wmNews == null){
      return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
    }

    //3.判断文章是否已发布
    if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
      return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
    }

    //4.修改文章enable
    if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
      update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
                .eq(WmNews::getId,wmNews.getId()));
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}9.4.3)控制器

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
    return wmNewsService.downOrUp(dto);
}9.4.4)测试
9.5)消息关照article端文章上下架

9.5.1)在heima-leadnews-common模块下导入kafka依靠
<!-- kafkfa --><dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency><dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>9.5.2)在自媒体端的nacos配置中心配置kafka的生产者
spring:
kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer9.5.3)在自媒体端文章上下架后发送消息

//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){
    Map<String,Object> map = new HashMap<>();
    map.put("articleId",wmNews.getArticleId());
    map.put("enable",dto.getEnable());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}常量类:

public class WmNewsMessageConstants {

    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}9.5.4)在article端的nacos配置中心配置kafka的消耗者
spring:
kafka:
    bootstrap-servers: 192.168.200.130:9092
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer9.5.5)在article端编写监听,接收数据

package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
public class ArtilceIsDownListener {

    @Autowired
    private ApArticleConfigService apArticleConfigService;

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message){
      if(StringUtils.isNotBlank(message)){
            Map map = JSON.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));
      }
    }
}9.5.6)修改ap_article_config表的数据
新建ApArticleConfigService

package com.heima.article.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.article.pojos.ApArticleConfig;

import java.util.Map;

public interface ApArticleConfigService extends IService<ApArticleConfig> {

    /**
   * 修改文章配置
   * @param map
   */
    public void updateByMap(Map map);
}实现类:

package com.heima.article.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.article.mapper.ApArticleConfigMapper;
import com.heima.article.service.ApArticleConfigService;
import com.heima.model.article.pojos.ApArticleConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Map;

@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {


    /**
   * 修改文章配置
   * @param map
   */
    @Override
    public void updateByMap(Map map) {
      //0 下架 1 上架
      Object enable = map.get("enable");
      boolean isDown = true;
      if(enable.equals(1)){
            isDown = false;
      }
      //修改文章配置
      update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));

    }
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 《黑马头条》 内容安全 主动审核 feign 延迟任务精准发布 kafka