1.小文件上传
1.单文件上传
- import uvicorn
- from fastapi import FastAPI
- from fastapi import File
- app = FastAPI()
- @app.post("/file")
- async def file_upload(file: bytes = File(..., max_length=2097152)):
- """使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
- with open("D:\\lufei.jpg", "wb") as f:
- f.write(file)
- return {"file_size": len(file)}
- if __name__ == '__main__':
- uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
- debug=True, workers=1)
复制代码 2.多文件上传
- import uvicorn
- from fastapi import FastAPI
- from fastapi import File
- app = FastAPI()
- @app.post("/file")
- async def file_upload(file: List[bytes] = File(..., max_length=2097152)):
- """使用File类,文件内容会以bytes的形式读入内存,适合小上传文件"""
- return {"file_size": len(file)}
- if __name__ == '__main__':
- uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
- debug=True, workers=1)
复制代码 2.大文件上传
1.单文件上传
- import uvicorn
- from fastapi import FastAPI
- from fastapi import UploadFile
- app = FastAPI()
- @users.post("/upload_file")
- async def file_upload(file: UploadFile = File(...)):
- """
- 使用UploadFile类的优势:
- 1.文件开始存储在内存中,使用内存达到阈值后,将被保存在磁盘中
- 2.适合于图片、视频大文件
- 3.可以获取上传的文件的元数据,如文件名,创建时间等
- 4.有文件对象的异步接口
- 5.上传的文件是Python文件对象,可以使用write()、read()、seek()、close()等操做
- :param file:
- :return:
- """
- with open(f"D:\\{file.filename}", 'wb') as f:
- for i in iter(lambda: file.file.read(1024 * 1024 * 10), b''):
- f.write(i)
- f.close()
- return {"file_name": file.filename}
- if __name__ == '__main__':
- uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
- debug=True, workers=1)
复制代码 2.多文件上传
- import uvicorn
- from fastapi import FastAPI
- from fastapi import UploadFile
- app = FastAPI()
- @users.post("/upload_file")
- async def file_upload(files: List[UploadFile] = File(...)):
- for file in files:
- with open(f"D:\\{file.filename}", 'wb') as f:
- for i in iter(lambda: file.file.read(1024 * 1024 * 10), b''):
- f.write(i)
- f.close()
- return {"file_name": [file.filename for file in files]}
- if __name__ == '__main__':
- uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
- debug=True, workers=1)
复制代码 3.大文件切片上传和断点续传
1.逻辑思路
data:image/s3,"s3://crabby-images/0cc13/0cc13e76152a3e049274b2cd87e9f6b31d64f503" alt=""
2.代码示例
- import os
- import shutil
- from pathlib import Path
- from fastapi import FastAPI
- import uvicorn
- import aiofiles
- from fastapi import Body, File, UploadFile
- app = FastAPI()
- base_dir = os.path.dirname(os.path.abspath(__file__))
- upload_file_path = Path(base_dir, './uploads')
- @app.post("/upload")
- async def upload_file(
- identifier: str = Body(..., description="文件唯一标识符"),
- number: str = Body(..., description="文件分片序号(初值为0)"),
- file: UploadFile = File(..., description="文件")
- ):
- """文件分片上传"""
- path = Path(upload_file_path, identifier)
- if not os.path.exists(path):
- os.makedirs(path)
- file_name = Path(path, f'{identifier}_{number}')
- if not os.path.exists(file_name):
- async with aiofiles.open(file_name, 'wb') as f:
- await f.write(await file.read())
- return {
- 'code': 1,
- 'chunk': f'{identifier}_{number}'
- }
- @app.put("/merge")
- async def merge_file(
- name: str = Body(..., description="文件名称(不含后缀)"),
- file_type: str = Body(..., description="文件类型/后缀"),
- identifier: str = Body(..., description="文件唯一标识符")
- ):
- """合并分片文件"""
- target_file_name = Path(upload_file_path, f'{name}.{file_type}')
- path = Path(upload_file_path, identifier)
- try:
- async with aiofiles.open(target_file_name, 'wb+') as target_file: # 打开目标文件
- for i in range(len(os.listdir(path))):
- temp_file_name = Path(path, f'{identifier}_{i}')
- async with aiofiles.open(temp_file_name, 'rb') as temp_file: # 按序打开每个分片
- data = await temp_file.read()
- await target_file.write(data) # 分片内容写入目标文件
- except Exception as e:
- return {
- 'code': 0,
- 'error': f'合并失败:{e}'
- }
- shutil.rmtree(path) # 删除临时目录
- return {
- 'code': 1,
- 'name': f'{name}.{file_type}'
- }
- if __name__ == '__main__':
- uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
- debug=True, workers=1)
复制代码 4.大文件切片下载,断点续下
1.代码示例
- import os
- import re
- import stat
- from email.utils import formatdate
- from mimetypes import guess_type
- from pathlib import Path
- from urllib.parse import quote
- import uvicorn
- from fastapi import FastAPI, Header
- from fastapi import Path as F_Path
- from starlette.responses import StreamingResponse
- app = FastAPI()
- base_dir = os.path.dirname(os.path.abspath(__file__))
- upload_file_path = Path(base_dir, './uploads')
- @app.get("/downloads/{file_name}")
- async def download_file(
- scope: str = Header(default="bytes=0", regex=r"bytes=\d+"),
- file_name: str = F_Path(..., description="文件名称(含后缀)")
- ):
- """分片下载文件,支持断点续传"""
- # 检查文件是否存在
- file_path = Path(upload_file_path, file_name)
- if not os.path.exists(file_path):
- return {
- 'code': 0,
- 'error': '文件不存在'
- }
- # 获取文件的信息
- stat_result = os.stat(file_path)
- content_type, encoding = guess_type(file_path)
- content_type = content_type or 'application/octet-stream'
- # 读取文件的起始位置和终止位置
- range_match = re.search(r'bytes=(\d+)-(\d+)', scope, re.S) or re.search(r'bytes=(\d+)-', scope, re.S)
- if range_match:
- start_bytes = int(range_match.group(1))
- end_bytes = int(range_match.group(2)) if range_match.lastindex == 2 else stat_result.st_size - 1
- else:
- start_bytes = 0
- end_bytes = stat_result.st_size - 1
- # 这里 content_length 表示剩余待传输的文件字节长度 判断是否为一般文件
- content_length = stat_result.st_size - start_bytes if stat.S_ISREG(stat_result.st_mode) else stat_result.st_size
- # 构建文件名称
- name, *suffix = file_name.rsplit('.', 1)
- suffix = f'.{suffix[0]}' if suffix else ''
- filename = quote(f'{name}{suffix}') # 文件名编码,防止中文名报错
- # 打开文件从起始位置开始分片读取文件
- return StreamingResponse(
- file_iterator(file_path, start_bytes, 1024 * 1024 * 1), # 每次读取 1M
- media_type=content_type,
- headers={
- 'content-disposition': f'attachment; filename="{filename}"',
- 'accept-ranges': 'bytes',
- 'connection': 'keep-alive',
- 'content-length': str(content_length),
- 'content-range': f'bytes {start_bytes}-{end_bytes}/{stat_result.st_size}',
- 'last-modified': formatdate(stat_result.st_mtime, usegmt=True),
- },
- status_code=206 if start_bytes > 0 else 200
- )
- def file_iterator(file_path, offset, chunk_size):
- """
- 文件生成器
- :param file_path: 文件绝对路径
- :param offset: 文件读取的起始位置
- :param chunk_size: 文件读取的块大小
- :return: yield
- """
- with open(file_path, 'rb') as f:
- f.seek(offset, os.SEEK_SET)
- while True:
- data = f.read(chunk_size)
- if data:
- yield data
- else:
- break
- if __name__ == '__main__':
- uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True,
- debug=True, workers=1)
复制代码 2.相关模块解读
- import os
- stat_result = os.stat("D:\lufei.jpg")
- """
- stat_result(
- st_mode=33206, # 保护模式
- st_ino=844424930132018, # 索引号
- st_dev=2222159220, # 设备名
- st_nlink=1, # 硬链接号(被连接数目)
- st_uid=0, # 用户 ID
- st_gid=0, # 组 ID
- st_size=20152, # 文件大小,以字节为单位
- st_atime=1656044665, # 最后一次访问时间
- st_mtime=1656038259, # 文件最后一次修改时间
- st_ctime=1656038259 # 最后一次状态变化的时间,操作系统不会,该属性对应的结果也不同,例如在 Windows 操作系统下返回的就是文件的创建时间。
- )
- """
复制代码- import stat
- if stat.S_ISREG(st_mode): #判断是否一般文件
- print("xxxxx")
- elif stat.S_ISLNK(st_mode): #判断是否链接文件
- print("xxxxx")
- elif stat.S_ISSOCK(st_mode): #判断是否套接字文件
- print("xxxxx")
- elif stat.S_ISFIFO(st_mode): #判断是否命名管道
- print("xxxxx")
- elif stat.S_ISBLK(st_mode): #判断是否块设备
- print("xxxxx")
- elif stat.S_ISCHR(st_mode): #判断是否字符设置
- print("xxxxx")
- elif stat.S_ISDIR(st_mode): #判断是否目录
- print("xxxxx")
- # 查看文件权限
- perm_code = stat.S_IMODE(st_mode)
- perm = oct(perm_code)
- # 获取文件类型
- stat.S_IFMT(st_mode)
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |