优化类别具体措施预期结果数据库优化索引优化、查询优化、N+1问题提拔查询速率、淘汰数据库负载缓存优化Redis缓存、页面缓存、查询缓存淘汰数据库访问、提拔响应速率异步处理Celery任务队列、异步视图、异步ORM提拔并发性能、优化用户体验 二、数据库优化实现
1. 模子设计和索引优化
- #
- from django.db import models
- from django.db.models import Index
- class Category(models.Model):
- name = models.CharField(max_length=100)
- slug = models.SlugField(unique=True)
- class Meta:
- indexes = [
- models.Index(fields=['name']),
- models.Index(fields=['slug'])
- ]
- class Article(models.Model):
- title = models.CharField(max_length=200)
- content = models.TextField()
- category = models.ForeignKey(Category, on_delete=models.CASCADE)
- author = models.ForeignKey('auth.User', on_delete=models.CASCADE)
- created_at = models.DateTimeField(auto_now_add=True)
- updated_at = models.DateTimeField(auto_now=True)
- views = models.IntegerField(default=0)
- class Meta:
- indexes = [
- models.Index(fields=['created_at', '-views']),
- models.Index(fields=['category', 'author']),
- ]
复制代码 2. 查询优化
- #
- from django.db.models import Prefetch, Count, Q
- from django.views.generic import ListView
- class ArticleListView(ListView):
- model = Article
- template_name = 'articles/article_list.html'
- context_object_name = 'articles'
- def get_queryset(self):
- # 使用select_related减少外键查询
- queryset = Article.objects.select_related(
- 'category', 'author'
- ).prefetch_related(
- 'tags',
- Prefetch(
- 'comments',
- queryset=Comment.objects.select_related('user')
- )
- )
- # 添加过滤条件
- category = self.request.GET.get('category')
- if category:
- queryset = queryset.filter(category__slug=category)
- # 添加聚合查询
- queryset = queryset.annotate(
- comment_count=Count('comments'),
- like_count=Count('likes')
- )
- return queryset.order_by('-created_at')
复制代码 3. 批量操纵优化
- #
- from django.db import transaction
- def bulk_create_articles(articles_data):
- """批量创建文章"""
- with transaction.atomic():
- articles = [
- Article(
- title=data['title'],
- content=data['content'],
- category_id=data['category_id'],
- author_id=data['author_id']
- )
- for data in articles_data
- ]
- return Article.objects.bulk_create(articles)
- def bulk_update_views(article_ids):
- """批量更新文章浏览量"""
- with transaction.atomic():
- Article.objects.filter(id__in=article_ids).update(
- views=models.F('views') + 1
- )
复制代码 三、缓存优化实现
1. Redis缓存配置
- #
- CACHES = {
- 'default': {
- 'BACKEND': 'django_redis.cache.RedisCache',
- 'LOCATION': 'redis://',
- 'OPTIONS': {
- 'CLIENT_CLASS': 'django_redis.client.DefaultClient',
- }
- }
- }
- # 使用Redis作为Session后端
- SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
复制代码 2. 视图缓存实现
- #
- from django.views.decorators.cache import cache_page
- from django.utils.decorators import method_decorator
- from django.core.cache import cache
- from django.conf import settings
- @method_decorator(cache_page(60 * 15), name='dispatch')
- class CategoryListView(ListView):
- model = Category
- template_name = 'categories/category_list.html'
- class ArticleDetailView(DetailView):
- model = Article
- def get_object(self):
- article_id = self.kwargs['pk']
- cache_key = f'article_{article_id}'
- # 尝试从缓存获取
- article = cache.get(cache_key)
- if article is None:
- # 缓存未命中,从数据库获取
- article = super().get_object()
- # 存入缓存
- cache.set(cache_key, article, timeout=60*30)
- return article
- def get_popular_articles():
- cache_key = 'popular_articles'
- articles = cache.get(cache_key)
- if articles is None:
- articles = Article.objects.annotate(
- total_score=Count('likes') + Count('comments')
- ).order_by('-total_score')[:10]
- cache.set(cache_key, articles, timeout=60*60)
- return articles
复制代码 3. 缓存装饰器
- #
- from functools import wraps
- from django.core.cache import cache
- def cache_result(timeout=300):
- def decorator(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- # 生成缓存键
- cache_key = f"{func.__name__}:{args}:{kwargs}"
- result = cache.get(cache_key)
- if result is None:
- result = func(*args, **kwargs)
- cache.set(cache_key, result, timeout=timeout)
- return result
- return wrapper
- return decorator
- # 使用示例
- @cache_result(timeout=60*5)
- def get_article_stats(article_id):
- return {
- 'views': Article.objects.get(id=article_id).views,
- 'comments': Comment.objects.filter(article_id=article_id).count(),
- 'likes': Like.objects.filter(article_id=article_id).count()
- }
复制代码 四、异步处理实现
1. Celery配置和任务
- #
- from celery import Celery
- import os
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
- app = Celery('myproject')
- app.config_from_object('django.conf:settings', namespace='CELERY')
- app.autodiscover_tasks()
- #
- from celery import shared_task
- from django.core.mail import send_mail
- from .models import Article
- @shared_task
- def send_article_notification(article_id):
- article = Article.objects.get(id=article_id)
- subscribers = article.category.subscribers.all()
- for subscriber in subscribers:
- send_mail(
- f'新文章: {article.title}',
- f'查看最新文章:{article.get_absolute_url()}',
- '',
- [],
- fail_silently=False,
- )
- @shared_task
- def update_article_stats():
- """定期更新文章统计信息"""
- for article in Article.objects.all():
- stats = get_article_stats(
- article.stats_cache = stats
复制代码 2. 异步视图实现
- #
- from django.http import JsonResponse
- from asgiref.sync import sync_to_async
- from channels.layers import get_channel_layer
- from asgiref.sync import async_to_sync
- async def async_article_detail(request, pk):
- article = await sync_to_async(Article.objects.get)(id=pk)
- context = {
- 'title': article.title,
- 'content': article.content,
- 'author':
- }
- return JsonResponse(context)
- class CommentCreateView(View):
- def post(self, request, article_id):
- comment = Comment.objects.create(
- article_id=article_id,
- user=request.user,
- content=request.POST['content']
- )
- # 异步发送WebSocket通知
- channel_layer = get_channel_layer()
- async_to_sync(channel_layer.group_send)(
- f"article_{article_id}",
- {
- "type": "comment.notification",
- "message": {
- "comment_id":,
- "user": comment.user.username,
- "content": comment.content
- }
- }
- )
- return JsonResponse({'status': 'success'})
复制代码 3. 异步中央件
- #
- from django.core.cache import cache
- from asgiref.sync import sync_to_async
- class AsyncCacheMiddleware:
- def __init__(self, get_response):
- self.get_response = get_response
- async def __call__(self, request):
- cache_key = f"page_cache:{request.path}"
- response = await sync_to_async(cache.get)(cache_key)
- if response is None:
- response = await self.get_response(request)
- await sync_to_async(cache.set)(
- cache_key,
- response,
- timeout=300
- )
- return response
复制代码 五、性能监控
1. 查询日志记录
- #
- import time
- import logging
- from django.db import connection
- logger = logging.getLogger(__name__)
- class QueryCountMiddleware:
- def __init__(self, get_response):
- self.get_response = get_response
- def __call__(self, request):
- start_time = time.time()
- initial_queries = len(connection.queries)
- response = self.get_response(request)
- total_time = time.time() - start_time
- total_queries = len(connection.queries) - initial_queries
- if total_queries > 10: # 设置查询数量阈值
- logger.warning(
- f'Path: {request.path} - '
- f'Queries: {total_queries} - '
- f'Time: {total_time:.2f}s'
- )
- return response
复制代码 六、性能优化流程图
- 数据库优化:
- 公道使用索引
- 避免N+1查询问题
- 使用批量操纵替换循环操纵
- 定期分析和优化慢查询
- 缓存策略:
- 公道设置缓存时间
- 分层缓存架构
- 实时更新缓存
- 避免缓存雪崩
- 异步处理:
- 公道使用任务队列
- 异步处理耗时操纵
- 恰当使用异步视图
- 监控任务实验状态
