MonkeyCode与Redis实战:高并发体系的缓存计划完全指南

[复制链接]
发表于 昨天 19:08 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
缓存是性能的杠杆

一个QPS 10000的体系,如果70%的哀求掷中缓存,数据库只必要遭受3000 QPS。缓存计划好,体系性能提升10倍不是标题。
这篇文章用MonkeyCode天生完备的Redis缓存方案,涵盖:缓存战略、分布式锁、耽误队列、排行榜、限流等实战场景。
给MonkeyCode的同一Prompt
  1. 用Python + Redis实现高并发缓存系统,要求:
  2. 1. 缓存策略:Cache-Aside、Write-Through、Write-Behind
  3. 2. 缓存失效:TTL、LRU淘汰
  4. 3. 分布式锁(RedLock)
  5. 4. 延迟队列
  6. 5. 排行榜(有序集合)
  7. 6. 限流(滑动窗口、令牌桶)
  8. 7. 消息发布订阅
  9. 8. 热点数据探测
  10. 9. 缓存异常降级处理
  11. 需要完整可运行的代码,包含异常处理和监控监控埋点
复制代码
1. 缓存战略实现
  1. # cache_strategies.py - MonkeyCode生成
  2. import redis
  3. import json
  4. import hashlib
  5. import time
  6. from functools import wraps
  7. from typing import Any, Callable, Optional
  8. from dataclasses import dataclass
  9. redis_client = redis.Redis(
  10.     host='localhost',
  11.     port=6379,
  12.     db=0,
  13.     decode_responses=True
  14. )
  15. # ============================================================
  16. # Cache-Aside(旁路缓存):最常用策略
  17. # ============================================================
  18. class CacheAside:
  19.     """读多写少的场景首选"""
  20.    
  21.     def __init__(self, prefix: str = 'cache'):
  22.         self.prefix = prefix
  23.         self.default_ttl = 300  # 5分钟
  24.    
  25.     def _make_key(self, key: str) -> str:
  26.         return f'{self.prefix}:{key}'
  27.    
  28.     def get(self, key: str, loader: Callable = None, ttl: int = None) -> Any:
  29.         """获取缓存,未命中时加载并缓存"""
  30.         redis_key = self._make_key(key)
  31.         
  32.         # 1. 读缓存
  33.         cached = redis_client.get(redis_key)
  34.         if cached:
  35.             return json.loads(cached)
  36.         
  37.         # 2. 缓存未命中,调用loader加载
  38.         if loader:
  39.             data = loader()
  40.             # 3. 写入缓存
  41.             self.set(key, data, ttl or self.default_ttl)
  42.             return data
  43.         
  44.         return None
  45.    
  46.     def set(self, key: str, value: Any, ttl: int = None):
  47.         """写入缓存"""
  48.         redis_key = self._make_key(key)
  49.         redis_client.setex(
  50.             redis_key,
  51.             ttl or self.default_ttl,
  52.             json.dumps(value)
  53.         )
  54.    
  55.     def delete(self, key: str):
  56.         """删除缓存"""
  57.         redis_client.delete(self._make_key(key))
  58.    
  59.     def invalidate_pattern(self, pattern: str):
  60.         """批量删除(匹配模式)"""
  61.         keys = redis_client.keys(f'{self.prefix}:{pattern}')
  62.         if keys:
  63.             redis_client.delete(*keys)
  64. # 使用示例
  65. cache = CacheAside('user')
  66. def get_user(user_id: int) -> dict:
  67.     """获取用户(缓存优先)"""
  68.     return cache.get(
  69.         f'user:{user_id}',
  70.         loader=lambda: load_user_from_db(user_id),
  71.         ttl=600  # 用户信息缓存10分钟
  72.     )
  73. def update_user(user_id: int, data: dict):
  74.     """更新用户时删除缓存"""
  75.     update_user_to_db(user_id, data)
  76.     cache.delete(f'user:{user_id}')
  77. # ============================================================
  78. # Write-Through(写穿透):数据一致性强保证
  79. # ============================================================
  80. class WriteThrough:
  81.     """写操作同时更新缓存和数据库"""
  82.    
  83.     def __init__(self, redis_client):
  84.         self.redis = redis_client
  85.    
  86.     def write(self, key: str, value: Any, ttl: int = 3600):
  87.         """同步写缓存和DB"""
  88.         # 1. 写DB
  89.         write_to_db(key, value)
  90.         
  91.         # 2. 写缓存
  92.         self.redis.setex(key, ttl, json.dumps(value))
  93.    
  94.     def delete(self, key: str):
  95.         """删除时同时清理缓存"""
  96.         delete_from_db(key)
  97.         self.redis.delete(key)
  98. # ============================================================
  99. # Write-Behind(写回):写入性能最高
  100. # ============================================================
  101. class WriteBehind:
  102.     """异步写回,批量写入减少IO"""
  103.    
  104.     def __init__(self, redis_client, queue_size: int = 1000):
  105.         self.redis = redis_client
  106.         self.queue_size = queue_size
  107.         self.write_queue = []
  108.    
  109.     def write(self, key: str, value: Any):
  110.         """先写缓存,异步批量写DB"""
  111.         # 1. 写缓存
  112.         self.redis.set(key, json.dumps(value))
  113.         
  114.         # 2. 加入写队列
  115.         self.write_queue.append({'key': key, 'value': value})
  116.         
  117.         # 3. 达到阈值批量写DB
  118.         if len(self.write_queue) >= self.queue_size:
  119.             self.flush()
  120.    
  121.     def flush(self):
  122.         """批量写DB"""
  123.         if not self.write_queue:
  124.             return
  125.         
  126.         batch_write_to_db(self.write_queue)
  127.         self.write_queue.clear()
复制代码
2. 缓存装饰器
  1. # cache_decorator.py - MonkeyCode生成
  2. import redis
  3. import json
  4. import hashlib
  5. import time
  6. import functools
  7. redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
  8. def cached(key_prefix: str, ttl: int = 300, key_builder: Callable = None):
  9.     """缓存装饰器"""
  10.     def decorator(func):
  11.         @functools.wraps(func)
  12.         def wrapper(*args, **kwargs):
  13.             # 构建缓存key
  14.             if key_builder:
  15.                 cache_key = key_builder(*args, **kwargs)
  16.             else:
  17.                 key_parts = [key_prefix, str(args), str(sorted(kwargs.items()))]
  18.                 key_str = '|'.join(key_parts)
  19.                 cache_key = f'{key_prefix}:{hashlib.md5(key_str.encode()).hexdigest()}'
  20.             
  21.             # 尝试获取缓存
  22.             cached_value = redis_client.get(cache_key)
  23.             if cached_value:
  24.                 return json.loads(cached_value)
  25.             
  26.             # 执行原函数
  27.             result = func(*args, **kwargs)
  28.             
  29.             # 写入缓存
  30.             redis_client.setex(cache_key, ttl, json.dumps(result))
  31.             
  32.             return result
  33.         return wrapper
  34.     return decorator
  35. def cache_invalidate(key_prefix: str, *args):
  36.     """缓存失效装饰器"""
  37.     def decorator(func):
  38.         @functools.wraps(func)
  39.         def wrapper(*args, **kwargs):
  40.             result = func(*args, **kwargs)
  41.             
  42.             # 清理相关缓存
  43.             pattern = f'{key_prefix}:*'
  44.             keys = redis_client.keys(pattern)
  45.             if keys:
  46.                 redis_client.delete(*keys)
  47.             
  48.             return result
  49.         return wrapper
  50.     return decorator
  51. # 使用
  52. @cached('product', ttl=600)
  53. def get_product(product_id: int) -> dict:
  54.     """获取商品(缓存600秒)"""
  55.     return query_product_from_db(product_id)
  56. @cache_invalidate('product')
  57. def update_product(product_id: int, data: dict):
  58.     """更新商品时自动清理缓存"""
  59.     return update_product_in_db(product_id, data)
复制代码
3. 分布式锁
  1. # distributed_lock.py - MonkeyCode生成
  2. import redis
  3. import uuid
  4. import time
  5. import threading
  6. from contextlib import contextmanager
  7. from typing import Optional
  8. class DistributedLock:
  9.     """Redis分布式锁(RedLock简化版)"""
  10.    
  11.     def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
  12.         self.redis = redis_client
  13.         self.lock_name = f'lock:{lock_name}'
  14.         self.timeout = timeout
  15.         self.token = str(uuid.uuid4())  # 锁持有者标识
  16.    
  17.     def acquire(self, blocking: bool = True, blocking_timeout: int = 10) -> bool:
  18.         """
  19.         获取锁
  20.         blocking: 是否阻塞等待
  21.         blocking_timeout: 阻塞超时时间(秒)
  22.         """
  23.         end_time = time.time() + blocking_timeout
  24.         
  25.         while True:
  26.             # SET NX EX:原子操作
  27.             acquired = self.redis.set(
  28.                 self.lock_name,
  29.                 self.token,
  30.                 nx=True,  # 只有key不存在时设置
  31.                 ex=self.timeout  # 自动过期
  32.             )
  33.             
  34.             if acquired:
  35.                 return True
  36.             
  37.             if not blocking:
  38.                 return False
  39.             
  40.             if time.time() > end_time:
  41.                 return False
  42.             
  43.             time.sleep(0.01)  # 避免CPU空转
  44.    
  45.     def release(self) -> bool:
  46.         """
  47.         释放锁(Lua脚本保证原子性)
  48.         只有持有者才能释放自己的锁
  49.         """
  50.         lua_script = """
  51.         if redis.call("get", KEYS[1]) == ARGV[1] then
  52.             return redis.call("del", KEYS[1])
  53.         else
  54.             return 0
  55.         end
  56.         """
  57.         result = self.redis.eval(lua_script, 1, self.lock_name, self.token)
  58.         return result == 1
  59.    
  60.     def extend(self, additional_time: int = 30) -> bool:
  61.         """延长锁的过期时间"""
  62.         lua_script = """
  63.         if redis.call("get", KEYS[1]) == ARGV[1] then
  64.             return redis.call("expire", KEYS[1], ARGV[2])
  65.         else
  66.             return 0
  67.         end
  68.         """
  69.         new_timeout = self.timeout + additional_time
  70.         result = self.redis.eval(lua_script, 1, self.lock_name, self.token, new_timeout)
  71.         return result == 1
  72.    
  73.     @contextmanager
  74.     def lock(self, blocking: bool = True, blocking_timeout: int = 10):
  75.         """上下文管理器方式使用锁"""
  76.         acquired = self.acquire(blocking, blocking_timeout)
  77.         if not acquired:
  78.             raise RuntimeError(f'Could not acquire lock: {self.lock_name}')
  79.         try:
  80.             yield
  81.         finally:
  82.             self.release()
  83. # 使用示例
  84. lock = DistributedLock(redis_client, 'order:create:123', timeout=60)
  85. # 方式1:手动获取/释放
  86. if lock.acquire():
  87.     try:
  88.         # 临界区操作
  89.         create_order(123)
  90.     finally:
  91.         lock.release()
  92. # 方式2:上下文管理器(推荐)
  93. with lock.lock():
  94.     # 临界区操作
  95.     create_order(123)
复制代码
4. 耽误队列
  1. # delay_queue.py - MonkeyCode生成
  2. import redis
  3. import json
  4. import time
  5. import threading
  6. from typing import Callable, Any, Optional
  7. class DelayQueue:
  8.     """Redis延迟队列(基于Sorted Set)"""
  9.    
  10.     def __init__(self, redis_client: redis.Redis, queue_name: str):
  11.         self.redis = redis_client
  12.         self.queue_name = f'delayq:{queue_name}'
  13.         self.consumer_name = f'consumer:{queue_name}:{threading.current_thread().name}'
  14.    
  15.     def push(self, job_data: dict, delay_seconds: int):
  16.         """添加延迟任务"""
  17.         job_id = f'job:{time.time()}:{id(job_data)}'
  18.         
  19.         # 任务数据存在Hash里
  20.         self.redis.hset(job_id, mapping={
  21.             'data': json.dumps(job_data),
  22.             'status': 'pending',
  23.             'created_at': time.time()
  24.         })
  25.         
  26.         # 任务ID和执行时间存入Sorted Set
  27.         execute_at = time.time() + delay_seconds
  28.         self.redis.zadd(self.queue_name, {job_id: execute_at})
  29.         
  30.         return job_id
  31.    
  32.     def pop(self, max_wait: int = 5) -> Optional[dict]:
  33.         """
  34.         阻塞获取可执行的任务
  35.         """
  36.         end_time = time.time() + max_wait
  37.         
  38.         while time.time() < end_time:
  39.             # 获取1秒内过期的任务
  40.             now = time.time()
  41.             jobs = self.redis.zrangebyscore(
  42.                 self.queue_name,
  43.                 0,
  44.                 now,
  45.                 start=0,
  46.                 num=1
  47.             )
  48.             
  49.             if not jobs:
  50.                 time.sleep(0.1)
  51.                 continue
  52.             
  53.             job_id = jobs[0]
  54.             
  55.             # 原子性地将任务转移到处理中
  56.             acquired = self.redis.zrem(self.queue_name, job_id)
  57.             if not acquired:
  58.                 # 被其他消费者抢走了
  59.                 continue
  60.             
  61.             # 获取任务数据
  62.             job_data = self.redis.hgetall(job_id)
  63.             if not job_data:
  64.                 continue
  65.             
  66.             # 更新状态为处理中
  67.             self.redis.hset(job_id, 'status', 'processing')
  68.             self.redis.hset(job_id, 'consumer', self.consumer_name)
  69.             self.redis.hset(job_id, 'started_at', time.time())
  70.             
  71.             # 重新入队(如果超时未完成)
  72.             self.redis.zadd(self.queue_name, {job_id: time.time() + 300})  # 5分钟超时
  73.             
  74.             return {
  75.                 'job_id': job_id,
  76.                 'data': json.loads(job_data['data']),
  77.                 'created_at': float(job_data['created_at'])
  78.             }
  79.         
  80.         return None
  81.    
  82.     def complete(self, job_id: str):
  83.         """标记任务完成"""
  84.         self.redis.hset(job_id, 'status', 'completed')
  85.         self.redis.hset(job_id, 'completed_at', time.time())
  86.    
  87.     def fail(self, job_id: str, error: str):
  88.         """标记任务失败"""
  89.         self.redis.hset(job_id, 'status', 'failed')
  90.         self.redis.hset(job_id, 'error', error)
  91.         self.redis.hset(job_id, 'failed_at', time.time())
  92.    
  93.     def retry(self, job_id: str, delay_seconds: int = 60):
  94.         """重试任务"""
  95.         self.redis.hset(job_id, 'retry_count',
  96.                        int(self.redis.hget(job_id, 'retry_count') or 0) + 1)
  97.         self.redis.hset(job_id, 'status', 'pending')
  98.         
  99.         execute_at = time.time() + delay_seconds
  100.         self.redis.zadd(self.queue_name, {job_id: execute_at})
  101. # 使用示例
  102. queue = DelayQueue(redis_client, 'order_timeout')
  103. # 生产者:订单超时15分钟未支付则取消
  104. def create_order(order_id: int):
  105.     save_order_to_db(order_id)
  106.     # 15分钟后检查订单支付状态
  107.     queue.push({'order_id': order_id, 'action': 'cancel'}, delay_seconds=900)
  108. # 消费者
  109. def order_timeout_worker():
  110.     while True:
  111.         job = queue.pop(max_wait=5)
  112.         if job:
  113.             order_id = job['data']['order_id']
  114.             if not is_order_paid(order_id):
  115.                 cancel_order(order_id)
  116.                 print(f'Order {order_id} cancelled due to timeout')
  117.             queue.complete(job['job_id'])
  118. # 启动消费者
  119. worker_thread = threading.Thread(target=order_timeout_worker, daemon=True)
  120. worker_thread.start()
复制代码
5. 排行榜
  1. # leaderboard.py - MonkeyCode生成
  2. import redis
  3. from typing import List, Dict, Any
  4. class Leaderboard:
  5.     """Redis有序集合实现的排行榜"""
  6.    
  7.     def __init__(self, redis_client: redis.Redis, leaderboard_name: str):
  8.         self.redis = redis_client
  9.         self.key = f'leaderboard:{leaderboard_name}'
  10.    
  11.     def set_score(self, member: str, score: float):
  12.         """设置/更新用户分数"""
  13.         self.redis.zadd(self.key, {member: score})
  14.    
  15.     def increment(self, member: str, increment: float = 1):
  16.         """增加分数"""
  17.         self.redis.zincrby(self.key, increment, member)
  18.    
  19.     def get_rank(self, member: str, reverse: bool = False) -> int:
  20.         """获取排名(0-based)"""
  21.         if reverse:
  22.             # 降序:第1名返回0
  23.             return self.redis.zrevrank(self.key, member) or -1
  24.         else:
  25.             return self.redis.zrank(self.key, member) or -1
  26.    
  27.     def get_score(self, member: str) -> float:
  28.         """获取分数"""
  29.         return self.redis.zscore(self.key, member) or 0
  30.    
  31.     def get_top(self, top_n: int = 10, with_scores: bool = True) -> List[Dict[str, Any]]:
  32.         """获取Top N"""
  33.         if with_scores:
  34.             results = self.redis.zrevrange(self.key, 0, top_n - 1, withscores=True)
  35.             return [
  36.                 {'rank': i + 1, 'member': member, 'score': score}
  37.                 for i, (member, score) in enumerate(results)
  38.             ]
  39.         else:
  40.             members = self.redis.zrevrange(self.key, 0, top_n - 1)
  41.             return [{'rank': i + 1, 'member': m} for i, m in enumerate(members)]
  42.    
  43.     def get_around_me(self, member: str, count: int = 5) -> List[Dict[str, Any]]:
  44.         """获取用户周围的排名"""
  45.         rank = self.redis.zrevrank(self.key, member)
  46.         if rank is None:
  47.             return []
  48.         
  49.         start = max(0, rank - count)
  50.         end = rank + count
  51.         
  52.         results = self.redis.zrevrange(self.key, start, end, withscores=True)
  53.         
  54.         return [
  55.             {'rank': start + i + 1, 'member': m, 'score': s}
  56.             for i, (m, s) in enumerate(results)
  57.         ]
  58.    
  59.     def get_rank_by_score(self, min_score: float, max_score: float) -> int:
  60.         """根据分数范围获取排名"""
  61.         return self.redis.zcount(self.key, min_score, max_score)
  62. # 使用示例
  63. leaderboard = Leaderboard(redis_client, 'game_scores')
  64. # 增加分数
  65. leaderboard.increment('player_001', 100)
  66. leaderboard.increment('player_002', 80)
  67. leaderboard.increment('player_003', 120)
  68. # 获取Top 10
  69. top10 = leaderboard.get_top(10)
  70. for p in top10:
  71.     print(f"#{p['rank']} {p['member']}: {p['score']}")
  72. # 查询特定玩家排名
  73. player_rank = leaderboard.get_rank('player_001')
  74. print(f"player_001 排名: #{player_rank + 1}")
复制代码
6. 限流
  1. # rate_limiter.py - MonkeyCode生成
  2. import redis
  3. import time
  4. from typing import Tuple
  5. class SlidingWindowRateLimiter:
  6.     """滑动窗口限流"""
  7.    
  8.     def __init__(self, redis_client: redis.Redis, key: str, max_requests: int, window_seconds: int):
  9.         self.redis = redis_client
  10.         self.key = f'ratelimit:{key}'
  11.         self.max_requests = max_requests
  12.         self.window_seconds = window_seconds
  13.    
  14.     def is_allowed(self) -> bool:
  15.         """检查是否允许请求"""
  16.         now = time.time()
  17.         window_start = now - self.window_seconds
  18.         
  19.         # 移除窗口外的请求记录
  20.         self.redis.zremrangebyscore(self.key, 0, window_start)
  21.         
  22.         # 检查当前请求数
  23.         current_count = self.redis.zcard(self.key)
  24.         
  25.         if current_count >= self.max_requests:
  26.             return False
  27.         
  28.         # 添加当前请求
  29.         self.redis.zadd(self.key, {f'{now}': now})
  30.         self.redis.expire(self.key, self.window_seconds)
  31.         
  32.         return True
  33.    
  34.     def get_remaining(self) -> int:
  35.         """获取剩余可用次数"""
  36.         now = time.time()
  37.         window_start = now - self.window_seconds
  38.         self.redis.zremrangebyscore(self.key, 0, window_start)
  39.         return max(0, self.max_requests - self.redis.zcard(self.key))
  40. class TokenBucketRateLimiter:
  41.     """令牌桶限流"""
  42.    
  43.     def __init__(self, redis_client: redis.Redis, key: str,
  44.                  capacity: int, refill_rate: float):
  45.         self.redis = redis_client
  46.         self.key = f'tokenbucket:{key}'
  47.         self.capacity = capacity
  48.         self.refill_rate = refill_rate  # 每秒补充令牌数
  49.    
  50.     def _refill(self) -> Tuple[float, float]:
  51.         """补充令牌"""
  52.         now = time.time()
  53.         last_refill = float(self.redis.hget(self.key, 'last_refill') or now)
  54.         
  55.         # 计算应该补充的令牌数
  56.         elapsed = now - last_refill
  57.         tokens_to_add = elapsed * self.refill_rate
  58.         
  59.         current_tokens = float(self.redis.hget(self.key, 'tokens') or self.capacity)
  60.         new_tokens = min(self.capacity, current_tokens + tokens_to_add)
  61.         
  62.         # 保存状态
  63.         self.redis.hset(self.key, 'tokens', new_tokens)
  64.         self.redis.hset(self.key, 'last_refill', now)
  65.         self.redis.expire(self.key, 60)  # 1分钟过期
  66.         
  67.         return new_tokens
  68.    
  69.     def is_allowed(self, tokens_needed: int = 1) -> bool:
  70.         """检查并消费令牌"""
  71.         current_tokens = self._refill()
  72.         
  73.         if current_tokens >= tokens_needed:
  74.             # 消费令牌
  75.             new_tokens = current_tokens - tokens_needed
  76.             self.redis.hset(self.key, 'tokens', new_tokens)
  77.             return True
  78.         
  79.         return False
  80. # 使用示例
  81. api_limiter = SlidingWindowRateLimiter(redis_client, 'api:user:123', 60, 60)  # 每分钟60次
  82. def api_handler(user_id: int):
  83.     limiter = SlidingWindowRateLimiter(redis_client, f'api:user:{user_id}', 60, 60)
  84.    
  85.     if not limiter.is_allowed():
  86.         raise Exception('请求过于频繁,请稍后再试')
  87.    
  88.     # 处理请求
  89.     process_api_request(user_id)
复制代码
7. 热门数据探测
  1. # hot_key_detection.py - MonkeyCode生成
  2. import redis
  3. import time
  4. from collections import defaultdict
  5. class HotKeyDetector:
  6.     """热点Key探测(基于统计)"""
  7.    
  8.     def __init__(self, redis_client: redis.Redis, sample_rate: int = 100):
  9.         self.redis = redis_client
  10.         self.sample_rate = sample_rate  # 采样率(1/100)
  11.         self.hot_keys = set()
  12.    
  13.     def record_access(self, key: str):
  14.         """记录访问(采样)"""
  15.         if time.time() % self.sample_rate == 0:
  16.             self.redis.pfadd('access:keys', key)
  17.    
  18.     def get_hot_keys(self, threshold: int = 1000) -> set:
  19.         """获取热点Key(HyperLogLog估算)"""
  20.         # 导出统计
  21.         keys = self.redis.pfcount('access:keys')
  22.         if keys < threshold:
  23.             return set()
  24.         
  25.         # 获取所有被访问过的Key
  26.         # 实际应该用有序集合记录访问次数
  27.         return self.hot_keys
  28. # 基于访问频率的热Key缓存
  29. class HotKeyCache:
  30.     """热Key自动缓存"""
  31.    
  32.     def __init__(self, redis_client: redis.Redis):
  33.         self.redis = redis_client
  34.         self.prefix = 'hotcache:'
  35.         self.hot_threshold = 1000  # 访问1000次以上
  36.    
  37.     def get(self, key: str, loader: Callable) -> Any:
  38.         """获取数据,自动缓存热Key"""
  39.         # 记录访问
  40.         access_key = f'access:count:{key}'
  41.         access_count = self.redis.incr(access_key)
  42.         
  43.         # 检查是否是热Key
  44.         if access_count > self.hot_threshold and not self.redis.exists(f'{self.prefix}{key}'):
  45.             # 预热缓存
  46.             data = loader()
  47.             self.redis.setex(f'{self.prefix}{key}', 3600, json.dumps(data))
  48.         
  49.         # 读缓存
  50.         cached = self.redis.get(f'{self.prefix}{key}')
  51.         if cached:
  52.             return json.loads(cached)
  53.         
  54.         return loader()
复制代码
用Redis做缓存的关键是根据场景选战略:读多写少用Cache-Aside,写多读少用Write-Through,高并发抢锁用分布式锁,超时使命用耽误队列。MonkeyCode能帮你天生完备代码,但战略选择必要团毕业务特点。

免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金.
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表