万万哇 发表于 2022-8-9 14:41:46

Fastapi之文件上传

1.小文件上传

1.单文件上传

import uvicorn
from fastapi import FastAPI
from fastapi import File


app = FastAPI()


@app.post("/file")
async def file_upload(file: bytes = File(..., max_length=2097152)):
    """使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
    with open("D:\\lufei.jpg", "wb") as f:
      f.write(file)
    return {"file_size": len(file)}


if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
                debug=True, workers=1)2.多文件上传

import uvicorn
from fastapi import FastAPI
from fastapi import File


app = FastAPI()


@app.post("/file")
async def file_upload(file: List = File(..., max_length=2097152)):
    """使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
    return {"file_size": len(file)}


if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
                debug=True, workers=1)2.大文件上传

1.单文件上传

import uvicorn
from fastapi import FastAPI
from fastapi import UploadFile


app = FastAPI()


@users.post("/upload_file")
async def file_upload(file: UploadFile = File(...)):
    """
    使用UploadFile类的优势:
    1.文件开始存储在内存中,使用内存达到阈值后,将被保存在磁盘中
    2.适合于图片、视频大文件
    3.可以获取上传的文件的元数据,如文件名,创建时间等
    4.有文件对象的异步接口
    5.上传的文件是Python文件对象,可以使用write()、read()、seek()、close()等操做
    :param file:
    :return:
    """

    with open(f"D:\\{file.filename}", 'wb') as f:
      for i in iter(lambda: file.file.read(1024 * 1024 * 10), b''):
            f.write(i)
    f.close()
    return {"file_name": file.filename}

if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
                debug=True, workers=1)2.多文件上传

import uvicorn
from fastapi import FastAPI
from fastapi import UploadFile


app = FastAPI()


@users.post("/upload_file")
async def file_upload(files: List = File(...)):
    for file in files:
      with open(f"D:\\{file.filename}", 'wb') as f:
            for i in iter(lambda: file.file.read(1024 * 1024 * 10), b''):
                f.write(i)
      f.close()
    return {"file_name": }


if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
                debug=True, workers=1)3.大文件切片上传和断点续传

1.逻辑思路

https://img2022.cnblogs.com/blog/2057523/202206/2057523-20220624120125623-271308785.png
2.代码示例

import os
import shutil
from pathlib import Path

from fastapi import FastAPI
import uvicorn
import aiofiles
from fastapi import Body, File, UploadFile

app = FastAPI()

base_dir = os.path.dirname(os.path.abspath(__file__))
upload_file_path = Path(base_dir, './uploads')


@app.post("/upload")
async def upload_file(
      identifier: str = Body(..., description="文件唯一标识符"),
      number: str = Body(..., description="文件分片序号(初值为0)"),
      file: UploadFile = File(..., description="文件")
):
    """文件分片上传"""
    path = Path(upload_file_path, identifier)
    if not os.path.exists(path):
      os.makedirs(path)
    file_name = Path(path, f'{identifier}_{number}')
    if not os.path.exists(file_name):
      async with aiofiles.open(file_name, 'wb') as f:
            await f.write(await file.read())
    return {
      'code': 1,
      'chunk': f'{identifier}_{number}'
    }


@app.put("/merge")
async def merge_file(
      name: str = Body(..., description="文件名称(不含后缀)"),
      file_type: str = Body(..., description="文件类型/后缀"),
      identifier: str = Body(..., description="文件唯一标识符")
):
    """合并分片文件"""
    target_file_name = Path(upload_file_path, f'{name}.{file_type}')
    path = Path(upload_file_path, identifier)
    try:
      async with aiofiles.open(target_file_name, 'wb+') as target_file:# 打开目标文件
            for i in range(len(os.listdir(path))):
                temp_file_name = Path(path, f'{identifier}_{i}')
                async with aiofiles.open(temp_file_name, 'rb') as temp_file:# 按序打开每个分片
                  data = await temp_file.read()
                  await target_file.write(data)# 分片内容写入目标文件
    except Exception as e:
      return {
            'code': 0,
            'error': f'合并失败:{e}'
      }
    shutil.rmtree(path)# 删除临时目录
    return {
      'code': 1,
      'name': f'{name}.{file_type}'
    }


if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
                debug=True, workers=1)4.大文件切片下载,断点续下

1.代码示例

import os
import re
import stat
from email.utils import formatdate
from mimetypes import guess_type
from pathlib import Path
from urllib.parse import quote

import uvicorn
from fastapi import FastAPI, Header
from fastapi import Path as F_Path
from starlette.responses import StreamingResponse


app = FastAPI()

base_dir = os.path.dirname(os.path.abspath(__file__))
upload_file_path = Path(base_dir, './uploads')


@app.get("/downloads/{file_name}")
async def download_file(
      scope: str = Header(default="bytes=0", regex=r"bytes=\d+"),
      file_name: str = F_Path(..., description="文件名称(含后缀)")
):
    """分片下载文件,支持断点续传"""
    # 检查文件是否存在
    file_path = Path(upload_file_path, file_name)
    if not os.path.exists(file_path):
      return {
            'code': 0,
            'error': '文件不存在'
      }
    # 获取文件的信息
    stat_result = os.stat(file_path)
    content_type, encoding = guess_type(file_path)
    content_type = content_type or 'application/octet-stream'
    # 读取文件的起始位置和终止位置
    range_match = re.search(r'bytes=(\d+)-(\d+)', scope, re.S) or re.search(r'bytes=(\d+)-', scope, re.S)
    if range_match:
      start_bytes = int(range_match.group(1))
      end_bytes = int(range_match.group(2)) if range_match.lastindex == 2 else stat_result.st_size - 1
    else:
      start_bytes = 0
      end_bytes = stat_result.st_size - 1
    # 这里 content_length 表示剩余待传输的文件字节长度       判断是否为一般文件
    content_length = stat_result.st_size - start_bytes if stat.S_ISREG(stat_result.st_mode) else stat_result.st_size
    # 构建文件名称
    name, *suffix = file_name.rsplit('.', 1)
    suffix = f'.{suffix}' if suffix else ''
    filename = quote(f'{name}{suffix}')# 文件名编码,防止中文名报错
    # 打开文件从起始位置开始分片读取文件
    return StreamingResponse(
      file_iterator(file_path, start_bytes, 1024 * 1024 * 1),# 每次读取 1M
      media_type=content_type,
      headers={
            'content-disposition': f'attachment; filename="{filename}"',
            'accept-ranges': 'bytes',
            'connection': 'keep-alive',
            'content-length': str(content_length),
            'content-range': f'bytes {start_bytes}-{end_bytes}/{stat_result.st_size}',
            'last-modified': formatdate(stat_result.st_mtime, usegmt=True),
      },
      status_code=206 if start_bytes > 0 else 200
    )


def file_iterator(file_path, offset, chunk_size):
    """
    文件生成器
    :param file_path: 文件绝对路径
    :param offset: 文件读取的起始位置
    :param chunk_size: 文件读取的块大小
    :return: yield
    """
    with open(file_path, 'rb') as f:
      f.seek(offset, os.SEEK_SET)
      while True:
            data = f.read(chunk_size)
            if data:
                yield data
            else:
                break


if __name__ == '__main__':
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
                debug=True, workers=1)2.相关模块解读


[*]os.stat
import os


stat_result = os.stat("D:\lufei.jpg")

"""
stat_result(
st_mode=33206,   # 保护模式
st_ino=844424930132018,# 索引号
st_dev=2222159220,# 设备名
st_nlink=1,# 硬链接号(被连接数目)
st_uid=0,# 用户 ID
st_gid=0,# 组 ID
st_size=20152,# 文件大小,以字节为单位
st_atime=1656044665,# 最后一次访问时间
st_mtime=1656038259,# 文件最后一次修改时间
st_ctime=1656038259# 最后一次状态变化的时间,操作系统不会,该属性对应的结果也不同,例如在 Windows 操作系统下返回的就是文件的创建时间。
)
"""

[*]stat
import stat

if stat.S_ISREG(st_mode):         #判断是否一般文件
   print("xxxxx")
elif stat.S_ISLNK(st_mode):         #判断是否链接文件
   print("xxxxx")
elif stat.S_ISSOCK(st_mode):      #判断是否套接字文件   
   print("xxxxx")
elif stat.S_ISFIFO(st_mode):      #判断是否命名管道
   print("xxxxx")
elif stat.S_ISBLK(st_mode):         #判断是否块设备
   print("xxxxx")
elif stat.S_ISCHR(st_mode):         #判断是否字符设置
  print("xxxxx")
elif stat.S_ISDIR(st_mode):         #判断是否目录
  print("xxxxx")

# 查看文件权限
perm_code = stat.S_IMODE(st_mode)
perm = oct(perm_code)

# 获取文件类型
stat.S_IFMT(st_mode)
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Fastapi之文件上传