马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
缓存是性能的杠杆
一个QPS 10000的体系,如果70%的哀求掷中缓存,数据库只必要遭受3000 QPS。缓存计划好,体系性能提升10倍不是标题。
这篇文章用MonkeyCode天生完备的Redis缓存方案,涵盖:缓存战略、分布式锁、耽误队列、排行榜、限流等实战场景。
给MonkeyCode的同一Prompt
- 用Python + Redis实现高并发缓存系统,要求:
- 1. 缓存策略:Cache-Aside、Write-Through、Write-Behind
- 2. 缓存失效:TTL、LRU淘汰
- 3. 分布式锁(RedLock)
- 4. 延迟队列
- 5. 排行榜(有序集合)
- 6. 限流(滑动窗口、令牌桶)
- 7. 消息发布订阅
- 8. 热点数据探测
- 9. 缓存异常降级处理
- 需要完整可运行的代码,包含异常处理和监控
埋点
复制代码 1. 缓存战略实现
- # cache_strategies.py - MonkeyCode生成
- import redis
- import json
- import hashlib
- import time
- from functools import wraps
- from typing import Any, Callable, Optional
- from dataclasses import dataclass
- redis_client = redis.Redis(
- host='localhost',
- port=6379,
- db=0,
- decode_responses=True
- )
- # ============================================================
- # Cache-Aside(旁路缓存):最常用策略
- # ============================================================
- class CacheAside:
- """读多写少的场景首选"""
-
- def __init__(self, prefix: str = 'cache'):
- self.prefix = prefix
- self.default_ttl = 300 # 5分钟
-
- def _make_key(self, key: str) -> str:
- return f'{self.prefix}:{key}'
-
- def get(self, key: str, loader: Callable = None, ttl: int = None) -> Any:
- """获取缓存,未命中时加载并缓存"""
- redis_key = self._make_key(key)
-
- # 1. 读缓存
- cached = redis_client.get(redis_key)
- if cached:
- return json.loads(cached)
-
- # 2. 缓存未命中,调用loader加载
- if loader:
- data = loader()
- # 3. 写入缓存
- self.set(key, data, ttl or self.default_ttl)
- return data
-
- return None
-
- def set(self, key: str, value: Any, ttl: int = None):
- """写入缓存"""
- redis_key = self._make_key(key)
- redis_client.setex(
- redis_key,
- ttl or self.default_ttl,
- json.dumps(value)
- )
-
- def delete(self, key: str):
- """删除缓存"""
- redis_client.delete(self._make_key(key))
-
- def invalidate_pattern(self, pattern: str):
- """批量删除(匹配模式)"""
- keys = redis_client.keys(f'{self.prefix}:{pattern}')
- if keys:
- redis_client.delete(*keys)
- # 使用示例
- cache = CacheAside('user')
- def get_user(user_id: int) -> dict:
- """获取用户(缓存优先)"""
- return cache.get(
- f'user:{user_id}',
- loader=lambda: load_user_from_db(user_id),
- ttl=600 # 用户信息缓存10分钟
- )
- def update_user(user_id: int, data: dict):
- """更新用户时删除缓存"""
- update_user_to_db(user_id, data)
- cache.delete(f'user:{user_id}')
- # ============================================================
- # Write-Through(写穿透):数据一致性强保证
- # ============================================================
- class WriteThrough:
- """写操作同时更新缓存和数据库"""
-
- def __init__(self, redis_client):
- self.redis = redis_client
-
- def write(self, key: str, value: Any, ttl: int = 3600):
- """同步写缓存和DB"""
- # 1. 写DB
- write_to_db(key, value)
-
- # 2. 写缓存
- self.redis.setex(key, ttl, json.dumps(value))
-
- def delete(self, key: str):
- """删除时同时清理缓存"""
- delete_from_db(key)
- self.redis.delete(key)
- # ============================================================
- # Write-Behind(写回):写入性能最高
- # ============================================================
- class WriteBehind:
- """异步写回,批量写入减少IO"""
-
- def __init__(self, redis_client, queue_size: int = 1000):
- self.redis = redis_client
- self.queue_size = queue_size
- self.write_queue = []
-
- def write(self, key: str, value: Any):
- """先写缓存,异步批量写DB"""
- # 1. 写缓存
- self.redis.set(key, json.dumps(value))
-
- # 2. 加入写队列
- self.write_queue.append({'key': key, 'value': value})
-
- # 3. 达到阈值批量写DB
- if len(self.write_queue) >= self.queue_size:
- self.flush()
-
- def flush(self):
- """批量写DB"""
- if not self.write_queue:
- return
-
- batch_write_to_db(self.write_queue)
- self.write_queue.clear()
复制代码 2. 缓存装饰器
- # cache_decorator.py - MonkeyCode生成
- import redis
- import json
- import hashlib
- import time
- import functools
- redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- def cached(key_prefix: str, ttl: int = 300, key_builder: Callable = None):
- """缓存装饰器"""
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- # 构建缓存key
- if key_builder:
- cache_key = key_builder(*args, **kwargs)
- else:
- key_parts = [key_prefix, str(args), str(sorted(kwargs.items()))]
- key_str = '|'.join(key_parts)
- cache_key = f'{key_prefix}:{hashlib.md5(key_str.encode()).hexdigest()}'
-
- # 尝试获取缓存
- cached_value = redis_client.get(cache_key)
- if cached_value:
- return json.loads(cached_value)
-
- # 执行原函数
- result = func(*args, **kwargs)
-
- # 写入缓存
- redis_client.setex(cache_key, ttl, json.dumps(result))
-
- return result
- return wrapper
- return decorator
- def cache_invalidate(key_prefix: str, *args):
- """缓存失效装饰器"""
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- result = func(*args, **kwargs)
-
- # 清理相关缓存
- pattern = f'{key_prefix}:*'
- keys = redis_client.keys(pattern)
- if keys:
- redis_client.delete(*keys)
-
- return result
- return wrapper
- return decorator
- # 使用
- @cached('product', ttl=600)
- def get_product(product_id: int) -> dict:
- """获取商品(缓存600秒)"""
- return query_product_from_db(product_id)
- @cache_invalidate('product')
- def update_product(product_id: int, data: dict):
- """更新商品时自动清理缓存"""
- return update_product_in_db(product_id, data)
复制代码 3. 分布式锁
- # distributed_lock.py - MonkeyCode生成
- import redis
- import uuid
- import time
- import threading
- from contextlib import contextmanager
- from typing import Optional
- class DistributedLock:
- """Redis分布式锁(RedLock简化版)"""
-
- def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
- self.redis = redis_client
- self.lock_name = f'lock:{lock_name}'
- self.timeout = timeout
- self.token = str(uuid.uuid4()) # 锁持有者标识
-
- def acquire(self, blocking: bool = True, blocking_timeout: int = 10) -> bool:
- """
- 获取锁
- blocking: 是否阻塞等待
- blocking_timeout: 阻塞超时时间(秒)
- """
- end_time = time.time() + blocking_timeout
-
- while True:
- # SET NX EX:原子操作
- acquired = self.redis.set(
- self.lock_name,
- self.token,
- nx=True, # 只有key不存在时设置
- ex=self.timeout # 自动过期
- )
-
- if acquired:
- return True
-
- if not blocking:
- return False
-
- if time.time() > end_time:
- return False
-
- time.sleep(0.01) # 避免CPU空转
-
- def release(self) -> bool:
- """
- 释放锁(Lua脚本保证原子性)
- 只有持有者才能释放自己的锁
- """
- lua_script = """
- if redis.call("get", KEYS[1]) == ARGV[1] then
- return redis.call("del", KEYS[1])
- else
- return 0
- end
- """
- result = self.redis.eval(lua_script, 1, self.lock_name, self.token)
- return result == 1
-
- def extend(self, additional_time: int = 30) -> bool:
- """延长锁的过期时间"""
- lua_script = """
- if redis.call("get", KEYS[1]) == ARGV[1] then
- return redis.call("expire", KEYS[1], ARGV[2])
- else
- return 0
- end
- """
- new_timeout = self.timeout + additional_time
- result = self.redis.eval(lua_script, 1, self.lock_name, self.token, new_timeout)
- return result == 1
-
- @contextmanager
- def lock(self, blocking: bool = True, blocking_timeout: int = 10):
- """上下文管理器方式使用锁"""
- acquired = self.acquire(blocking, blocking_timeout)
- if not acquired:
- raise RuntimeError(f'Could not acquire lock: {self.lock_name}')
- try:
- yield
- finally:
- self.release()
- # 使用示例
- lock = DistributedLock(redis_client, 'order:create:123', timeout=60)
- # 方式1:手动获取/释放
- if lock.acquire():
- try:
- # 临界区操作
- create_order(123)
- finally:
- lock.release()
- # 方式2:上下文管理器(推荐)
- with lock.lock():
- # 临界区操作
- create_order(123)
复制代码 4. 耽误队列
- # delay_queue.py - MonkeyCode生成
- import redis
- import json
- import time
- import threading
- from typing import Callable, Any, Optional
- class DelayQueue:
- """Redis延迟队列(基于Sorted Set)"""
-
- def __init__(self, redis_client: redis.Redis, queue_name: str):
- self.redis = redis_client
- self.queue_name = f'delayq:{queue_name}'
- self.consumer_name = f'consumer:{queue_name}:{threading.current_thread().name}'
-
- def push(self, job_data: dict, delay_seconds: int):
- """添加延迟任务"""
- job_id = f'job:{time.time()}:{id(job_data)}'
-
- # 任务数据存在Hash里
- self.redis.hset(job_id, mapping={
- 'data': json.dumps(job_data),
- 'status': 'pending',
- 'created_at': time.time()
- })
-
- # 任务ID和执行时间存入Sorted Set
- execute_at = time.time() + delay_seconds
- self.redis.zadd(self.queue_name, {job_id: execute_at})
-
- return job_id
-
- def pop(self, max_wait: int = 5) -> Optional[dict]:
- """
- 阻塞获取可执行的任务
- """
- end_time = time.time() + max_wait
-
- while time.time() < end_time:
- # 获取1秒内过期的任务
- now = time.time()
- jobs = self.redis.zrangebyscore(
- self.queue_name,
- 0,
- now,
- start=0,
- num=1
- )
-
- if not jobs:
- time.sleep(0.1)
- continue
-
- job_id = jobs[0]
-
- # 原子性地将任务转移到处理中
- acquired = self.redis.zrem(self.queue_name, job_id)
- if not acquired:
- # 被其他消费者抢走了
- continue
-
- # 获取任务数据
- job_data = self.redis.hgetall(job_id)
- if not job_data:
- continue
-
- # 更新状态为处理中
- self.redis.hset(job_id, 'status', 'processing')
- self.redis.hset(job_id, 'consumer', self.consumer_name)
- self.redis.hset(job_id, 'started_at', time.time())
-
- # 重新入队(如果超时未完成)
- self.redis.zadd(self.queue_name, {job_id: time.time() + 300}) # 5分钟超时
-
- return {
- 'job_id': job_id,
- 'data': json.loads(job_data['data']),
- 'created_at': float(job_data['created_at'])
- }
-
- return None
-
- def complete(self, job_id: str):
- """标记任务完成"""
- self.redis.hset(job_id, 'status', 'completed')
- self.redis.hset(job_id, 'completed_at', time.time())
-
- def fail(self, job_id: str, error: str):
- """标记任务失败"""
- self.redis.hset(job_id, 'status', 'failed')
- self.redis.hset(job_id, 'error', error)
- self.redis.hset(job_id, 'failed_at', time.time())
-
- def retry(self, job_id: str, delay_seconds: int = 60):
- """重试任务"""
- self.redis.hset(job_id, 'retry_count',
- int(self.redis.hget(job_id, 'retry_count') or 0) + 1)
- self.redis.hset(job_id, 'status', 'pending')
-
- execute_at = time.time() + delay_seconds
- self.redis.zadd(self.queue_name, {job_id: execute_at})
- # 使用示例
- queue = DelayQueue(redis_client, 'order_timeout')
- # 生产者:订单超时15分钟未支付则取消
- def create_order(order_id: int):
- save_order_to_db(order_id)
- # 15分钟后检查订单支付状态
- queue.push({'order_id': order_id, 'action': 'cancel'}, delay_seconds=900)
- # 消费者
- def order_timeout_worker():
- while True:
- job = queue.pop(max_wait=5)
- if job:
- order_id = job['data']['order_id']
- if not is_order_paid(order_id):
- cancel_order(order_id)
- print(f'Order {order_id} cancelled due to timeout')
- queue.complete(job['job_id'])
- # 启动消费者
- worker_thread = threading.Thread(target=order_timeout_worker, daemon=True)
- worker_thread.start()
复制代码 5. 排行榜
- # leaderboard.py - MonkeyCode生成
- import redis
- from typing import List, Dict, Any
- class Leaderboard:
- """Redis有序集合实现的排行榜"""
-
- def __init__(self, redis_client: redis.Redis, leaderboard_name: str):
- self.redis = redis_client
- self.key = f'leaderboard:{leaderboard_name}'
-
- def set_score(self, member: str, score: float):
- """设置/更新用户分数"""
- self.redis.zadd(self.key, {member: score})
-
- def increment(self, member: str, increment: float = 1):
- """增加分数"""
- self.redis.zincrby(self.key, increment, member)
-
- def get_rank(self, member: str, reverse: bool = False) -> int:
- """获取排名(0-based)"""
- if reverse:
- # 降序:第1名返回0
- return self.redis.zrevrank(self.key, member) or -1
- else:
- return self.redis.zrank(self.key, member) or -1
-
- def get_score(self, member: str) -> float:
- """获取分数"""
- return self.redis.zscore(self.key, member) or 0
-
- def get_top(self, top_n: int = 10, with_scores: bool = True) -> List[Dict[str, Any]]:
- """获取Top N"""
- if with_scores:
- results = self.redis.zrevrange(self.key, 0, top_n - 1, withscores=True)
- return [
- {'rank': i + 1, 'member': member, 'score': score}
- for i, (member, score) in enumerate(results)
- ]
- else:
- members = self.redis.zrevrange(self.key, 0, top_n - 1)
- return [{'rank': i + 1, 'member': m} for i, m in enumerate(members)]
-
- def get_around_me(self, member: str, count: int = 5) -> List[Dict[str, Any]]:
- """获取用户周围的排名"""
- rank = self.redis.zrevrank(self.key, member)
- if rank is None:
- return []
-
- start = max(0, rank - count)
- end = rank + count
-
- results = self.redis.zrevrange(self.key, start, end, withscores=True)
-
- return [
- {'rank': start + i + 1, 'member': m, 'score': s}
- for i, (m, s) in enumerate(results)
- ]
-
- def get_rank_by_score(self, min_score: float, max_score: float) -> int:
- """根据分数范围获取排名"""
- return self.redis.zcount(self.key, min_score, max_score)
- # 使用示例
- leaderboard = Leaderboard(redis_client, 'game_scores')
- # 增加分数
- leaderboard.increment('player_001', 100)
- leaderboard.increment('player_002', 80)
- leaderboard.increment('player_003', 120)
- # 获取Top 10
- top10 = leaderboard.get_top(10)
- for p in top10:
- print(f"#{p['rank']} {p['member']}: {p['score']}")
- # 查询特定玩家排名
- player_rank = leaderboard.get_rank('player_001')
- print(f"player_001 排名: #{player_rank + 1}")
复制代码 6. 限流
- # rate_limiter.py - MonkeyCode生成
- import redis
- import time
- from typing import Tuple
- class SlidingWindowRateLimiter:
- """滑动窗口限流"""
-
- def __init__(self, redis_client: redis.Redis, key: str, max_requests: int, window_seconds: int):
- self.redis = redis_client
- self.key = f'ratelimit:{key}'
- self.max_requests = max_requests
- self.window_seconds = window_seconds
-
- def is_allowed(self) -> bool:
- """检查是否允许请求"""
- now = time.time()
- window_start = now - self.window_seconds
-
- # 移除窗口外的请求记录
- self.redis.zremrangebyscore(self.key, 0, window_start)
-
- # 检查当前请求数
- current_count = self.redis.zcard(self.key)
-
- if current_count >= self.max_requests:
- return False
-
- # 添加当前请求
- self.redis.zadd(self.key, {f'{now}': now})
- self.redis.expire(self.key, self.window_seconds)
-
- return True
-
- def get_remaining(self) -> int:
- """获取剩余可用次数"""
- now = time.time()
- window_start = now - self.window_seconds
- self.redis.zremrangebyscore(self.key, 0, window_start)
- return max(0, self.max_requests - self.redis.zcard(self.key))
- class TokenBucketRateLimiter:
- """令牌桶限流"""
-
- def __init__(self, redis_client: redis.Redis, key: str,
- capacity: int, refill_rate: float):
- self.redis = redis_client
- self.key = f'tokenbucket:{key}'
- self.capacity = capacity
- self.refill_rate = refill_rate # 每秒补充令牌数
-
- def _refill(self) -> Tuple[float, float]:
- """补充令牌"""
- now = time.time()
- last_refill = float(self.redis.hget(self.key, 'last_refill') or now)
-
- # 计算应该补充的令牌数
- elapsed = now - last_refill
- tokens_to_add = elapsed * self.refill_rate
-
- current_tokens = float(self.redis.hget(self.key, 'tokens') or self.capacity)
- new_tokens = min(self.capacity, current_tokens + tokens_to_add)
-
- # 保存状态
- self.redis.hset(self.key, 'tokens', new_tokens)
- self.redis.hset(self.key, 'last_refill', now)
- self.redis.expire(self.key, 60) # 1分钟过期
-
- return new_tokens
-
- def is_allowed(self, tokens_needed: int = 1) -> bool:
- """检查并消费令牌"""
- current_tokens = self._refill()
-
- if current_tokens >= tokens_needed:
- # 消费令牌
- new_tokens = current_tokens - tokens_needed
- self.redis.hset(self.key, 'tokens', new_tokens)
- return True
-
- return False
- # 使用示例
- api_limiter = SlidingWindowRateLimiter(redis_client, 'api:user:123', 60, 60) # 每分钟60次
- def api_handler(user_id: int):
- limiter = SlidingWindowRateLimiter(redis_client, f'api:user:{user_id}', 60, 60)
-
- if not limiter.is_allowed():
- raise Exception('请求过于频繁,请稍后再试')
-
- # 处理请求
- process_api_request(user_id)
复制代码 7. 热门数据探测
- # hot_key_detection.py - MonkeyCode生成
- import redis
- import time
- from collections import defaultdict
- class HotKeyDetector:
- """热点Key探测(基于统计)"""
-
- def __init__(self, redis_client: redis.Redis, sample_rate: int = 100):
- self.redis = redis_client
- self.sample_rate = sample_rate # 采样率(1/100)
- self.hot_keys = set()
-
- def record_access(self, key: str):
- """记录访问(采样)"""
- if time.time() % self.sample_rate == 0:
- self.redis.pfadd('access:keys', key)
-
- def get_hot_keys(self, threshold: int = 1000) -> set:
- """获取热点Key(HyperLogLog估算)"""
- # 导出统计
- keys = self.redis.pfcount('access:keys')
- if keys < threshold:
- return set()
-
- # 获取所有被访问过的Key
- # 实际应该用有序集合记录访问次数
- return self.hot_keys
- # 基于访问频率的热Key缓存
- class HotKeyCache:
- """热Key自动缓存"""
-
- def __init__(self, redis_client: redis.Redis):
- self.redis = redis_client
- self.prefix = 'hotcache:'
- self.hot_threshold = 1000 # 访问1000次以上
-
- def get(self, key: str, loader: Callable) -> Any:
- """获取数据,自动缓存热Key"""
- # 记录访问
- access_key = f'access:count:{key}'
- access_count = self.redis.incr(access_key)
-
- # 检查是否是热Key
- if access_count > self.hot_threshold and not self.redis.exists(f'{self.prefix}{key}'):
- # 预热缓存
- data = loader()
- self.redis.setex(f'{self.prefix}{key}', 3600, json.dumps(data))
-
- # 读缓存
- cached = self.redis.get(f'{self.prefix}{key}')
- if cached:
- return json.loads(cached)
-
- return loader()
复制代码 用Redis做缓存的关键是根据场景选战略:读多写少用Cache-Aside,写多读少用Write-Through,高并发抢锁用分布式锁,超时使命用耽误队列。MonkeyCode能帮你天生完备代码,但战略选择必要团毕业务特点。
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金. |