【Redis7】10大数据范例之Stream范例

打印 上一主题 下一主题

主题 504|帖子 504|积分 1512

1. Stream简介

Redis Stream 是在Redis 5.0版本中引入的一种新的数据布局,它主要用于实时数据处理场景,如消息队列、日志记录和实时数据分析等。Stream的设计灵感部分泉源于消息队列系统,如Kafka,但它提供了更直接集成到Redis生态系统中的本领。
   简单来说:Stream就是消息中央件+阻塞队列
  Redis Stream 范例自己设计时就充分考虑了生产者消耗者模子的需求。它不仅包含了这一模子,还对其进行了优化和扩展,以便更好地顺应当代分布式系统中的消息通报场景。


  • 生产者:任何可以向Redis Stream写入消息的客户端都可以视为生产者。生产者使用XADD下令向Stream中添加消息,每个消息都附有一个全局唯一的ID,确保消息的顺序性和可追踪性。
  • 消耗者:消耗者使用XREAD或XREADGROUP下令从Stream中读取消息。特殊是XREADGROUP下令,它支持消耗者组(Consumer Group)的概念,这是生产者消耗者模子中的一个紧张组成部分。消耗者组让多个消耗者可以协作处理Stream中的消息,同时保证了消息不会被重复处理。
  • 消耗者组:Stream支持消耗者组,组内的消耗者共享消息,但每个消息只由组内的一个消耗者处理,从而实现了消息的有序和公中分配。消耗者组还允许消耗者在处理失败时重新分配消息,以及通过XACK下令确认消息已成功处理,实现消息的确认机制。
  • 消息持久化与顺序性:Stream中的消息是持久化的,确保了纵然在Redis服务器重启后,消息也不会丢失。同时,Stream保持消息的严酷顺序,这对于某些依赖消息顺序的应用场景至关紧张。
  • 阻塞读取与自定义读取策略:消耗者可以选择阻塞读取模式,这意味着当没有新消息时,消耗者会等待直至新消息到达。此外,还可以通过参数定制读取的起始位置、消息数量或时间范围,提供了高度的机动性。
Stream范例主要特点:

  • 有序性:Stream中的消息按照ID排序,每个消息都有一个全局唯一的ID,确保了消息的顺序。
  • 持久化:Stream中的数据是持久化的,纵然Redis服务器重启,消息也不会丢失。
  • 多播与分组消耗:支持多个消耗者同时消耗同一流中的消息,而且可以将消耗者构造成消耗组,实现消息的分组消耗,每个消息可以被一个或多个组消耗,但组内每个消息只会被此中一个消耗者消耗(雷同于Kafka的分区消耗者模子)。
  • 机动的数据布局:每条消息可以包含多个字段(field-value对),提供了高度的机动性来携带复杂的数据。
  • 消耗者进度跟踪:消耗者可以在读取消息时自动追踪自己的消耗进度,Redis使用Last Seen Displacement (LSD) 来跟踪消耗者的读取位置。
  • 读取控制:支持多种读取模式,包括从特定消息ID读取、读取近来的N个消息、读取某个时间范围内的消息等。
  • 阻塞读取:可以使用XREAD和XREADGROUP下令以阻塞的方式等待新消息,直到有新消息到达或超时。
Stream数据布局图:
一个消息链表,将全部参加的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容

内容含义Message Content消息内容Consumer group消耗组,通过XGROUP CREATE 下令创建,同一个消耗组可以有多个消耗者Last_delivered_id游标,每个消耗组会有个游标 last_delivered_id,任意一个消耗者读取了消息都会使游标 last_delivered_id 往前移动。Consumer消耗者,消耗组中的消耗者Pending_ids消耗者会有一个状态变量,用于记录被当前消耗已读取但未ack的消息Id,如果客户端没有ack,这个变量内里的消息ID会越来越多,一旦某个消息被ack它就开始减少。这个pending_ids变量在Redis官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消耗了消息一次,而不会在网络传输的中途丢失了没处理 2. 生产消息下令(XADD)

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]:向Stream(key)中添加一条消息,ID可以是自动天生或指定的唯一标识符,后面跟着一个或多个字段值对。
这个下令有三个注意点:


  • 消息id要比上个id大
  • 默认用*表示自动天生规矩
  • *:用于XADD下令,表示让系统自动天生id(雷同于MySQL的自增主键)
示例:
  1. 127.0.0.1:6379> xadd k1 * name zhangsan age 18
  2. 1715525079420-0 # 系统自动生成的id
  3. 127.0.0.1:6379> xadd k1 * name lisi age 19
  4. 1715525087805-0
  5. 127.0.0.1:6379> xadd k1 * name wangwu age 20
  6. 1715525094621-0
  7. 127.0.0.1:6379> xadd k1 1715525094621-0 name wangwu age 20 # 重复的id会出错
  8. ERR The ID specified in XADD is equal or smaller than the target stream top item
  9. 127.0.0.1:6379> xadd k1 1715525094621-1 name zhaoliu age 21 # 注意这里是 -1
  10. 1715525094621-1
  11. 127.0.0.1:6379>
复制代码
天生的消息ID,有两部分组成,毫秒时间戳-该毫秒内产生的第1条消息
   信息条目指的是序列号,在雷同的毫秒下序列号从0开始递增,序列号是64位长度,理论上在同一毫秒内天生的数据量无法到达这个级别,因此不用担心序列号会不敷用。millisecondsTime指的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会接纳以前雷同的毫秒创建新的ID,也即redis 在增加信息条目时会检查当前 id 与上一条目标 id, 自动纠正错误的情况,一定要保证后面的 id 比前面大,一个流中信息条目标ID必须是单调增的,这是流的基础。
    客户端显示传入规则:
  Redis对于ID有逼迫要求,格式必须是时间戳-自增Id这样的方式,且后续ID不能小于前一个ID
  Stream的消息内容,它的布局雷同Hash布局,以key-value的情势存在。
3. 查询相关下令

3.1 获取指定范围内的消息(XRANGE)

XRANGE key start end [COUNT count]:获取Stream中指定范围内的消息,start和end定义了消息ID的范围,COUNT限定返回结果数量。
  1. 127.0.0.1:6379> xrange k1 - + count 2
  2. 1715525079420-0
  3. name
  4. zhangsan
  5. age
  6. 18
  7. 1715525087805-0
  8. name
  9. lisi
  10. age
  11. 19
  12. 127.0.0.1:6379>
复制代码


  • -:表示Stream中的最小ID
  • +:表示Stream中的最大ID
3.2 逆序获取指定范围内的消息(XREVRANGE)

XREVRANGE key end start [COUNT count]:雷同于XRANGE,但消息按逆序返回。
示例:
  1. 127.0.0.1:6379> xrevrange k1 + - count 2
  2. 1715525094621-1
  3. name
  4. zhaoliu
  5. age
  6. 21
  7. 1715525094621-0
  8. name
  9. wangwu
  10. age
  11. 20
  12. 127.0.0.1:6379>
复制代码
3.3 返回消息的数量(XLEN)

XLEN key:返回Stream中消息的数量。
  1. 127.0.0.1:6379> xlen k1
  2. 4
  3. 127.0.0.1:6379>
复制代码
4. 删除消息下令(XDEL)

XDEL key ID [ID ...]:删除Stream中指定ID的消息。
  1. 127.0.0.1:6379> xlen k1
  2. 4
  3. 127.0.0.1:6379>
  4. xdel k1 1715525079420-01127.0.0.1:6379> xlen k13127.0.0.1:6379>
复制代码
5. 截取消息下令(XTRIM)

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]:截取Stream


  • MAXLEN:表示允许的最大长度,保留大的
  • MIDID:允许的最小id,这个id之前的消息会被截取掉
  1. 127.0.0.1:6379> xrange k1 - +
  2. 1715525087805-0
  3. name
  4. lisi
  5. age
  6. 19
  7. 1715525094621-0
  8. name
  9. wangwu
  10. age
  11. 20
  12. 1715525094621-1
  13. name
  14. zhaoliu
  15. age
  16. 21
  17. 127.0.0.1:6379> xtrim k1 maxlen 2
  18. 1
  19. 127.0.0.1:6379> xrange k1 - +
  20. 1715525094621-0
  21. name
  22. wangwu
  23. age
  24. 20
  25. 1715525094621-1
  26. name
  27. zhaoliu
  28. age
  29. 21
  30. 127.0.0.1:6379> xtrim k1 minid 1715525094621-1
  31. 1
  32. 127.0.0.1:6379> xrange k1 - +
  33. 1715525094621-1
  34. name
  35. zhaoliu
  36. age
  37. 21
  38. 127.0.0.1:6379>
复制代码
6. 消耗消息下令(XREAD)

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]:非阻塞或阻塞式读取一个或多个Stream中的消息。


  • COUNT指定最多读取的消息数量
  • BLOCK milliseconds指定阻塞等待新消息的最长时间(毫秒)。默认不阻塞,如果milliseconds设置为0,则永远阻塞
  1. 127.0.0.1:6379> xread count 2 streams k1 0-0
  2. k1
  3. 1715525094621-1
  4. name
  5. zhaoliu
  6. age
  7. 21
  8. 1715526892718-0
  9. name
  10. aaa
  11. 127.0.0.1:6379>
复制代码
  0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的全部消息,注意也可以使用0(00/000也都是可以的)
  阻塞读取示例:
客户端1:
  1. 127.0.0.1:6379> xread count 1 block 0 streams k1 $
  2. k1
  3. 1715527170695-0
  4. name
  5. ddd
  6. 127.0.0.1:6379>
复制代码
客户端2:
  1. 127.0.0.1:6379> xadd k1 * name ddd
  2. "1715527170695-0"
  3. 127.0.0.1:6379>
复制代码
$:表示只消耗新的消息,比当前id还要大的id
只有当客户端2添加数据之后,客户端1才会进行消耗
7. 消耗者组管理下令

消息创建好之后,就必要消耗者来进行消耗.而创建消耗者要分组进行创建.
7.1 创建消耗者组(XGROUP)

XGROUP create key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]:创建一个新的消耗者组。
  1. 127.0.0.1:6379> XGROUP create k1 group1 $
  2. OK
  3. 127.0.0.1:6379> XGROUP create k1 group2 0
  4. OK
  5. 127.0.0.1:6379>
复制代码


  • $表示消耗新来de
  • 0表示从Stream头部开始消耗
7.2 在消耗者组中读取消息(XREADGROUP)

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]:在消耗者组中读取消息,支持消息确认机制(通过ACK或NOACK)。
  1. 127.0.0.1:6379> XGROUP create k1 group2 0
  2. OK
  3. 127.0.0.1:6379> xreadgroup group group2 consumer1 streams k1 >
  4. k1
  5. 1715570723943-0
  6. f1
  7. v1
  8. 1715570728646-0
  9. f2
  10. v2
  11. 1715570732271-0
  12. f3
  13. v3
  14. 1715570735727-0
  15. f4
  16. v4
  17. 127.0.0.1:6379> xreadgroup group group2 consumer2 streams k1 >
  18. 127.0.0.1:6379>
复制代码


  • >:用于XREADGROUP下令,表示迄今还没有发送给组中使用者的信息,会更新消耗者组的末了ID
  • 消耗者不存在时,会自动创建
group2这个组中的consumer1读完了k1中的全部消息,那么当group2中consumer2再来读时,就读不到任何消息了.
但当我新建一个group3组,在group3组中读数据时,又可以或许读到消息.
  1. 127.0.0.1:6379> XGROUP create k1 group3 0
  2. OK
  3. 127.0.0.1:6379> xreadgroup group group3 consumer3 streams k1 >
  4. k1
  5. 1715570723943-0
  6. f1
  7. v1
  8. 1715570728646-0
  9. f2
  10. v2
  11. 1715570732271-0
  12. f3
  13. v3
  14. 1715570735727-0
  15. f4
  16. v4
  17. 127.0.0.1:6379>
复制代码
  这是由于同组共享一个队列信息.这个组就相称mq中的队列,不同消耗者读取同一个队列的时间每条消息只能被消耗一次,但是同一条消息可以被不同队列的不同消耗者同时消耗,也就是广播
  为了防止一个消耗者读完全部消息,我们可以使用count参数来限定消耗者读取几条消息,以此实现负载均衡
示例:
  1. 127.0.0.1:6379> xreadgroup group group4 consumer1 count 2 streams k1 > # 限制只能读2条消息
  2. k1
  3. 1715570723943-0
  4. f1
  5. v1
  6. 1715570728646-0
  7. f2
  8. v2
  9. 127.0.0.1:6379> xreadgroup group group4 consumer2 count 2 streams k1 >
  10. k1
  11. 1715570732271-0
  12. f3
  13. v3
  14. 1715570735727-0
  15. f4
  16. v4
  17. 127.0.0.1:6379>
复制代码
7.3 查询消耗者组的信息(XPENDING)

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]:查询消耗者组的信息,包括待处理消息的数量等。
示例:
  1. 127.0.0.1:6379> xpending k1 group2 # group2组中消费读取情况
  2. 4
  3. 1715570723943-0
  4. 1715570735727-0
  5. consumer1
  6. 4
  7. 127.0.0.1:6379> xpending k1 group4 # group4组中消费读取情况
  8. 4
  9. 1715570723943-0
  10. 1715570735727-0
  11. consumer1
  12. 2
  13. consumer2
  14. 2
  15. 127.0.0.1:6379>
复制代码
7.4 确认消耗者成功处理了消息(XACK)

XACK key group id [id ...]:用于确认(acknowledge)消耗者已经成功处理了消息。
示例:
  1. 127.0.0.1:6379> xpending k1 group4 - + 5 consumer1
  2. 1715570723943-0
  3. consumer1
  4. 543165
  5. 1
  6. 1715570728646-0
  7. consumer1
  8. 543165
  9. 1
  10. 127.0.0.1:6379> xack k1 group4 1715570723943-0
  11. 1
  12. 127.0.0.1:6379> xpending k1 group4 - + 5 consumer1
  13. 1715570728646-0
  14. consumer1
  15. 574237
  16. 1
  17. 127.0.0.1:6379>
复制代码
当客户端从Stream中读取消息后,会使用XACK下令告知Redis,它所指定的消息ID对应的消息已经得到了妥善处理。Redis吸收到XACK下令后,会将这些消息ID从消耗者组的待确认(pending)消息列表中移除,这样,这些消息就不会再次被同一个消耗者组内的消耗者获取到,实现了消息的确认与去重处理。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表