ToB企服应用市场:ToB评测及商务社交产业平台

标题: 实操教程 | 触发器实现 Apache DolphinScheduler 失败钉钉自动告警 [打印本页]

作者: 温锦文欧普厨电及净水器总代理    时间: 2023-9-2 15:23
标题: 实操教程 | 触发器实现 Apache DolphinScheduler 失败钉钉自动告警

作者 | sqlboy-yuzhenc
背景介绍

在实际应用中,我们经常需要将特定的任务通知给特定的人,虽然 Apache DolphinScheduler 在安全中心提供了告警组和告警实例,但是配置起来相对复杂,并且还需要在定时调度时指定告警组。通过这篇文章,你将学到一个简单的方法,无需任何配置,只需要在用户表(t_ds_user)表中增加字段钉钉名称(dignding_name),创建用户时指定用户的手机号码和维护对应的钉钉名称,就能轻松实现 Apache DolphinScheduler 任务失败时钉钉告警到指定的人。
安装插件plpython3u
  1. psql etl -U postgres
  2. create extension plpython3u
复制代码
pip安装requests
  1. cd /opt && wget https://bootstrap.pypa.io/get-pip.py
  2. python get-pip.py
  3. pip install requests
复制代码
创建发送钉钉的存储过程

  1. sql
  2. create or replace function tool.sp_send(
  3.       message json
  4.      ,webhook varchar
  5.      ,secret varchar
  6. )
  7.     returns text
  8.     language plpython3u
  9.     security definer
  10. as $function$
  11. import requests
  12. import json
  13. import time
  14. import hmac
  15. import hashlib
  16. import base64
  17. import urllib.parse
  18. """
  19. /*
  20. * 作者 : v-yuzhenc
  21. * 功能 : 给钉钉发送一条消息
  22. * message : 需要发送的消息,json格式,详情参考https://open.dingtalk.com/document/robots/custom-robot-access
  23. * webhook : 钉钉机器人的webhook
  24. * secret : 钉钉机器人的secret
  25. * */
  26. """
  27. v_timestamp = str(round(time.time() * 1000))
  28. p_secret = secret
  29. secret_enc = p_secret.encode('utf-8')
  30. string_to_sign = '{}\n{}'.format(v_timestamp, p_secret)
  31. string_to_sign_enc = string_to_sign.encode('utf-8')
  32. hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
  33. v_sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
  34. # 钉钉自定义机器人的webhook地址
  35. p_webhook = webhook
  36. webhook_url = p_webhook+"&timestamp="+v_timestamp+"&sign="+v_sign
  37. # 要发送的消息内容
  38. p_message = json.loads(message)
  39. # 发送POST请求
  40. response = requests.post(webhook_url, data=json.dumps(p_message), headers={"Content-Type": "application/json"})
  41. # 打印响应结果
  42. return response.text
  43. $function$;
  44. alter function tool.sp_send(json,varchar,varchar) owner to tool;
  45. grant execute on function tool.sp_send(json,varchar,varchar) to public;
复制代码
测试发送钉钉的存储过程
  1. select sp_send('{
  2.     "msgtype": "actionCard",
  3.     "actionCard": {
  4.         "title": "我 20 年前想打造一间苹果咖啡厅,而它正是 Apple Store 的前身",
  5.         "text": "![screenshot](https://img2023.cnblogs.com/other/2685289/202308/2685289-20230829152524866-1747807117.png) \n\n #### 乔布斯 20 年前想打造的苹果咖啡厅 \n\n Apple Store 的设计正从原来满满的科技感走向生活化,而其生活化的走向其实可以追溯到 20 年前苹果一个建立咖啡馆的计划",
  6.         "btnOrientation": "0",
  7.         "btns": [
  8.             {
  9.                 "title": "内容不错",
  10.                 "actionURL": "https://www.dingtalk.com/"
  11.             },
  12.             {
  13.                 "title": "不感兴趣",
  14.                 "actionURL": "https://www.dingtalk.com/"
  15.             }
  16.         ]
  17.     }
  18. }'::json);
复制代码

参考

自定义机器人安全设置 - 钉钉开放平台
自定义机器人接入 - 钉钉开放平台
t_ds_user增加字段
  1. alter table t_ds_user add column dingding_name varchar(100);
  2. --人为将海豚账号对应的钉钉用户名更新上去
复制代码
编写触发器
  1. CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_ding()
  2. RETURNS trigger
  3. LANGUAGE plpgsql
  4. AS $function$
  5. /*
  6. * 作者:v-yuzhenc
  7. * 功能:海豚调度工作流失败自动告警
  8. * */
  9. declare
  10.     i record;
  11.     v_user varchar;
  12.     v_mobile varchar;
  13.     v_content text;
  14.     v_message varchar;
  15. begin
  16.     if new.state in (4,5,6) then
  17.         for i in (
  18.             select
  19.                              d.user_name
  20.                             ,d.phone
  21.                             ,d.dingding_name
  22.                             ,g.name project_name
  23.                             ,e.name process_name
  24.                             ,string_agg(distinct b.name||' '||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss'),'\r\n') task_name
  25.                         from t_ds_process_instance a
  26.                         inner join t_ds_task_instance b
  27.                         on (a.id = b.process_instance_id)
  28.                         inner join t_ds_task_definition c
  29.                         on (b.task_code = c.code and b.task_definition_version = c."version")
  30.                         inner join t_ds_user d
  31.                         on (c.user_id = d.id)
  32.                         inner join t_ds_process_definition e
  33.                         on (a.process_definition_code = e.code and a.process_definition_version = e."version")
  34.                         inner join t_ds_project g
  35.             on (e.project_code = g.code)
  36.                         where c.task_type <> 'SUB_PROCESS'
  37.                             and a.state = 6
  38.                             and b.state = 6
  39.                             and a.id = new.id
  40.                         group by d.user_name
  41.                                 ,d.phone
  42.                                 ,d.dingding_name
  43.                                 ,g.name
  44.                                 ,e.name
  45.         ) loop
  46.             v_mobile := i.phone;
  47.             v_user := i.dingding_name;
  48.             v_content := '海豚工作流执行失败,请尽快处理!\r\n项目名称:\r\n'||i.project_name||'\r\n工作流名称:\r\n'||i.process_name||'\r\n任务名称:\r\n'||i.task_name;
  49.             v_message := $v_message${
  50.     "at": {
  51.         "atMobiles":[
  52.             "$v_message$||v_mobile||$v_message$"
  53.         ],
  54.         "atUserIds":[
  55.             "$v_message$||v_user||$v_message$"
  56.         ],
  57.         "isAtAll": false
  58.     },
  59.     "text": {
  60.         "content":"$v_message$||v_content||$v_message$"
  61.     },
  62.     "msgtype":"text"
  63. }$v_message$;
  64.             --告警
  65.             perform tool.sp_send(v_message::json);
  66.         end loop;
  67.     end if;
  68.     return new;
  69. end;
  70. $function$
  71. ;
  72. create trigger tg_state_ds_process_instance after update on t_ds_process_instance for each row execute procedure tg_ds_udef_alert_ding();
复制代码
测试


本文转载自CSDN博主sqlboy-yuzhenc文章:https://blog.csdn.net/qq_33445829/article/details/131073349
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4