欢乐狗 发表于 2024-9-2 08:30:23

PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技能实践

大家好,我是码农先森。
我们在某宝或某多多上抢购商品时,如果只是下了订单但没有进行实际的支付,那在订单页面会有一个支付倒计时,要是过了这个时间点那么订单便会主动取消。在如许的业务场景中,一般情况下就会使用到延时队列。
通常在客户下单之后,就会将订单数据推送到延时队列中并且会对该消息设置一个延时时长,好比设置五分钟、十分钟、或十五分钟等,具体的时长应该照旧要结合当前的业务进行衡量,然后消费端会在指定时间到达后就对该消息进行支付支付状态判断,如果已经支付则不予处理,要照旧未支付,则会取消该订单,并且释放商品库存。
我们这次分享的内容,重要是基于 Redis 延时队列的实现方式,当然除了 Redis 还可以用其他的技能,好比 RabbitMQ、Kafka、RocketMQ 等专业的消息队列。但是我用 Redis 的原因是,它的应用场景比较广泛,我们平时接触也比较多,而且相对于专业的消息队列它没有过多复杂的配置,学起来容易上手,出了问题解决起来也快,学东西的路径都是由易到难嘛。
另外,如果你对上面提到的专业消息队列使用很纯熟,也可以将 Redis 更换成它们,这里只是存储介质的不同,技能的实现逻辑上没有太大区别,重要的是设计头脑,大家各取所需吧。
https://img2024.cnblogs.com/blog/737276/202408/737276-20240816225242975-610694914.png
好了,我先介绍一下这次延时队列的实现逻辑。重要分为三个部分,一是:消息的发送,如果设置了延时时间则会将消息存储到 Redis 的延时队列中,反之会直接将消息推送到 Redis 的停当队列中等待消费。二是:将到期的消息从 Redis 延时队列中取出,并且推送到 Redis 的停当队列中等待消费。三是:消费端会从 Redis 的停当队列中按序次读取出消息,并且执行对应的业务处理逻辑,如果处理失败则会将该消息,再次推送到 Redis 的延时队列中进行下一次的重试。
这里说到的延时队列是使用 Redis 有序集合来实现的,它每间隔一秒钟就会被轮询一次,如果有到期的消息,则就会将该消息推送到 Redis 停当队列,并且从该集合中移除过期的消息,至此就可以等待着消费端进行消费了。接下来我们就从实际的代码出发,来看一下如何实现基于 Redis 的延时队列。
话不多说,开整!我们先来看一下整体的项目目录结构,内容重要分为 PHP 和 Go 两部分。
$ tree -L 2
.
├── go_delay
│   ├── app
│   │   ├── controller
│   │   │   └── notify.go
│   │   ├── config
│   │   │   └── config.go
│   │   ├── extend
│   │   │   └── queue.go
│   │   └── route.go
│   ├── go.mod
│   ├── go.sum
│   └── main.go
└── php_delay
│   ├── app
│   │   ├── controller
│   │   │   └── Notify.php
│   ├── composer.json
│   ├── composer.lock
│   ├── command
│   │   └── Consumer.php
│   ├── route
│   │   └── app.php
│   ├── extend
│   │   └── Queue.php
│   ├── think
│   ├── vendor
│   └── .envThinkPHP

使用 composer 创建基于 ThinkPHP 框架的 php_delay 项目。
## 当前目录
$ pwd
/home/manongsen/workspace/php_to_go/php_delay

## 安装 ThinkPHP 框架
$ composer create-project topthink/think php_delay
$ cp .example.env .env

## 安装 Composer 依赖包
$ composer require predis/predis
## 创建一个消费者脚本
$ php think make:command Consumer
## 创建一个生产者脚本,用于测试
$ php think make:command Producer这个就是延时队列实现的核心类,定义了停当、延时、失败三个消息队列。send() 方法用于发送消息,其中可以指定 $delay 参数设置延时时间单位是秒。wait() 方法用于消费端监听消息,从下面的代码可以看出这里还使用多进程,父进程的作用是每间隔一秒钟,就从 Redis 有序集合中读取到期的消息,并将该消息推送到 Redis 停当队列,子进程则阻塞监听停当队列的消息,并且将接收到的消息回调到用户自定义的业务函数中。
页: [1]
查看完整版本: PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技能实践