Flink 任务启动脚本-V2(包括ck启动)

[复制链接]
发表于 2024-11-29 01:09:41 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
  1. #!/bin/bash
  2. #crontab时设置,如果依赖其他环境变量配置,可以在脚本执行一下环境变量脚本
  3. source /etc/profile
  4. # 进入脚本目录
  5. curdir=`dirname "$0"`
  6. curdir=`cd "$curdir"; pwd`
  7. echo "进入启动脚本目录 $curdir"
  8. # 定义应用程序名称
  9. APP_NAME="orderTest"
  10. # 定义checkpoint路径
  11. CHECKPOINT_BASE_PATH="hdfs:///jobs/flink/checkpoints/$APP_NAME/"
  12. MAIN_CLASS="com.test.mainTest"
  13. #绝对路径
  14. JAR_PATH="/$curdir/flink-test-1.0.0.jar"
  15. # 默认从checkpoint启动
  16. USE_CHECKPOINT=true
  17. # 显示使用说明
  18. usage() {
  19.     echo "用法: $0 [-n] [-h]"
  20.     echo "选项:"
  21.     echo "  -n    不从checkpoint启动任务(默认从最新的checkpoint启动)"
  22.     echo "  -h    显示此帮助信息"
  23.     echo
  24.     echo "示例:"
  25.     echo "  $0        # 从最新的checkpoint启动任务"
  26.     echo "  $0 -n     # 不使用checkpoint启动任务"
  27.     exit 1
  28. }
  29. # 解析命令行参数
  30. while getopts ":nh" opt; do
  31.     case $opt in
  32.         n)
  33.             USE_CHECKPOINT=false
  34.             echo "已设置:不从checkpoint启动任务"
  35.             ;;
  36.         h)
  37.             usage
  38.             ;;
  39.         \?)
  40.             echo "错误:无效的选项 -$OPTARG"
  41.             echo "使用 -h 查看帮助信息"
  42.             exit 1
  43.             ;;
  44.     esac
  45. done
  46. # 如果设置了无效参数,显示使用说明
  47. if [ $OPTIND -gt 1 ]; then
  48.     shift $((OPTIND-1))
  49.     if [ "$#" -gt 0 ]; then
  50.         echo "错误:存在额外的参数 $@"
  51.         echo "使用 -h 查看帮助信息"
  52.         exit 1
  53.     fi
  54. fi
  55. # 检查是否存在指定应用程序在运行中
  56. is_running=$(yarn application -list | grep -w "$APP_NAME" | grep -c "RUNNING")
  57. if [ $is_running -gt 0 ]; then
  58.     echo "应用程序 $APP_NAME 在运行中,退出脚本"
  59.     exit 1
  60. else
  61.     echo "应用程序 $APP_NAME 不在运行中,准备拉起任务"
  62. fi
  63. # 函数:获取最新成功的checkpoint地址
  64. get_latest_checkpoint() {
  65.     latest_checkpoint=$(hdfs dfs -ls -t -R $CHECKPOINT_BASE_PATH  | grep '_metadata' | sort -k6,7r | head -n 1 | awk '{print $8}'  )
  66.     if [ -z "$latest_checkpoint" ]; then
  67.         return 1
  68.     fi
  69.     # 去掉文件名,只保留目录路径
  70.     checkpoint_dir=$(dirname "$latest_checkpoint")
  71.     echo $checkpoint_dir
  72.     return 0
  73. }
  74. # 构建基础命令
  75. CMD="flink run \
  76. -t yarn-per-job \
  77. -d \
  78. -p 1 \
  79. -Dyarn.application.queue=realtime \
  80. -Dyarn.application.name=$APP_NAME \
  81. -Dtaskmanager.memory.process.size=4096mb \
  82. -Dtaskmanager.memory.network.max=64mb \
  83. -Dtaskmanager.memory.managed.size=1024mb \
  84. -Dtaskmanager.numberOfTaskSlots=1 \
  85. -c $MAIN_CLASS"
  86. if [ "$USE_CHECKPOINT" = true ]; then
  87.     # 获取最新的checkpoint地址
  88.     LATEST_CHECKPOINT=$(get_latest_checkpoint)
  89.     if [ $? -ne 0 ]; then
  90.         echo "没有找到适合的ck,退出执行"
  91.         exit 1
  92.     fi
  93.     echo "'$APP_NAME' 任务将从 '$LATEST_CHECKPOINT' 启动"
  94.     CMD="$CMD \
  95.     -s $LATEST_CHECKPOINT"
  96. else
  97.     echo "'$APP_NAME' 任务将不从checkpoint启动"
  98. fi
  99. # 添加最终的jar包和配置文件参数
  100. CMD="$CMD \
  101.    $JAR_PATH "
  102. # 执行命令
  103. echo " 任务启动命令: '$CMD' "
  104. eval $CMD
复制代码
本次新增一个不从 ck 启动的选择,默认从 最新 ck 启动,用法 sh start.sh -n 。
备注:由于Flink checkpoint 个别情况下,不一定能保证落地的checkpoint文件一定有效,所以需要人工介入支持。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告
回复

使用道具 举报

快速回复 返回顶部 返回列表