Go Redis 管道和事务之 go-redis

小秦哥  金牌会员 | 2023-6-16 17:44:56 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 863|帖子 863|积分 2589

Go Redis 管道和事务之 go-redis

Go Redis 管道和事务官方文档介绍

Redis pipelines(管道) 允许一次性发送多个命令来提高性能,go-redis支持同样的操作, 你可以使用go-redis一次性发送多个命令到服务器,并一次读取返回结果,而不是一个个命令的操作。
Go Redis 管道和事务: https://redis.uptrace.dev/zh/guide/go-redis-pipelines.html
#管道

通过 go-redis Pipeline 一次执行多个命令并读取返回值:
  1. pipe := rdb.Pipeline()
  2. incr := pipe.Incr(ctx, "pipeline_counter")
  3. pipe.Expire(ctx, "pipeline_counter", time.Hour)
  4. cmds, err := pipe.Exec(ctx)
  5. if err != nil {
  6.         panic(err)
  7. }
  8. // 结果你需要再调用 Exec 后才可以使用
  9. fmt.Println(incr.Val())
复制代码
或者你也可以使用 Pipelined 方法,它将自动调用 Exec:
  1. var incr *redis.IntCmd
  2. cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  3.         incr = pipe.Incr(ctx, "pipelined_counter")
  4.         pipe.Expire(ctx, "pipelined_counter", time.Hour)
  5.         return nil
  6. })
  7. if err != nil {
  8.         panic(err)
  9. }
  10. fmt.Println(incr.Val())
复制代码
同时会返回每个命令的结果,你可以遍历结果集:
  1. cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  2.         for i := 0; i < 100; i++ {
  3.                 pipe.Get(ctx, fmt.Sprintf("key%d", i))
  4.         }
  5.         return nil
  6. })
  7. if err != nil {
  8.         panic(err)
  9. }
  10. for _, cmd := range cmds {
  11.     fmt.Println(cmd.(*redis.StringCmd).Val())
  12. }
复制代码
#Watch 监听

使用 Redis 事务, 监听key的状态,仅当key未被其他客户端修改才会执行命令, 这种方式也被成为 乐观锁
Redis 事务https://redis.io/docs/manual/transactions/
乐观锁
  1. WATCH mykey
  2. val = GET mykey
  3. val = val + 1
  4. MULTI
  5. SET mykey $val
  6. EXEC
复制代码
#事务

你可以使用 TxPipelined 和 TxPipeline 方法,把命令包装在 MULTI 、 EXEC 中, 但这种做法没什么意义:
  1. cmds, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
  2.         for i := 0; i < 100; i++ {
  3.                 pipe.Get(ctx, fmt.Sprintf("key%d", i))
  4.         }
  5.         return nil
  6. })
  7. if err != nil {
  8.         panic(err)
  9. }
  10. // MULTI
  11. // GET key0
  12. // GET key1
  13. // ...
  14. // GET key99
  15. // EXEC
复制代码
你应该正确的使用 Watch + 事务管道, 比如以下示例,我们使用 GET, SET 和 WATCH 命令,来实现 INCR 操作, 注意示例中使用 redis.TxFailedErr 来判断失败:
  1. const maxRetries = 1000
  2. // increment 方法,使用 GET + SET + WATCH 来实现Key递增效果,类似命令 INCR
  3. func increment(key string) error {
  4.         // 事务函数
  5.         txf := func(tx *redis.Tx) error {
  6.    // // 获得当前值或零值
  7.                 n, err := tx.Get(ctx, key).Int()
  8.                 if err != nil && err != redis.Nil {
  9.                         return err
  10.                 }
  11.                 n++  // 实际操作
  12.     // 仅在监视的Key保持不变的情况下运行
  13.                 _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
  14.                         pipe.Set(ctx, key, n, 0)
  15.                         return nil
  16.                 })
  17.                 return err
  18.         }
  19.        
  20.         for i := 0; i < maxRetries; i++ {
  21.                 err := rdb.Watch(ctx, txf, key)
  22.                 if err == nil {
  23.                         // Success.
  24.                         return nil
  25.                 }
  26.                 if err == redis.TxFailedErr {
  27.                         // 乐观锁失败
  28.                         continue
  29.                 }
  30.                 return err
  31.         }
  32.         return errors.New("increment reached maximum number of retries")
  33. }
复制代码
Go Redis 管道和事务 实操
  1. package main
  2. import (
  3.         "context"
  4.         "fmt"
  5.         "github.com/redis/go-redis/v9"
  6.         "time"
  7. )
  8. // 声明一个全局的 rdb 变量
  9. var rdb *redis.Client
  10. // 初始化连接
  11. func initRedisClient() (err error) {
  12.         // NewClient将客户端返回给Options指定的Redis Server。
  13.         // Options保留设置以建立redis连接。
  14.         rdb = redis.NewClient(&redis.Options{
  15.                 Addr:     "localhost:6379",
  16.                 Password: "", // 没有密码,默认值
  17.                 DB:       0,  // 默认DB 0 连接到服务器后要选择的数据库。
  18.                 PoolSize: 20, // 最大套接字连接数。 默认情况下,每个可用CPU有10个连接,由runtime.GOMAXPROCS报告。
  19.         })
  20.         // Background返回一个非空的Context。它永远不会被取消,没有值,也没有截止日期。
  21.         // 它通常由main函数、初始化和测试使用,并作为传入请求的顶级上下文
  22.         ctx := context.Background()
  23.         _, err = rdb.Ping(ctx).Result()
  24.         if err != nil {
  25.                 return err
  26.         }
  27.         return nil
  28. }
  29. // watchDemo 在key值不变的情况下将其值+1
  30. func watchKeyDemo(key string) error {
  31.         ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  32.         defer cancel()
  33.         // Watch准备一个事务,并标记要监视的密钥,以便有条件执行(如果有密钥的话)。
  34.         // 当fn退出时,事务将自动关闭。
  35.         // func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string)
  36.         return rdb.Watch(ctx, func(tx *redis.Tx) error {
  37.                 // Get Redis `GET key` command. It returns redis.Nil error when key does not exist.
  38.                 // 获取 Key 的值 n
  39.                 n, err := tx.Get(ctx, key).Int()
  40.                 if err != nil && err != redis.Nil {
  41.                         fmt.Printf("redis get failed, err: %v\n", err)
  42.                         return err
  43.                 }
  44.                 // 假设操作耗时5秒
  45.                 // 5秒内我们通过其他的客户端修改key,当前事务就会失败
  46.                 time.Sleep(5 * time.Second)
  47.                 // txpipeline 执行事务中fn队列中的命令。
  48.                 // 当使用WATCH时,EXEC只会在被监视的键没有被修改的情况下执行命令,从而允许检查和设置机制。
  49.                 // Exec总是返回命令列表。如果事务失败,则返回TxFailedErr。否则Exec返回第一个失败命令的错误或nil
  50.                 _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
  51.                         // 业务逻辑 如果 Key 没有变化,则在原来的基础上加 1
  52.                         pipe.Set(ctx, key, n+1, time.Hour)
  53.                         return nil
  54.                 })
  55.                 return err
  56.         }, key)
  57. }
  58. func main() {
  59.         if err := initRedisClient(); err != nil {
  60.                 fmt.Printf("initRedisClient failed: %v\n", err)
  61.                 return
  62.         }
  63.         fmt.Println("initRedisClient started successfully")
  64.         defer rdb.Close() // Close 关闭客户端,释放所有打开的资源。关闭客户端是很少见的,因为客户端是长期存在的,并在许多例程之间共享。
  65.         err := watchKeyDemo("watch_key")
  66.         if err != nil {
  67.                 fmt.Printf("watchKeyDemo failed: %v\n", err)
  68.                 return
  69.         }
  70.         fmt.Printf("watchKeyDemo succeeded!\n")
  71. }
复制代码
运行
[code]Code/go/redis_demo via
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小秦哥

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表