每天40分玩转Django:Django性能优化

打印 上一主题 下一主题

主题 866|帖子 866|积分 2598

Django性能优化

一、性能优化要点总览表

优化类别具体措施预期结果数据库优化索引优化、查询优化、N+1问题提拔查询速率、淘汰数据库负载缓存优化Redis缓存、页面缓存、查询缓存淘汰数据库访问、提拔响应速率异步处理Celery任务队列、异步视图、异步ORM提拔并发性能、优化用户体验 二、数据库优化实现

1. 模子设计和索引优化

  1. # models.py
  2. from django.db import models
  3. from django.db.models import Index
  4. class Category(models.Model):
  5.     name = models.CharField(max_length=100)
  6.     slug = models.SlugField(unique=True)
  7.    
  8.     class Meta:
  9.         indexes = [
  10.             models.Index(fields=['name']),
  11.             models.Index(fields=['slug'])
  12.         ]
  13. class Article(models.Model):
  14.     title = models.CharField(max_length=200)
  15.     content = models.TextField()
  16.     category = models.ForeignKey(Category, on_delete=models.CASCADE)
  17.     author = models.ForeignKey('auth.User', on_delete=models.CASCADE)
  18.     created_at = models.DateTimeField(auto_now_add=True)
  19.     updated_at = models.DateTimeField(auto_now=True)
  20.     views = models.IntegerField(default=0)
  21.    
  22.     class Meta:
  23.         indexes = [
  24.             models.Index(fields=['created_at', '-views']),
  25.             models.Index(fields=['category', 'author']),
  26.         ]
复制代码
2. 查询优化

  1. # views.py
  2. from django.db.models import Prefetch, Count, Q
  3. from django.views.generic import ListView
  4. class ArticleListView(ListView):
  5.     model = Article
  6.     template_name = 'articles/article_list.html'
  7.     context_object_name = 'articles'
  8.    
  9.     def get_queryset(self):
  10.         # 使用select_related减少外键查询
  11.         queryset = Article.objects.select_related(
  12.             'category', 'author'
  13.         ).prefetch_related(
  14.             'tags',
  15.             Prefetch(
  16.                 'comments',
  17.                 queryset=Comment.objects.select_related('user')
  18.             )
  19.         )
  20.         
  21.         # 添加过滤条件
  22.         category = self.request.GET.get('category')
  23.         if category:
  24.             queryset = queryset.filter(category__slug=category)
  25.             
  26.         # 添加聚合查询
  27.         queryset = queryset.annotate(
  28.             comment_count=Count('comments'),
  29.             like_count=Count('likes')
  30.         )
  31.         
  32.         return queryset.order_by('-created_at')
复制代码
3. 批量操纵优化

  1. # utils.py
  2. from django.db import transaction
  3. def bulk_create_articles(articles_data):
  4.     """批量创建文章"""
  5.     with transaction.atomic():
  6.         articles = [
  7.             Article(
  8.                 title=data['title'],
  9.                 content=data['content'],
  10.                 category_id=data['category_id'],
  11.                 author_id=data['author_id']
  12.             )
  13.             for data in articles_data
  14.         ]
  15.         return Article.objects.bulk_create(articles)
  16. def bulk_update_views(article_ids):
  17.     """批量更新文章浏览量"""
  18.     with transaction.atomic():
  19.         Article.objects.filter(id__in=article_ids).update(
  20.             views=models.F('views') + 1
  21.         )
复制代码
三、缓存优化实现

1. Redis缓存配置

  1. # settings.py
  2. CACHES = {
  3.     'default': {
  4.         'BACKEND': 'django_redis.cache.RedisCache',
  5.         'LOCATION': 'redis://127.0.0.1:6379/1',
  6.         'OPTIONS': {
  7.             'CLIENT_CLASS': 'django_redis.client.DefaultClient',
  8.         }
  9.     }
  10. }
  11. # 使用Redis作为Session后端
  12. SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
  13. SESSION_CACHE_ALIAS = 'default'
复制代码
2. 视图缓存实现

  1. # views.py
  2. from django.views.decorators.cache import cache_page
  3. from django.utils.decorators import method_decorator
  4. from django.core.cache import cache
  5. from django.conf import settings
  6. @method_decorator(cache_page(60 * 15), name='dispatch')
  7. class CategoryListView(ListView):
  8.     model = Category
  9.     template_name = 'categories/category_list.html'
  10. class ArticleDetailView(DetailView):
  11.     model = Article
  12.    
  13.     def get_object(self):
  14.         article_id = self.kwargs['pk']
  15.         cache_key = f'article_{article_id}'
  16.         
  17.         # 尝试从缓存获取
  18.         article = cache.get(cache_key)
  19.         if article is None:
  20.             # 缓存未命中,从数据库获取
  21.             article = super().get_object()
  22.             # 存入缓存
  23.             cache.set(cache_key, article, timeout=60*30)
  24.         
  25.         return article
  26. def get_popular_articles():
  27.     cache_key = 'popular_articles'
  28.     articles = cache.get(cache_key)
  29.    
  30.     if articles is None:
  31.         articles = Article.objects.annotate(
  32.             total_score=Count('likes') + Count('comments')
  33.         ).order_by('-total_score')[:10]
  34.         cache.set(cache_key, articles, timeout=60*60)
  35.    
  36.     return articles
复制代码
3. 缓存装饰器

  1. # decorators.py
  2. from functools import wraps
  3. from django.core.cache import cache
  4. def cache_result(timeout=300):
  5.     def decorator(func):
  6.         @wraps(func)
  7.         def wrapper(*args, **kwargs):
  8.             # 生成缓存键
  9.             cache_key = f"{func.__name__}:{args}:{kwargs}"
  10.             result = cache.get(cache_key)
  11.             
  12.             if result is None:
  13.                 result = func(*args, **kwargs)
  14.                 cache.set(cache_key, result, timeout=timeout)
  15.             
  16.             return result
  17.         return wrapper
  18.     return decorator
  19. # 使用示例
  20. @cache_result(timeout=60*5)
  21. def get_article_stats(article_id):
  22.     return {
  23.         'views': Article.objects.get(id=article_id).views,
  24.         'comments': Comment.objects.filter(article_id=article_id).count(),
  25.         'likes': Like.objects.filter(article_id=article_id).count()
  26.     }
复制代码
四、异步处理实现

1. Celery配置和任务

  1. # celery.py
  2. from celery import Celery
  3. import os
  4. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
  5. app = Celery('myproject')
  6. app.config_from_object('django.conf:settings', namespace='CELERY')
  7. app.autodiscover_tasks()
  8. # tasks.py
  9. from celery import shared_task
  10. from django.core.mail import send_mail
  11. from .models import Article
  12. @shared_task
  13. def send_article_notification(article_id):
  14.     article = Article.objects.get(id=article_id)
  15.     subscribers = article.category.subscribers.all()
  16.    
  17.     for subscriber in subscribers:
  18.         send_mail(
  19.             f'新文章: {article.title}',
  20.             f'查看最新文章:{article.get_absolute_url()}',
  21.             'noreply@example.com',
  22.             [subscriber.email],
  23.             fail_silently=False,
  24.         )
  25. @shared_task
  26. def update_article_stats():
  27.     """定期更新文章统计信息"""
  28.     for article in Article.objects.all():
  29.         stats = get_article_stats(article.id)
  30.         article.stats_cache = stats
  31.         article.save()
复制代码
2. 异步视图实现

  1. # views.py
  2. from django.http import JsonResponse
  3. from asgiref.sync import sync_to_async
  4. from channels.layers import get_channel_layer
  5. from asgiref.sync import async_to_sync
  6. async def async_article_detail(request, pk):
  7.     article = await sync_to_async(Article.objects.get)(id=pk)
  8.     context = {
  9.         'title': article.title,
  10.         'content': article.content,
  11.         'author': article.author.username
  12.     }
  13.     return JsonResponse(context)
  14. class CommentCreateView(View):
  15.     def post(self, request, article_id):
  16.         comment = Comment.objects.create(
  17.             article_id=article_id,
  18.             user=request.user,
  19.             content=request.POST['content']
  20.         )
  21.         
  22.         # 异步发送WebSocket通知
  23.         channel_layer = get_channel_layer()
  24.         async_to_sync(channel_layer.group_send)(
  25.             f"article_{article_id}",
  26.             {
  27.                 "type": "comment.notification",
  28.                 "message": {
  29.                     "comment_id": comment.id,
  30.                     "user": comment.user.username,
  31.                     "content": comment.content
  32.                 }
  33.             }
  34.         )
  35.         
  36.         return JsonResponse({'status': 'success'})
复制代码
3. 异步中央件

  1. # middleware.py
  2. from django.core.cache import cache
  3. from asgiref.sync import sync_to_async
  4. class AsyncCacheMiddleware:
  5.     def __init__(self, get_response):
  6.         self.get_response = get_response
  7.    
  8.     async def __call__(self, request):
  9.         cache_key = f"page_cache:{request.path}"
  10.         response = await sync_to_async(cache.get)(cache_key)
  11.         
  12.         if response is None:
  13.             response = await self.get_response(request)
  14.             await sync_to_async(cache.set)(
  15.                 cache_key,
  16.                 response,
  17.                 timeout=300
  18.             )
  19.         
  20.         return response
复制代码
五、性能监控

1. 查询日志记录

  1. # middleware.py
  2. import time
  3. import logging
  4. from django.db import connection
  5. logger = logging.getLogger(__name__)
  6. class QueryCountMiddleware:
  7.     def __init__(self, get_response):
  8.         self.get_response = get_response
  9.    
  10.     def __call__(self, request):
  11.         start_time = time.time()
  12.         initial_queries = len(connection.queries)
  13.         
  14.         response = self.get_response(request)
  15.         
  16.         total_time = time.time() - start_time
  17.         total_queries = len(connection.queries) - initial_queries
  18.         
  19.         if total_queries > 10:  # 设置查询数量阈值
  20.             logger.warning(
  21.                 f'Path: {request.path} - '
  22.                 f'Queries: {total_queries} - '
  23.                 f'Time: {total_time:.2f}s'
  24.             )
  25.         
  26.         return response
复制代码
六、性能优化流程图


最佳实践建议:

  • 数据库优化:

    • 公道使用索引
    • 避免N+1查询问题
    • 使用批量操纵替换循环操纵
    • 定期分析和优化慢查询

  • 缓存策略:

    • 公道设置缓存时间
    • 分层缓存架构
    • 实时更新缓存
    • 避免缓存雪崩

  • 异步处理:

    • 公道使用任务队列
    • 异步处理耗时操纵
    • 恰当使用异步视图
    • 监控任务实验状态

这就是关于Django性能优化的详细内容。通过实践这些优化策略,你可以明显提拔Django应用的性能。如果有任何问题,接待随时提出!

怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财政自由,还请给个赞,谢谢!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农妇山泉一亩田

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表