实现了批量生成DolphinScheduler的任务,当导入时发现只能逐个导入,因此通过接口实现会更方便。
DolphinScheduler接口文档
DolphinScheduler是有接口文档的,地址是- http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn
复制代码 不外这文档写的比较大略,自己需要研究研究。
token:所有的接口都需要用到token
在安全中心-令牌管理 创建一个token 。记着这个token,后面所有的接口都需要用到 。
header:根据上面的token组成哀求要用的header- token = ''
- headers = {
- 'Accept': 'application/json',
- 'token': token
- }
复制代码 项目ID project_id 可以在查看项目工作流时,在url中找到。
DolphinScheduler导入任务接口
导入任务的接口是- import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import'
复制代码 知道接口 就可以导入了。- def import_job(file_path):
- # 打开文件并读取为二进制数据
- with open(file_path, 'rb') as file:
- files = {'file': file}
- # 导入工作流
- response = requests.post(import_url, headers=headers, files=files)
- print(response.status_code)
- if response.status_code != 200:
- print('上传失败 '+file_path)
复制代码 需要注意的是,导入任务时 只支持二进制。
file_path 是工作流文件,详细实现 可以工作流中导出一个作为参考。
重复利用上述方法,就可以实现批量导入任务。
工作流上线
利用上述方法批量完成任务上传后,仍旧有问题,逐个上线工作量也是个不小的工作量,因此继续利用接口。
经过研究发现,上线工作流需要先获取工作流的调度ID 。
获取工作流列表 - > 获取工作流code -> 获取所有工作流的调度ID -> 工作流上线
- jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition'
复制代码 不外这个要分页查询,轻微有一点点麻烦- def get_jobs_list():
- # 分页查询
- # 初始化分页参数
- pageNo = 1
- pageSize = 10
- url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal='
- # 构建完整的URL
- # 存储所有结果
- all_items = list()
- while True:
- # 构建完整的URL
- url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal='
- # 发送GET请求
- response = requests.get(url, headers=headers)
- # 检查响应状态码
- if response.status_code == 200:
- # 请求成功,处理响应数据
- items = response.content.decode()
- total = json.loads(items)["data"]["total"]
- item = json.loads(items)["data"]["totalList"]
- # 将当前页的数据添加到结果列表中
- for i in item:
- all_items.append(i)
- # 如果当前页没有数据,退出循环
- if pageNo * pageSize > total:
- break
- if not items:
- break
- # 增加页码
- pageNo += 1
- else:
- # 请求失败,打印错误信息
- print('请求失败:', response.status_code, response.text)
- break
- return all_items
复制代码 all_items 是所有工作流的详细内容,需要提取一下- all_jobs = get_jobs_list()
- job_codes = [job['code'] for job in all_jobs]
复制代码 这样就是所有的工作流code。
- 获取调度ID
下面是调度ID的接口,因为不想分页,直接一页1000个。
- schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode='
复制代码 利用这个接口就能拿到所有的调度ID- def schedule_id(job_code):
- url = schedules_url+str(job_code)
- response = requests.get(url, headers=headers)
- if response.status_code == 200:
- data = response.content.decode()
- js = json.loads(data)
- if len(js['data']['totalList'])>0 and js['data']['totalList'][0]['releaseState']=='OFFLINE':
- return js['data']['totalList'][0]['id']
- else:return ''
复制代码 这里过滤了已经上线的调度ID 。
- online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online'
复制代码 详细实现:- def online_job(scheduler_id):
- url = online_url.format(scheduler_id=scheduler_id)
- response = requests.post(url, headers=headers)
- if response.status_code == 200:
- print('success')
- else:
- print('online job failed')
复制代码 到此 就可以实现导入-批量全自动了。
打完收工,祝你不加班。
原文链接:https://blog.csdn.net/weixin_45399602/article/details/143226396
本文由 白鲸开源 提供发布支持!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |