惊落一身雪 发表于 2024-6-14 23:34:48

黑马点评项目全部功能实现及详细笔记--Redis练手项目

一.项目详情

1.1 项目简介

#捉住2022的尾巴,分享年末记载笔记
黑马点评项目是前后端分离项目,前端部署在nginx服务器上,后端部署在tomcat上。整个项目必要实现的功能如下图。
https://img-blog.csdnimg.cn/dc8beea2ebd04a6c8e7e5eaab50c5445.png
1.2 数据库表设计

创建一个数据库 hmdp,涉及到的表及含义如下
https://img-blog.csdnimg.cn/e1c2459958d64964a30e402cc84918a2.png
1.3 前端部署

由于是前后端分离项目,前端页面设计样式都已经完成好了,我们后端只必要部署前端项目即可。
https://img-blog.csdnimg.cn/0018cdc367554435b477474d96ca5fb9.png
输入以下下令,启动nginx
start nginx.exe
然后访问 http://localhost:8080 ,即可看到页面:如果访问失败,详细解决方法我写在另一篇文章了。地址:nginx部署前端项目访问失败解决方法
https://img-blog.csdnimg.cn/74fdfd3b54004f76a0eb23e105e27aba.png
1.4 后端搭建

后端搭建还是分3步
1.创建Springboot工程项目
2.pom文件导入依赖坐标
3.yml设置文件添加设置信息
这里我直接导入黑马的项目工程来做脚手架,整个架构其实不复杂,就是用mybatis-plus做持久层,业务层,控制层,加上对应注解,调mp提供的api就完了,实体类这些更不用多说,按表映射成实体类。
项目整体布局如下图
https://img-blog.csdnimg.cn/1b649587da1c477b804813e378220c60.png
二.短信登录

涉及到Redis的共享session应用
2.1 发送验证码

点击我的,未登录会自动跳转到登录界面
https://img-blog.csdnimg.cn/93f2a1a0c6e245c5b51d6bd81d3f63bc.png
这里有一个小细节
   这里虽然后端设置的端口时8081,但这里的请求仍然时8080,是由于前端使用nginx,反向代理,请求先发送到了nginx服务,然后反向代理再去请求tomcat服务器
https://img-blog.csdnimg.cn/8d04a7487fed46239e9677f8b88b5a58.png
将生成的验证码根据key-value的情势存入redis
   @Resource有两个属性name和type。Spring将@Resource注解的name属性分析为bean的名字,而type属性则分析为bean的类型。所以如果使用name属性,则使用byName的自动注入计谋,而使用type属性时则使用byType自动注入计谋。如果既不指定name也不指定type属性,这时将通过反射机制使用byName自动注入计谋。
@Autowired只根据type举行注入,不会去匹配name。
https://img-blog.csdnimg.cn/cc6caf2505c948c297bbb4da693251da.png
2.2 验证码登录

这里验证码登录逻辑有点复杂,先捋清楚
1.校验提交过来的手机号是否为空
2.获取表单提交过来的手机号和验证码
3.根据key获取redis存的验证码
4.验证码匹配判定
5.不匹配,直接返回提示信息
6.匹配,根据phone查询用户信息
7.判定用户是否存在
8.不存在,创建该用户到数据库中
9.将用户信息存入redis
重点来了
9.1生成随机的token
9.2将User对象转为HashMap存储
9.3存入redis中
9.4设置key有效时间
注意:这里设置key的有效时间是死的,就是不管如何,到时间就key就失效,而我们想要的结果是和session一样,每次登录后,key的有效时间就革新,只有不停不登录时间达到了设置的时间才会失效(这项功能我们在拦截器里去实现)
10.返回token
      //8.1随机生成token,作为登录令牌;生成key
      String token = UUID.randomUUID().toString();
      //8.2将User对象转为HashMap存储;生成value
      UserDTO userDTO = new UserDTO();
      BeanUtils.copyProperties(user,userDTO);
      Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);
      //8.3存储
      stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,userMap);//用定义的常量拼接key值
      stringRedisTemplate.expire(LOGIN_USER_KEY+token,30,TimeUnit.MINUTES);//设置有效时间
      //返回token
      return Result.ok(token);
2.3 登录校验拦截器

填入手机号和验证码之后,点击登录,前端发送如下请求,我们必要把登录用户的根本信息封装传递已往,获取当前登录的用户并返回
https://img-blog.csdnimg.cn/b0e2ecef188e48e59ca08104b2aa6388.png
那么接下来,如何获取登录用户的信息呢?
   ThreadLocal:为每个线程提供一份单独存储空间,只有在线程内才能获取对应的值
答案是我们在登录时就将用户的根本信息封装为userDto对象,存入ThreadLocal中,在me这里直接调用get方法,获取ThreadLocal存的对象根本信息
https://img-blog.csdnimg.cn/9be9555ad7dc493db764d188cc8ffaee.png
以下是使用ThreadLocal的根本工具类代码
public class UserHolder {
    private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();

    public static void saveUser(UserDTO user){
      tl.set(user);
    }

    public static UserDTO getUser(){
      return tl.get();
    }

    public static void removeUser(){
      tl.remove();
    }
}
如果直接传递一个user对象,会造成信息泄露,用户的所有信息随着请求提交都可以在负载中看到,如下图
https://img-blog.csdnimg.cn/f9f70c468b0045edaaae13d217be64f3.png
所以我们必要创建一个userDto,只界说几个简单属性,如许只是表现一些公开字段,不会泄露暗码等敏感信息,返回的时间也是返回userDto对象
https://img-blog.csdnimg.cn/dfe80a26ad98444382b3358922217acc.png
https://img-blog.csdnimg.cn/37f8d27e60a54cc29b2b1c4dead13a53.png
使用session时的登录验证凭据,就是session-id,根据客户端发送的session-id获取session,再从session中获取数据
https://img-blog.csdnimg.cn/c8dfc89409c14d33892ba377bc4bfaa5.png
由于session不适合集群模式,就是好几个tomcat服务器,运行时采用轮询计谋,每个请求处理的服务器差别,不大概每个服务器都存一份session的用户数据,同份数据存多次,既造成了访问延迟数据更新不及时,也造成了内存浪费。而redis是非常适合的,redis支持横向扩展。
使用redis代替session作为缓存后,无法从客户端发送的请求来获取key,在不明确key的情况下,无法获取存入的value值,如下是使用redis缓存来代替session存user
https://img-blog.csdnimg.cn/3f1bc8f60bf345afbd2f1dc391cda073.png
我们使用token来作为唯一key值,token是自动生成且不重复的值
接下来分析前端代码,如何发送请求时携带key值,其实就是相称于session的发送请求时携带cookie一样。
   前端的意思是设置了一个拦截器,当要访问后端请求时拦截器会拦截住,在上面绑定一个token请求,如许到后端就会携带token信息,后端就让你访问对应的redis数据
https://img-blog.csdnimg.cn/903b80845152483985186a784dc312df.png
至此虽然我们获取到了对象信息,但是不但一个业务必要获取登录用户的信息,好比发布博客,发表评论,都必要获取用户名称等信息。我们也不大概每个业务都去添加这段代码,要想再方法执行前统一做一些操作,就用到了我们的拦截器,也可以拦截未登录用户。
拦截器开发:我们使用springmvc提供的拦截器,拦截请求,在执行前做一些操作
如果要使用redis作为缓存对象信息,首先拦截器要获取到stringRedisTemplate对象,才能调用缓存api
   必要注意
拦截器在Bean初始化之前它就执行了,所以它肯定是无法获取SpringIOC容器中的内容的。那么我们就让拦截器执行的时间实例化拦截器Bean,在拦截器设置类内里先实例化拦截器,然后再获取
https://img-blog.csdnimg.cn/234454e343de421d8abfbf4fcb7c3f49.png
用构造器方法注入
   不能使用autowried或resource的原因,这个类的对象是在注入拦截器时,自己new出来的,不是由spring创建的,用构造器注入
    而注入拦截器的MvcConfig是由spring构建的,它可以使用自动装配,所以在这里获取stringRedisTemplate,在new的时间作为形参传进去
https://img-blog.csdnimg.cn/1411e0663ad34df394512f797c1badf9.png
业务逻辑分析
1.获取请求头的token,并判定是否为空
2.根据请求头的token,获取redis存的value–>用户信息,以map类型接收
3.判定用户是否存在
4.不存在,返回错误信息
5.将查询到value–>map数据转换为userDTO对象
6.将userDTO对象存入ThreadLocal中
7.革新token有效期
8.放行
拦截器代码很通用,我就直接上代码,内里也给了很详细的注释
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
      //1.获取请求头的token
      String token = request.getHeader("authorization");//前端页面定义的请求头
      if (StrUtil.isBlank(token)){
            //不存在拦截,返回401状态码
            response.setStatus(401);
            return false;
      }
      //2.基于token获取redis中的用户
      Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
      //3.判读用户是否存在
      if (userMap.isEmpty()){
            //4.不存在拦截到登录页面,返回401状态码,未授权
            response.setStatus(401);
            return false;
      }
      //5.将查询到的Hash数据转为UserDTO对象
      //将map中的数据自动填充到对象中,false是忽视错误,有异常直接往外抛就行了
      UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
      //6.存在,保存用户信息到TreadLocal中
      UserHolder.saveUser(userDTO);
      //7.刷新token的有效期
      stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token,30, TimeUnit.MINUTES);
      //8.放行
      return true;
    }
拦截器开发完毕后还必要注入,不然不会起作用
@Configuration
public class MvcConfig implements WebMvcConfigurer {
    //刚刚拦截器是编写好了,但是还没有注册

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
      registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
                .excludePathPatterns(//排除哪些不被拦截,放行
                        "/user/code",
                        "/voucher/**",
                        "/blog/hot",
                        "/shop/**",
                        "/shop-type/**",
                        "/upload/**",
                        "/user/login"
                );
    }
}
拦截器优化
由于之前那个拦截器不是拦截所有请求,好比访问首页,拦截器直接放行,如许使得token登录令牌的有效时间没有被革新,以下可以解决这个问题。
https://img-blog.csdnimg.cn/78ed142c48c3448a8fb240194f22fce8.png
拦截器的执行序次是,先注入的先执行,最严谨的还是.order(),设置的值越小,执行序次越前
https://img-blog.csdnimg.cn/39961f6198794905bb3a37152e45c70c.png
   登录验证总结:从前用的是session,用户登录后将数据放在session中,拦截器那里判定登录验证,可以根据请求传来的cookie获取session-id,从而获取数据
使用redis后,就不能根据请求获取key了,这里用到前端的token来作为key,登录时存入数据,拦截器做登录验证时,根据key取value,判定value是否为空,来判定是否拦截
而存入ThreadLocal中,是由于后续业务必要获取到用户信息
2.4 退出登录(增补)

https://img-blog.csdnimg.cn/7db6187bfcf84c9cab6513e510ce4796.png
用户点击退出登录,我们必要清除ThreadLocal里存放的用户信息,如许前端发送的请求获取不到用户信息,登录拦截器从ThreadLocal中获取用户也为null,被拦截跳到登录界面。
https://img-blog.csdnimg.cn/e43855f106a44e5b9a9e88381a6351a7.png
https://img-blog.csdnimg.cn/fb9f5782bb6a4d1baeaef8fd432517c5.png
https://img-blog.csdnimg.cn/7c0878802a364d268a6d266663da64e2.png
最后退出登录代码如下
    /**
   * 登出功能
   * @return 无
   */
    @PostMapping("/logout")
    public Result logout(){
      // TODO 实现登出功能
      UserHolder.removeUser();
      return Result.ok();
    }
三.商家查询缓存

涉及到企业的缓存使用本领,缓存雪崩,穿透等问题解决
3.1 添加商家缓存

业务逻辑
   1.先根据id查询redis中的value
2.判定value是否为空
3.若不为空,返回value值
注意,这里从redis中获取的value是json字符串,必要转换成对象才能返回。
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
    4.若redis未掷中,从数据库查询数据
5.判定数据是否为空
6.为空返回错误信息
7.非空存入redis
注意存入redis时,必要将数据库查询出的对象转换为json字符串
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop));
最后设置redis缓存key的有效时间
代码如下
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result queryById(Long id) {
      //1.从redis根据id查数据
      String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
      //2.判断是否存在
      if (StrUtil.isNotBlank(shopJson)) {
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            //3.若存在直接返回数据
            return Result.ok(shop);
      }

      //4.从数据库中根据id查数据
      Shop shop = getById(id);
      //5.若不存在,返回错误信息
      if (shop == null){
            return Result.fail("店铺不存在");
      }
      //6.将数据写入redis
      stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(shop));
      stringRedisTemplate.expire(CACHE_SHOP_KEY + id,30, TimeUnit.MINUTES);
      //7.返回数据
      return Result.ok(shop);
    }
}
3.2 首页列表缓存(p37作业)

首页的这块列表信息是不变动的,因此我们可以将它存入缓存中,避免每次访问时都去查询数据库
https://img-blog.csdnimg.cn/d88488e761f74925a19320dd8936cb18.png
前端发送的请求
请求URL:http://localhost:8080/api/shop-type/list(GET)
准备工作
https://img-blog.csdnimg.cn/e1c9cdf7dc3043aabce37a802f77f3a2.png
https://img-blog.csdnimg.cn/4e553786a8054fb89b024efa1160b86e.png
以下三种计谋都测试成功了
3.2.1 Stirng缓存计谋实现

@Override
public Result queryShopTypeString() {
    // 1.从 Redis 中查询商铺缓存
    String shopTypeJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_TYPE_LIST_KEY);

    // 2.判断 Redis 中是否存在数据
    if (StrUtil.isNotBlank(shopTypeJson)) {
      // 2.1.存在,则返回
      List<ShopType> shopTypes = JSONUtil.toList(shopTypeJson, ShopType.class);
      return Result.ok(shopTypes);
    }
    // 2.2.Redis 中不存在,则从数据库中查询
    List<ShopType> shopTypes = query().orderByAsc("sort").list();

    // 3.判断数据库中是否存在
    if (shopTypes == null) {
      // 3.1.数据库中也不存在,则返回 false
      return Result.fail("分类不存在!");
    }
    // 3.2.数据库中存在,则将查询到的信息存入 Redis
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_TYPE_LIST_KEY, JSONUtil.toJsonStr(shopTypes));
    // 3.3返回
    return Result.ok(shopTypes);
}
https://img-blog.csdnimg.cn/371103cc1ed74827ba95af925a8825c0.png
3.2.2 List缓存计谋实现

@Slf4j
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public Result queryShopType() {

      // 1.从 Redis 中查询商铺缓存
      List<String> shopTypeJsonList = stringRedisTemplate.opsForList().range(CACHE_SHOP_TYPE_LIST_KEY, 0, -1);

      // 2.判断 Redis 中是否有该缓存
      if (shopTypeJsonList != null && !shopTypeJsonList.isEmpty()) {
            // 2.1.若 Redis 中存在该缓存,则直接返回
            ArrayList<ShopType> typeList = new ArrayList<>();
            for (String str : shopTypeJsonList) {
                typeList.add(JSONUtil.toBean(str, ShopType.class));
            }
            //也可以用下面的stream流的方式,其实大差不差,都是遍历,每个数据都转换类型后再操作
//            List typeList = shopTypeJsonList.stream().map((shopTypeJson)->{
//                ShopType shopType = JSONUtil.toBean(shopTypeJson, ShopType.class);
//                return shopType;
//            }).collect(Collectors.toList());
            return Result.ok(typeList);
      }
      // 2.2.Redis 中若不存在该数据,则从数据库中查询
      List<ShopType> typeList = query().orderByAsc("sort").list();

      // 3.判断数据库中是否存在
      if (typeList == null || typeList.isEmpty()) {
            // 3.1.数据库中也不存在,则返回 false
            return Result.fail("分类不存在!");
      }

      // 3.2数据库中存在,则将查询到的信息存入 Redis
      for (ShopType shopType : typeList) {
            stringRedisTemplate.opsForList().rightPushAll(CACHE_SHOP_TYPE_LIST_KEY, JSONUtil.toJsonStr(shopType));
      }
      //下面是stream流的方式
//      List shopTypeJson = typeList.stream().map((shopType)-> {
//            String jsonStr = JSONUtil.toJsonStr(shopType);
//            return jsonStr;
//      }).collect(Collectors.toList());
//      stringRedisTemplate.opsForList().rightPushAll(CACHE_SHOP_TYPE_LIST_KEY,shopTypeJson);

      // 3.3返回
      return Result.ok(typeList);
    }
}

https://img-blog.csdnimg.cn/9b5b7d46bda44fba8c05357bc1662f79.png
3.2.3 Zset缓存计谋实现

@Override
public Result queryShopTypeZSet() {
    // 1.从 Redis 中查询商铺缓存
    Set<String> shopTypeJsonSet = stringRedisTemplate.opsForZSet().range(CACHE_SHOP_TYPE_LIST_KEY, 0, -1);

    // 2.判断 Redis 中是否有该缓存
    if (shopTypeJsonSet.size() != 0) {
      // 2.1.若 Redis 中存在该缓存,则直接返回
      List<ShopType> shopTypes = new ArrayList<>();
      for (String str : shopTypeJsonSet) {
            shopTypes.add(JSONUtil.toBean(str, ShopType.class));
      }
      return Result.ok(shopTypes);
    }

    // 2.2.若 Redis 中无该数据的缓存,则查询数据库
    List<ShopType> shopTypes = query().orderByAsc("sort").list();

    // 3.判断数据库中是否存在
    if (shopTypes == null || shopTypes.isEmpty()) {
      // 3.1.数据库中也不存在,则返回 false
      return Result.fail("分类不存在!");
    }

    // 3.2.数据库中存在,则将查询到的信息存入 Redis
    for (ShopType shopType : shopTypes) {
      stringRedisTemplate.opsForZSet().add(CACHE_SHOP_TYPE_LIST_KEY,JSONUtil.toJsonStr(shopType),shopType.getSort());
    }

    // 3.3返回
    return Result.ok(shopTypes);
}

https://img-blog.csdnimg.cn/1936fadc758443368a8440beba7f2f7f.png
3.2.4 列表图片不表现的问题解决

首先看看你的路径改了没
https://img-blog.csdnimg.cn/61ac7fcbd1294194a52082a457e27ccf.png
图片路径往nginx目次的html下找
https://img-blog.csdnimg.cn/ab43145b58b34126b9676a79f4138680.png
如果你的图片路径改正确了,可以看看下面的另一种情况
访问不出列表数据,查redis缓存,内里却又有数据
https://img-blog.csdnimg.cn/2a213df1f4dd4f0fb282cce3fac81445.png
https://img-blog.csdnimg.cn/ed0c6d5ac8174cbf88b75a4527ec3d75.png
而我们无论是断点debug还是redis中查,都发现redis已经缓存了查询的数据,而且数据和从数据库查出来的同等
https://img-blog.csdnimg.cn/49981787d43042308de07d56e83e7df9.png
最后经过不停摸索,终于发现问题地点
   首先是controller 返回的时间不要再次返回result.ok(xxx)不然会出现嵌套 的问题返回两个data,serviceImpl返回即可。
https://img-blog.csdnimg.cn/67335fa939694251abfdba8e0c6970fb.png
如果你的service实现类中最后返回的是Result的对象,这里只必要返回调用方法即可,如下
https://img-blog.csdnimg.cn/380daf29f18f48c8aec553b6bbc04334.png
更改后,访问出结果
https://img-blog.csdnimg.cn/256c1ec807154f629908f3ca3de91047.png
3.3 商家缓存自动更新计谋

超时剔除
在根据id查询商家信息时,把信息存入缓存,同时设置有效时间
https://img-blog.csdnimg.cn/6ab89234e2284c5cbc93342d7c1e749c.png
https://img-blog.csdnimg.cn/d8eaa9cbf6724993b0897c16e3cc2264.png
自动更新计谋
同时操作数据库和缓存,必要到场事务,同成功同失败
先更新数据库,再删除缓存,下次查询就把更新的数据存入缓存。由于数据库更新的时间是较长的,而删除缓存,写入缓存是很快速的,在多线程并发情况下,若使用先删除缓存,再更新数据库的计谋,这时注意,在该线程更新数据库时,另一线程举行查询操作,又把旧数据写入到缓存了,后续再查询,直接掷中缓存,不查询数据库,造成缓存和数据库不同等。
https://img-blog.csdnimg.cn/430c7af86f2f4f3092720bea140bc818.png
3.4 缓存穿透

不停发送如许的请求,redis不停未掷中,不停查询数据库,给数据库造成很大压力
https://img-blog.csdnimg.cn/a02f825784334868ab723d38abd7b24a.png
两种解决方法
缓存空对象
对于不存在的数据也在redis中创建缓存,并设置一个较短的TTL时间,实现比力简单。
根据id查询店铺信息上应用
当数据库查询出的数据为空时,返回错误信息还要把空对象存入缓存中,并设置较短的有效时间。用户再次发送该请求时,直接掷中redis缓存的空对象,返回错误信息,不再向下查询数据库,低落数据库压力。
https://img-blog.csdnimg.cn/c705131a65f74cb68f94135ddce6d968.png
https://img-blog.csdnimg.cn/a84bb3c53b0f4141801cd836a257ec86.png
https://img-blog.csdnimg.cn/9f32e731a7c6477889e587fc1fceb06f.png
布隆过滤
利用布隆过滤算法,在请求进入Redis之前判定是否存在,不存在直接拒绝请求,但布隆过滤器实现比力复杂。
3.5 缓存雪崩

https://img-blog.csdnimg.cn/2b48523964534bb6879e5449fa1c8047.png
3.6 缓存击穿

https://img-blog.csdnimg.cn/a72dc276a2e94414b5894c57e77c0bcf.png
https://img-blog.csdnimg.cn/ec78aeb0d6e9444290eec3fb8abefb6d.png
逻辑逾期设置的一样平常不是TTL,设置缓存根本上是不停有效到活动竣事后,才移除缓存中数据
之所以会逻辑逾期,不是由于有效时间,而是由于数据更新了,缓存也必要更新数据,这时逻辑逾期。
https://img-blog.csdnimg.cn/8498971ea5d74ac09ac88452fb25a7a5.png
互斥锁方式解决缓存击穿问题
https://img-blog.csdnimg.cn/442b100d8dd342edb49bf48faa8a1527.png
自界说互斥锁
获取锁
    private boolean tryLock(String key){
      Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
      return BooleanUtil.isTrue(flag);
    }
开释锁
    private void unlock(String key){
      stringRedisTemplate.delete(key);
    }
https://img-blog.csdnimg.cn/1bf215853d7e4c5b9a942abfaaec254f.png
3.7 封装Redis工具类

   注意做逻辑逾期时,要先预热,缓存热点key
调用工具类将热点key都set到缓存
https://img-blog.csdnimg.cn/4bd50629212749f4a43aab784265355d.png
   使用工具类,实现缓存穿透解决代码,就两步,第一步导入工具类,第二部调用工具类
https://img-blog.csdnimg.cn/4fd45256752b422a99ab5956423a8c64.png
最后测试都通过了,结果图就不展示了
RedisData类封装的内容
https://img-blog.csdnimg.cn/a47d950b2679436b92558902a4aa7dae.png
工具类代码,老师封装的很好,有许多知识点,封装类值得反复咀嚼消化
@Slf4j@Componentpublic class CacheClient {    private final StringRedisTemplate stringRedisTemplate;    public CacheClient(StringRedisTemplate stringRedisTemplate){      this.stringRedisTemplate = stringRedisTemplate;    }    public voidset(String key, Object value, Long time, TimeUnit unit){      stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);    }    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){      //设置逻辑逾期      RedisData redisData = new RedisData();      redisData.setData(value);      redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));      //写入Redis      stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));    }    public <R,ID> R queryWithPassThrough(            String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time, TimeUnit unit){      String key = keyPrefix + id;      //1.从redis查询商店缓存      String json = stringRedisTemplate.opsForValue().get(key);      //2.判定是否存在      if (StrUtil.isNotBlank(json)){            //3.存在,直接返回            return JSONUtil.toBean(json, type);      }      //判定掷中的是否是空值      if (json != null){            //返回一个错误信息            return null;      }      //4.不存在,根据id查询数据库      R r = dbFallback.apply(id);      //5.不存在,返回错误      if (r == null){            //将空值写入redis            stringRedisTemplate.opsForValue().set(key,"",2,TimeUnit.MINUTES);            //返回错误信息            return null;      }      //6.存在写入redis      this.set(key,r,time,unit);      return r;    }    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);    public <R,ID> R queryWithLogicalExpire(            String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit) {      String key = keyPrefix + id;      //1.从redis查商店缓存      String json = stringRedisTemplate.opsForValue().get(key);      //2.判定是否存在      if (StrUtil.isBlank(json)) {            //3.存在,缓存中存的null            return null;      }      //4.掷中,先把json反序列化为对象      RedisData redisData = JSONUtil.toBean(json, RedisData.class);      R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);      LocalDateTime expireTime = redisData.getExpireTime();      //5.判定是否逾期      if (expireTime.isAfter(LocalDateTime.now())) {            //5.1未逾期,直接返回店铺信息            return r;      }      //5.2已逾期,必要缓存重修      //6.缓存重修      //6.1获取互斥锁      String lockKey = LOCK_SHOP_KEY + id;      boolean isLock = tryLock(lockKey);      //6.2判定是否获取锁成功      if (isLock) {            //6.3 成功,再举行二次判定,检察缓存中是否有数据,由于有大概是别人刚刚重修完开释锁,刚好获取到了            //6.4 开启独立线程,实现缓存重修            CACHE_REBUILD_EXECUTOR.submit(() -> {                try {                  //查询数据库                  R r1 = dbFallback.apply(id);                  //写入redis                  this.setWithLogicalExpire(key, r1, time, unit);                } catch (Exception e) {                  throw new RuntimeException(e);                } finally {                  //开释锁                  unlock(lockKey);                }            });      }      //6.6返回逾期的商店信息      return r;    }    private boolean tryLock(String key){
      Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
      return BooleanUtil.isTrue(flag);
    }
    private void unlock(String key){
      stringRedisTemplate.delete(key);
    }
} 四.达人探店

涉及基于List的点赞列表
基于SortedSet的点赞排行榜
4.1 发布博客

https://img-blog.csdnimg.cn/381576c0e5904b72a884ecca846224d2.png
根据前端页面提交的数据,我们用blog对象来接收,调用mp提供的接口向博客表里save一条数据,但是用户id前端没有传递过来,必要自己去ThreadLocal中去获取
https://img-blog.csdnimg.cn/e0fb0e9008de4ef8870acce46717e81b.png
4.2 检察博客

https://img-blog.csdnimg.cn/1a79d4ff77c245319c9eb58605732df1.png
这里可以使用Dto对象,把两表查询结果放入dto,然后返回dto对象,但是过于繁琐,这里直接在Blog实体类,添加用户头像和用户名称两个属性,并加上mp提供的注解@TableField(exist = false) //当前属性不属于表中字段
https://img-blog.csdnimg.cn/cf3720a7cf8d403697e55bfa63d64abf.png
逻辑分两步,一是根据传过来的id调用mp提供方法查到blog对象,二是根据userId查询user对象,将user中的icon头像,nickname昵称,存入set到blog对象中,再将blog对象返回
https://img-blog.csdnimg.cn/a89a9ad4377447b7a76b3c26b9b7c315.png
4.3 点赞

https://img-blog.csdnimg.cn/ec7c9f32c2184c89bbbbe719b564ccb4.png
https://img-blog.csdnimg.cn/b198066e4e46433690718c55b0383a61.png
这里使用到redis的set集合,key为blog的id,value为user的id,用set的ismembet方法判定,当前集合是否有userId,来判读该博客,用户是否已经点赞过了。每个key代表每条博客,每个key下的value集合代表所有点赞的用户id集合。
   点赞功能实现及逻辑代码
    /**
   * 点赞功能实现及判读逻辑
   * @param id
   * @return
   */
    @Override
    public Result likeBlog(Long id) {
      //1.获取登录用户
      Long userId = UserHolder.getUser().getId();
      //2.判读登录用户是否点赞
      String key = BLOG_LIKED_KEY + id;
      Boolean isMembet = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
      //3.若未点赞,可以点赞
      if (BooleanUtil.isFalse(isMembet)){
            //3.1 数据库点赞数+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 保存用户到redis的set集合中
            if (isSuccess){
                stringRedisTemplate.opsForSet().add(key, userId.toString());
            }
      }else {
            //4.若已点赞,取消点赞
            //4.1 数据库点赞数-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 清除redis的set集合里的userId
            if (isSuccess){
                stringRedisTemplate.opsForSet().remove(key, userId.toString());
            }
      }
      return Result.ok();
    }

   分页查询下的点赞业务
    @Override
    public Result queryHotBlog(Integer current) {
      // 根据用户查询
      Page<Blog> page = this.query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
      // 获取当前页数据
      List<Blog> records = page.getRecords();
      // 查询用户
      records.forEach(blog ->{
            this.queryBlogUser(blog);
            this.isBlogLinked(blog);
      });
      return Result.ok(records);
    }
   根据id查询博客详情页下的点赞业务
    @Override
    public Result queryBlogById(Long id) {
      //获取博客对象
      Blog blog = this.getById(id);
      if (blog == null){
            return Result.fail("博客不存在");
      }
      //存入用户名和头像
      queryBlogUser(blog);
      //存入是否点赞信息
      isBlogLinked(blog);
      return Result.ok(blog);
    }
   调用的工具方法
    /**
   * 网blog对象存入用户名和头像
   * @param blog
   */
    private void queryBlogUser(Blog blog) {
      Long userId = blog.getUserId();
      User user = userService.getById(userId);
      blog.setIcon(user.getIcon());
      blog.setName(user.getNickName());
    }
    /**
   * 往blog对象填入isLike信息(是否点赞)
   * @param blog
   */
    private void isBlogLinked(Blog blog) {
      //1.获取登录用户
      Long userId = UserHolder.getUser().getId();
      //2.判读登录用户是否点赞
      String key = BLOG_LIKED_KEY + blog.getId();
      Boolean isMembet = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
      //3.将是否点赞信息set到blog中
      blog.setIsLike(BooleanUtil.isTrue(isMembet));
    }
4.4 点赞排行榜

我们点击博客,进入详情页,其实是发送了两个请求,一个是根据id返回博客详细信息,另一个是根据id返回点赞排行榜,两个请求如下。
https://img-blog.csdnimg.cn/fb6087f4e284482e945c32717eeefeb5.png
https://img-blog.csdnimg.cn/b6adbbdd60b04054aaca7dc2735f216a.png
这里使用的计谋是用redis的sortSet来代替set,由于set集合是无序的,但排行榜必要表现前5个点赞的用户,要想有序且不重复,只能用sortSet了,把之前的点赞功能,用户id存入set集合改为存入ZSet,使用sorce(key,value)方法来获取该键值的sorce,若没有则返回null,用来代替之前set的ismembet方法
https://img-blog.csdnimg.cn/47371a6a3c9d43deb1fe3f81f955319e.png
判读是否点赞的工具方法也做了改动
    /**
   * 往blog对象填入isLike信息(是否点赞)
   * @param blog
   */
    private void isBlogLinked(Blog blog) {
      //1.获取登录用户
      UserDTO user = UserHolder.getUser();
      if (user == null){
            return;//用户未登录,无需查询是否点赞
      }
      Long userId = user.getId();
      //2.判读登录用户是否点赞
      String key = BLOG_LIKED_KEY + blog.getId();
      Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
      //3.将是否点赞信息set到blog中
      blog.setIsLike(score != null?true:false);
    }
注意:sql语句的 list.in(…, …)查询出来的是后点赞的在前,先点赞的在后,必要我们自界说sql查询,用last最后一条sql语句,手写order by的sql,具体代码如下
    /**
   * 根据博客id查询点赞排行榜
   * @param id
   * @return
   */
    @Override
    public Result queryBlogLikes(Long id) {
      String key = BLOG_LIKED_KEY + id;
      //1.查询top5的点赞用户 zrange key 0 4
      Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
      if (top5 == null || top5.isEmpty()){
            return Result.ok(Collections.emptyList());
      }
      //2.解析除其中的用户id
      List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
      String idStr = StrUtil.join(",", ids);
      //3.根据用户id查询用户将user处理为userDTO对象    where id (5 , 1)   order by field(id, 5, 1)
      List<UserDTO> userDTOS = userService.query()
                .in("id", ids).last("order by field(id," + idStr + ")").list()
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
      //4.返回
      return Result.ok(userDTOS);
    }
https://img-blog.csdnimg.cn/a1b568b675264b81b513f3c80817dfb8.png
五.好友关注

基于Set集合的关注,取关,共同关注,消息推送等功能
5.1 关注和取关

https://img-blog.csdnimg.cn/6d65efc1200c4285a2af9a41a1830f2d.png
业务逻辑:
①关注用户功能的实现,根据前端传递的isFollow的值判定,用户是否已关注该博主,若未关注,传过来的是true,则有关注资格,new出follow对象,将userId和userFollowId都set到follow对象里,再将follow对象save到数据库表中
②检察博客详情页的另一个请求,判定用户是否关注了博主,根据ThreadLocal中获取的userId和传递过来的userFollowId查询count是否大于0,结果返回true或false
代码比力简单,如下
    @Override //关注取关功能
    public Result follow(Long userFollowId, boolean isFollow) {
      Long userId = UserHolder.getUser().getId();
      if (isFollow){
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(userFollowId);
            save(follow);
      }else {//取关,删除delete from tb_follow where user_id = ? and follow_user_id = ?
            remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", userFollowId));
      }
      return Result.ok();
    }


    @Override//用户是否关注了博主
    public Result isFolow(Long userFollowId) {
      Long userId = UserHolder.getUser().getId();
      //查询是否关注select count(*) from tb_follow where user_id = ? and follow_user_id = ?
      Integer count = query().eq("user_id", userId).eq("follow_user_id", userFollowId).count();
      return Result.ok(count>0);
    }
结果图
https://img-blog.csdnimg.cn/6abe3a4b94b84eceb75659b5f8ed8d56.png
5.2 检察他人主页

https://img-blog.csdnimg.cn/e819b941b2874e70a62b16fc1d4f3ebb.png
一个根据userId查询用户,一个根据userId对博客分页查询

// UserController 根据id查询用户

@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id") Long userId){
        // 查询详情
        User user = userService.getById(userId);
        if (user == null) {
                return Result.ok();
        }
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        // 返回
        return Result.ok(userDTO);
}

// BlogController
@GetMapping("/of/user")
public Result queryBlogByUserId(
                @RequestParam(value = "current", defaultValue = "1") Integer current,
                @RequestParam("id") Long id) {
        // 根据用户查询
        Page<Blog> page = blogService.query()
                        .eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        return Result.ok(records);
}
5.3 共同关注

共同关注可以使用redis中的set数据布局,来求两个用户关注集合的交集,那么我们就必要更改关注功能的接口了,不仅要把数据存入数据库follow表中,还有把userId存入redis的set集合里
场景
https://img-blog.csdnimg.cn/4d4ba105d8b646a4a5bdba00f63002d2.png
关注功能接口所做的更改
https://img-blog.csdnimg.cn/42c91434ec6449eebd2c0f27d02812eb.png
到这里,我们把关注的用户存入了set集合里,key为用户id,value为被关注用户id。下面必要根据上面图片的请求,编写求交集的方法,交集里都是用户id,先将String转为Long型,再根据useId批量查询user转为userDTO,返回集合。代码如下
    @Override
    public Result followCommons(Long id) {
      Long userId = UserHolder.getUser().getId();
      String key1 = FOLLOW_USER_ID + userId;//当前登录用户的关注列表集合
      String key2 = FOLLOW_USER_ID + id;//点击查看的用户的关注列表集合
      //求交集
      Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
      if (intersect == null || intersect.isEmpty()){
            //无交集
            return Result.ok(Collections.emptyList());
      }
      //解析id集合
      List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
      //批量查询用户并转换为userDTO对象
      List<UserDTO> userDTOList = userService.listByIds(ids).stream().map(user ->
                        BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
      return Result.ok(userDTOList);
    }
最后结果图
https://img-blog.csdnimg.cn/56e74539b41245b6931e6f947b5cd420.png
5.4 关注推送

关注推送也叫做 Feed 流,直译为投喂。为用户连续的提供 “陶醉式” 的体验,通过无穷下拉革新获取新的信息。
5.4.1 Feed 流分析

https://img-blog.csdnimg.cn/5029b837998c4bd1ae8f886a7af21555.png
Feed 流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。
例如朋友圈
长处:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不肯定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
例如抖音,快手
长处:投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,大概起到反作用
本例中的个人页面,是基于关注的好友来做 Feed 流,因此采用 Timeline 的模式。
https://img-blog.csdnimg.cn/68106373c6864a23a4a53755f6bcf134.png
该模式的实现方案有三种:拉模式、推模式、推拉结合
拉模式:也叫做读扩散
每次读的时间获取消息,内存斲丧小,但读操作过于频繁,若用户关注了许多博主,一次要读的消息也是十分多,造成延迟较高
https://img-blog.csdnimg.cn/a492bd69a0c344edac54f38856c53f13.png
推模式:也叫做写扩散。
发消息时写入粉丝收件箱,内存占用更高,写操作频繁,若博主有许多粉丝,写操作更加繁重
https://img-blog.csdnimg.cn/9ccb773eca2f4a34b04d095439071ac4.png
推拉结合模式:也叫做读写混合,兼具推和拉两种模式的长处。
普通博主,粉丝少,可以采用推模式,写操作并不是很繁重
大v博主,粉丝多;分两种粉丝,生动粉,普通粉;生动粉,数量少,可以采用推模式;普通粉,数量多,但上线检察少,采用拉模式,什么时间看什么时间拉取。
https://img-blog.csdnimg.cn/84049ddd45a94dddbde89fddeda0d2ba.png
https://img-blog.csdnimg.cn/d6cc0c5ded2e4e5093aa6ea69e94bed0.png
5.4.2 推送到粉丝收件箱

案例:基于推模式实现关注推送功能
需求:
①修改新增探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱
②收件箱满足可以根据时间戳排序,必须用 Redis 的数据布局实现
③查询收件箱数据时,可以实现分页查询


[*]Feed 流中的数据会不停更新,所以数据的角标也在变革,因此不能采用传统的分页模式。
https://img-blog.csdnimg.cn/f3f8948f9ba34c9d960e4de50eaf92b8.png


[*]滚动分页
https://img-blog.csdnimg.cn/9d8c420c31c8466d86ac410f48d5cbc8.png
这里使用sortSet来实现收件箱 ,先将之前新增保存博客的功能接口修改一下,使得博客发布就能推送到粉丝
代码修改如下
    public Result saveBlog(Blog blog) {
      // 获取登录用户
      UserDTO user = UserHolder.getUser();
      blog.setUserId(user.getId());
      // 保存探店博文
      boolean isSuccess = save(blog);
      if (!isSuccess){
            return Result.fail("发布失败,请检查重试");
      }
      // 查询博文作者的所有粉丝
      List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
      for (Follow follow : follows) {
            // 获取粉丝id
            Long userId = follow.getUserId();
            // 推送笔记id给所有粉丝
            String key = "feed:" + userId;
            stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
      }
      // 返回id
      return Result.ok(blog.getId());
    }
5.4.3 滚动分页

下面就是展示推送的blog消息,使用的滚动分页,稍许复杂
https://img-blog.csdnimg.cn/cea5bb9f8940414ca0e31b5ccf2b7671.png
https://img-blog.csdnimg.cn/7593d4eeeb6d4874ac29b2a7fee869ca.png
redis中分析滚动查询
https://img-blog.csdnimg.cn/b6ce88618c2e4e999a969128531d1bb4.png
   详细代码,分析与注解都在代码上
    /**
   * 滚动查询,展示博主推送的笔记, 新发布的滚动查询查不到,但是往上滚,前端做了处理,就是刷新重新查询,开始位置在当前最新位置
   * @param max
   * @param offset
   * @return
   */
    @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
      //获取当前用户
      Long userId = UserHolder.getUser().getId();
      //查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count    limit是小于等于的意思,小于等于查询的最后时间戳
      String key = "feed:" + userId;
      Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
                .reverseRangeByScoreWithScores(key, 0, max, offset, 2);
      //非空判断
      if (typedTuples == null || typedTuples.isEmpty()){
            return Result.ok();
      }
      //解析数据: blogId,minTime(时间戳), offset
      ArrayList<Object> ids = new ArrayList<>(typedTuples.size());
      long minTime = 0;//这个minTime是上次查询的最小时间戳,作为当次查询的最大时间戳来开始查
      int os = 1;
      for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
            //获取博客id转换为Long型并存入ids数组中
            ids.add(Long.valueOf(typedTuple.getValue()));
            //获取分数      判读得到最后一次的时间戳,以及偏移量
            long time = typedTuple.getScore().longValue();
            if (time == minTime){
                os++;
            }else {
                minTime = time;
                os = 1;
            }
      }

      //根据id查询blog,先把前面保存id的ids数组转为字符串
      String idStr = StrUtil.join(",", ids); //由于用mp提供的listByIds是用in方法查,不能保证顺序
      List<Blog> blogs = query().in("id", ids).last("order by field(id," + idStr + ")").list();
      for (Blog blog : blogs) {
            //查询blog有关用户信息
            queryBlogUser(blog);
            //查询blog是否已被点赞
            isBlogLinked(blog);
      }

      //封装并返回
      ScrollResult r = new ScrollResult();
      r.setList(blogs);
      r.setOffset(os);
      r.setMinTime(minTime);
      return Result.ok(r);
    }
六.优惠券秒杀

涉及Redis的计数器,Lua脚本Redis
分布式锁
Redis的三种消息队列
6.1 全局ID生成器

全局id生成器,是一种在分布式体系下用俩生玉成局唯一ID的工具,满足以下特性
https://img-blog.csdnimg.cn/103b091ec91f422bb4a9fc0b2c9bd74c.png
https://img-blog.csdnimg.cn/98910536004e41a9bd21d729fe1af748.png
ID生成器工具类代码,效率根本可以达到每秒1w个
@Component
public class RedisIdWorker {
    //开始时间戳
    private static final long BEGIN_TIMESTAMP = 1674086400L;

    //序列号位数
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
      this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix){
      //1.生成时间戳
      LocalDateTime time = LocalDateTime.now();
      long nowSecond = time.toEpochSecond(ZoneOffset.UTC);
      long timestamp = nowSecond - BEGIN_TIMESTAMP;
      //2.生成序列号,redis自增长,redis单个key自增长有上限,2的64次方
      //2.1获取当前日期,精确到天
      String date = time.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
      long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
      //3.拼接并返回,不能使用字符串方式拼接
      return timestamp << COUNT_BITS | count;//先向左移32位,那么低32位全为0,跟序列号进行或操作

    }

    /**
   * 生成开始时间戳
   * @param args
   */
    public static void main(String[] args) {
      LocalDateTime time = LocalDateTime.of(2023, 1, 19, 0, 0, 0);
      long second = time.toEpochSecond(ZoneOffset.UTC);
      System.out.println(second);
    }
}
6.2 秒杀下单

下单业务本质上就是修改优惠券表中的number字段,再在order表中新增一条订单数据;点击抢购优惠券,发送请求如下
https://img-blog.csdnimg.cn/eeb9c5564c2445cfa86dfaa0072580f2.png
https://img-blog.csdnimg.cn/47c66abdd0aa4b1e9375ca664e157c78.png
以上是业务逻辑,其中涉及到的表操作
1.seckill_voucher表的根据id查询操作
2.seckill_voucher表的根据id修改剩余数量的操作
3.voucher_order表的新增订单数据操作
涉及到多表操作必要添加事务,同成功,同失败
https://img-blog.csdnimg.cn/5117b75d3b6e418cb07a45326b37b5d7.png
voucher_order表的新增订单数据
https://img-blog.csdnimg.cn/def87a1f35d1483c95d1d090980e82ea.png
6.3 超卖问题

https://img-blog.csdnimg.cn/5ef26a8ec28c4ea6baa69b934b2e8a2f.png
通过加锁的方式行止理多线程问题
https://img-blog.csdnimg.cn/0be3134eb35b45ccb2ce691b689a4892.png
悲观锁实现比力简单,操作前获取锁,操作竣事才开释锁,让多个线程串行执行,但是你让并发线程串行,效率十分低下
乐观锁设计
   第一种加版本号的方式
逻辑如下
https://img-blog.csdnimg.cn/e5b74b9d1e2e49bf8bb8e8e2a5f687d2.png
   第二种方式CAS法
用版本号方式,发现,用每次查的版本跟操作前查的版本作对比是否同等,那还不如直接查库存,用库存对比,更加简化,逻辑如下
https://img-blog.csdnimg.cn/9b7fa5b44f734bb5845f4e0642ff1442.png
主要是在扣减操作上加上条件,查询的值不同等,就不执行操作
在idea中,代码修改,在更新数据库操作,也就是减库存的时间,加上判定条件,eq比力
      //5.扣减库存
      boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")//set stock = stock -1
                .eq("voucher_id",voucherId).eq("stock",voucher.getStock()) //where id = ? and stock =?
                .update();
如果是匹配库存,成功率很低,如下压力测试,100张只售出23张,但是我是开的有200个线程
https://img-blog.csdnimg.cn/4759821f544d4ce296489689869f98ea.png
成功率低的原因
https://img-blog.csdnimg.cn/9c4ae03a094e472d9e623cbd1797e7eb.png
所以,我们更改库存操作时,加的条件不必是必须前面查到的库存数和更改时查到的库存数同等,我们只必要让更改时的条件为查到的库存数大于0即可,代码修改如下
      //5.扣减库存
      boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")//set stock = stock -1
                .eq("voucher_id",voucherId).gt("stock",0) //where id = ? and stock > 0
                .update();
6.4 一人一单

业务逻辑修改,防止黄牛批量刷券
https://img-blog.csdnimg.cn/2629ea4a291b45aab0cc50dc80c836e4.png
加锁方式,在并发请求时,包管一个用户只能下一单
https://img-blog.csdnimg.cn/419bb5e773e64249ab6bd7199046b6c7.png
以下是最后结果图。这个一人一单的单体项目这一节,知识点太多了,像spring框架事务失效、aop代理对象、synchronized锁对象等等,这一节值得多看几遍,我自认为我是没有本领将这些知识点论述的非常清晰的,所以直接多看这节视频,知识点我就不记载了,就算记载也是模糊不清不能像老师那么浅近易懂。
https://img-blog.csdnimg.cn/265bbd0399b44295bba47767d4d83b3a.png
https://img-blog.csdnimg.cn/11a4180c57ee40f38ef903aaba011347.png
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
我们将服务启动两份,端口分别为 8081 和 8082:ctrl+d复制启动类,添加端口设置
https://img-blog.csdnimg.cn/0aca80b6f1e3443ea5dd2c94d1e4a56b.png
https://img-blog.csdnimg.cn/464730447f644f95881e2bda35667902.png
然后修改 nginx 的 conf 目次下的 nginx.conf 文件,设置反向代理;更改完毕后必要重新启动nginx服务
nginx.exe -s reload
https://img-blog.csdnimg.cn/aa015252546340b49454edc69448cba2.png
如今,用户请求会在这两个节点上负载平衡,再次测试下发现存在线程安全问题。
但是在集群模式下,加锁只是该台jvm给当前这台服务器处理的请求加锁,而集群是多台服务器轮询处理请求,会造成每台服务器都有一个加锁的线程,每台服务器都会有一个新订单创建处理
https://img-blog.csdnimg.cn/12e75c8ab47f4f71a00dbeec746fc406.png
6.5 分布式锁

分布式锁:满足分布式体系或集群模式下多进程可见并且互斥的锁。
https://img-blog.csdnimg.cn/f1030e64d3524e408aace3195c529a5a.png
分布式锁的特点:多进程可见、互斥、高可用、高性能(高并发)、安全性
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有许多,常见的分布式锁有以下三种
https://img-blog.csdnimg.cn/1805b72252e74ab7af85655e88b0f7a0.png
6.5.1 初级版本

用redis实现分布式锁
https://img-blog.csdnimg.cn/6ad8cb14dcbf479e8f7d22203423a3c9.png
   Redis分布式锁原理:基于setnx下令–>key存在的情况下,不更新value,而是返回nil
那么利用key是唯一的特性来加锁,好比一人一单业务,key名称精确到userId,那么同一个用户无论发多少次请求,能成功创建键值的只有一个,由于setnx下令,背面的请求在获取锁创建键值就会失败
redis获取锁下令测试,设置逾期时间为10秒的锁,nx–>唯一
https://img-blog.csdnimg.cn/f926e68ba835453ba39828c1f155d15a.png
以下是结果图,两台机器发送请求,只有一台能够获取锁成功,这就是分布式锁的作用,它的作用域不再是单体项目,单机模式,而是在整个集群模式下,它的锁都生效。
https://img-blog.csdnimg.cn/1066533dd4dd4b22927162a432d96e0f.png
https://img-blog.csdnimg.cn/d797c119c28d446c8fb7278864199b5c.png
Redis中存的也是唯一的键值
https://img-blog.csdnimg.cn/fde13c1227f2473abea102d39d8689f0.png
数据库的表中数据变革
https://img-blog.csdnimg.cn/ed482bcb2a3941818424a0e2d1de5693.png
6.5.2 误删问题

场景形貌
https://img-blog.csdnimg.cn/3cdf34b691ba4ccba8678e4fe153a724.png
解决逻辑如下图
https://img-blog.csdnimg.cn/552752d76b094dc9ade79a51d2b0b7cc.png
   其实就是在每次开释锁的时间举行判定,判定当前锁与自己是否同等,不同等大概是别人的锁,不开释
业务流程图
https://img-blog.csdnimg.cn/dd758b503770429cbc9d23cbbb070f4b.png
6.5.3 原子性问题

场景形貌
https://img-blog.csdnimg.cn/8c9d3db48b79420799d88e2a6f77f900.png
   判定锁的操作和开释锁的操作得成一个原子性操作,一起执行,要阻塞都阻塞,要通过都通过
Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 下令,确保多条下令执行时的原子性。
Lua是一种编程语言,它的根本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
我们将开释锁的操作写到Lua脚本中去,直接调用脚本
开释锁的业务逻辑是如许的:
①获取锁中的线程标示
②判定是否与指定的标示(当前线程标示)同等
③如果同等则开释锁(删除)
④如果不同等则什么都不做
Lua脚本代码
-- 这里的 KEYS 就是锁的 key,这里的 ARGV 就是当前线程标识
-- 获取锁中的线程标识 get key
local id = redis.call('get', KEYS);
-- 比较线程标识与锁中的标识是否一致
if (id == ARGV) then
    -- 释放锁 del key
    return redis.call('del', KEYS)
end
return 0
6.5.4 Redisson 分布式锁

基于 setnx 实现的分布式锁存在下面的问题
1.不可重入:同一个线程无法多次获取同一把锁
2.不可重试:获取锁只尝试一次就返回 false,没有重试机制
3.超时开释:锁超时开释虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁开释,存在安全隐患
4.主从同等性:如果 Redis 提供了主从集群,主从延同步在延迟,当主机宕机时,如果从机同步主机中的数据,则会出现锁失效
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格
它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包罗了各种分布式锁的实现。
说白了就是一个封装各种锁且十分完善的工具,前面又白雪了,分布式锁轮子人家已经做的很完善了,咱学的就是个头脑
   官网地址:https://redisson.org
GitHub 地址:https://github.com/redisson/redisson
要使用Redisson,先导入它的坐标
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>
Redisson类
@Configuration
public class RedisConfig {
    @Bean
    public RedissonClient redissionClient() {
      // 配置类
      Config config = new Config();

      // 添加 Redis 地址,此处添加了单点的地址,也可以使用 config.useClusterServers() 添加集群地址
      config.useSingleServer().setAddress("redis://192.168.2.12:6379").setPassword("123321");

      // 创建客户端
      return Redisson.create(config);
    }
}
使用 Redisson 的分布式锁
@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
    // 获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock");
    // 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试过),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
    // 判断锁是否获取成功
    if (isLock) {
      try {
            System.out.println("执行业务");
      } finally {
            //释放锁
            lock.unlock();
      }
    }
}
   Redisson 分布式锁原理
https://img-blog.csdnimg.cn/33f1fe67018d4c9e996cb68bbc5b68d9.png
   连锁计谋:不再有主从节点,都获取成功才能获取锁成功,有一个节点获取锁不成功就获取锁失败
https://img-blog.csdnimg.cn/d5f50e66e68b448bb1f1e7e89373e431.png
如果多个主节点包管锁的话,一个主节点宕机了,其它线程只能获得一个新主节点的锁,获取不到其它两个锁,还会获取失败
这里主要是防止主节点宕机后,其它线程获得新主节点的锁,引起线程安全问题
   总结
①不可重入Redis 分布式锁
原理:利用 setnx 的互斥性;利用 ex 避免死锁;开释锁时判定线程标示
缺陷:不可重入、无法重试、锁超时失效
②可重入的 Redis 分布式锁
原理:利用 hash 布局,记载线程标示和重入次数;利用 watchDog 延续锁时间;利用信号量控制锁重试等待
缺陷:Redis 宕机引起锁失效问题
③Redisson 的 multiLock连锁
原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维本钱高、实现复杂
6.5.5 秒杀优化

流程图
https://img-blog.csdnimg.cn/b9c54dc26288497ab23ff3832435c029.png
为避免所有操作都在数据库上执行,分离成两个线程,一个线程判定用户的购买资格,发现用户有购买资格后再开启一个独立的线程来处理耗时较久的减库存、写订单的操作。
可以将耗时较短的两步操作放到 Redis 中,在 Redis 中处理对应的秒杀资格的判定。Redis 的性能是比 MySQL 要好的。别的,还必要引入异队伍列记载相关的信息。
redis部分处理逻辑, Lua脚本封装操作包管原子性, redis这里选择的存储类型为set,由于key不能重复,而set恰恰是无序不重复的
https://img-blog.csdnimg.cn/7c28b3c9c2cb47848da9ecdf630d41cd.png
案例:改进秒杀业务,提高并发性能
需求:
1.新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中
2.基于 Lua 脚本,判定秒杀库存、一人一单,决定用户是否抢购成功
3.如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列
4.开启线程任务,不停从阻塞队列中获取信息,实现异步下单功能
分步实现如下
1.在新增优惠券的业务实现类中,把秒杀优惠券的库存信息保存到redis里
VoucherServiceImpl类中
https://img-blog.csdnimg.cn/218d54fb25e64eb6b22d365760207abb.png
2.编写lua脚本,按照下面的业务流程逻辑,在脚本中完成业务实现
https://img-blog.csdnimg.cn/e1397310d7374a17833ed4852a343328.png
以下是代码及详细注解
-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV
-- 1.2 用户id
local userId = ARGV

-- 2.数据key
-- 2.1 库存key   key 是优惠的业务名称加优惠券idvalue 是优惠券的库存数
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key   key 也是拼接的业务名称加优惠权id而value是用户id, 这是一个set集合,凡购买该优惠券的用户都会将其id存入集合中
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0)then--将get的value先转为数字类型才能判断比较
    -- 3.2 库存不足,返回1
    return 1
end
-- 3.3 判断用户是否下单 sismember orderKey userId命令,判断当前key集合中,是否存在该value;返回1存在,0不存在
if (redis.call('sismember', orderKey, userId) == 1) then
    --3.4 存在说明是重复下单,返回2
    return 2
end
-- 3.5 扣库存
redis.call('incrby', stockKey, -1)
-- 3.6 下单(保存用户)
redis.call('sadd', orderKey, userId)
return 0
3.java代码中执行lua脚本,并判定,抢购成功的生成订单并存入阻塞队列
首先注入脚本
    private IVoucherOrderService proxy;//定义代理对象,提前定义后面会用到
    //注入脚本
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
      SECKILL_SCRIPT = new DefaultRedisScript<>();
      SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
      SECKILL_SCRIPT.setResultType(Long.class);
    }
其次运行脚本,且判定不满足的请求直接返回提示信息
    @Override
    public Result seckillVoucher(Long voucherId) { //使用lua脚本
      //获取用户
      Long userId = UserHolder.getUser().getId();
      //1.执行lua脚本
      Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(), //这里是key数组,没有key,就传的一个空集合
                voucherId.toString(), userId.toString()
      );
      //2.判断结果是0
      int r = result.intValue();//Long型转为int型,便于下面比较
      if (r != 0){
            //2.1 不为0,代表没有购买资格
            returnResult.fail(r == 1?"优惠券已售罄":"不能重复购买");

      }
最后是将满足条件的给存放进阻塞队列中
创建一个BlockingQueue阻塞队列
   BlockingQueue这个阻塞队列特点:当一个线程尝试从队列获取元素的时间,如果没有元素该线程阻塞,直到队列中有元素才会被唤醒并获取元素
    //创建阻塞队列这个阻塞队列特点:当一个线程尝试从队列获取元素的时候,如果没有元素该线程阻塞,直到队列中有元素才会被唤醒获取
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);//初始化阻塞队列的大小
接下来就是将满足条件的请求,给生成订单,并把订单对象add到阻塞队列中,接上面的代码,完成整个第三步
      //2.2 为0,有购买资格,把下单信息保存到阻塞队列中
      //7.创建订单   向订单表新增一条数据,除默认字段,其他字段的值需要set
      VoucherOrder voucherOrder = new VoucherOrder();
      //7.1订单id
      long orderId = redisIdWorker.nextId("order");
      voucherOrder.setId(orderId);
      //7.2用户id
      voucherOrder.setUserId(userId);
      //7.3代金券id
      voucherOrder.setVoucherId(voucherId);
      //放入阻塞对列中
      orderTasks.add(voucherOrder);
      //获取代理对象
      proxy = (IVoucherOrderService) AopContext.currentProxy();
      //3.返回订单id
      return Result.ok(orderId);
    }
4.开启线程任务,实现异步下单功能
首先创建一个线程池
再界说一个线程任务,但是注意,线程任务必要在用户秒杀订单之前开始,用户一但开始秒杀,队列就会有新的订单,线程任务就应该立即取出订单信息,这里利用spring提供的注解,在类初始化完毕后立即执行线程任务,详细代码如下
    //创建线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    //利用spring提供的注解,在类初始化完毕后立即执行线程任务
    @PostConstruct
    private void init(){
      SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
线程任务代码如下
    //创建线程任务,内部类方式
    private class VoucherOrderHandler implements Runnable{

      @Override
      public void run() {
            //1.获取队列中的订单信息
            try {
                VoucherOrder voucherOrder = orderTasks.take();
                //2.创建订单,这是调之前那个创建订单的方法,需要稍作改动
                handleVoucherOrder(voucherOrder);
            } catch (Exception e) {
                log.info("异常信息:",e);
            }
      }
    }
创建调用的handleVoucherOrder方法,这里的获取锁操作只是做最后的兜底,以防万一,由于前面lua脚本都已经判定过了
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
      Long userId = voucherOrder.getUserId();
      //创建锁对象
      SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
      //获取锁
      boolean isLock = lock.tryLock(1200);
      //判断是否获取锁成功
      if (!isLock){
            log.error("您已购买过该商品,不能重复购买");
      }
      try {
            proxy.createVoucherOrder(voucherOrder);//使用代理对象,最后用于提交事务
      } catch (IllegalStateException e) {
            throw new RuntimeException(e);
      } finally {
            lock.unlock();//释放锁
      }
    }
createVoucherOrder创建订单方法,这里一人一单的其实也不必判读了,lua脚本都写好了,这里只是兜底
@Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder){
      Long voucherId = voucherOrder.getVoucherId();
      //5.一人一单
      Long userId = voucherOrder.getId();
      //5.1查询订单
      int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
      //5.2判断是否存在
      if (count > 0){
            log.error("您已经购买过了");
      }
      //6.扣减库存
      boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")//set stock = stock -1
                .eq("voucher_id",voucherId).gt("stock",0) //where id = ? and stock > 0
                .update();
      if (!success){
            log.error("库存不足!");
      }
      this.save(voucherOrder);

    }
最后来分析以下整个优化思路
①编写lua脚本,对于超卖问题和一人一单举行解决处理,超卖用CAS方法判定库存是否大于0,一人一单用redis的set集合的sismenber判读该优惠券(key)下的用户id(value)是否唯一
②Java代码中注入脚本,并执行脚本判定脚本返回结果,若不为脚本结果0,直接返回错误提示
③若脚本结果为0,代表有购买优惠券资格,将new VoucherOrder创建订单对象,并set orderId,userId,voucherId。再把订单对象放入阻塞队列中,返回订单id给用户
④创建线程池,并界说线程任务,但注意,线程任务必须在方法执行前执行,使用到spring提供的注解在类初始化完成后执行线程任务
⑤线程任务中获取阻塞队列的订单对象,然后调用handleVoucherOrder方法传入voucherOrder
⑥handleVoucherOrder方法其实是再次获取锁,这个就是个纯兜底,作用不大。并在获取锁成功后调用createVoucherOrder方法扣减库存创建订单,由于都是对数据库的操作,因此要提交事务
至此,整个秒杀业务优化完毕
   总结
秒杀业务的优化思路是什么?
先利用 Redis 完成库存余量、一人一单判定,完成抢单业务
再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
内存限制问题 —> 我们使用的是JDK里的阻塞队列,是基于JVM的内存,高并发海量请求下造成内存溢出还有服务宕机情况下内存数据丢失
数据安全问题
个人认为还存在的问题
队列满了怎么办 ?
子线程下单失败怎么办?
订单太多了超过阻塞队列大小了怎么办?
拒绝计谋怎么设计?
待消耗的消息是否应该持久化,不然宕机了消息不就丢失了?
还有如何确保消息确实被消耗成功了,不然消耗失败了无法重试
6.5.6 消息队列

由于前面的阻塞队列是基于JVM的内存实现,那么不可避免的两个大问题,①高并发海量访问,创建订单,队列很快就超出上限造成内存溢出;②JVM内存没有持久化机制,若服务出现重启或宕机,阻塞队列中的所有任务都会丢失。所以我们使用MQ
https://img-blog.csdnimg.cn/04d6993ea3d14929ae618890624b8d2d.png
MQ是JVM以外的服务,不受JVM内存限制,且MQ中的所有消息会做持久化,如许即使重启或宕机,数据不会丢失。消息投递给消耗者后必要消耗者确认,未确认消息会不停存在下一次继承投递,确保消息至少被消耗一次
6.5.6.1 基于 List 布局模拟消息队列

Redis 的 list 数据布局是一个双向链表
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP 来实现。
不外要注意的是,当队列中没有消息时 RPOP 或 LPOP 操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。
因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞结果。
https://img-blog.csdnimg.cn/314909a95a764f63b5957c026824bddf.png
基于 List 的消息队列有哪些优缺点
长处
利用 Redis 存储,不受限于 JVM 内存上限
基于 Redis 的持久化机制,数据安全性有包管
可以满足消息有序性
缺点
无法避免消息丢失
只支持单消耗者
6.5.6.2 基于 PubSub 的消息队列

PubSub(发布订阅) 是 Redis 2.0 版本引入的消息传递模型。
顾名思义,消耗者可以订阅一个或多个channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern :订阅与 pattern 格式匹配的所有频道
pattern – 通配符方式
?:匹配一个字符
*:匹配多个字符
ae:匹配括号内存在的字符
https://img-blog.csdnimg.cn/13b98faa3b6b4a8dbe49c8f527541e2a.png
基于 PubSub 的消息队列有哪些优缺点
长处:采用发布订阅模型,支持多生产、多消耗
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
6.5.6.3 基于 Stream 的消息队列

   单消耗模式
发送消息的下令
https://img-blog.csdnimg.cn/5fae09c3c8b14147adba00ea0b4b6eff.png
   XADD key threshold ] *|ID field value
key:队列名称
:如果队列不存在时,确定是否自动创建队列,默认自动创建
threshold ]:设置消息队列的最大消息数量
|ID:消息的唯一 ID, 代表由 Redis 自动生成,格式是 ”时间戳-递增数字“,例如:”1666161469358-0“
field value :发送到队列中的消息,称为 Entry。格式为多个 Key-Value 键值对。
例如:创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用 Redis 自动生成 ID
   127.0.0.1:6379> XADD users * name jack age 21 “1644805700523-0”
读取消息下令
https://img-blog.csdnimg.cn/1c7f8ae2bdcf41809131cab83c68beef.png
读取消息的方式之一:XREAD
:每次读取消息的最大数量;
:当没有消息时,确定是否阻塞,阻塞则添加具体的 milliseconds (阻塞时长)
STREAMS key :从哪个队列读取消息,Key 就是队列名;
ID :起始 ID,只返回大于该 ID 的消息;0 代表从第一个消息开始,$ 代表从最新的消息开始。
例如,使用 XREAD 读取第一个消息
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "queue"
   2) 1) 1) "1666169070359-0"
         2) 1) "name"
            2) "jack"
            3) "age"
            4) 20
XREAD 阻塞方式,读取最新的消息
XREAD COUNT 1 BLOCK STREAMS queue $
注意:
当我们指定起始 ID 为 $ 时,代表读取最新的消息
如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条
云云便会出现漏读消息的问题
STREAM 类型消息队列的 XREAD 下令特点:
1.消息可回溯(消息永世的保存在消息队列中)
2.一个消息可以被多个消耗者读取
3.可以阻塞读取
4.有消息漏读的风险(缺点)
   消耗者组模式
消耗者组(Consumer Group):将多个消耗者划分到一个组中,监听同一个队列。
其具备下列特点:
消息分流:队列中的 消息会分流给组内差别的消耗者,而不是重复消耗,从而加快消息处理的速度。
消息标示:消耗者组会维护一个标示,记载最后一个被处理的消息,即使消耗者宕机重启,还会从标示之后读取消息,确保每一个消息都会被消耗。(解决漏读问题)
消息确认:消耗者获取消息后,消息处于 pending 状态,并存入一个 pending-list。
当处理完成后必要通过 XACK 下令来确认消息,标记消息为已处理,才会从 pending-list 中移除。(解决消息丢失问题)
创建消耗者组
XGROUP CREATE key groupName ID
key:队列名称
groupName:消耗者组名称
ID:起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
# 删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消耗者组读取消息
   XREADGROUP GROUP group consumer STREAMS key ID
group:消耗组名称
consumer:消耗者名称,如果消耗者不存在,会自动创建一个消耗者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动 ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始 ID:
“>”:从下一个未消耗的消息开始
其它:根据指定 id 从 pending-list 中获取已消耗但未确认的消息。
例如 0,是从 pending-list 中的第一个消息开始
   STREAM 类型消息队列的 XREADGROUP 下令特点
消息可回溯
可以多消耗者争抢消息,加快消耗速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,包管消息至少被消耗一次
   Redis 三种消息队列的对比
https://img-blog.csdnimg.cn/ca943be27aee4b1986e4c919e503a271.png
   Stream消息队列异步秒杀下单
需求:
①创建一个 Stream 类型的消息队列,名为 stream.orders
②修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包罗 voucherId、userId、orderId
③项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单
redis客户端下令行执行如下下令,创建消息队列
XGROUP CREATE stream.orders g1 0 MKSTREAM
Lua脚本改动
-- 1.参数列表
-- 1.1.优惠券 id
local voucherId = ARGV
-- 1.2.用户 id
local userId = ARGV
-- 1.3.订单 id
local orderId = ARGV

-- 2.数据 key
-- 2.1.库存 key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单 key
local orderKey = 'seckill:order:' .. voucherId

local stockKey_value = redis.call('get', stockKey)

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if (tonumber(stockKey_value) <= 0) then
    -- 3.2.库存不足,返回 1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,则说明该用户是重复下单(这是不允许的),则返回 2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中:XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

return 0

七.附近商户

Redis的GeoHash应用
7.1 GEO 数据布局根本用法

GEO 就是 Geolocation 的简写情势,代表地理坐标。
Redis 在 3.2 版本中到场了对 GEO 的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。
常见的下令有:


[*] GEOADD:添加一个地理空间信息,包罗:经度(longitude)、纬度(latitude)、值(member)
[*] GEODIST:计算指定的两个点之间的距离并返回
[*] GEOHASH:将指定 member 的坐标转为 hash 字符串情势并返回
[*] GEOPOS:返回指定member的坐标
[*] GEORADIUS:指定圆心、半径,找到该圆内包罗的所有 member,并按照与圆心之间的距离排序后返回。6.2 以后已废弃
[*] GEOSEARCH:在指定范围内搜刮 member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2 新功能
[*] GEOSEARCHSTORE:与 GEOSEARCH 功能同等,不外可以把结果存储到一个指定的 key。 6.2.新功能
   Redis中使用示范
https://img-blog.csdnimg.cn/23985f4ad39142e4a27345f58bdc0624.png
https://img-blog.csdnimg.cn/cbe0ebd3963c4642a55d160309b618ae.png
7.2 查找附近商店功能

https://img-blog.csdnimg.cn/8c69878753d54ab2b0bd588fabbe128d.png
7.2.1导入店铺数据到Redis的GEO

https://img-blog.csdnimg.cn/87e3a309ed1542418d5bbd8f6b26aca7.png
    @Test
    void loadShopData(){
      //查询店铺信息
      List<Shop> list = shopService.list();
      //把店铺分组,按照typeId分组,typeId一致的放到一个集合
      Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
      //分批完成写入Redis
      for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
            //获取类型id
            Long typeId = entry.getKey();
            String key = "shop:geo" + typeId;
            //获取同类型的店铺的集合
            List<Shop> value = entry.getValue();
            List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
            //写入redisGEOADD key 经度 纬度 member
            for (Shop shop : value) {
                //stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
                locations.add(new RedisGeoCommands.GeoLocation<>(
                        shop.getId().toString(),
                        new Point(shop.getX(),shop.getY())
                ));
            }
            stringRedisTemplate.opsForGeo().add(key, locations);
      }

    }
https://img-blog.csdnimg.cn/bc43ba10290542278bf7510b580711a1.png
7.2.2 实现查找附近商店功能

SpringDataRedis 的 2.3.9 版本并不支持 Redis 6.2 提供的 GEOSEARCH 下令
因此我们必要提示其版本,修改自己的 pom.xml,内容如下
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
      <exclusion>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
      </exclusion>
      <exclusion>
            <artifactId>lettuce-core</artifactId>
            <groupId>io.lettuce</groupId>
      </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-redis</artifactId>
    <version>2.6.2</version>
</dependency>
<dependency>
    <artifactId>lettuce-core</artifactId>
    <groupId>io.lettuce</groupId>
    <version>6.1.6.RELEASE</version>
</dependency>

这其中逻辑我也没消化完全,等后期我功力有成,再来详细表述。
最后结果图
https://img-blog.csdnimg.cn/5f4ab5844cc7461fb3552b33c5270e27.png
八.用户签到

Redis的BitMap数据统计功能
8.1 BitMap

假如我们用一张表来存储用户签到信息,其布局应该如下
https://img-blog.csdnimg.cn/22deb036394f4da79a2ff71cc1baa8b7.png
假如用户数量庞大,平均每人每年签到次数为 10 次,则这张表一年的数据量为 1 亿条
每签到一次必要使用(8 + 8 + 1 + 1 + 3 + 1)共 22 字节的内存,一个月则最多必要 600 多字节
我们按月来统计用户签到信息,签到记载为 1,未签到则记载为 0
https://img-blog.csdnimg.cn/addb1323835a4d7fba137dd003296202.png
把每一个 bit 位对应当月的每一天,形成了映射关系。用0和1标示业务状态,如许一个月也只斲丧31位(4字节)这种思路就称为位图(BitMap)
Redis 中 是利用 string 类型数据布局实现 BitMap,因此最大上限是 512M,转换为 bit 则是 2^32个 bit 位。
BitMap 的操作下令有:


[*] SETBIT:向指定位置(offset)存入一个 0 或 1
[*] GETBIT :获取指定位置(offset)的 bit 值
[*] BITCOUNT :统计 BitMap 中值为 1 的 bit 位的数量
[*] BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
[*] BITFIELD_RO :获取 BitMap 中 bit 数组,并以十进制情势返回
[*] BITOP :将多个 BitMap 的结果做位运算(与 、或、异或)
[*] BITPOS :查找 bit 数组中指定范围内第一个 0 或 1 出现的位置
https://img-blog.csdnimg.cn/4d93596563d74cab9a5b3e6ec7da7718.png
8.2 签到功能

https://img-blog.csdnimg.cn/8ad337d38e154a25811142c38efd8d0a.png
由于 BitMap 底层是基于 String 数据布局,因此其操作也都封装在字符串相关操作中了
实现类代码
    @Override
    public Result sign() {
      //获取登录用户
      Long userId = UserHolder.getUser().getId();
      //获取当前日期
      LocalDateTime now = LocalDateTime.now();
      //拼接key
      String format = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
      String key = "sign:"+ userId + format;
      //获取今天是本月的第几天
      int dayOfMonth = now.getDayOfMonth();
      //写入redis select key offset 1
      stringRedisTemplate.opsForValue().setBit(key, dayOfMonth -1, true);//true代表 1为签到,0为未签到
      return Result.ok();
    }
测试通过
https://img-blog.csdnimg.cn/220af41ea9e04e228e6fdcb179cfc8ae.png
https://img-blog.csdnimg.cn/cc19d5c57cb64354a23538a928787475.png
8.3 签到统计

https://img-blog.csdnimg.cn/3df03cb214374c448a9eecca8a96b0b3.png
https://img-blog.csdnimg.cn/0930bd5a078b4bc2bec5d3eaa651a77c.png
   /**
   * 统计签到次数
   * @return
   */
    @Override
    public Result signCount() {
      //获取登录用户
      Long userId = UserHolder.getUser().getId();
      //获取当前日期
      LocalDateTime now = LocalDateTime.now();
      //拼接key
      String format = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
      String key = "sign:"+ userId + format;
      //获取今天是本月的第几天
      int dayOfMonth = now.getDayOfMonth();
      //获取本月截至今天为止的所有签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
      List<Long> result = stringRedisTemplate.opsForValue().bitField(
                key,
                BitFieldSubCommands.create()
                        .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
      );
      if (result == null || result.isEmpty()){
            //没有任何签到结果
            returnResult.ok(0);
      }
      Long num = result.get(0);
      if (num == null || num == 0){
            return Result.ok(0);
      }
      //循环遍历
      int count = 0;
      while (true){
            //让这个数字与1做与运算,得到数字的最后一个bit位   //判读这个bit位是否为0
            if ((num & 1) == 0){
                //如果为0,说明未签到,结束
                break;
            }else {
                //如果不为0,说明已签到,计数器加1
                count++;
            }
            //把数字右移一位,抛弃最后一个bit位,继续下一个bit位
            num >>>= 1;
      }
      return Result.ok(count);
    }

九.UV统计

Redis的HyperLogLog的统计功能


[*] UV:全称 Unique Visitor,也叫独立访客量,是指通过互联网访问、欣赏这个网页的天然人。1 天内同一个用户多次访问该网站,只记载1次。
[*] PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记载 1 次PV,用户多次打开页面,则记载多次PV。
往往用来衡量网站的流量。
Hyperloglog(HLL)是从 Loglog 算法派生的概率算法,用于确定非常大的集合的基数,而不必要存储其所有值。
相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0
Redis 中的 HLL 是基于 string 布局实现的,单个 HLL 的内存永久小于 16 kb,内存占用低,但相对的其丈量结果是概率性的,有小于 0.81% 的偏差。不外对于 UV 统计的庞大数量来说,这完全可以忽略。
   127.0.0.1:6379> PFADD hl1 e1 e2 e3 e4 e5
(integer) 1
127.0.0.1:6379> pfcount hl1
(integer) 5
127.0.0.1:6379> PFADD hl1 e1 e2 e3 e4 e5
(integer) 0
127.0.0.1:6379> pfcount hl1
(integer) 5
由上可以看出,Hyperloglog天生就适合做UV统计,雷同元素只能统计一次
通过单元测试,向 HyperLogLog 中添加 100 万条数据,看看内存占用和统计结果如何
@Test
void testHyperLogLog() {
    String[] values = new String;
    int j = 0;
    for (int i = 0; i < 1000000; i++) {
      j = i % 1000;
      values = "user_" + i;
      if (j == 999) {
            // 发送到 Redis
            stringRedisTemplate.opsForHyperLogLog().add("hl2", values);
      }
    }
    //统计数量
    Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
    System.out.println("count = " + count);
}

总结:


[*] HyperLogLog 的作用:做海量数据的统计工作
[*] HyperLogLog 的长处:内存占用极低、性能非常好
[*] HyperLogLog 的缺点:有肯定的偏差
尚未完结

视频中虽然完结了,但是对于这个项目而言,还有一些功能未实现


[*]好比按人气排序和按评分排序,如果还是滚动分页的话,还必要存入redis,以后偶然间再继承完善
https://img-blog.csdnimg.cn/d11ad9a91944435688ec079cde8e231c.png


[*]用户个人信息修改功能,前端都没有实现点击事件,等我学完vue在回头继承完善尽美
https://img-blog.csdnimg.cn/402a8bd2df094d76b8d5822bc7c7c411.png


[*]地图功能和消息功能也没有实现,等我会前端了,就给他办了
https://img-blog.csdnimg.cn/6a1e54e1e0614db798284b8dd9c7310d.png


[*]暗码登录校验功能实现,以及忘记暗码跳验证码登录
https://img-blog.csdnimg.cn/9cc38c10574341a4afbab42e6c073c02.png
已经5w字了,再写有点卡,背面功能完善了我再增补吧。
https://img-blog.csdnimg.cn/91c00223c4ad4038b2f5bd1c6647e365.png
至此感谢您的阅览,个人本领有限,文中不足之处诸位多多包涵,也欢迎您的指正。等功能都实现了,我再来增补。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 黑马点评项目全部功能实现及详细笔记--Redis练手项目