写过一篇 发表于 2024-7-26 14:13:43

Java使用定时任务开始-竣事活动

        有一种场景,好比抢购活动,活动的开始肯定不是到点后手动点击开始按钮的,而是通过配置活动开始时间和竣事时间,然后开启定时任务,通过定时任务来控制活动的开始和竣事。

1. 活动上架--触发定时任务的开启

@Override
@Transactional(rollbackFor = Exception.class)
public void publishIntegralActivity(List<Long> idList) {
    // 修改上架状态
    String accountName = SessionUtils.getAccountNameSession();
    integralActivityDao.updatePublishStatus(idList, PublishStatusEnum.PUBLISH_YES.getCode(), accountName);
    // 活动开始定时任务
    for (Long id : idList) {
      IntegralActivity integralActivity = integralActivityDao.getById(id);
      integralActivityCronService.addStartCron(id, integralActivity.getStartTime());
    }
} 2. 开启活动开始定时任务

public void addStartCron(long id, Date startTime) {
    long millSecond = startTime.getTime() - System.currentTimeMillis();
    ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
      @Override
      public void run() {
            log.debug("integral activity start cron begin execute");
            String key = RedisConstant.INTEGRAL_ACTIVITY_CRON + ":" + ACTIVITY_START_CRON_PREFX + id;
            String value = UUID.randomUUID().toString().replace("-", "");
            if (!getExecuteLock(key, value)) {
                return;
            }
            try {
                IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
                queryDto.setId(id);
                List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
                if (CollectionUtils.isEmpty(activityList) ||
                        !ActivityStatusEnum.NOT_START.getCode().equals(activityList.get(0).getActivityStatus()) ||
                        startTime.getTime() != activityList.get(0).getStartTime().getTime()) {
                  log.warn("when execute start cron, integral activity not exists or status not match");
                  lockUtils.unLock(key, value);
                  return;
                }
                // 更改活动状态
                integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.IN_PROGRESS.getCode(), null, null, activityList.get(0).getEndTime());
                // 清除活动结束定时任务
                clearCronById(ACTIVITY_END_CRON_PREFX + id);
                // 添加活动结束定时任务
                addEndCron(id, activityList.get(0).getEndTime());
                lockUtils.unLock(key, value);
            } catch (Exception e) {
                log.error("integral activity start error, retry", e);
                lockUtils.unLock(key, value);
                addStartCron(id, startTime);
            }
      }
    }, millSecond, TimeUnit.MILLISECONDS);
    // 加入任务
    currentCronMap.put(ACTIVITY_START_CRON_PREFX + id, scheduledFuture);
} 3. 开启定时任务时要加锁,防止并发哀求

private boolean getExecuteLock(String key, String value) {
    for (int i = 1; i <= 5; i++) {
      try {
            if (lockUtils.getLock(key, value, 10 * 60 * 1000)) {
                return true;
            }
      } catch (Exception e) {
            log.warn("redis lock get error, continue");
      }
    }
    return false;
} 4. 开启活动竣事定时任务

private void addEndCron(long id, Date endTime) {
    long millSecond = endTime.getTime() - System.currentTimeMillis();
    log.debug("integral activity end cron begin execute");
    ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
      @Override
      public void run() {
            try {
                IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
                queryDto.setId(id);
                List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
                if (CollectionUtils.isEmpty(activityList) ||
                        !ActivityStatusEnum.IN_PROGRESS.getCode().equals(activityList.get(0).getActivityStatus()) ||
                        endTime.getTime() != activityList.get(0).getEndTime().getTime()) {
                  return;
                }
                // 更改活动状态
                integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.FINISHED.getCode(), null, null, new Date());
                // 更改上架状态
                integralActivityDao.updatePublishStatus(Lists.newArrayList(id), PublishStatusEnum.PUBLISH_NO.getCode(), null);
            } catch (Exception e) {
                log.error("integral activity end error, retry", e);
                addEndCron(id, endTime);
            }
      }
    }, millSecond, TimeUnit.MILLISECONDS);
    // 加入任务
    currentCronMap.put(ACTIVITY_END_CRON_PREFX + id, scheduledFuture);
} 5.  活动下架

@Override
@Transactional(rollbackFor = Exception.class)
public void offIntegralActivity(List<Long> idList) {
    String accountName = SessionUtils.getAccountNameSession();
    integralActivityDao.updateActivityStatus(idList, ActivityStatusEnum.EARLY_STOP.getCode(), null, accountName, new Date());
    integralActivityDao.updatePublishStatus(idList, PublishStatusEnum.PUBLISH_NO.getCode(), accountName);
    // 活动结束定时任务
    for (Long id : idList) {
      integralActivityCronService.clearActivityCron(id);
    }
} 6. 活动下架后要清除定时任务

public void clearActivityCron(long id) {
    this.clearCronById(ACTIVITY_START_CRON_PREFX + id);
    this.clearCronById(ACTIVITY_END_CRON_PREFX + id);
}

@SuppressWarnings("rawtypes")
private void clearCronById(String cronId) {
    if (currentCronMap.get(cronId) == null) {
      log.debug("cron not exists");
    }
    Future future = currentCronMap.get(cronId);
    if (null != future && !future.isDone()) {
      future.cancel(true); // 正在运行是否干扰
    }
    currentCronMap.remove(cronId);
} 7. 程序启动时开启全部活动的定时任务

@Component
@Slf4j
public class InitIntegralActivityCron implements CommandLineRunner {
   
    @Autowired
    private LockUtils lockUtils;
    @Autowired
    private IntegralActivityCronService integralActivityCronService;

    @Override
    public void run(String... args) throws Exception {
      // 初始化积分活动定时任务
      String key = RedisConstant.LOAD_INTEGRAL_ACTIVITY_CRON;
      String value = UUID.randomUUID().toString().replace("-", "");
      if (!lockUtils.getLock(key, value, 30 * 60 * 1000)) {
            log.info("integral activity cron load lock get failed, return!!!");
            return;
      }
      try {
            if (!integralActivityCronService.createAllActivityCron()) {
                lockUtils.unLock(key, value);
                log.error("partial integral activity cron load error, system will exit");
                System.exit(0);
            }
      } catch (Exception e) {
            log.error("integral activity cron load error, system will exit", e);
            lockUtils.unLock(key, value);
            System.exit(0);
      }
    }
} @Override
public boolean createAllActivityCron() {
    boolean isAllSuccess = true;// 是否都创建成功
    List<String> activityStatusList = new ArrayList<String>();
    activityStatusList.add(ActivityStatusEnum.NOT_START.getCode());
    activityStatusList.add(ActivityStatusEnum.IN_PROGRESS.getCode());
    IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
    queryDto.setActivityStatusList(activityStatusList);
    queryDto.setPublishStatusList(Lists.newArrayList(PublishStatusEnum.PUBLISH_YES.getCode()));
    List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
    if (CollectionUtils.isNotEmpty(activityList)) {
      for (IntegralActivity integralActivity : activityList) {
            long id = integralActivity.getId();
            try {
                if (ActivityStatusEnum.NOT_START.getCode().equals(integralActivity.getActivityStatus())) {
                  this.addStartCron(id, integralActivity.getStartTime());
                } else {
                  this.addEndCron(id, integralActivity.getEndTime());
                }
            } catch (Exception e) {
                isAllSuccess = false;
                log.error("integral activity cron create error", e);
            }
      }
    }
    log.info("create integral activity cron end, activity count =" + activityList.size() + " " + (isAllSuccess ? "all success" : "partial success"));
    return isAllSuccess;
}
上面是定时任务的完整流程,下面补充完整代码:
1. 定时任务完整代码

@Service
@Slf4j
public class IntegralActivityCronServiceImpl implements IntegralActivityCronService {
   
    // 执行定时对象池
    private ScheduledExecutorService scheduleExecutor = Executors.newScheduledThreadPool(2);

    // 所有的定时任务
    @SuppressWarnings("rawtypes")
    private static Map<String, Future> currentCronMap = new HashMap<String, Future>();
    /**
   * 活动开始定时任务ID前缀
   */
    private static final String ACTIVITY_START_CRON_PREFX = "ACTIVITY_START:";
    /**
   * 活动结束定时任务ID前缀
   */
    private static final String ACTIVITY_END_CRON_PREFX = "ACTIVITY_END:";

    @Autowired
    private IntegralActivityDao integralActivityDao;
    @Autowired
    private LockUtils lockUtils;

    @Override
    public boolean createAllActivityCron() {
      boolean isAllSuccess = true;// 是否都创建成功
      List<String> activityStatusList = new ArrayList<String>();
      activityStatusList.add(ActivityStatusEnum.NOT_START.getCode());
      activityStatusList.add(ActivityStatusEnum.IN_PROGRESS.getCode());
      IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
      queryDto.setActivityStatusList(activityStatusList);
      queryDto.setPublishStatusList(Lists.newArrayList(PublishStatusEnum.PUBLISH_YES.getCode()));
      List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
      if (CollectionUtils.isNotEmpty(activityList)) {
            for (IntegralActivity integralActivity : activityList) {
                long id = integralActivity.getId();
                try {
                  if (ActivityStatusEnum.NOT_START.getCode().equals(integralActivity.getActivityStatus())) {
                        this.addStartCron(id, integralActivity.getStartTime());
                  } else {
                        this.addEndCron(id, integralActivity.getEndTime());
                  }
                } catch (Exception e) {
                  isAllSuccess = false;
                  log.error("integral activity cron create error", e);
                }
            }
      }
      log.info("create integral activity cron end, activity count =" + activityList.size() + " " + (isAllSuccess ? "all success" : "partial success"));
      return isAllSuccess;
    }

    /**
   * 添加启动定时任务
   *
   * @param id
   * @param startTime
   */
    public void addStartCron(long id, Date startTime) {
      long millSecond = startTime.getTime() - System.currentTimeMillis();
      ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
            @Override
            public void run() {
                log.debug("integral activity start cron begin execute");
                String key = RedisConstant.INTEGRAL_ACTIVITY_CRON + ":" + ACTIVITY_START_CRON_PREFX + id;
                String value = UUID.randomUUID().toString().replace("-", "");
                if (!getExecuteLock(key, value)) {
                  return;
                }
                try {
                  IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
                  queryDto.setId(id);
                  List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
                  if (CollectionUtils.isEmpty(activityList) ||
                            !ActivityStatusEnum.NOT_START.getCode().equals(activityList.get(0).getActivityStatus()) ||
                            startTime.getTime() != activityList.get(0).getStartTime().getTime()) {
                        log.warn("when execute start cron, integral activity not exists or status not match");
                        lockUtils.unLock(key, value);
                        return;
                  }
                  // 更改活动状态
                  integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.IN_PROGRESS.getCode(), null, null, activityList.get(0).getEndTime());
                  // 清除活动结束定时任务
                  clearCronById(ACTIVITY_END_CRON_PREFX + id);
                  // 添加活动结束定时任务
                  addEndCron(id, activityList.get(0).getEndTime());
                  lockUtils.unLock(key, value);
                } catch (Exception e) {
                  log.error("integral activity start error, retry", e);
                  lockUtils.unLock(key, value);
                  addStartCron(id, startTime);
                }
            }
      }, millSecond, TimeUnit.MILLISECONDS);
      // 加入任务
      currentCronMap.put(ACTIVITY_START_CRON_PREFX + id, scheduledFuture);
    }

    /**
   * 添加结束定时任务
   *
   * @param id
   * @param endTime
   */
    private void addEndCron(long id, Date endTime) {
      long millSecond = endTime.getTime() - System.currentTimeMillis();
      log.debug("integral activity end cron begin execute");
      ScheduledFuture<?> scheduledFuture = scheduleExecutor.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                  IntegralActivityQueryDto queryDto = new IntegralActivityQueryDto();
                  queryDto.setId(id);
                  List<IntegralActivity> activityList = integralActivityDao.getActivityList(queryDto);
                  if (CollectionUtils.isEmpty(activityList) ||
                            !ActivityStatusEnum.IN_PROGRESS.getCode().equals(activityList.get(0).getActivityStatus()) ||
                            endTime.getTime() != activityList.get(0).getEndTime().getTime()) {
                        return;
                  }
                  // 更改活动状态
                  integralActivityDao.updateActivityStatus(Lists.newArrayList(id), ActivityStatusEnum.FINISHED.getCode(), null, null, new Date());
                  // 更改上架状态
                  integralActivityDao.updatePublishStatus(Lists.newArrayList(id), PublishStatusEnum.PUBLISH_NO.getCode(), null);
                } catch (Exception e) {
                  log.error("integral activity end error, retry", e);
                  addEndCron(id, endTime);
                }
            }
      }, millSecond, TimeUnit.MILLISECONDS);
      // 加入任务
      currentCronMap.put(ACTIVITY_END_CRON_PREFX + id, scheduledFuture);
    }

    /**
   * 清除活动定时任务
   *
   * @param id
   */
    public void clearActivityCron(long id) {
      this.clearCronById(ACTIVITY_START_CRON_PREFX + id);
      this.clearCronById(ACTIVITY_END_CRON_PREFX + id);
    }

    /**
   * 通过id清除任务
   *
   * @param cronId
   */
    @SuppressWarnings("rawtypes")
    private void clearCronById(String cronId) {
      if (currentCronMap.get(cronId) == null) {
            log.debug("cron not exists");
      }
      Future future = currentCronMap.get(cronId);
      if (null != future && !future.isDone()) {
            future.cancel(true); // 正在运行是否干扰
      }
      currentCronMap.remove(cronId);
    }
   
    /**
   * 获取活动执行锁
   *
   * @param key
   * @param value
   * @return
   */
    private boolean getExecuteLock(String key, String value) {
      for (int i = 1; i <= 5; i++) {
            try {
                if (lockUtils.getLock(key, value, 10 * 60 * 1000)) {
                  return true;
                }
            } catch (Exception e) {
                log.warn("redis lock get error, continue");
            }
      }
      return false;
    }
} 2. Redis工具类

@Component()
public class LockUtils {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public LockUtils() {
    }

    public boolean getLock(String key) {
      return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, "lock_default_value", Duration.ofSeconds(60L));
    }

    public boolean getLock(String key, String value) {
      return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, value, Duration.ofSeconds(60L));
    }

    public boolean unLock(String key) {
      return this.redisTemplate.delete("lock_key_" + key);
    }

    public boolean unLock(String key, String value) {
      if (value == null) {
            return this.unLock(key);
      } else {
            return value.equals(this.redisTemplate.opsForValue().get("lock_key_" + key)) ? this.redisTemplate.delete("lock_key_" + key) : false;
      }
    }

    public boolean getLock(String key, long expireTime) {
      return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, "lock_default_value", expireTime, TimeUnit.MILLISECONDS);
    }

    public boolean getLock(String key, String value, long expireTime) {
      return this.redisTemplate.opsForValue().setIfAbsent("lock_key_" + key, value, expireTime, TimeUnit.MILLISECONDS);
    }
} 3. 活动状态枚举类

public enum ActivityStatusEnum {

    NOT_START("1", "未开始"),
    IN_PROGRESS("2", "进行中"),
    FINISHED("3", "已结束"),
    EARLY_STOP("4", "提前结束");
   
    private String code;
    private String value;

    private ActivityStatusEnum(String code, String value) {
      this.code = code;
      this.value = value;
    }

    public String getCode() {
      return code;
    }

    public void setCode(String code) {
      this.code = code;
    }

    public String getValue() {
      return value;
    }

    public void setValue(String value) {
      this.value = value;
    }

    /**
   * 根据code获取value
   * @param code
   * @return
   */
    public static String getValueByCode(String code) {
      if (StringUtils.isEmpty(code)) {
            return "";
      }
      for (ActivityStatusEnum item : ActivityStatusEnum.values()) {
            if (item.getCode().equals(code)) {
                return item.getValue();
            }
      }
      return "";
    }
   
    /**
   * 根据code获取value
   * @param code
   * @return
   */
    public static String getCodeByValue(String value) {
      if (StringUtils.isEmpty(value)) {
            return "";
      }
      for (ActivityStatusEnum item : ActivityStatusEnum.values()) {
            if (item.getValue().equals(value)) {
                return item.getCode();
            }
      }
      return "";
    }
} 其它业务代码没有参考价值不再贴出。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Java使用定时任务开始-竣事活动