Golang Gin Redis+Mysql 同步查询更新删除操纵(我的小GO笔记) ...

立山  金牌会员 | 2024-12-23 20:42:23 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 904|帖子 904|积分 2712

我的需求是在处置惩罚几百上千万数据时避免缓存穿透以及缓存击穿情况出现,并且确保数据库和redis同步,为了方便我查询数据操纵加了一些条件精准查询和模糊查询以及全字段模糊查询、分页、排序一些小玩意,redis存储是hash表key值也就是数据ID,name值是数据表名和redis同步的,别问为什么,我懒!!
  1. 使用示例:
  2. params := utils.QueryParams{
  3.         Name:  "users",
  4.         Limit: 10,
  5.         Order: "id",
  6.         Sort:  1,
  7.         Where: map[string]interface{}{
  8.             "name": "张",  // 将进行模糊查询
  9.             "age":  18,   // 将进行精确匹配
  10.             "*": "李",  // 将进行全字段模糊查询
  11.         },
  12. }
  13. results, err := utils.CustomRedisQuery(db, redisClient, params)
  14. if err != nil {
  15. // 处理错误
  16. }
  17. result, err := GetRedisById(rdb, "users", 1)
  18. result, err := GetRedisByWhere(rdb, "users", map[string]interface{}{"status": 1, "type": "vip"}, 1)
  19. err = DeleteRedisById(rdb, "users", 1)
  20. err = UpdateRedisById(db, rdb, "users", 1)
复制代码
完备代码 
  1. /*
  2. +--------------------------------------------------------------------------------
  3. | If this code works, it was written by Xven. If not, I don't know who wrote it.
  4. +--------------------------------------------------------------------------------
  5. | Statement: An Ordinary Person
  6. +--------------------------------------------------------------------------------
  7. | Author: Xven <QQ:270988107>
  8. +--------------------------------------------------------------------------------
  9. | Copyright (c) 2024 Xven All rights reserved.
  10. +--------------------------------------------------------------------------------
  11. */
  12. package utils
  13. import (
  14.         "context"
  15.         "encoding/json"
  16.         "fmt"
  17.         "sort"
  18.         "strings"
  19.         "sync"
  20.         "time"
  21.         "github.com/go-redis/redis/v8"
  22.         "gorm.io/gorm"
  23. )
  24. type QueryParams struct {
  25.         Name  string                 // 表名
  26.         Limit int                    // 分页数量
  27.         Order string                 // 排序字段
  28.         Sort  int                    // 排序方式 1:升序 2:降序
  29.         Where map[string]interface{} // 查询条件
  30. }
  31. /**
  32. * 检查字符串是否包含子串
  33. * @Author Xven <270988107@qq.com>
  34. * @param {string} str
  35. * @param {string} substr
  36. * @return {bool}
  37. */
  38. func containsString(str, substr string) bool {
  39.         return strings.Contains(strings.ToLower(str), strings.ToLower(substr))
  40. }
  41. /**
  42. * 检查字符串是否包含通配符
  43. * @Author Xven <270988107@qq.com>
  44. * @param {string} str
  45. * @return {bool}
  46. */
  47. func hasWildcard(str string) bool {
  48.         return strings.Contains(str, "*")
  49. }
  50. /**
  51. * 对数据进行排序
  52. * @Author Xven <270988107@qq.com>
  53. * @param {[]map[string]interface{}} data
  54. * @param {string} orderField
  55. * @param {int} sortType
  56. * @return {void}
  57. */
  58. func sortData(data []map[string]interface{}, orderField string, sortType int) {
  59.         sort.Slice(data, func(i, j int) bool {
  60.                 if sortType == 1 { // 升序
  61.                         return fmt.Sprint(data[i][orderField]) < fmt.Sprint(data[j][orderField])
  62.                 }
  63.                 return fmt.Sprint(data[i][orderField]) > fmt.Sprint(data[j][orderField])
  64.         })
  65. }
  66. /**
  67. * 自定义redis查询
  68. * @Author Xven <270988107@qq.com>
  69. * @param {*gorm.DB} db
  70. * @param {*redis.Client} rdb
  71. * @param {QueryParams} params
  72. * @return {[]map[string]interface{}, error}
  73. */
  74. func CustomRedisQuery(db *gorm.DB, rdb *redis.Client, params QueryParams) ([]map[string]interface{}, error) {
  75.         ctx := context.Background()
  76.         var result []map[string]interface{}
  77.         // 参数校验,防止缓存穿透
  78.         if params.Name == "" {
  79.                 return nil, fmt.Errorf("表名不能为空")
  80.         }
  81.         // 构建 Redis key
  82.         redisKey := params.Name + ":list"
  83.         // 使用分布式锁防止缓存击穿
  84.         lockKey := fmt.Sprintf("lock:%s", redisKey)
  85.         lock := rdb.SetNX(ctx, lockKey, "1", 10*time.Second)
  86.         if !lock.Val() {
  87.                 // 等待100ms后重试
  88.                 time.Sleep(100 * time.Millisecond)
  89.                 return CustomRedisQuery(db, rdb, params)
  90.         }
  91.         defer rdb.Del(ctx, lockKey)
  92.         // 1. 先查询 Redis 缓存
  93.         vals, err := rdb.HGetAll(ctx, redisKey).Result()
  94.         if err == nil && len(vals) > 0 {
  95.                 // 将缓存数据解析为结果集
  96.                 for _, v := range vals {
  97.                         var item map[string]interface{}
  98.                         if err := json.Unmarshal([]byte(v), &item); err == nil {
  99.                                 result = append(result, item)
  100.                         }
  101.                 }
  102.                 // 如果有查询条件,则进行过滤
  103.                 if len(params.Where) > 0 {
  104.                         result = filterData(result, params.Where)
  105.                 }
  106.                 // 处理排序
  107.                 if params.Order != "" {
  108.                         sortData(result, params.Order, params.Sort)
  109.                 }
  110.                 // 处理分页
  111.                 if params.Limit > 0 && len(result) > params.Limit {
  112.                         result = result[:params.Limit]
  113.                 }
  114.                 return result, nil
  115.         }
  116.         // 2. Redis 没有数据,从数据库查询
  117.         var dbResult []map[string]interface{}
  118.         // 使用连接池控制并发
  119.         pool := make(chan struct{}, 10)
  120.         var wg sync.WaitGroup
  121.         var mu sync.Mutex
  122.         // 使用游标分批查询数据库,避免一次性加载过多数据
  123.         err = db.Table(params.Name).FindInBatches(&dbResult, 1000, func(tx *gorm.DB, batch int) error {
  124.                 wg.Add(1)
  125.                 pool <- struct{}{} // 获取连接
  126.                 go func(data []map[string]interface{}) {
  127.                         defer func() {
  128.                                 <-pool // 释放连接
  129.                                 wg.Done()
  130.                         }()
  131.                         pipe := rdb.Pipeline()
  132.                         // 批量写入Redis
  133.                         for _, item := range data {
  134.                                 // 将每条记录序列化为JSON
  135.                                 jsonData, err := json.Marshal(item)
  136.                                 if err != nil {
  137.                                         continue
  138.                                 }
  139.                                 // 使用ID作为field,JSON作为value写入hash
  140.                                 id := fmt.Sprint(item["id"])
  141.                                 pipe.HSet(ctx, redisKey, id, string(jsonData))
  142.                         }
  143.                         // 执行管道命令
  144.                         _, err := pipe.Exec(ctx)
  145.                         if err != nil {
  146.                                 // 写入失败时重试写入数据
  147.                                 for _, item := range data {
  148.                                         jsonData, _ := json.Marshal(item)
  149.                                         id := fmt.Sprint(item["id"])
  150.                                         rdb.HSet(ctx, redisKey, id, string(jsonData))
  151.                                 }
  152.                         }
  153.                         mu.Lock()
  154.                         result = append(result, data...)
  155.                         mu.Unlock()
  156.                 }(dbResult)
  157.                 return nil
  158.         }).Error
  159.         wg.Wait()
  160.         if err != nil {
  161.                 // 设置空值缓存,防止缓存穿透
  162.                 rdb.Set(ctx, redisKey+"_empty", "1", 5*time.Minute)
  163.                 return nil, err
  164.         }
  165.         // 处理排序
  166.         if params.Order != "" {
  167.                 sortData(result, params.Order, params.Sort)
  168.         }
  169.         // 处理分页
  170.         if params.Limit > 0 && len(result) > params.Limit {
  171.                 result = result[:params.Limit]
  172.         }
  173.         return result, nil
  174. }
  175. /**
  176. * 过滤数据
  177. * @Author Xven <270988107@qq.com>
  178. * @param {[]map[string]interface{}} data
  179. * @param {map[string]interface{}} where
  180. * @return {[]map[string]interface{}}
  181. */
  182. func filterData(data []map[string]interface{}, where map[string]interface{}) []map[string]interface{} {
  183.         var filteredResult []map[string]interface{}
  184.         // 先处理精确匹配条件
  185.         hasExactMatch := false
  186.         for field, value := range where {
  187.                 if field != "*" {
  188.                         if strValue, ok := value.(string); ok && !hasWildcard(strValue) {
  189.                                 hasExactMatch = true
  190.                                 break
  191.                         } else if !ok {
  192.                                 hasExactMatch = true
  193.                                 break
  194.                         }
  195.                 }
  196.         }
  197.         if hasExactMatch {
  198.                 filteredResult = exactMatch(data, where)
  199.                 if len(filteredResult) > 0 {
  200.                         filteredResult = fuzzyMatch(filteredResult, where)
  201.                 }
  202.         } else {
  203.                 filteredResult = fuzzyMatch(data, where)
  204.         }
  205.         return filteredResult
  206. }
  207. /**
  208. * 精确匹配
  209. * @Author Xven <270988107@qq.com>
  210. * @param {[]map[string]interface{}} data
  211. * @param {map[string]interface{}} where
  212. * @return {[]map[string]interface{}}
  213. */
  214. func exactMatch(data []map[string]interface{}, where map[string]interface{}) []map[string]interface{} {
  215.         var result []map[string]interface{}
  216.         for _, item := range data {
  217.                 matched := true
  218.                 for field, value := range where {
  219.                         if field == "*" {
  220.                                 continue
  221.                         }
  222.                         if strValue, ok := value.(string); ok {
  223.                                 if !hasWildcard(strValue) {
  224.                                         if itemValue, exists := item[field]; !exists || itemValue != value {
  225.                                                 matched = false
  226.                                                 break
  227.                                         }
  228.                                 }
  229.                         } else {
  230.                                 if itemValue, exists := item[field]; !exists || itemValue != value {
  231.                                         matched = false
  232.                                         break
  233.                                 }
  234.                         }
  235.                 }
  236.                 if matched {
  237.                         result = append(result, item)
  238.                 }
  239.         }
  240.         return result
  241. }
  242. /**
  243. * 模糊匹配
  244. * @Author Xven <270988107@qq.com>
  245. * @param {[]map[string]interface{}} data
  246. * @param {map[string]interface{}} where
  247. * @return {[]map[string]interface{}}
  248. */
  249. func fuzzyMatch(data []map[string]interface{}, where map[string]interface{}) []map[string]interface{} {
  250.         var result []map[string]interface{}
  251.         // 处理指定字段的模糊查询
  252.         for _, item := range data {
  253.                 matched := true
  254.                 for field, value := range where {
  255.                         if field == "*" {
  256.                                 continue
  257.                         }
  258.                         if strValue, ok := value.(string); ok && hasWildcard(strValue) {
  259.                                 if itemValue, exists := item[field]; exists {
  260.                                         if strItemValue, ok := itemValue.(string); ok {
  261.                                                 pattern := strings.ReplaceAll(strValue, "*", "")
  262.                                                 if !strings.Contains(strings.ToLower(strItemValue), strings.ToLower(pattern)) {
  263.                                                         matched = false
  264.                                                         break
  265.                                                 }
  266.                                         }
  267.                                 }
  268.                         }
  269.                 }
  270.                 if matched {
  271.                         result = append(result, item)
  272.                 }
  273.         }
  274.         // 处理全字段模糊查询
  275.         if wildcardValue, exists := where["*"]; exists {
  276.                 var globalResult []map[string]interface{}
  277.                 searchData := result
  278.                 if len(searchData) == 0 {
  279.                         searchData = data
  280.                 }
  281.                 if strValue, ok := wildcardValue.(string); ok {
  282.                         for _, item := range searchData {
  283.                                 matched := false
  284.                                 for _, fieldValue := range item {
  285.                                         if strFieldValue, ok := fieldValue.(string); ok {
  286.                                                 if containsString(strFieldValue, strValue) {
  287.                                                         matched = true
  288.                                                         break
  289.                                                 }
  290.                                         }
  291.                                 }
  292.                                 if matched {
  293.                                         globalResult = append(globalResult, item)
  294.                                 }
  295.                         }
  296.                 }
  297.                 result = globalResult
  298.         }
  299.         return result
  300. }
  301. /**
  302. * 根据ID查询单条数据
  303. * @Author Xven <270988107@qq.com>
  304. * @param {*redis.Client} rdb
  305. * @param {string} name
  306. * @param {interface{}} id
  307. * @return {map[string]interface{}, error}
  308. */
  309. func GetRedisById(rdb *redis.Client, name string, id interface{}) (map[string]interface{}, error) {
  310.         ctx := context.Background()
  311.         redisKey := name + ":list"
  312.         // 从Redis查询
  313.         jsonData, err := rdb.HGet(ctx, redisKey, fmt.Sprint(id)).Result()
  314.         if err == nil {
  315.                 // Redis命中,解析JSON数据
  316.                 var result map[string]interface{}
  317.                 err = json.Unmarshal([]byte(jsonData), &result)
  318.                 if err == nil {
  319.                         return result, nil
  320.                 }
  321.         }
  322.         return nil, err
  323. }
  324. /**
  325. * 根据条件查询数据
  326. * @Author Xven <270988107@qq.com>
  327. * @param {*redis.Client} rdb
  328. * @param {string} name
  329. * @param {map[string]interface{}} where
  330. * @param {int} is
  331. * @return {interface{}, error}
  332. */
  333. func GetRedisByWhere(rdb *redis.Client, name string, where map[string]interface{}, is int) (interface{}, error) {
  334.         ctx := context.Background()
  335.         redisKey := name + ":list"
  336.         // 获取所有数据
  337.         values, err := rdb.HGetAll(ctx, redisKey).Result()
  338.         if err != nil {
  339.                 return nil, err
  340.         }
  341.         var results []map[string]interface{}
  342.         // 遍历所有数据进行条件匹配
  343.         for _, jsonStr := range values {
  344.                 var item map[string]interface{}
  345.                 err := json.Unmarshal([]byte(jsonStr), &item)
  346.                 if err != nil {
  347.                         continue
  348.                 }
  349.                 // 检查是否匹配所有条件
  350.                 match := true
  351.                 for k, v := range where {
  352.                         if item[k] != v {
  353.                                 match = false
  354.                                 break
  355.                         }
  356.                 }
  357.                 if match {
  358.                         results = append(results, item)
  359.                         // 如果是单条查询且已找到,则直接返回
  360.                         if is == 0 {
  361.                                 return item, nil
  362.                         }
  363.                 }
  364.         }
  365.         if is == 0 {
  366.                 return nil, nil
  367.         }
  368.         return results, nil
  369. }
  370. /**
  371. * 删除指定ID的数据
  372. * @Author Xven <270988107@qq.com>
  373. * @param {*redis.Client} rdb
  374. * @param {string} name
  375. * @param {interface{}} id
  376. * @return {error}
  377. */
  378. func DeleteRedisById(rdb *redis.Client, name string, id interface{}) error {
  379.         ctx := context.Background()
  380.         redisKey := name + ":list"
  381.         maxRetries := 3
  382.         for i := 0; i < maxRetries; i++ {
  383.                 err := rdb.HDel(ctx, redisKey, fmt.Sprint(id)).Err()
  384.                 if err == nil {
  385.                         return nil
  386.                 }
  387.                 // 重试前等待短暂时间
  388.                 time.Sleep(time.Millisecond * 100)
  389.         }
  390.         return fmt.Errorf("failed to delete after %d retries", maxRetries)
  391. }
  392. /**
  393. * 更新指定ID的数据
  394. * @Author Xven <270988107@qq.com>
  395. * @param {*gorm.DB} db
  396. * @param {*redis.Client} rdb
  397. * @param {string} name
  398. * @param {interface{}} id
  399. * @return {error}
  400. */
  401. func UpdateRedisById(db *gorm.DB, rdb *redis.Client, name string, id interface{}) error {
  402.         ctx := context.Background()
  403.         redisKey := name + ":list"
  404.         maxRetries := 3
  405.         // 从数据库查询数据
  406.         var result map[string]interface{}
  407.         err := db.Table(name).Where("id = ?", id).Take(&result).Error
  408.         if err != nil {
  409.                 return err
  410.         }
  411.         // 序列化数据
  412.         jsonData, err := json.Marshal(result)
  413.         if err != nil {
  414.                 return err
  415.         }
  416.         // 重试更新Redis
  417.         for i := 0; i < maxRetries; i++ {
  418.                 err = rdb.HSet(ctx, redisKey, fmt.Sprint(id), string(jsonData)).Err()
  419.                 if err == nil {
  420.                         return nil
  421.                 }
  422.                 time.Sleep(time.Millisecond * 100)
  423.         }
  424.         return fmt.Errorf("failed to update after %d retries", maxRetries)
  425. }
复制代码
 
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立山

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表