Fastapi之文件上传

打印 上一主题 下一主题

主题 506|帖子 506|积分 1518

1.小文件上传

1.单文件上传
  1. import uvicorn
  2. from fastapi import FastAPI
  3. from fastapi import File
  4. app = FastAPI()
  5. @app.post("/file")
  6. async def file_upload(file: bytes = File(..., max_length=2097152)):
  7.     """使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
  8.     with open("D:\\lufei.jpg", "wb") as f:
  9.         f.write(file)
  10.     return {"file_size": len(file)}
  11. if __name__ == '__main__':
  12.     uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
  13.                 debug=True, workers=1)
复制代码
2.多文件上传
  1. import uvicorn
  2. from fastapi import FastAPI
  3. from fastapi import File
  4. app = FastAPI()
  5. @app.post("/file")
  6. async def file_upload(file: List[bytes] = File(..., max_length=2097152)):
  7.     """使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
  8.     return {"file_size": len(file)}
  9. if __name__ == '__main__':
  10.     uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
  11.                 debug=True, workers=1)
复制代码
2.大文件上传

1.单文件上传
  1. import uvicorn
  2. from fastapi import FastAPI
  3. from fastapi import UploadFile
  4. app = FastAPI()
  5. @users.post("/upload_file")
  6. async def file_upload(file: UploadFile = File(...)):
  7.     """
  8.     使用UploadFile类的优势:
  9.     1.文件开始存储在内存中,使用内存达到阈值后,将被保存在磁盘中
  10.     2.适合于图片、视频大文件
  11.     3.可以获取上传的文件的元数据,如文件名,创建时间等
  12.     4.有文件对象的异步接口
  13.     5.上传的文件是Python文件对象,可以使用write()、read()、seek()、close()等操做
  14.     :param file:
  15.     :return:
  16.     """
  17.     with open(f"D:\\{file.filename}", 'wb') as f:
  18.         for i in iter(lambda: file.file.read(1024 * 1024 * 10), b''):
  19.             f.write(i)
  20.     f.close()
  21.     return {"file_name": file.filename}
  22. if __name__ == '__main__':
  23.     uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
  24.                 debug=True, workers=1)
复制代码
2.多文件上传
  1. import uvicorn
  2. from fastapi import FastAPI
  3. from fastapi import UploadFile
  4. app = FastAPI()
  5. @users.post("/upload_file")
  6. async def file_upload(files: List[UploadFile] = File(...)):
  7.     for file in files:
  8.         with open(f"D:\\{file.filename}", 'wb') as f:
  9.             for i in iter(lambda: file.file.read(1024 * 1024 * 10), b''):
  10.                 f.write(i)
  11.         f.close()
  12.     return {"file_name": [file.filename for file in files]}
  13. if __name__ == '__main__':
  14.     uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
  15.                 debug=True, workers=1)
复制代码
3.大文件切片上传和断点续传

1.逻辑思路


2.代码示例
  1. import os
  2. import shutil
  3. from pathlib import Path
  4. from fastapi import FastAPI
  5. import uvicorn
  6. import aiofiles
  7. from fastapi import Body, File, UploadFile
  8. app = FastAPI()
  9. base_dir = os.path.dirname(os.path.abspath(__file__))
  10. upload_file_path = Path(base_dir, './uploads')
  11. @app.post("/upload")
  12. async def upload_file(
  13.         identifier: str = Body(..., description="文件唯一标识符"),
  14.         number: str = Body(..., description="文件分片序号(初值为0)"),
  15.         file: UploadFile = File(..., description="文件")
  16. ):
  17.     """文件分片上传"""
  18.     path = Path(upload_file_path, identifier)
  19.     if not os.path.exists(path):
  20.         os.makedirs(path)
  21.     file_name = Path(path, f'{identifier}_{number}')
  22.     if not os.path.exists(file_name):
  23.         async with aiofiles.open(file_name, 'wb') as f:
  24.             await f.write(await file.read())
  25.     return {
  26.         'code': 1,
  27.         'chunk': f'{identifier}_{number}'
  28.     }
  29. @app.put("/merge")
  30. async def merge_file(
  31.         name: str = Body(..., description="文件名称(不含后缀)"),
  32.         file_type: str = Body(..., description="文件类型/后缀"),
  33.         identifier: str = Body(..., description="文件唯一标识符")
  34. ):
  35.     """合并分片文件"""
  36.     target_file_name = Path(upload_file_path, f'{name}.{file_type}')
  37.     path = Path(upload_file_path, identifier)
  38.     try:
  39.         async with aiofiles.open(target_file_name, 'wb+') as target_file:  # 打开目标文件
  40.             for i in range(len(os.listdir(path))):
  41.                 temp_file_name = Path(path, f'{identifier}_{i}')
  42.                 async with aiofiles.open(temp_file_name, 'rb') as temp_file:  # 按序打开每个分片
  43.                     data = await temp_file.read()
  44.                     await target_file.write(data)  # 分片内容写入目标文件
  45.     except Exception as e:
  46.         return {
  47.             'code': 0,
  48.             'error': f'合并失败:{e}'
  49.         }
  50.     shutil.rmtree(path)  # 删除临时目录
  51.     return {
  52.         'code': 1,
  53.         'name': f'{name}.{file_type}'
  54.     }
  55. if __name__ == '__main__':
  56.     uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
  57.                 debug=True, workers=1)
复制代码
4.大文件切片下载,断点续下

1.代码示例
  1. import os
  2. import re
  3. import stat
  4. from email.utils import formatdate
  5. from mimetypes import guess_type
  6. from pathlib import Path
  7. from urllib.parse import quote
  8. import uvicorn
  9. from fastapi import FastAPI, Header
  10. from fastapi import Path as F_Path
  11. from starlette.responses import StreamingResponse
  12. app = FastAPI()
  13. base_dir = os.path.dirname(os.path.abspath(__file__))
  14. upload_file_path = Path(base_dir, './uploads')
  15. @app.get("/downloads/{file_name}")
  16. async def download_file(
  17.         scope: str = Header(default="bytes=0", regex=r"bytes=\d+"),
  18.         file_name: str = F_Path(..., description="文件名称(含后缀)")
  19. ):
  20.     """分片下载文件,支持断点续传"""
  21.     # 检查文件是否存在
  22.     file_path = Path(upload_file_path, file_name)
  23.     if not os.path.exists(file_path):
  24.         return {
  25.             'code': 0,
  26.             'error': '文件不存在'
  27.         }
  28.     # 获取文件的信息
  29.     stat_result = os.stat(file_path)
  30.     content_type, encoding = guess_type(file_path)
  31.     content_type = content_type or 'application/octet-stream'
  32.     # 读取文件的起始位置和终止位置
  33.     range_match = re.search(r'bytes=(\d+)-(\d+)', scope, re.S) or re.search(r'bytes=(\d+)-', scope, re.S)
  34.     if range_match:
  35.         start_bytes = int(range_match.group(1))
  36.         end_bytes = int(range_match.group(2)) if range_match.lastindex == 2 else stat_result.st_size - 1
  37.     else:
  38.         start_bytes = 0
  39.         end_bytes = stat_result.st_size - 1
  40.     # 这里 content_length 表示剩余待传输的文件字节长度       判断是否为一般文件
  41.     content_length = stat_result.st_size - start_bytes if stat.S_ISREG(stat_result.st_mode) else stat_result.st_size
  42.     # 构建文件名称
  43.     name, *suffix = file_name.rsplit('.', 1)
  44.     suffix = f'.{suffix[0]}' if suffix else ''
  45.     filename = quote(f'{name}{suffix}')  # 文件名编码,防止中文名报错
  46.     # 打开文件从起始位置开始分片读取文件
  47.     return StreamingResponse(
  48.         file_iterator(file_path, start_bytes, 1024 * 1024 * 1),  # 每次读取 1M
  49.         media_type=content_type,
  50.         headers={
  51.             'content-disposition': f'attachment; filename="{filename}"',
  52.             'accept-ranges': 'bytes',
  53.             'connection': 'keep-alive',
  54.             'content-length': str(content_length),
  55.             'content-range': f'bytes {start_bytes}-{end_bytes}/{stat_result.st_size}',
  56.             'last-modified': formatdate(stat_result.st_mtime, usegmt=True),
  57.         },
  58.         status_code=206 if start_bytes > 0 else 200
  59.     )
  60. def file_iterator(file_path, offset, chunk_size):
  61.     """
  62.     文件生成器
  63.     :param file_path: 文件绝对路径
  64.     :param offset: 文件读取的起始位置
  65.     :param chunk_size: 文件读取的块大小
  66.     :return: yield
  67.     """
  68.     with open(file_path, 'rb') as f:
  69.         f.seek(offset, os.SEEK_SET)
  70.         while True:
  71.             data = f.read(chunk_size)
  72.             if data:
  73.                 yield data
  74.             else:
  75.                 break
  76. if __name__ == '__main__':
  77.     uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
  78.                 debug=True, workers=1)
复制代码
2.相关模块解读


  • os.stat
  1. import os
  2. stat_result = os.stat("D:\lufei.jpg")
  3. """
  4. stat_result(
  5.   st_mode=33206,   # 保护模式
  6.   st_ino=844424930132018,  # 索引号
  7.   st_dev=2222159220,  # 设备名
  8.   st_nlink=1,  # 硬链接号(被连接数目)
  9.   st_uid=0,  # 用户 ID
  10.   st_gid=0,  # 组 ID
  11.   st_size=20152,  # 文件大小,以字节为单位
  12.   st_atime=1656044665,  # 最后一次访问时间
  13.   st_mtime=1656038259,  # 文件最后一次修改时间
  14.   st_ctime=1656038259  # 最后一次状态变化的时间,操作系统不会,该属性对应的结果也不同,例如在 Windows 操作系统下返回的就是文件的创建时间。
  15. )
  16. """
复制代码

  • stat
  1. import stat
  2. if stat.S_ISREG(st_mode):           #判断是否一般文件
  3.    print("xxxxx")
  4. elif stat.S_ISLNK(st_mode):         #判断是否链接文件
  5.    print("xxxxx")
  6. elif stat.S_ISSOCK(st_mode):        #判断是否套接字文件   
  7.    print("xxxxx")
  8. elif stat.S_ISFIFO(st_mode):        #判断是否命名管道
  9.    print("xxxxx")
  10. elif stat.S_ISBLK(st_mode):         #判断是否块设备
  11.    print("xxxxx")
  12. elif stat.S_ISCHR(st_mode):         #判断是否字符设置
  13.   print("xxxxx")
  14. elif stat.S_ISDIR(st_mode):         #判断是否目录
  15.   print("xxxxx")
  16. # 查看文件权限
  17. perm_code = stat.S_IMODE(st_mode)
  18. perm = oct(perm_code)
  19. # 获取文件类型
  20. stat.S_IFMT(st_mode)
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万万哇

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表