Redis发布订阅
目录一 、redis发布订阅模式:
二、通道与模式两者订阅模式原理实现:
三 、redis发布订阅PubSub与Stream模式区分
1)Pub-Sub
2)Stream消息队列
四、Stream与其他中间件的区别
1.数据结构
2.持久化
3.消息确认
4.重试机制
5.业务利用场景
五、SubPub与stream实际应用中的利用场景
1)利用发布/订阅(pub_sub):
2)利用流(stream):
六、Pub_Sub代码实现
1)引入依靠
2)RedisConfig设置
3)发布订阅通过channel标识对应业务场景
4)生产者消息发送
5)消费者订阅生产者对应的发送消息通道
七、stream流代码实现
1) 依靠(redis5.0以上版本以及spring data redis版本为2.2.0版本以上)*
2)定义一个监听
3)定义抽象类AbstractMsgService
4)创建生产者
5)创建消费者
6)创建容器
八、利用消息队列会遇到什么样的题目以及有哪办理措施
1.对于生产者/来说(网络抖动,服务器宕机)
1.1 网络抖动导致发送失败
1.2 由于服务器宕机了导致发送失败:
2.对于消费者来说(网络抖动,重复消费)
2.1 网络抖动导致确认失败
2.2 重复消费
3.对于消息队列来说(重复消费、消息积存)
3.1 消息积存
3.2 消息顺序性
3.3 消息耽误
九、利用redis有哪些实用场景
一 、redis发布订阅模式:
[*]队列:基于 list 结构的消息队列,是一种 Publisher 与 Consumer 点对点的强关联关系
[*]通道channel:可实现消费者订阅多个通道
[*]模式Pattern:与通道的区别是 1、模式无需完整的通道名称订阅,用模糊订阅(通过?,*来代替)
二、通道与模式两者订阅模式原理实现:
通道:内部实现是通过(PubSub_Channel)键对值,key(数组元素)为channel,value就是某个client
https://i-blog.csdnimg.cn/direct/31064d6fd215450eacc87ade45c34dc6.png
Pattern:内部实现是通过(PubSub_patterns)链表,程序创建包含客户端信息和被订阅模式的 pubsubPattern 结构,并将该结构添加到 redisServer.pubsub_patterns 链表中
https://i-blog.csdnimg.cn/direct/166ffdaafdd94a75bea93ef906c553c8.png
三 、redis发布订阅PubSub与Stream模式区分
1)Pub-Sub
在于PubSub 的生产者传递过来一个消息,Redis会直接找到相应的消费者传递已往。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者忽然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了
https://i-blog.csdnimg.cn/direct/fcbd668d93be405794fa41a98fcb2fd5.png
2)Stream消息队列
能提供持久性、顺序的队列功能
实现顺序队列实现的内部结构
https://i-blog.csdnimg.cn/direct/da7d5652282c4a898c30557a1bd1a27c.png消息ID:是按照毫秒数-序列号
消息内容:平常的键对值
消费者组下有last_deliveres_id记载着消费到那个id下的消息,
pendin_id表示还没有确认的消息的id,基于这个实现消息重试机制 s
四、Stream与其他中间件的区别
1.数据结构
Stream 是一个有序的消息日志,支持以时间顺序追加消息。来保证了数据的顺序性,每条消息都有一个唯一的 ID(包括时间戳和序列号),并且可以附加多个字段及其值。
RocketMQ 利用主题(Topic)和队列(Queue)模子,消息通过主题举行发布,消费者从队列中拉取消息。
Kafka 利用主题(Topic)和分区(Partion)模子,消息通过主题举行发布,消费者从队列中拉取消息。
2.持久化
Stream数据持久化由 Redis 的内存存储消息 并通过持久化机制(如 RDB 和 AOF)举行实现持久化
rocketmq持久化是通过commit log举行存储消息体文件 后通过文件体系举行实现持久化
kafka数据持久化是通过日志文件举行实现持久化机制
3.消息确认
Stream支持消息确认机制,消费者可以确认消息是否处理乐成
rocketmq支持多种消息确认机制,包括同步确认和异步确认。
kafka支持多种消息确认机制,包括同步确认和异步确认。
4.重试机制
Redis Streams自己不提供内建的重试机制,但可以通过客户端逻辑实现。你需要在消费者那里捕获处理失败的消息
rocketmq可以实现主动重试16次 还失败就会进入死信队列内里
Kafka 自己不提供内建的重试机制,但可以通过客户端逻辑实现。你需要在消费者那里捕获处理失败的消息
5.业务利用场景
Stream用于需要高性能、低耽误的,能够快读响应和高并发能力场景,如及时数据处理和日志体系、及时消息处理等等
rocketmq用户高级消息传递特性,如事件消息、定时消息、顺序消息,适用于需要高吞吐量、可靠性和分布式特性的场景,如大规模数据传输,分布式体系的事件驱动架构等。
kafka是一个分布式流处理平台,提供了高吞吐量的体系 实现了日志收集与分析以及是大数据集成
五、SubPub与stream实际应用中的利用场景
1)利用发布/订阅(pub_sub):
及时谈天应用程序:用于用户之间的及时消息传递。
及时新闻推送:用于向订阅者发送最新新闻。
事件处理体系:用于触发和处理事件。
2)利用流(stream):
任务队列:用于分发和处理任务,如后台任务处理。
数据同步:用于数据更新和同步,确保多个体系之间的划一性。
时间序列数据库:用于存储和查询时间序列数据,如监控和日志记载。
选择发布/订阅或流取决于你的具体需求和应用场景。
六、Pub_Sub代码实现
1)引入依靠
<strong><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency></strong> 2)RedisConfig设置
<strong>@Configuration
/*@EnableCaching*/
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private RedisMessageListener redisMessageListener;
/*配置Redis消息监听器容器:在Java Spring的配置类中配置Redis消息监听器容器,用于注册并管理消息监听器。*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
Arrays.stream(RedisMessageChannelEnum.values()).forEach(item ->container.addMessageListener(redisMessageListener, new ChannelTopic(item.getCode())));
return container;
}
}</strong> 3)发布订阅通过channel标识对应业务场景
<strong>public enum RedisMessageChannelEnum {
...
}</strong> 4)生产者消息发送
<strong>@Log4j2
@Component
public class MessagePublisher {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
/**
* 服务端发布消息
*
* @param channel 通道名
* @param message 待发送的消息
*/
public void sendMessage(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
log.info("消息发送成功...channel={}, message={}", channel, message);
redisTemplate.opsForList().leftPush("topic1",message);
}
}</strong> 5)消费者订阅生产者对应的发送消息通道
<strong>@Component
@Log4j2
public class RedisMessageListener implements MessageListener {
@Autowired
private RetrySendMessageServiceRouteAdaptor retrySendMessageServiceRouteAdaptor;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ObjectMapper objectMapper;
@Override
public void onMessage(Message message, byte[] bytes) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
log.info("---频道---: " + channel);
log.info("---消息内容---: " + msg);
String channelString = objectMapper.convertValue(channel, String.class);
RedisMessageChannelEnum redisMessageChannelEnum =RedisMessageChannelEnum.getEnumByCode(channelString);
// 通过对应的消息通道匹配到对应的是适配器服务执行业务
retrySendMessageServiceRouteAdaptor.<RetrySendMessageService>find(ServiceModeEnum.WX_SYSTEM, redisMessageChannelEnum).retrySendMassage(retrySendMassageDto);
}</strong> 七、stream流代码实现
1) 依靠(redis5.0以上版本以及spring data redis版本为2.2.0版本以上)*
<strong> <dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.2.0.RELEASE</version>
</dependency></strong> 2)定义一个监听
<strong>@Target(ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface MsgStreamListener
{
String stream();
String group();
String name();
}</strong> 3)定义抽象类AbstractMsgService
<strong>public abstract class AbstractMsgService {
}</strong> 4)创建生产者
<strong>@Service
@RequiredArgsConstructor
public class RedisMessageProducer
{
private final RedisTemplate<String,String> redisTemplate;
public void sendMsg(String streamKey,String msgKey,String msg){
Map<String,String> msgMap = new HashMap<>();
msgMap.put(msgKey,msg);
RecordId recordId = redisTemplate.opsForStream().add(streamKey,msgMap);
if(recordId == null){
throw new RuntimeException("发送消息失败");
}
}
}</strong> 5)创建消费者
<strong>@Service
@Slf4j
public class MessageHandlerService extends AbstractMsgService
{
@MsgStreamListener(group = "test1",name = "test1",stream = "test1")
public void onMessage(ObjectRecord<String, String> message)
{
var stream = message.getStream();
var msgId = message.getId().toString();
var msgBody = message.getValue();
log.info("receive test1 msg stream:{} msgId:{} msgBody:{}",stream,msgId,msgBody);
}
}
</strong> 6)创建容器
<strong>@Configuration
@Slf4j
public class RedisStreamConfig
{
private final RedisTemplate<String,String> redisTemplate;
private final ApplicationContext applicationContext;
public RedisStreamConfig(RedisTemplate<String, String> redisTemplate, ApplicationContext applicationContext) {
this.redisTemplate = redisTemplate;
this.applicationContext = applicationContext;
}
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String,String>> streamMessageListenerContainer(RedisConnectionFactory connectionFactory)
{
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String,String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(java.time.Duration.ofSeconds(1))
.targetType(String.class)
.build();
//创建监听redis流的消息监听容器
StreamMessageListenerContainer<String, ObjectRecord<String,String>> listenerContainer =
StreamMessageListenerContainer.create(connectionFactory, options);
//找到所有继承AbstractMsgService的类
var serviceArray = applicationContext.getBeansOfType(AbstractMsgService.class).values().toArray();
for (var service : serviceArray) {
for (Method method : service.getClass().getMethods()) {
if(method.isAnnotationPresent(MsgStreamListener.class)){
MsgStreamListener annotation = method.getAnnotation(MsgStreamListener.class);
String stream = annotation.stream();
String group = annotation.group();
String name = annotation.name();
StreamListener<String,ObjectRecord<String,String>> listener = (StreamListener<String, ObjectRecord<String,String>>) message -> {
try {
method.invoke(service,message);
}catch (Exception e){
log.warn(e.getMessage());
}
};
//创建redis流的消息监听器
listenerContainer.receive(Consumer.from(group,name),
StreamOffset.create(stream, ReadOffset.lastConsumed()),
listener);
initializeStream(stream,name);
}
}
}
listenerContainer.start();
return listenerContainer;
}
public void initializeStream(String stream,String group) {
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
// 创建一个流
try {
streamOperations.createGroup(stream, ReadOffset.from("0"), group);
} catch (Exception e) {
// 流可能已存在,忽略异常
}
}
}
</strong> 八、利用消息队列会遇到什么样的题目以及有哪办理措施
1.对于生产者/来说(网络抖动,服务器宕机)
1.1 网络抖动导致发送失败
redis中stream没有重试机制 但是你可以通过捕获异常记载日志再举行处理
kafka、rocketmq会主动重试机制
1.2 由于服务器宕机了导致发送失败:
可以通过集群的方式举行实现
2.对于消费者来说(网络抖动,重复消费)
2.1 网络抖动导致确认失败
可以通过手动ack举行确认
2.2 重复消费
可以通过幂等性来办理
3.对于消息队列来说(重复消费、消息积存)
3.1 消息积存
看下是否是因为消费者过少 可以适当增长消费者
是否能优化消费者处理逻辑
进步服务器性能(增长硬件资源、优化设置或升级版本)
是否可扩大消息队列长度
3.2 消息顺序性
rocketmq:利用同一个MQ队列,并且针对单个队列只开启一个消费者消费
kafka:发送同一个partiion分区,并且针对单个队列只开启一个消费者消费
3.3 消息耽误
3.3.1 由于网络耽误:确保保消息队列服务器与消息生产者和消费者之间的网络毗连稳定,淘汰网络耽误的影响
3.3.2 服务器负载: 增长消息队列服务器的硬件资源,例如增长CPU、内存和磁盘容量等,以进步消息处理的速度
3.3.3 消息处理时间过长:
[*]消息消费者的并行处理:将消息的处理逻辑举行拆分和并行化,可以通过增长消费者的数量或者利用多线程处理的方式来进步消息的处理速度
[*]异步处理:对于一些耗时操作,可以利用异步处理的方式,将消息放入消息队列之后立刻返回,然后由后台异步处理完成,从而淘汰消息发送和接收的时间
[*]检查业务逻辑:检查消息的处理逻辑是否存在瓶颈或者低效的操作,有可能通过重新设计业务逻辑来进步消息的处理速度。
九、利用redis有哪些实用场景
[*]查询缓存
[*]分布式锁
[*]短信验证码登录 或者举行缓存登任命户信息:
[*]实现登录控制功能,要求用户输入电话号,获取验证码,后台天生验证码并将验证码存储在Redis中,有效期5分钟。当用户在5分钟内输入准确的验证码,验证通过。
[*]获取附近地理信息: Redis提供了Geospatial数据范例,如有序聚集(Sorted Set)来存储地理位置数据。
[*]zset有序聚集:排行榜
[*]获取分类树:
[*]incr的命令的原子性的自增属性各类计数器、统计网站访问量、点赞数等
[*]限流:执行lua脚本,基于redis中incrby举行一个累加,通过传参是哪个接口,限流次数 失效时间,通过切面的前置通知去执行我们的脚本
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]