魏晓东 发表于 2024-12-10 23:55:22

Apache DolphinScheduler 限制秒级别的定时调度

背景

Apache DolphinScheduler 定时任务配置采用的 7 位 Crontab 表达式,分别对应秒、分、时、月天、月、周天、年。
在团队一样平常开发工作中,工作流的定时调度一样平常不会细化到秒级别。但汗青上出现过因配置的疏忽大意而产生故障时间,如应该配置每分钟实验的工作流被配置长了每秒实验,造成短时间内产生大量工作流实例,对 Apache DolphinScheduler 服务可用性和提交任务的 Hadoop 集群造成影响。
https://img2024.cnblogs.com/other/2685289/202412/2685289-20241210222954831-136734127.jpg
基于此,团队决定将 DolphinScheduler 中定时任务配置模块的 Crontab 表达式做限制,从平台侧杜绝此类变乱发生。
方案

我们的方案是从前后端双方面限制 Crontab 表达式的第一位:

[*]前端配置选择不提供“每一秒钟”选项
[*]服务端接口判定第一位为 * 时,返回错误
前端修改

在前端项目中,秒、分、时 均为同一模版(CrontabTime),因此新增 dolphinscheduler-ui/src/components/crontab/modules/second.tsx
只保存两种模式:intervalTime 和 specificTime
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import _ from 'lodash'
import { defineComponent, onMounted, PropType, ref, toRefs, watch } from 'vue'
import { NInputNumber, NRadio, NRadioGroup, NSelect } from 'naive-ui'
import { useI18n } from 'vue-i18n'
import { ICrontabI18n } from '../types'
import { isStr, specificList } from '../common'
import styles from '../index.module.scss'

const props = {
    timeMin: {
      type: Number as PropType<number>,
      default: 0
    },
    timeMax: {
      type: Number as PropType<number>,
      default: 60
    },
    intervalPerform: {
      type: Number as PropType<number>,
      default: 5
    },
    intervalStart: {
      type: Number as PropType<number>,
      default: 3
    },
    timeSpecial: {
      type: Number as PropType<number | string>,
      default: 60
    },
    timeValue: {
      type: String as PropType<string>,
      default: '*'
    },
    timeI18n: {
      type: Object as PropType<ICrontabI18n>,
      require: true
    }
}

export default defineComponent({
    name: 'CrontabSecond',
    props,
    emits: ['update:timeValue'],
    setup(props, ctx) {
      const options = Array.from({ length: 60 }, (x, i) => ({
            label: i.toString(),
            value: i
      }))

      const timeRef = ref()
      const radioRef = ref()
      const intervalStartRef = ref(props.intervalStart)
      const intervalPerformRef = ref(props.intervalPerform)
      const specificTimesRef = ref<Array<number>>([])

      /**
         * Parse parameter value
         */
      const analyticalValue = () => {
            const $timeVal = props.timeValue
            // Interval time
            const $interval = isStr($timeVal, '/')
            // Specific time
            const $specific = isStr($timeVal, ',')

            // Positive integer (times)
            if (
                ($timeVal.length === 1 ||
                  $timeVal.length === 2 ||
                  $timeVal.length === 4) &&
                _.isInteger(parseInt($timeVal))
            ) {
                radioRef.value = 'specificTime'
                specificTimesRef.value =
                return
            }

            // Interval times
            if ($interval) {
                radioRef.value = 'intervalTime'
                intervalStartRef.value = parseInt($interval)
                intervalPerformRef.value = parseInt($interval)
                timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`
                return
            }

            // Specific times
            if ($specific) {
                radioRef.value = 'specificTime'
                specificTimesRef.value = $specific.map((item) => parseInt(item))
                return
            }
      }

      // Interval start time(1)
      const onIntervalStart = (value: number | null) => {
            intervalStartRef.value = value || 0
            if (radioRef.value === 'intervalTime') {
                timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`
            }
      }

      // Interval execution time(2)
      const onIntervalPerform = (value: number | null) => {
            intervalPerformRef.value = value || 0
            if (radioRef.value === 'intervalTime') {
                timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`
            }
      }

      // Specific time
      const onSpecificTimes = (arr: Array<number>) => {
            specificTimesRef.value = arr
            if (radioRef.value === 'specificTime') {
                specificReset()
            }
      }

      // Reset interval time
      const intervalReset = () => {
            timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`
      }

      // Reset specific time
      const specificReset = () => {
            let timeValue = '0'
            if (specificTimesRef.value.length) {
                timeValue = specificTimesRef.value.join(',')
            }
            timeRef.value = timeValue
      }

      const updateRadioTime = (value: string) => {
            switch (value) {
                case 'intervalTime':
                  intervalReset()
                  break
                case 'specificTime':
                  specificReset()
                  break
            }
      }

      watch(
            () => timeRef.value,
            () => ctx.emit('update:timeValue', timeRef.value.toString())
      )

      onMounted(() => analyticalValue())

      return {
            options,
            radioRef,
            intervalStartRef,
            intervalPerformRef,
            specificTimesRef,
            updateRadioTime,
            onIntervalStart,
            onIntervalPerform,
            onSpecificTimes,
            ...toRefs(props)
      }
    },
    render() {
      const { t } = useI18n()

      return (
            <NRadioGroup
                v-model:value={this.radioRef}
                onUpdateValue={this.updateRadioTime}
            >
               
                  <NRadio value={'intervalTime'} />
                  
                        {t(this.timeI18n!.every)}
                        
                            <NInputNumber
                              defaultValue={5}
                              min={this.timeMin}
                              max={this.timeMax}
                              v-model:value={this.intervalPerformRef}
                              onUpdateValue={this.onIntervalPerform}
                            />
                        
                        
                              {t(this.timeI18n!.timeCarriedOut)}
                        
                        
                            <NInputNumber
                              defaultValue={3}
                              min={this.timeMin}
                              max={this.timeMax}
                              v-model:value={this.intervalStartRef}
                              onUpdateValue={this.onIntervalStart}
                            />
                        
                        {t(this.timeI18n!.timeStart)}
                  
               
               
                  <NRadio value={'specificTime'} />
                  
                        {t(this.timeI18n!.specificTime)}
                        
                            <NSelect
                              multiple
                              options={specificList}
                              placeholder={t(this.timeI18n!.specificTimeTip)}
                              v-model:value={this.specificTimesRef}
                              onUpdateValue={this.onSpecificTimes}
                            />
                        
                  
               
            </NRadioGroup>
      )
    }
})服务端

添加Crontab表达式检验(有两处:一处是新增Post接口、另一处是修改PUT接口),直接添加个检测方法供这两处调用:
      if (scheduleParam.getCrontab().startsWith("*")) {
            logger.error("The crontab must not start with *");
            putMsg(result, Status.CRONTAB_EVERY_SECOND_ERROR);
            return result;
      }本文完!
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Apache DolphinScheduler 限制秒级别的定时调度